Source code for pyiso.caiso

import copy
import re
from datetime import datetime, timedelta, time
from io import BytesIO, StringIO

import pandas as pd
import pytz
from bs4 import BeautifulSoup

from pyiso import LOGGER
from pyiso.base import BaseClient


[docs]class CAISOClient(BaseClient): """ Interface to CAISO data sources. For information about the data sources, see http://www.caiso.com/Documents/InterfaceSpecifications-OASISv4_1_3.pdf """ NAME = 'CAISO' base_url_oasis = 'http://oasis.caiso.com/oasisapi/SingleZip' base_url_gen = 'http://content.caiso.com/green/renewrpt/' base_url_outlook = 'http://content.caiso.com/outlook/SP/' price_map_url = 'http://wwwmobile.caiso.com/Web.Service.Chart/api/v3/ChartService/PriceContourMap1' base_payload = {'version': 1} oasis_request_time_format = '%Y%m%dT%H:%M-0000' TZ_NAME = 'America/Los_Angeles' fuels = { 'GEOTHERMAL': 'geo', 'BIOMASS': 'biomass', 'BIOGAS': 'biogas', 'SMALL HYDRO': 'smhydro', 'WIND TOTAL': 'wind', 'SOLAR': 'solar', 'SOLAR PV': 'solarpv', 'SOLAR THERMAL': 'solarth', 'NUCLEAR': 'nuclear', 'THERMAL': 'thermal', 'HYDRO': 'hydro', } oasis_markets = { # {'RT5M': 'RTM', 'DAHR': 'DAM', 'RTHR': 'HASP'} BaseClient.MARKET_CHOICES.hourly: 'HASP', BaseClient.MARKET_CHOICES.fivemin: 'RTM', # There are actually three codes used: RTPD (Real-time Pre-dispatch), RTD (real-time dispatch), and RTM (Real-Time Market). I can't figure out what the difference is. BaseClient.MARKET_CHOICES.dam: 'DAM', BaseClient.MARKET_CHOICES.fifteenmin: 'RTPD' } def __init__(self): super(CAISOClient, self).__init__() self.ca_tz = pytz.timezone(self.TZ_NAME)
[docs] def handle_options(self, **kwargs): # regular handle options super(CAISOClient, self).handle_options(**kwargs) # ensure market and freq are set if 'market' not in self.options: if self.options['forecast']: self.options['market'] = self.MARKET_CHOICES.dam elif self.options['sliceable'] and self.options['data'] == 'gen': self.options['market'] = self.MARKET_CHOICES.dam else: self.options['market'] = self.MARKET_CHOICES.fivemin if 'freq' not in self.options: if self.options['forecast']: self.options['freq'] = self.FREQUENCY_CHOICES.hourly elif self.options['sliceable'] and self.options['data'] == 'gen': self.options['freq'] = self.FREQUENCY_CHOICES.hourly else: self.options['freq'] = self.FREQUENCY_CHOICES.fivemin
[docs] def get_generation(self, latest=False, yesterday=False, start_at=False, end_at=False, **kwargs): # set args self.handle_options(data='gen', latest=latest, yesterday=yesterday, start_at=start_at, end_at=end_at, **kwargs) if self.options['latest']: return self._generation_latest() elif self.options['forecast'] or self.options['market'] == self.MARKET_CHOICES.dam: return self._generation_forecast() else: return self._generation_historical()
[docs] def get_load(self, latest=False, start_at=False, end_at=False, **kwargs): # set args self.handle_options(data='load', latest=latest, start_at=start_at, end_at=end_at, **kwargs) # construct and execute OASIS request payload = self.construct_oasis_payload('SLD_FCST') oasis_data = self.fetch_oasis(payload=payload) # parse data parsed_data = self.parse_oasis_demand_forecast(oasis_data) if self.options['latest']: # select latest latest_dp = None latest_ts = self.utcify('1900-01-01 12:00') now = self.utcify(datetime.utcnow(), tz_name='utc') for dp in parsed_data: if dp['timestamp'] < now and dp['timestamp'] > latest_ts: latest_dp = dp latest_ts = dp['timestamp'] # return latest if latest_dp: return [latest_dp] else: return [] else: # return all data return parsed_data
[docs] def get_trade(self, latest=False, start_at=False, end_at=False, **kwargs): # set args self.handle_options(data='trade', latest=latest, start_at=start_at, end_at=end_at, **kwargs) # construct and execute OASIS request payload = self.construct_oasis_payload('ENE_SLRS') oasis_data = self.fetch_oasis(payload=payload) # parse data parsed_data = self.parse_oasis_slrs(oasis_data) if self.options['latest']: # select latest latest_dp = None latest_ts = self.utcify('1900-01-01 12:00') now = self.utcify(datetime.utcnow(), tz_name='utc') for dp in parsed_data: if dp['timestamp'] < now and dp['timestamp'] > latest_ts: latest_dp = dp latest_ts = dp['timestamp'] # return latest if latest_dp: return [latest_dp] else: return [] else: # return all data return parsed_data
[docs] def construct_oasis_payload(self, queryname, **kwargs): # get start and end times if self.options['latest']: now = self.utcify(datetime.utcnow(), tz_name='utc') startdatetime = now - timedelta(minutes=20) enddatetime = now + timedelta(minutes=20) else: startdatetime = self.options['start_at'] enddatetime = self.options['end_at'] # get market id try: market_run_id = self.options['market_run_id'] except KeyError: market_run_id = self.oasis_markets[self.options['market']] self.options.update(market_run_id=market_run_id) # construct payload payload = { 'queryname': queryname, 'market_run_id': market_run_id, 'startdatetime': (startdatetime).strftime(self.oasis_request_time_format), 'enddatetime': (enddatetime).strftime(self.oasis_request_time_format), } payload.update(self.base_payload) payload.update(kwargs) # return return payload
[docs] def set_dt_index(self, df, date, hours, end_of_hour=True): if end_of_hour: offset = -1 else: offset = 0 # create list of combined datetimes dts = [datetime.combine(date, time(hour=(int(h)+offset))) for h in hours] # set list as index df.index = dts # utcify df.index = self.utcify_index(df.index) return df
def _generation_historical(self): # set up storage parsed_data = [] # collect data request_date = self.options['start_at'].astimezone(self.ca_tz).date() local_end_at = self.options['end_at'].astimezone(self.ca_tz).date() while request_date <= local_end_at: # set up request url_file = request_date.strftime('%Y%m%d_DailyRenewablesWatch.txt') url = self.base_url_gen + url_file # carry out request response = self.request(url) if not response: request_date += timedelta(days=1) continue dst_error_text = 'The supplied DateTime represents an invalid time. For example, when the clock is ' \ 'adjusted forward, any time in the period that is skipped is invalid.' header_idx = 1 for part in [1, 2]: # process both halves of page (i.e. two parts) num_data_rows = 24 # The day transitioning to daylight saving time adds extra erroneous lines of text. if part == 1 and dst_error_text in response.text: num_data_rows = 29 df = self.parse_to_df(response.text, nrows=num_data_rows, header=header_idx, delimiter='\t+') # The day transitioning to daylight saving time has errors in part two of the file that need removal. if part == 2: df = df[df.THERMAL.map(str) != '#VALUE!'] # combine date with hours to index try: indexed = self.set_dt_index(df, request_date, df['Hour']) except Exception as e: LOGGER.error(e) continue # original header is fuel names indexed.rename(columns=self.fuels, inplace=True) # remove non-fuel cols fuel_cols = list(set(self.fuels.values()) & set(indexed.columns)) subsetted = indexed[fuel_cols] # pivot pivoted = self.unpivot(subsetted) pivoted.rename(columns={'level_1': 'fuel_name', 0: 'gen_MW'}, inplace=True) # slice times sliced = self.slice_times(pivoted) # store parsed_data += self.serialize(sliced, header=['timestamp', 'fuel_name', 'gen_MW'], extras={'ba_name': self.NAME, 'market': self.MARKET_CHOICES.hourly, 'freq': self.FREQUENCY_CHOICES.hourly}) # If processing the first part, set the header index for second part. if part == 1: header_idx = num_data_rows + 3 # finish day request_date += timedelta(days=1) # return return parsed_data
[docs] def fetch_oasis(self, payload={}, return_all_files=False): """ Returns a list of report data elements, or an empty list if an error was encountered. If return_all_files=False, returns only the content from the first file in the .zip - this is the default behavior and was used in earlier versions of this function. If return_all_files=True, will return an array representing the content from each file. """ # set up storage raw_data = [] if return_all_files is True: default_return_val = [] else: default_return_val = '' # try get response = self.request(self.base_url_oasis, params=payload) if not response: return default_return_val # read data from zip # This will be an array of content if successful, and None if unsuccessful content = self.unzip(response.content) if not content: return default_return_val # check xml content for errors soup = BeautifulSoup(content[0], 'xml') error = soup.find(['error', 'ERROR']) if error: code = error.find(['err_code', 'ERR_CODE']) desc = error.find(['err_desc', 'ERR_DESC']) msg = 'XML error for CAISO OASIS with payload %s: %s %s' % (payload, code, desc) LOGGER.error(msg) return default_return_val # return xml or csv data if payload.get('resultformat', False) == 6: # If we requested CSV files if return_all_files: return content else: return content[0] else: # Return XML content if return_all_files: raw_data = [BeautifulSoup(thisfile, 'xml').find_all(['REPORT_DATA', 'report_data']) for thisfile in content] return raw_data else: raw_data = soup.find_all(['REPORT_DATA', 'report_data']) return raw_data
[docs] def parse_oasis_renewable(self, raw_data): """Parse raw data output of fetch_oasis for renewables.""" # set up storage preparsed_data = {} parsed_data = [] # extract values from xml for raw_soup_dp in raw_data: # set up storage for timestamp ts = self.utcify(raw_soup_dp.find(['INTERVAL_START_GMT', 'interval_start_gmt']).string) if ts not in preparsed_data: preparsed_data[ts] = {'wind': 0, 'solar': 0} # store generation value try: fuel_name = raw_soup_dp.find(['RENEWABLE_TYPE', 'renewable_type']).string.lower() gen_MW = float(raw_soup_dp.find(['VALUE', 'value']).string) preparsed_data[ts][fuel_name] += gen_MW except TypeError: LOGGER.error('Error in schema for CAISO OASIS result %s' % raw_soup_dp.prettify()) continue # collect values into dps freq = self.options.get('freq', self.FREQUENCY_CHOICES.hourly) market = self.options.get('market', self.MARKET_CHOICES.hourly) for ts, preparsed_dp in preparsed_data.items(): # set up base base_parsed_dp = {'timestamp': ts, 'freq': freq, 'market': market, 'gen_MW': 0, 'ba_name': self.NAME} # collect data for fuel_name in ['wind', 'solar']: parsed_dp = copy.deepcopy(base_parsed_dp) parsed_dp['fuel_name'] = fuel_name parsed_dp['gen_MW'] += preparsed_dp[fuel_name] parsed_data.append(parsed_dp) # return return parsed_data
[docs] def parse_oasis_slrs(self, raw_data): """Parse raw data output of fetch_oasis for System Load and Resource Schedules.""" # set strings to search on if self.options['data'] == 'gen': data_items = ['ISO_TOT_GEN_MW'] data_label = 'gen_MW' elif self.options['data'] == 'trade': data_items = ['ISO_TOT_EXP_MW', 'ISO_TOT_IMP_MW'] data_label = 'net_exp_MW' else: data_items = [] data_label = None freq = self.options.get('freq', self.FREQUENCY_CHOICES.fivemin) market = self.options.get('market', self.MARKET_CHOICES.fivemin) # set up storage extracted_data = {} parsed_data = [] # extract values from xml for raw_soup_dp in raw_data: data_item = raw_soup_dp.find(['DATA_ITEM', 'data_item']).string if data_item in data_items: # parse timestamp ts = self.utcify(raw_soup_dp.find(['INTERVAL_START_GMT', 'interval_start_gmt']).string) # parse val if data_item == 'ISO_TOT_IMP_MW': val = -float(raw_soup_dp.find(['VALUE', 'value']).string) else: val = float(raw_soup_dp.find(['VALUE', 'value']).string) # add to storage try: extracted_data[ts] += val except KeyError: extracted_data[ts] = val # assemble data for ts in sorted(extracted_data.keys()): parsed_dp = {data_label: extracted_data[ts]} parsed_dp.update({'timestamp': ts, 'freq': freq, 'market': market, 'ba_name': self.NAME}) if self.options['data'] == 'gen': parsed_dp.update({'fuel_name': 'other'}) # add to storage parsed_data.append(parsed_dp) # return return parsed_data
[docs] def parse_oasis_demand_forecast(self, raw_data): """Parse raw data output of fetch_oasis for system-wide 5-min RTM demand forecast.""" # set up storage parsed_data = [] # set up freq and market freq = self.options.get('freq', self.FREQUENCY_CHOICES.fivemin) market = self.options.get('market', self.MARKET_CHOICES.fivemin) if market == self.MARKET_CHOICES.dam: data_item_key = 'SYS_FCST_DA_MW' else: data_item_key = 'SYS_FCST_5MIN_MW' # extract values from xml for raw_soup_dp in raw_data: if raw_soup_dp.find(['DATA_ITEM', 'data_item']).string == data_item_key and \ raw_soup_dp.find(['RESOURCE_NAME', 'resource_name']).string == 'CA ISO-TAC': # parse timestamp ts = self.utcify(raw_soup_dp.find(['INTERVAL_START_GMT', 'interval_start_gmt']).string) # set up base parsed_dp = {'timestamp': ts, 'freq': freq, 'market': market, 'ba_name': self.NAME} # store generation value parsed_dp['load_MW'] = float(raw_soup_dp.find(['VALUE', 'value']).string) parsed_data.append(parsed_dp) # return return parsed_data
[docs] def todays_outlook_time(self, demand_soup): for ts_soup in demand_soup.find_all(class_='docdate'): if str(ts_soup) is None: continue match = re.search('\d{1,2}-[a-zA-Z]+-\d{4} \d{1,2}:\d{2}', str(ts_soup)) if match: ts_str = match.group(0) return self.utcify(ts_str) return None
[docs] def fetch_todays_outlook_renewables(self): # get renewables data response = self.request(self.base_url_outlook+'renewables.html') try: return BeautifulSoup(response.content, 'lxml') except AttributeError: LOGGER.warn('No response for CAISO today outlook renewables') return None
[docs] def parse_todays_outlook_renewables(self, soup, ts): # set up storage parsed_data = [] # get all renewables values for (id_name, fuel_name) in [('totalrenewables', 'renewable'), ('currentsolar', 'solar'), ('currentwind', 'wind')]: resource_soup = soup.find(id=id_name) if resource_soup: match = re.search('(?P<val>\d+.?\d+)\s+MW', resource_soup.string) if match: parsed_dp = { 'timestamp': ts, 'freq': self.FREQUENCY_CHOICES.tenmin, 'market': self.MARKET_CHOICES.tenmin, 'ba_name': self.NAME, 'gen_MW': float(match.group('val')), 'fuel_name': fuel_name, } parsed_data.append(parsed_dp) # actual 'renewable' value should be only renewables that aren't accounted for in other categories accounted_for_ren = 0 for dp in parsed_data: if dp['fuel_name'] != 'renewable': accounted_for_ren += dp['gen_MW'] for dp in parsed_data: if dp['fuel_name'] == 'renewable': dp['gen_MW'] -= accounted_for_ren return parsed_data
def _generation_latest(self, **kwargs): # set up parsed_data = [] # override market and freq to 10 minute self.options['market'] = self.MARKET_CHOICES.tenmin self.options['freq'] = self.FREQUENCY_CHOICES.tenmin # get "Today's Outlook" data soup = self.fetch_todays_outlook_renewables() if not soup: return [] # parse "Today's Outlook" data # get timestamp response = self.request(self.base_url_outlook+'systemconditions.html') ts = None if response: demand_soup = BeautifulSoup(response.content, 'lxml') ts = self.todays_outlook_time(demand_soup) parsed_data += self.parse_todays_outlook_renewables(soup, ts) if len(parsed_data) == 0: return parsed_data total_ren_MW = sum([dp['gen_MW'] for dp in parsed_data]) ts = parsed_data[0]['timestamp'] # get OASIS total gen data payload = self.construct_oasis_payload(queryname='ENE_SLRS', schedule='ALL') oasis_data = self.fetch_oasis(payload=payload) # parse OASIS data for dp in self.parse_oasis_slrs(oasis_data): if dp['timestamp'] == ts: dp['gen_MW'] -= total_ren_MW dp['freq'] = self.options['freq'] parsed_data.append(dp) return parsed_data # no matching OASIS data found, so return null return [] def _generation_forecast(self, **kwargs): # set up parsed_data = [] # get OASIS total gen data gen_payload = self.construct_oasis_payload(queryname='ENE_SLRS', schedule='ALL') gen_oasis_data = self.fetch_oasis(payload=gen_payload) gen_dps = self.parse_oasis_slrs(gen_oasis_data) # get OASIS renewable gen data ren_payload = self.construct_oasis_payload(queryname='SLD_REN_FCST') ren_oasis_data = self.fetch_oasis(payload=ren_payload) ren_dps = self.parse_oasis_renewable(ren_oasis_data) # set of times with both gen and renewable data times = set([dp['timestamp'] for dp in ren_dps]) & set([dp['timestamp'] for dp in gen_dps]) # handle renewables total_ren_MW = {} for dp in ren_dps: if dp['timestamp'] in times: # assemble renewable totals for each time try: total_ren_MW[dp['timestamp']] += dp['gen_MW'] except KeyError: total_ren_MW[dp['timestamp']] = dp['gen_MW'] # add to storage parsed_data.append(dp) # handle generation for dp in gen_dps: if dp['timestamp'] in times: # subtract off renewable totals dp['gen_MW'] -= total_ren_MW[dp['timestamp']] # add to storage parsed_data.append(dp) # return return parsed_data