Source code for pandaprosumer.run_time_series

import pandas as pd
import tqdm

from pandapower.control import get_controller_order
from pandapower.create import _get_multiple_index_with_check
from pandapower.timeseries import DFData
from pandapower.timeseries.run_time_series import run_loop
from pandaprosumer.run_control import run_control, prepare_run_ctrl

try:
    import pandaplan.core.pplog as pplog
except ImportError:
    import logging as pplog

logger = pplog.getLogger(__name__)
logger.setLevel(level=pplog.WARNING)


[docs] def run_timeseries(prosumer, period_index, verbose=True): start = prosumer.period.at[period_index, 'start'] end = prosumer.period.at[period_index, 'end'] resol = int(prosumer.period.at[period_index, 'resolution_s']) dur = pd.date_range(start, end, freq='%ss' % resol, tz=prosumer.period.at[period_index, 'timezone']) #control_diagnostic_pandaprosumer(prosumer, start, end, resol) ts_variables = init_time_series(prosumer, dur, verbose) time_series_initialization(ts_variables['controller_order']) run_loop(prosumer, ts_variables, output_writer_fct=output_writer_fct, evaluate_net_fct=evaluate_prosumer_fct, run_control_fct=run_control) time_series_finalization(ts_variables['controller_order'])
def time_series_initialization(controller_order): retrieve_data(controller_order, 'time_series_initialization') def time_series_finalization(controller_order): retrieve_data(controller_order, 'time_series_finalization') def retrieve_data(controller_order, fct_name): ctrl_list = [] for levelorder in controller_order: for ctrl, prosumer in levelorder: if hasattr(ctrl, 'has_elements') and ctrl.has_elements: # FixMe: Should be has_elements or has_period ? #if ctrl in ctrl_list: # continue #else: ctrl_list += [ctrl] fct = getattr(ctrl, fct_name, None) if fct is None or 'time_series' not in prosumer: continue res = fct(prosumer) data = [DFData(pd.DataFrame(entry, columns=ctrl.result_columns, index=ctrl.time_index)) for entry in res] index = _get_multiple_index_with_check(prosumer, 'time_series', None, len(data)) columns = ['name', 'element', 'element_index', 'period_index', 'data_source'] for i, idx in enumerate(index): name = prosumer[ctrl.element_name].loc[ctrl.element_index[i], 'name'] prosumer['time_series'].loc[idx, columns] = (name, ctrl.element_name, int(ctrl.element_index[i]), ctrl.period_index, data[i]) # else: # name = ctrl.name # prosumer['time_series'].at[idx, columns] = (name, 'controller', int(ctrl.index), # ctrl.obj.period_index, ctrl.location_index, data[i]) def control_diagnostic_pandaprosumer(prosumer, start, end, resolution_s): _, controller_order = get_controller_order(prosumer, prosumer.controller) for levelorder in controller_order: for ctrl, _ in levelorder: if hasattr(ctrl, 'period_index') and (ctrl.start != start or ctrl.end != end or ctrl.resol != resolution_s): raise (UserWarning(r'if you run run_timeseries, all controllers interacting with each other ' r'need to refer to the same period_index')) def output_writer_fct(prosumer, time_step, pf_converged, ctrl_converged, ts_variables): pass def evaluate_prosumer_fct(prosumer, levelorder, ctrl_variables, **kwargs): return ctrl_variables def init_time_series(prosumer, time_steps, verbose=True, **kwargs): """ inits the time series calculation creates the dict ts_variables, which includes necessary variables for the time series / control function INPUT: **net** - The pandapower format network **time_steps** (list or tuple, None) - time_steps to calculate as list or tuple (start, stop) if None, all time steps from provided data source are simulated OPTIONAL: **continue_on_divergence** (bool, False) - If True time series calculation continues in case of errors. **verbose** (bool, True) - prints progress bar or logger debug messages """ ts_variables = prepare_run_ctrl(prosumer, **kwargs) ts_variables['time_steps'] = time_steps ts_variables['verbose'] = verbose if logger.level != 10 and verbose: # simple progress bar ts_variables['progress_bar'] = tqdm.tqdm(total=len(time_steps)) return ts_variables