Source code for grid2op.Chronics.gridStateFromFileWithForecasts

# Copyright (c) 2019-2020, RTE (https://www.rte-france.com)
# See AUTHORS.txt
# This Source Code Form is subject to the terms of the Mozilla Public License, version 2.0.
# If a copy of the Mozilla Public License, version 2.0 was not distributed with this file,
# you can obtain one at http://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
# This file is part of Grid2Op, Grid2Op a testbed platform to model sequential decision making in power systems.

import os
import copy
import numpy as np
import pandas as pd
from datetime import timedelta

from grid2op.dtypes import dt_float, dt_bool
from grid2op.Exceptions import (
    EnvError,
    IncorrectNumberOfLoads,
    IncorrectNumberOfLines,
    IncorrectNumberOfGenerators,
)
from grid2op.Exceptions import ChronicsError
from grid2op.Chronics.gridStateFromFile import GridStateFromFile


[docs]class GridStateFromFileWithForecasts(GridStateFromFile): """ An extension of :class:`GridStateFromFile` that implements the "forecast" functionality. Forecast are also read from a file. For this class, only 1 forecast per timestep is read. The "forecast" present in the file at row $i$ is the one available at the corresponding time step, so valid for the grid state at the next time step. To have more advanced forecasts, this class could be overridden. Attributes ---------- load_p_forecast: ``numpy.ndarray``, dtype: ``float`` Array used to store the forecasts of the load active values. load_q_forecast: ``numpy.ndarray``, dtype: ``float`` Array used to store the forecasts of the load reactive values. prod_p_forecast: ``numpy.ndarray``, dtype: ``float`` Array used to store the forecasts of the generator active production setpoint. prod_v_forecast: ``numpy.ndarray``, dtype: ``float`` Array used to store the forecasts of the generator voltage magnitude setpoint. """ MULTI_CHRONICS = False def __init__( self, path, sep=";", time_interval=timedelta(minutes=5), max_iter=-1, chunk_size=None, h_forecast=(5, ), ): self.load_p_forecast = None self.load_q_forecast = None self.prod_p_forecast = None self.prod_v_forecast = None # for when you read data in chunk self._order_load_p_forecasted = None self._order_load_q_forecasted = None self._order_prod_p_forecasted = None self._order_prod_v_forecasted = None self._data_already_in_mem = False # says if the "main" value from the base class had to be reloaded (used for chunk) self._nb_forecast = len(h_forecast) self._h_forecast = copy.deepcopy(h_forecast) self._check_hs_consistent(self._h_forecast, time_interval) # init base class GridStateFromFile.__init__( self, path, sep=sep, time_interval=time_interval, max_iter=max_iter, chunk_size=chunk_size, ) def _clear(self): super()._clear() self.load_p_forecast = None self.load_q_forecast = None self.prod_p_forecast = None self.prod_v_forecast = None # for when you read data in chunk self._order_load_p_forecasted = None self._order_load_q_forecasted = None self._order_prod_p_forecasted = None self._order_prod_v_forecasted = None self._data_already_in_mem = False # says if the "main" value from the base class had to be reloaded (used for chunk) def _check_hs_consistent(self, h_forecast, time_interval): prev = timedelta(minutes=0) for i, h in enumerate(h_forecast): prev += time_interval if prev.total_seconds() // 60 != h: raise ChronicsError("For now you cannot build non contiuguous forecast. " "Forecast should look like [5, 10, 15, 20] " "but not [10, 15, 20] (missing h=5mins) or [5, 10, 20] (missing h=15)") def _get_next_chunk_forecasted(self): load_p = None load_q = None prod_p = None prod_v = None if self._data_chunk["load_p_forecasted"] is not None: load_p = next(self._data_chunk["load_p_forecasted"]) if self._data_chunk["load_q_forecasted"] is not None: load_q = next(self._data_chunk["load_q_forecasted"]) if self._data_chunk["prod_p_forecasted"] is not None: prod_p = next(self._data_chunk["prod_p_forecasted"]) if self._data_chunk["prod_v_forecasted"] is not None: prod_v = next(self._data_chunk["prod_v_forecasted"]) return load_p, load_q, prod_p, prod_v def _data_in_memory(self): res = super()._data_in_memory() self._data_already_in_mem = res return res
[docs] def initialize( self, order_backend_loads, order_backend_prods, order_backend_lines, order_backend_subs, names_chronics_to_backend=None, ): """ The same condition as :class:`GridStateFromFile.initialize` applies also for :attr:`GridStateFromFileWithForecasts.load_p_forecast`, :attr:`GridStateFromFileWithForecasts.load_q_forecast`, :attr:`GridStateFromFileWithForecasts.prod_p_forecast`, and :attr:`GridStateFromFileWithForecasts.prod_v_forecast`. Parameters ---------- See help of :func:`GridValue.initialize` for a detailed help about the _parameters. """ super().initialize( order_backend_loads, order_backend_prods, order_backend_lines, order_backend_subs, names_chronics_to_backend, ) if self.chunk_size is not None: chunk_size = self.chunk_size * self._nb_forecast else: chunk_size = None if self._max_iter > 0: nrows_to_load = (self._max_iter + 1) * self._nb_forecast load_p_iter = self._get_data("load_p_forecasted", chunk_size, nrows_to_load) load_q_iter = self._get_data("load_q_forecasted", chunk_size, nrows_to_load) prod_p_iter = self._get_data("prod_p_forecasted", chunk_size, nrows_to_load) prod_v_iter = self._get_data("prod_v_forecasted", chunk_size, nrows_to_load) hazards = None # no hazards in forecast maintenance = None # maintenance are read from the real data and propagated in the chronics if self.chunk_size is None: load_p = load_p_iter load_q = load_q_iter prod_p = prod_p_iter prod_v = prod_v_iter else: self._data_chunk["load_p_forecasted"] = load_p_iter self._data_chunk["load_q_forecasted"] = load_q_iter self._data_chunk["prod_p_forecasted"] = prod_p_iter self._data_chunk["prod_v_forecasted"] = prod_v_iter load_p, load_q, prod_p, prod_v = self._get_next_chunk_forecasted() order_backend_loads = {el: i for i, el in enumerate(order_backend_loads)} order_backend_prods = {el: i for i, el in enumerate(order_backend_prods)} order_backend_lines = {el: i for i, el in enumerate(order_backend_lines)} ( order_chronics_load_p, order_backend_load_q, order_backend_prod_p, order_backend_prod_v, order_backend_hazards, order_backend_maintenance, ) = self._get_orders( load_p, load_q, prod_p, prod_v, hazards, maintenance, order_backend_loads, order_backend_prods, order_backend_lines, ) self._order_load_p_forecasted = np.argsort(order_chronics_load_p) self._order_load_q_forecasted = np.argsort(order_backend_load_q) self._order_prod_p_forecasted = np.argsort(order_backend_prod_p) self._order_prod_v_forecasted = np.argsort(order_backend_prod_v) self._init_attrs_forecast( load_p, load_q, prod_p, prod_v )
def _init_attrs_forecast(self, load_p, load_q, prod_p, prod_v): # TODO refactor that with _init_attrs from super() self.load_p_forecast = None self.load_q_forecast = None self.prod_p_forecast = None self.prod_v_forecast = None if load_p is not None: self.load_p_forecast = copy.deepcopy( load_p.values[:, self._order_load_p_forecasted].astype(dt_float) ) if load_q is not None: self.load_q_forecast = copy.deepcopy( load_q.values[:, self._order_load_q_forecasted].astype(dt_float) ) if prod_p is not None: self.prod_p_forecast = copy.deepcopy( prod_p.values[:, self._order_prod_p_forecasted].astype(dt_float) ) if prod_v is not None: self.prod_v_forecast = copy.deepcopy( prod_v.values[:, self._order_prod_v_forecasted].astype(dt_float) )
[docs] def check_validity(self, backend): super(GridStateFromFileWithForecasts, self).check_validity(backend) at_least_one = False if self.load_p_forecast is not None: if self.load_p_forecast.shape[1] != backend.n_load: raise IncorrectNumberOfLoads( "for the active part. It should be {} but is in fact {}" "".format(backend.n_load, len(self.load_p)) ) at_least_one = True if self.load_q_forecast is not None: if self.load_q_forecast.shape[1] != backend.n_load: raise IncorrectNumberOfLoads( "for the reactive part. It should be {} but is in fact {}" "".format(backend.n_load, len(self.load_q)) ) at_least_one = True if self.prod_p_forecast is not None: if self.prod_p_forecast.shape[1] != backend.n_gen: raise IncorrectNumberOfGenerators( "for the active part. It should be {} but is in fact {}" "".format(backend.n_gen, len(self.prod_p)) ) at_least_one = True if self.prod_v_forecast is not None: if self.prod_v_forecast.shape[1] != backend.n_gen: raise IncorrectNumberOfGenerators( "for the voltage part. It should be {} but is in fact {}" "".format(backend.n_gen, len(self.prod_v)) ) at_least_one = True if not at_least_one: raise ChronicsError( "You used a class that read forecasted data, yet there is no forecasted data in" '"{}". Please fall back to using class "GridStateFromFile" instead of ' '"{}"'.format(self.path, type(self)) ) for name_arr, arr in zip( ["load_q", "load_p", "prod_v", "prod_p"], [ self.load_q_forecast, self.load_p_forecast, self.prod_v_forecast, self.prod_p_forecast ], ): if arr is not None: if self.chunk_size is None: if arr.shape[0] < self.n_ * self._nb_forecast: raise EnvError( "Array for forecast {}_forecasted as not the same number of rows of {} x nb_forecast. " "The chronics cannot be loaded properly.".format(name_arr, name_arr) )
def _load_next_chunk_in_memory_forecast(self): # i load the next chunk as dataframes load_p, load_q, prod_p, prod_v = self._get_next_chunk_forecasted() # i put these dataframes in the right order (columns) self._init_attrs_forecast(load_p, load_q, prod_p, prod_v) # resetting the index has been done in _load_next_chunk_in_memory, or at least it should have
[docs] def forecasts(self): """ This is the major difference between :class:`GridStateFromFileWithForecasts` and :class:`GridStateFromFile`. It returns non empty forecasts. As explained in the :func:`GridValue.forecasts`, forecasts are made of list of tuple. Each tuple having exactly 2 elements: 1. Is the time stamp of the forecast 2. An :class:`grid2op.BaseAction` representing the modification of the powergrid after the forecast. For this class, only the forecast of the next time step is given, and only for the injections and maintenance. Returns ------- See :func:`GridValue.forecasts` for more information. """ if not self._data_already_in_mem: try: self._load_next_chunk_in_memory_forecast() except StopIteration as exc_: raise exc_ res = [] for h_id, h in enumerate(self._h_forecast): res_d = {} dict_ = {} indx_to_look = self._nb_forecast * self.current_index + h_id if self.load_p_forecast is not None: dict_["load_p"] = dt_float( 1.0 * self.load_p_forecast[indx_to_look, :] ) if self.load_q_forecast is not None: dict_["load_q"] = dt_float( 1.0 * self.load_q_forecast[indx_to_look, :] ) if self.prod_p_forecast is not None: dict_["prod_p"] = dt_float( 1.0 * self.prod_p_forecast[indx_to_look, :] ) if self.prod_v_forecast is not None: dict_["prod_v"] = dt_float( 1.0 * self.prod_v_forecast[indx_to_look, :] ) if dict_: res_d["injection"] = dict_ forecast_datetime = self.current_datetime + timedelta(minutes=h) res.append((forecast_datetime, res_d)) return res
[docs] def get_id(self) -> str: return self.path
def _init_res_split(self, nb_rows): res_load_p_f = None res_load_q_f = None res_prod_p_f = None res_prod_v_f = None if self.prod_p_forecast is not None: res_prod_p_f = np.zeros((nb_rows, self.n_gen), dtype=dt_float) if self.prod_v_forecast is not None: res_prod_v_f = np.zeros((nb_rows, self.n_gen), dtype=dt_float) if self.load_p_forecast is not None: res_load_p_f = np.zeros((nb_rows, self.n_load), dtype=dt_float) if self.load_q_forecast is not None: res_load_q_f = np.zeros((nb_rows, self.n_load), dtype=dt_float) res = super()._init_res_split(nb_rows) res += tuple( [res_prod_p_f, res_prod_v_f, res_load_p_f, res_load_q_f] ) return res def _update_res_split(self, i, tmp, *arrays): ( *args_super, res_prod_p_f, res_prod_v_f, res_load_p_f, res_load_q_f ) = arrays super()._update_res_split(i, tmp, *args_super) if res_prod_p_f is not None: res_prod_p_f[i, :] = tmp._extract_array("prod_p_forecast") if res_prod_v_f is not None: res_prod_v_f[i, :] = tmp._extract_array("prod_v_forecast") if res_load_p_f is not None: res_load_p_f[i, :] = tmp._extract_array("load_p_forecast") if res_load_q_f is not None: res_load_q_f[i, :] = tmp._extract_array("load_q_forecast") def _clean_arrays(self, i, *arrays): ( *args_super, res_prod_p_f, res_prod_v_f, res_load_p_f, res_load_q_f ) = arrays res = super()._clean_arrays(i, *args_super) if res_prod_p_f is not None: res_prod_p_f = res_prod_p_f[:i, :] if res_prod_v_f is not None: res_prod_v_f = res_prod_v_f[:i, :] if res_load_p_f is not None: res_load_p_f = res_load_p_f[:i, :] if res_load_q_f is not None: res_load_q_f = res_load_q_f[:i, :] res += tuple( [res_prod_p_f, res_prod_v_f, res_load_p_f, res_load_q_f] ) return res def _get_name_arrays_for_saving(self): res = super()._get_name_arrays_for_saving() res += [ "prod_p_forecasted", "prod_v_forecasted", "load_p_forecasted", "load_q_forecasted" ] return res def _get_colorder_arrays_for_saving(self): res = super()._get_colorder_arrays_for_saving() res += tuple( [ self._order_backend_prods, self._order_backend_prods, self._order_backend_loads, self._order_backend_loads ] ) return res