Source code for grid2op.Observation.baseObservation

# Copyright (c) 2019-2020, RTE (https://www.rte-france.com)
# See AUTHORS.txt
# This Source Code Form is subject to the terms of the Mozilla Public License, version 2.0.
# If a copy of the Mozilla Public License, version 2.0 was not distributed with this file,
# you can obtain one at http://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
# This file is part of Grid2Op, Grid2Op a testbed platform to model sequential decision making in power systems.

import copy
import datetime
import warnings
import networkx
from abc import abstractmethod
import numpy as np
from scipy.sparse import csr_matrix

from grid2op.dtypes import dt_int, dt_float, dt_bool
from grid2op.Exceptions import (
    Grid2OpException,
    NoForecastAvailable,
    EnvError,
    BaseObservationError,
)
from grid2op.Space import GridObjects

# TODO have a method that could do "forecast" by giving the _injection by the agent,
# TODO if he wants to make custom forecasts

# TODO fix "bug" when action not initalized it should return nan in to_vect

# TODO be consistent with gen_* and prod_* also in dictionaries

ERROR_ONLY_SINGLE_EL = "You can only the inspect the effect of an action on one single element"

[docs]class BaseObservation(GridObjects): """ Basic class representing an observation. All observation must derive from this class and implement all its abstract methods. Attributes ---------- action_helper: :class:`grid2op.Action.ActionSpace` A representation of the possible action space. year: ``int`` The current year month: ``int`` The current month (1 = january, 12 = december) day: ``int`` The current day of the month (1 = first day of the month) hour_of_day: ``int`` The current hour of the day (from O to 23) minute_of_hour: ``int`` The current minute of the current hour (from 0 to 59) day_of_week: ``int`` The current day of the week (monday = 0 and sunday = 6) support_theta: ``bool`` This flag indicates whether the backend supports the retrieval of the voltage angle. If so (which is the case for most backend) then some supplementary attributes are available, such as :attr:`BaseObservation.gen_theta`, :attr:`BaseObservation.load_theta`, :attr:`BaseObservation.storage_theta`, :attr:`BaseObservation.theta_or` or :attr:`BaseObservation.theta_ex` . gen_p: :class:`numpy.ndarray`, dtype:float The active production value of each generator (expressed in MW). (the old name "prod_p" is still usable) gen_q: :class:`numpy.ndarray`, dtype:float The reactive production value of each generator (expressed in MVar). (the old name "prod_q" is still usable) gen_v: :class:`numpy.ndarray`, dtype:float The voltage magnitude of the bus to which each generator is connected (expressed in kV). (the old name "prod_v" is still usable) gen_theta: :class:`numpy.ndarray`, dtype:float The voltage angle (in degree) of the bus to which each generator is connected. Only availble if the backend supports the retrieval of voltage angles (see :attr:`BaseObservation.support_theta`). load_p: :class:`numpy.ndarray`, dtype:float The active load value of each consumption (expressed in MW). load_q: :class:`numpy.ndarray`, dtype:float The reactive load value of each consumption (expressed in MVar). load_v: :class:`numpy.ndarray`, dtype:float The voltage magnitude of the bus to which each consumption is connected (expressed in kV). load_theta: :class:`numpy.ndarray`, dtype:float The voltage angle (in degree) of the bus to which each consumption is connected. Only availble if the backend supports the retrieval of voltage angles (see :attr:`BaseObservation.support_theta`). p_or: :class:`numpy.ndarray`, dtype:float The active power flow at the origin end of each powerline (expressed in MW). q_or: :class:`numpy.ndarray`, dtype:float The reactive power flow at the origin end of each powerline (expressed in MVar). v_or: :class:`numpy.ndarray`, dtype:float The voltage magnitude at the bus to which the origin end of each powerline is connected (expressed in kV). theta_or: :class:`numpy.ndarray`, dtype:float The voltage angle at the bus to which the origin end of each powerline is connected (expressed in degree). Only availble if the backend supports the retrieval of voltage angles (see :attr:`BaseObservation.support_theta`). a_or: :class:`numpy.ndarray`, dtype:float The current flow at the origin end of each powerline (expressed in A). p_ex: :class:`numpy.ndarray`, dtype:float The active power flow at the extremity end of each powerline (expressed in MW). q_ex: :class:`numpy.ndarray`, dtype:float The reactive power flow at the extremity end of each powerline (expressed in MVar). v_ex: :class:`numpy.ndarray`, dtype:float The voltage magnitude at the bus to which the extremity end of each powerline is connected (expressed in kV). theta_ex: :class:`numpy.ndarray`, dtype:float The voltage angle at the bus to which the extremity end of each powerline is connected (expressed in degree). Only availble if the backend supports the retrieval of voltage angles (see :attr:`BaseObservation.support_theta`). a_ex: :class:`numpy.ndarray`, dtype:float The current flow at the extremity end of each powerline (expressed in A). rho: :class:`numpy.ndarray`, dtype:float The capacity of each powerline. It is defined at the observed current flow divided by the thermal limit of each powerline (no unit) topo_vect: :class:`numpy.ndarray`, dtype:int For each object (load, generator, ends of a powerline) it gives on which bus this object is connected in its substation. See :func:`grid2op.Backend.Backend.get_topo_vect` for more information. line_status: :class:`numpy.ndarray`, dtype:bool Gives the status (connected / disconnected) for every powerline (``True`` at position `i` means the powerline `i` is connected) timestep_overflow: :class:`numpy.ndarray`, dtype:int Gives the number of time steps since a powerline is in overflow. time_before_cooldown_line: :class:`numpy.ndarray`, dtype:int For each powerline, it gives the number of time step the powerline is unavailable due to "cooldown" (see :attr:`grid2op.Parameters.NB_TIMESTEP_COOLDOWN_LINE` for more information). 0 means the an action will be able to act on this same powerline, a number > 0 (eg 1) means that an action at this time step cannot act on this powerline (in the example the agent have to wait 1 time step) time_before_cooldown_sub: :class:`numpy.ndarray`, dtype:int Same as :attr:`BaseObservation.time_before_cooldown_line` but for substations. For each substation, it gives the number of timesteps to wait before acting on this substation (see see :attr:`grid2op.Parameters.NB_TIMESTEP_COOLDOWN_SUB` for more information). time_next_maintenance: :class:`numpy.ndarray`, dtype:int For each powerline, it gives the time of the next planned maintenance. For example if there is: - `1` at position `i` it means that the powerline `i` will be disconnected for maintenance operation at the next time step. - `0` at position `i` means that powerline `i` is disconnected from the powergrid for maintenance operation at the current time step. - `-1` at position `i` means that powerline `i` will not be disconnected for maintenance reason for this episode. - `k` > 1 at position `i` it means that the powerline `i` will be disconnected for maintenance operation at in `k` time steps When a powerline is "in maintenance", it cannot be reconnected by the `Agent` before the end of this maintenance. duration_next_maintenance: :class:`numpy.ndarray`, dtype:int For each powerline, it gives the number of time step that the maintenance will last (if any). This means that, if at position `i` of this vector: - there is a `0`: the powerline is not disconnected from the grid for maintenance - there is a `1`, `2`, ... the powerline will be disconnected for at least `1`, `2`, ... timestep (**NB** in all case, the powerline will stay disconnected until a :class:`grid2op.BaseAgent.BaseAgent` performs the proper :class:`grid2op.BaseAction.BaseAction` to reconnect it). When a powerline is "in maintenance", it cannot be reconnected by the `Agent` before the end of this maintenance. target_dispatch: :class:`numpy.ndarray`, dtype:float For **each** generators, it gives the target redispatching, asked by the agent. This is the sum of all redispatching asked by the agent for during all the episode. It for each generator it is a number between: - pmax and pmax. Note that there is information about all generators there, even the one that are not dispatchable. actual_dispatch: :class:`numpy.ndarray`, dtype:float For **each** generators, it gives the redispatching currently implemented by the environment. Indeed, the environment tries to implement at best the :attr:`BaseObservation.target_dispatch`, but sometimes, due to physical limitation (pmin, pmax, ramp min and ramp max) it cannot. In this case, only the best possible redispatching is implemented at the current time step, and this is what this vector stores. Note that there is information about all generators there, even the one that are not dispatchable. storage_charge: :class:`numpy.ndarray`, dtype:float The actual 'state of charge' of each storage unit, expressed in MWh. storage_power_target: :class:`numpy.ndarray`, dtype:float For each storage units, give the setpoint of production / consumption as given by the agent storage_power: :class:`numpy.ndarray`, dtype:float Give the actual storage production / loads at the given state. storage_theta: :class:`numpy.ndarray`, dtype:float The voltage angle (in degree) of the bus to which each storage units is connected. Only availble if the backend supports the retrieval of voltage angles (see :attr:`BaseObservation.support_theta`). gen_p_before_curtail: :class:`numpy.ndarray`, dtype:float Give the production of renewable generator there would have been if no curtailment were applied (**NB** it returns 0.0 for non renewable generators that cannot be curtailed) curtailment_limit: :class:`numpy.ndarray`, dtype:float Limit (in ratio of gen_pmax) imposed on each renewable generator as set by the agent. It is always 1. if no curtailment actions is acting on the generator. This is the "curtailment" given in the action by the agent. curtailment_limit_effective: :class:`numpy.ndarray`, dtype:float Limit (in ratio of gen_pmax) imposed on each renewable generator effectively imposed by the environment. It matches :attr:`BaseObservation.curtailment_limit` if `param.LIMIT_INFEASIBLE_CURTAILMENT_STORAGE_ACTION` is ``False`` (default) otherwise the environment is able to limit the curtailment actions if too much power would be needed to compensate the "loss" of generation due to renewables. It is always 1. if no curtailment actions is acting on the generator. curtailment_mw: :class:`numpy.ndarray`, dtype:float Gives the amount of power curtailed for each generator (it is 0. for all non renewable generators) This is NOT the "curtailment" given in the action by the agent. curtailment: :class:`numpy.ndarray`, dtype:float Give the power curtailed for each generator. It is expressed in ratio of gen_pmax (so between 0. - meaning no curtailment in effect for this generator - to 1.0 - meaning this generator should have produced pmax, but a curtailment action limits it to 0.) This is NOT the "curtailment" given in the action by the agent. current_step: ``int`` Current number of step performed up until this observation (NB this is not given in the observation if it is transformed into a vector) max_step: ``int`` Maximum number of steps possible for this episode delta_time: ``float`` Time (in minutes) between the last step and the current step (usually constant in an episode, even in an environment) is_alarm_illegal: ``bool`` whether the last alarm has been illegal (due to budget constraint). It can only be ``True`` if an alarm was raised by the agent on the previous step. Otherwise it is always ``False`` time_since_last_alarm: ``int`` Number of steps since the last successful alarm has been raised. It is `-1` if no alarm has been raised yet. last_alarm: :class:`numpy.ndarray`, dtype:int For each zones, gives how many steps since the last alarm was raised successfully for this zone attention_budget: ``int`` The current attention budget was_alarm_used_after_game_over: ``bool`` Was the last alarm used to compute anything related to the attention budget when there was a game over (can only be set to ``True`` if the observation corresponds to a game over, but not necessarily) gen_margin_up: :class:`numpy.ndarray`, dtype:float From how much can you increase each generators production between this step and the next. It is always 0. for non renewable generators. For the others it is defined as `np.minimum(type(self).gen_pmax - self.gen_p, self.gen_max_ramp_up)` gen_margin_down: :class:`numpy.ndarray`, dtype:float From how much can you decrease each generators production between this step and the next. It is always 0. for non renewable generators. For the others it is defined as `np.minimum(self.gen_p - type(self).gen_pmin, self.gen_max_ramp_down)` _shunt_p: :class:`numpy.ndarray`, dtype:float Shunt active value (only available if shunts are available) (in MW) _shunt_q: :class:`numpy.ndarray`, dtype:float Shunt reactive value (only available if shunts are available) (in MVAr) _shunt_v: :class:`numpy.ndarray`, dtype:float Shunt voltage (only available if shunts are available) (in kV) _shunt_bus: :class:`numpy.ndarray`, dtype:float Bus (-1 disconnected, 1 for bus 1, 2 for bus 2) at which each shunt is connected (only available if shunts are available) """ _attr_eq = [ "line_status", "topo_vect", "timestep_overflow", "gen_p", "gen_q", "gen_v", "load_p", "load_q", "load_v", "p_or", "q_or", "v_or", "a_or", "p_ex", "q_ex", "v_ex", "a_ex", "time_before_cooldown_line", "time_before_cooldown_sub", "time_next_maintenance", "duration_next_maintenance", "target_dispatch", "actual_dispatch", "_shunt_p", "_shunt_q", "_shunt_v", "_shunt_bus", # storage "storage_charge", "storage_power_target", "storage_power", # curtailment "gen_p_before_curtail", "curtailment", "curtailment_limit", "curtailment_limit_effective", # attention budget "is_alarm_illegal", "time_since_last_alarm", "last_alarm", "attention_budget", "was_alarm_used_after_game_over", # gen up / down "gen_margin_up", "gen_margin_down", ] attr_list_vect = None # value to assess if two observations are equal _tol_equal = 1e-3
[docs] def __init__(self, obs_env=None, action_helper=None, random_prng=None): GridObjects.__init__(self) self._is_done = True self.random_prng = random_prng self.action_helper = action_helper # handles the forecasts here self._forecasted_grid_act = {} self._forecasted_inj = [] self._obs_env = obs_env # calendar data self.year = dt_int(1970) self.month = dt_int(1) self.day = dt_int(1) self.hour_of_day = dt_int(0) self.minute_of_hour = dt_int(0) self.day_of_week = dt_int(0) self.timestep_overflow = np.empty(shape=(self.n_line,), dtype=dt_int) # 0. (line is disconnected) / 1. (line is connected) self.line_status = np.empty(shape=self.n_line, dtype=dt_bool) # topological vector self.topo_vect = np.empty(shape=self.dim_topo, dtype=dt_int) # generators information self.gen_p = np.empty(shape=self.n_gen, dtype=dt_float) self.gen_q = np.empty(shape=self.n_gen, dtype=dt_float) self.gen_v = np.empty(shape=self.n_gen, dtype=dt_float) self.gen_margin_up = np.empty(shape=self.n_gen, dtype=dt_float) self.gen_margin_down = np.empty(shape=self.n_gen, dtype=dt_float) # loads information self.load_p = np.empty(shape=self.n_load, dtype=dt_float) self.load_q = np.empty(shape=self.n_load, dtype=dt_float) self.load_v = np.empty(shape=self.n_load, dtype=dt_float) # lines origin information self.p_or = np.empty(shape=self.n_line, dtype=dt_float) self.q_or = np.empty(shape=self.n_line, dtype=dt_float) self.v_or = np.empty(shape=self.n_line, dtype=dt_float) self.a_or = np.empty(shape=self.n_line, dtype=dt_float) # lines extremity information self.p_ex = np.empty(shape=self.n_line, dtype=dt_float) self.q_ex = np.empty(shape=self.n_line, dtype=dt_float) self.v_ex = np.empty(shape=self.n_line, dtype=dt_float) self.a_ex = np.empty(shape=self.n_line, dtype=dt_float) # lines relative flows self.rho = np.empty(shape=self.n_line, dtype=dt_float) # cool down and reconnection time after hard overflow, soft overflow or cascading failure self.time_before_cooldown_line = np.empty(shape=self.n_line, dtype=dt_int) self.time_before_cooldown_sub = np.empty(shape=self.n_sub, dtype=dt_int) self.time_next_maintenance = 1 * self.time_before_cooldown_line self.duration_next_maintenance = 1 * self.time_before_cooldown_line # redispatching self.target_dispatch = np.empty(shape=self.n_gen, dtype=dt_float) self.actual_dispatch = np.empty(shape=self.n_gen, dtype=dt_float) # storage unit self.storage_charge = np.empty(shape=self.n_storage, dtype=dt_float) # in MWh self.storage_power_target = np.empty( shape=self.n_storage, dtype=dt_float ) # in MW self.storage_power = np.empty(shape=self.n_storage, dtype=dt_float) # in MW # attention budget self.is_alarm_illegal = np.ones(shape=1, dtype=dt_bool) self.time_since_last_alarm = np.empty(shape=1, dtype=dt_int) self.last_alarm = np.empty(shape=self.dim_alarms, dtype=dt_int) self.attention_budget = np.empty(shape=1, dtype=dt_float) self.was_alarm_used_after_game_over = np.zeros(shape=1, dtype=dt_bool) # to save some computation time self._connectivity_matrix_ = None self._bus_connectivity_matrix_ = None self._dictionnarized = None self._vectorized = None # for shunt (these are not stored!) if self.shunts_data_available: self._shunt_p = np.empty(shape=self.n_shunt, dtype=dt_float) self._shunt_q = np.empty(shape=self.n_shunt, dtype=dt_float) self._shunt_v = np.empty(shape=self.n_shunt, dtype=dt_float) self._shunt_bus = np.empty(shape=self.n_shunt, dtype=dt_int) self._thermal_limit = np.empty(shape=self.n_line, dtype=dt_float) self.gen_p_before_curtail = np.empty(shape=self.n_gen, dtype=dt_float) self.curtailment = np.empty(shape=self.n_gen, dtype=dt_float) self.curtailment_limit = np.empty(shape=self.n_gen, dtype=dt_float) self.curtailment_limit_effective = np.empty(shape=self.n_gen, dtype=dt_float) # the "theta" (voltage angle, in degree) self.support_theta = False self.theta_or = np.empty(shape=self.n_line, dtype=dt_float) self.theta_ex = np.empty(shape=self.n_line, dtype=dt_float) self.load_theta = np.empty(shape=self.n_load, dtype=dt_float) self.gen_theta = np.empty(shape=self.n_gen, dtype=dt_float) self.storage_theta = np.empty(shape=self.n_storage, dtype=dt_float) # counter self.current_step = dt_int(0) self.max_step = dt_int(np.iinfo(dt_int).max) self.delta_time = dt_float(5.0)
def _aux_copy(self, other): attr_simple = [ "max_step", "current_step", "support_theta", "day_of_week", "minute_of_hour", "hour_of_day", "day", "month", "year", "delta_time", "_is_done", ] attr_vect = [ "storage_theta", "gen_theta", "load_theta", "theta_ex", "theta_or", "curtailment_limit", "curtailment", "gen_p_before_curtail", "_thermal_limit", "is_alarm_illegal", "time_since_last_alarm", "last_alarm", "attention_budget", "was_alarm_used_after_game_over", "storage_power", "storage_power_target", "storage_charge", "actual_dispatch", "target_dispatch", "duration_next_maintenance", "time_next_maintenance", "time_before_cooldown_sub", "time_before_cooldown_line", "rho", "a_ex", "v_ex", "q_ex", "p_ex", "a_or", "v_or", "q_or", "p_or", "load_p", "load_q", "load_v", "gen_p", "gen_q", "gen_v", "topo_vect", "line_status", "timestep_overflow", "gen_margin_up", "gen_margin_down", "curtailment_limit_effective", ] if self.shunts_data_available: attr_vect += ["_shunt_bus", "_shunt_v", "_shunt_q", "_shunt_p"] for attr_nm in attr_simple: setattr(other, attr_nm, getattr(self, attr_nm)) for attr_nm in attr_vect: getattr(other, attr_nm)[:] = getattr(self, attr_nm) def __copy__(self): res = type(self)(obs_env=self._obs_env, action_helper=self.action_helper) # copy regular attributes self._aux_copy(other=res) # just copy res._connectivity_matrix_ = copy.copy(self._connectivity_matrix_) res._bus_connectivity_matrix_ = copy.copy(self._bus_connectivity_matrix_) res._dictionnarized = copy.copy(self._dictionnarized) res._vectorized = copy.copy(self._vectorized) # handles the forecasts here res._forecasted_grid_act = copy.copy(self._forecasted_grid_act) res._forecasted_inj = copy.copy(self._forecasted_inj) return res def __deepcopy__(self, memodict={}): res = type(self)(obs_env=self._obs_env, action_helper=self.action_helper) # copy regular attributes self._aux_copy(other=res) # just deepcopy res._connectivity_matrix_ = copy.deepcopy(self._connectivity_matrix_, memodict) res._bus_connectivity_matrix_ = copy.deepcopy( self._bus_connectivity_matrix_, memodict ) res._dictionnarized = copy.deepcopy(self._dictionnarized, memodict) res._vectorized = copy.deepcopy(self._vectorized, memodict) # handles the forecasts here res._forecasted_grid_act = copy.deepcopy(self._forecasted_grid_act, memodict) res._forecasted_inj = copy.deepcopy(self._forecasted_inj, memodict) return res
[docs] def state_of( self, _sentinel=None, load_id=None, gen_id=None, line_id=None, storage_id=None, substation_id=None, ): """ Return the state of this action on a give unique load, generator unit, powerline of substation. Only one of load, gen, line or substation should be filled. The querry of these objects can only be done by id here (ie by giving the integer of the object in the backed). The :class:`ActionSpace` has some utilities to access them by name too. Parameters ---------- _sentinel: ``None`` Used to prevent positional parameters. Internal, do not use. load_id: ``int`` ID of the load we want to inspect gen_id: ``int`` ID of the generator we want to inspect line_id: ``int`` ID of the powerline we want to inspect line_id: ``int`` ID of the powerline we want to inspect storage_id: ``int`` ID of the storage unit we want to inspect substation_id: ``int`` ID of the substation unit we want to inspect Returns ------- res: :class:`dict` A dictionary with keys and value depending on which object needs to be inspected: - if a load is inspected, then the keys are: - "p" the active value consumed by the load - "q" the reactive value consumed by the load - "v" the voltage magnitude of the bus to which the load is connected - "theta" (optional) the voltage angle (in degree) of the bus to which the load is connected - "bus" on which bus the load is connected in the substation - "sub_id" the id of the substation to which the load is connected - if a generator is inspected, then the keys are: - "p" the active value produced by the generator - "q" the reactive value consumed by the generator - "v" the voltage magnitude of the bus to which the generator is connected - "theta" (optional) the voltage angle (in degree) of the bus to which the gen. is connected - "bus" on which bus the generator is connected in the substation - "sub_id" the id of the substation to which the generator is connected - "actual_dispatch" the actual dispatch implemented for this generator - "target_dispatch" the target dispatch (cumulation of all previously asked dispatch by the agent) for this generator - if a powerline is inspected then the keys are "origin" and "extremity" each being dictionary with keys: - "p" the active flow on line side (extremity or origin) - "q" the reactive flow on line side (extremity or origin) - "v" the voltage magnitude of the bus to which the line side (extremity or origin) is connected - "theta" (optional) the voltage angle (in degree) of the bus to which line side (extremity or origin) is connected - "bus" on which bus the line side (extremity or origin) is connected in the substation - "sub_id" the id of the substation to which the line side is connected - "a" the current flow on the line side (extremity or origin) In the case of a powerline, additional information are: - "maintenance": information about the maintenance operation (time of the next maintenance and duration of this next maintenance. - "cooldown_time": for how many timestep i am not supposed to act on the powerline due to cooldown (see :attr:`grid2op.Parameters.Parameters.NB_TIMESTEP_COOLDOWN_LINE` for more information) - if a storage unit is inspected, information are: - "storage_power": the power the unit actually produced / absorbed - "storage_charge": the state of the charge of the storage unit - "storage_power_target": the power production / absorbtion targer - "storage_theta": (optional) the voltage angle of the bus at which the storage unit is connected - "bus": the bus (1 or 2) to which the storage unit is connected - "sub_id" : the id of the substation to which the sotrage unit is connected - if a substation is inspected, it returns the topology to this substation in a dictionary with keys: - "topo_vect": the representation of which object is connected where - "nb_bus": number of active buses in this substations - "cooldown_time": for how many timestep i am not supposed to act on the substation due to cooldown (see :attr:`grid2op.Parameters.Parameters.NB_TIMESTEP_COOLDOWN_SUB` for more information) Notes ----- This function can only be used to retrieve the state of the element of the grid, and not the alarm sent or not, to the operator. Raises ------ Grid2OpException If _sentinel is modified, or if None of the arguments are set or alternatively if 2 or more of the parameters are being set. """ if _sentinel is not None: raise Grid2OpException( "action.effect_on should only be called with named argument." ) if ( load_id is None and gen_id is None and line_id is None and substation_id is None and storage_id is None ): raise Grid2OpException( "You ask the state of an object in a observation without specifying the object id. " 'Please provide "load_id", "gen_id", "line_id", "storage_id" or ' '"substation_id"' ) if load_id is not None: if ( gen_id is not None or line_id is not None or substation_id is not None or storage_id is not None ): raise Grid2OpException(ERROR_ONLY_SINGLE_EL) if load_id >= len(self.load_p): raise Grid2OpException( 'There are no load of id "load_id={}" in this grid.'.format(load_id) ) if load_id < 0: raise Grid2OpException("`load_id` should be a positive integer") res = { "p": self.load_p[load_id], "q": self.load_q[load_id], "v": self.load_v[load_id], "bus": self.topo_vect[self.load_pos_topo_vect[load_id]], "sub_id": self.load_to_subid[load_id], } if self.support_theta: res["theta"] = self.load_theta[load_id] elif gen_id is not None: if ( line_id is not None or substation_id is not None or storage_id is not None ): raise Grid2OpException(ERROR_ONLY_SINGLE_EL) if gen_id >= len(self.gen_p): raise Grid2OpException( 'There are no generator of id "gen_id={}" in this grid.'.format( gen_id ) ) if gen_id < 0: raise Grid2OpException("`gen_id` should be a positive integer") res = { "p": self.gen_p[gen_id], "q": self.gen_q[gen_id], "v": self.gen_v[gen_id], "bus": self.topo_vect[self.gen_pos_topo_vect[gen_id]], "sub_id": self.gen_to_subid[gen_id], "target_dispatch": self.target_dispatch[gen_id], "actual_dispatch": self.target_dispatch[gen_id], "curtailment": self.curtailment[gen_id], "curtailment_limit": self.curtailment_limit[gen_id], "curtailment_limit_effective": self.curtailment_limit_effective[gen_id], "p_before_curtail": self.gen_p_before_curtail[gen_id], "margin_up": self.gen_margin_up[gen_id], "margin_down": self.gen_margin_down[gen_id], } if self.support_theta: res["theta"] = self.gen_theta[gen_id] elif line_id is not None: if substation_id is not None or storage_id is not None: raise Grid2OpException(ERROR_ONLY_SINGLE_EL) if line_id >= len(self.p_or): raise Grid2OpException( 'There are no powerline of id "line_id={}" in this grid.'.format( line_id ) ) if line_id < 0: raise Grid2OpException("`line_id` should be a positive integer") res = {} # origin information res["origin"] = { "p": self.p_or[line_id], "q": self.q_or[line_id], "v": self.v_or[line_id], "a": self.a_or[line_id], "bus": self.topo_vect[self.line_or_pos_topo_vect[line_id]], "sub_id": self.line_or_to_subid[line_id], } if self.support_theta: res["origin"]["theta"] = self.theta_or[line_id] # extremity information res["extremity"] = { "p": self.p_ex[line_id], "q": self.q_ex[line_id], "v": self.v_ex[line_id], "a": self.a_ex[line_id], "bus": self.topo_vect[self.line_ex_pos_topo_vect[line_id]], "sub_id": self.line_ex_to_subid[line_id], } if self.support_theta: res["origin"]["theta"] = self.theta_ex[line_id] # maintenance information res["maintenance"] = { "next": self.time_next_maintenance[line_id], "duration_next": self.duration_next_maintenance[line_id], } # cooldown res["cooldown_time"] = self.time_before_cooldown_line[line_id] elif storage_id is not None: if substation_id is not None: raise Grid2OpException(ERROR_ONLY_SINGLE_EL) if storage_id >= self.n_storage: raise Grid2OpException( 'There are no storage unit with id "storage_id={}" in this grid.'.format( storage_id ) ) if storage_id < 0: raise Grid2OpException("`storage_id` should be a positive integer") res = {} res["storage_power"] = self.storage_power[storage_id] res["storage_charge"] = self.storage_charge[storage_id] res["storage_power_target"] = self.storage_power_target[storage_id] res["bus"] = self.topo_vect[self.storage_pos_topo_vect[storage_id]] res["sub_id"] = self.storage_to_subid[storage_id] if self.support_theta: res["theta"] = self.storage_theta[storage_id] else: if substation_id >= len(self.sub_info): raise Grid2OpException( 'There are no substation of id "substation_id={}" in this grid.'.format( substation_id ) ) beg_ = int(np.sum(self.sub_info[:substation_id])) end_ = int(beg_ + self.sub_info[substation_id]) topo_sub = self.topo_vect[beg_:end_] if np.any(topo_sub > 0): nb_bus = ( np.max(topo_sub[topo_sub > 0]) - np.min(topo_sub[topo_sub > 0]) + 1 ) else: nb_bus = 0 res = { "topo_vect": topo_sub, "nb_bus": nb_bus, "cooldown_time": self.time_before_cooldown_sub[substation_id], } return res
[docs] @classmethod def process_shunt_data(cls): if not cls.shunts_data_available: # this is really important, otherwise things from grid2op base types will be affected cls.attr_list_vect = copy.deepcopy(cls.attr_list_vect) cls.attr_list_set = copy.deepcopy(cls.attr_list_set) # remove the shunts from the list to vector for el in ["_shunt_p", "_shunt_q", "_shunt_v", "_shunt_bus"]: if el in cls.attr_list_vect: try: cls.attr_list_vect.remove(el) except ValueError: pass cls.attr_list_set = set(cls.attr_list_vect) return super().process_shunt_data()
[docs] @classmethod def process_grid2op_compat(cls): if cls.glop_version == cls.BEFORE_COMPAT_VERSION: # oldest version: no storage and no curtailment available # this is really important, otherwise things from grid2op base types will be affected cls.attr_list_vect = copy.deepcopy(cls.attr_list_vect) cls.attr_list_set = copy.deepcopy(cls.attr_list_set) # deactivate storage cls.set_no_storage() for el in ["storage_charge", "storage_power_target", "storage_power"]: if el in cls.attr_list_vect: try: cls.attr_list_vect.remove(el) except ValueError: pass # remove the curtailment for el in ["gen_p_before_curtail", "curtailment", "curtailment_limit"]: if el in cls.attr_list_vect: try: cls.attr_list_vect.remove(el) except ValueError: pass cls.attr_list_set = set(cls.attr_list_vect) if cls.glop_version < "1.6.0" or cls.glop_version == cls.BEFORE_COMPAT_VERSION: # this feature did not exist before and was introduced in grid2op 1.6.0 cls.attr_list_vect = copy.deepcopy(cls.attr_list_vect) cls.attr_list_set = copy.deepcopy(cls.attr_list_set) cls.dim_alarms = 0 for el in [ "is_alarm_illegal", "time_since_last_alarm", "last_alarm", "attention_budget", "was_alarm_used_after_game_over", ]: try: cls.attr_list_vect.remove(el) except ValueError as exc_: # this attribute was not there in the first place pass for el in ["_shunt_p", "_shunt_q", "_shunt_v", "_shunt_bus"]: # added in grid2op 1.6.0 mainly for the EpisodeReboot try: cls.attr_list_vect.remove(el) except ValueError as exc_: # this attribute was not there in the first place pass cls.attr_list_set = set(cls.attr_list_vect) if cls.glop_version < "1.6.4" or cls.glop_version == cls.BEFORE_COMPAT_VERSION: # "current_step", "max_step" were added in grid2Op 1.6.4 cls.attr_list_vect = copy.deepcopy(cls.attr_list_vect) cls.attr_list_set = copy.deepcopy(cls.attr_list_set) for el in ["max_step", "current_step"]: try: cls.attr_list_vect.remove(el) except ValueError as exc_: # this attribute was not there in the first place pass cls.attr_list_set = set(cls.attr_list_vect) if cls.glop_version < "1.6.5" or cls.glop_version == cls.BEFORE_COMPAT_VERSION: # "current_step", "max_step" were added in grid2Op 1.6.5 cls.attr_list_vect = copy.deepcopy(cls.attr_list_vect) cls.attr_list_set = copy.deepcopy(cls.attr_list_set) for el in ["delta_time"]: try: cls.attr_list_vect.remove(el) except ValueError as exc_: # this attribute was not there in the first place pass cls.attr_list_set = set(cls.attr_list_vect) if cls.glop_version < "1.6.6" or cls.glop_version == cls.BEFORE_COMPAT_VERSION: # "gen_margin_up", "gen_margin_down" were added in grid2Op 1.6.6 cls.attr_list_vect = copy.deepcopy(cls.attr_list_vect) cls.attr_list_set = copy.deepcopy(cls.attr_list_set) for el in [ "gen_margin_up", "gen_margin_down", "curtailment_limit_effective", ]: try: cls.attr_list_vect.remove(el) except ValueError as exc_: # this attribute was not there in the first place pass cls.attr_list_set = set(cls.attr_list_vect)
[docs] def reset(self): """ INTERNAL .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ Resetting a single observation is unlikely to do what you want to do. Reset the :class:`BaseObservation` to a blank state, where everything is set to either ``None`` or to its default value. """ self._is_done = True # 0. (line is disconnected) / 1. (line is connected) self.line_status[:] = True # topological vector self.topo_vect[:] = 0 # generators information self.gen_p[:] = np.NaN self.gen_q[:] = np.NaN self.gen_v[:] = np.NaN # loads information self.load_p[:] = np.NaN self.load_q[:] = np.NaN self.load_v[:] = np.NaN # lines origin information self.p_or[:] = np.NaN self.q_or[:] = np.NaN self.v_or[:] = np.NaN self.a_or[:] = np.NaN # lines extremity information self.p_ex[:] = np.NaN self.q_ex[:] = np.NaN self.v_ex[:] = np.NaN self.a_ex[:] = np.NaN # lines relative flows self.rho[:] = np.NaN # cool down and reconnection time after hard overflow, soft overflow or cascading failure self.time_before_cooldown_line[:] = -1 self.time_before_cooldown_sub[:] = -1 self.time_next_maintenance[:] = -1 self.duration_next_maintenance[:] = -1 self.timestep_overflow[:] = 0 # calendar data self.year = dt_int(1970) self.month = dt_int(0) self.day = dt_int(0) self.hour_of_day = dt_int(0) self.minute_of_hour = dt_int(0) self.day_of_week = dt_int(0) # forecasts self._forecasted_inj = [] self._forecasted_grid_act = {} # redispatching self.target_dispatch[:] = np.NaN self.actual_dispatch[:] = np.NaN # storage units self.storage_charge[:] = np.NaN self.storage_power_target[:] = np.NaN self.storage_power[:] = np.NaN # to save up computation time self._dictionnarized = None self._connectivity_matrix_ = None self._bus_connectivity_matrix_ = None if self.shunts_data_available: self._shunt_p[:] = np.NaN self._shunt_q[:] = np.NaN self._shunt_v[:] = np.NaN self._shunt_bus[:] = -1 self.support_theta = False self.theta_or[:] = np.NaN self.theta_ex[:] = np.NaN self.load_theta[:] = np.NaN self.gen_theta[:] = np.NaN self.storage_theta[:] = np.NaN # alarm feature self.is_alarm_illegal[:] = False self.time_since_last_alarm[:] = -1 self.last_alarm[:] = False self.attention_budget[:] = 0 self.was_alarm_used_after_game_over[:] = False self.current_step = dt_int(0) self.max_step = dt_int(np.iinfo(dt_int).max) self.delta_time = dt_float(5.0)
[docs] def set_game_over(self, env=None): """ Set the observation to the "game over" state: - all powerlines are disconnected - all loads are 0. - all prods are 0. - etc. Notes ----- As some attributes are initialized with `np.empty` it is recommended to reset here all attributes to avoid non deterministic behaviour. """ self._is_done = True self.gen_p[:] = 0.0 self.gen_q[:] = 0.0 self.gen_v[:] = 0.0 self.gen_margin_up[:] = 0.0 self.gen_margin_down[:] = 0.0 # loads information self.load_p[:] = 0.0 self.load_q[:] = 0.0 self.load_v[:] = 0.0 # lines origin information self.p_or[:] = 0.0 self.q_or[:] = 0.0 self.v_or[:] = 0.0 self.a_or[:] = 0.0 # lines extremity information self.p_ex[:] = 0.0 self.q_ex[:] = 0.0 self.v_ex[:] = 0.0 self.a_ex[:] = 0.0 # lines relative flows self.rho[:] = 0.0 # line status self.line_status[:] = False # topological vector self.topo_vect[:] = -1 # forecasts self._forecasted_inj = [] self._forecasted_grid_act = {} # redispatching self.target_dispatch[:] = 0.0 self.actual_dispatch[:] = 0.0 # storage self.storage_charge[:] = 0.0 self.storage_power_target[:] = 0.0 self.storage_power[:] = 0.0 # curtailment self.curtailment[:] = 0.0 self.curtailment_limit[:] = 1.0 self.curtailment_limit_effective[:] = 1.0 self.gen_p_before_curtail[:] = 0.0 # cooldown self.time_before_cooldown_line[:] = 0 self.time_before_cooldown_sub[:] = 0 self.time_next_maintenance[:] = -1 self.duration_next_maintenance[:] = 0 # overflow self.timestep_overflow[:] = 0 if self.shunts_data_available: self._shunt_p[:] = 0.0 self._shunt_q[:] = 0.0 self._shunt_v[:] = 0.0 self._shunt_bus[:] = -1 if env is None: # set an old date (as i don't know anything about the env) self.year = 1970 self.month = 1 self.day = 1 self.hour_of_day = 0 self.minute_of_hour = 0 self.day_of_week = 1 else: # retrieve the date from the environment self.year = dt_int(env.time_stamp.year) self.month = dt_int(env.time_stamp.month) self.day = dt_int(env.time_stamp.day) self.hour_of_day = dt_int(env.time_stamp.hour) self.minute_of_hour = dt_int(env.time_stamp.minute) self.day_of_week = dt_int(env.time_stamp.weekday()) if env is not None: self._thermal_limit[:] = env.get_thermal_limit() else: self._thermal_limit[:] = 0. # by convention, I say it's 0 if the grid is in total blackout self.theta_or[:] = 0.0 self.theta_ex[:] = 0.0 self.load_theta[:] = 0.0 self.gen_theta[:] = 0.0 self.storage_theta[:] = 0.0 # counter if env is not None: self.current_step = dt_int(env.nb_time_step) self.max_step = dt_int(env.max_episode_duration()) # stuff related to alarm self.is_alarm_illegal[:] = False self.time_since_last_alarm[:] = -1 self.last_alarm[:] = False self.attention_budget[:] = 0 if env is not None: self.was_alarm_used_after_game_over[:] = env._is_alarm_used_in_reward else: self.was_alarm_used_after_game_over[:] = False
def __compare_stats(self, other, name): attr_me = getattr(self, name) attr_other = getattr(other, name) if attr_me is None and attr_other is not None: return False if attr_me is not None and attr_other is None: return False if attr_me is not None: if attr_me.shape != attr_other.shape: return False if attr_me.dtype != attr_other.dtype: return False if np.issubdtype(attr_me.dtype, np.dtype(dt_float).type): # first special case: there can be Nan there me_finite = np.isfinite(attr_me) oth_finite = np.isfinite(attr_other) if np.any(me_finite != oth_finite): return False # special case of floating points, otherwise vector are never equal if not np.all( np.abs(attr_me[me_finite] - attr_other[oth_finite]) <= self._tol_equal ): return False else: if not np.all(attr_me == attr_other): return False return True
[docs] def __eq__(self, other): """ INTERNAL .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ Test the equality of two observations. 2 actions are said to be identical if the have the same impact on the powergrid. This is unlrelated to their respective class. For example, if an BaseAction is of class :class:`BaseAction` and doesn't act on the _injection, it can be equal to a an BaseAction of derived class :class:`TopologyAction` (if the topological modification are the same of course). This implies that the attributes :attr:`BaseAction.authorized_keys` is not checked in this method. Note that if 2 actions doesn't act on the same powergrid, or on the same backend (eg number of loads, or generators is not the same in *self* and *other*, or they are not in the same order) then action will be declared as different. **Known issue** if two backend are different, but the description of the _grid are identical (ie all n_gen, n_load, n_line, sub_info, dim_topo, all vectors \*_to_subid, and \*_pos_topo_vect are identical) then this method will not detect the backend are different, and the action could be declared as identical. For now, this is only a theoretical behaviour: if everything is the same, then probably, up to the naming convention, then the powergrid are identical too. Parameters ---------- other: :class:`BaseObservation` An instance of class BaseAction to which "self" will be compared. Returns ------- ``True`` if the action are equal, ``False`` otherwise. """ if self.year != other.year: return False if self.month != other.month: return False if self.day != other.day: return False if self.day_of_week != other.day_of_week: return False if self.hour_of_day != other.hour_of_day: return False if self.minute_of_hour != other.minute_of_hour: return False # check that the underlying grid is the same in both instances same_grid = type(self).same_grid_class(type(other)) if not same_grid: return False for stat_nm in self._attr_eq: if not self.__compare_stats(other, stat_nm): # one of the above stat is not equal in this and in other return False return True
[docs] def __sub__(self, other): """ computes the difference between two observation, and return an observation corresponding to this difference. This can be used to easily plot the difference between two observations at different step for example. """ same_grid = type(self).same_grid_class(type(other)) if not same_grid: raise Grid2OpException( "Cannot compare to observation not coming from the same powergrid." ) tmp_obs_env = self._obs_env self._obs_env = None # keep aside the backend res = copy.deepcopy(self) self._obs_env = tmp_obs_env for stat_nm in self._attr_eq: me_ = getattr(self, stat_nm) oth_ = getattr(other, stat_nm) if me_ is None and oth_ is None: diff_ = None elif me_ is not None and oth_ is None: diff_ = me_ elif me_ is None and oth_ is not None: if oth_.dtype == dt_bool: diff_ = np.full(oth_.shape, fill_value=False, dtype=dt_bool) else: diff_ = -oth_ else: # both are not None if oth_.dtype == dt_bool: diff_ = ~np.logical_xor(me_, oth_) else: diff_ = me_ - oth_ res.__setattr__(stat_nm, diff_) return res
[docs] def where_different(self, other): """ Returns the difference between two observation. Parameters ---------- other: Other action to compare Returns ------- diff_: :class:`grid2op.Observation.BaseObservation` The observation showing the difference between `self` and `other` attr_nm: ``list`` List of string representing the names of the different attributes. It's [] if the two observations are identical. """ diff_ = self - other res = [] for attr_nm in self._attr_eq: array_ = getattr(diff_, attr_nm) if array_.dtype == dt_bool: if np.any(~array_): res.append(attr_nm) else: if (array_.shape[0] > 0) and np.max(np.abs(array_)): res.append(attr_nm) return diff_, res
[docs] @abstractmethod def update(self, env, with_forecast=True): """ INTERNAL .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ This is carried out automatically by the environment in `env.step` Update the actual instance of BaseObservation with the new received value from the environment. An observation is a description of the powergrid perceived by an agent. The agent takes his decision based on the current observation and the past rewards. This method `update` receive complete detailed information about the powergrid, but that does not mean an agent sees everything. For example, it is possible to derive this class to implement some noise in the generator or load, or flows to mimic sensor inaccuracy. It is also possible to give fake information about the topology, the line status etc. In the Grid2Op framework it's also through the observation that the agent has access to some forecast (the way forecast are handled depends are implemented in this class). For example, forecast data (retrieved thanks to `chronics_handler`) are processed, but can be processed differently. One can apply load / production forecast to each _grid state, or to make forecast for one "reference" _grid state valid a whole day and update this one only etc. All these different mechanisms can be implemented in Grid2Op framework by overloading the `update` observation method. This class is really what a dispatcher observes from it environment. It can also include some temperatures, nebulosity, wind etc. can also be included in this class. Notes ----- We strongly recommend to call :attr:`BaseObservation.reset` when implementing this function. """ pass
[docs] def connectivity_matrix(self, as_csr_matrix=False): """ Computes and return the "connectivity matrix" `con_mat`. Let "dim_topo := 2 * n_line + n_prod + n_conso + n_storage" (the total number of elements on the grid) It is a matrix of size dim_topo, dim_topo, with values 0 or 1. For two objects (lines extremity, generator unit, load) i,j : - if i and j are connected on the same substation: - if `conn_mat[i,j] = 0` it means the objects id'ed i and j are not connected to the same bus. - if `conn_mat[i,j] = 1` it means the objects id'ed i and j are connected to the same bus - if i and j are not connected on the same substation then`conn_mat[i,j] = 0` except if i and j are the two extremities of the same power line, in this case `conn_mat[i,j] = 1` (if the powerline is in service or 0 otherwise). By definition, the diagonal is made of 0. Returns ------- res: ``numpy.ndarray``, shape:dim_topo,dim_topo, dtype:float The connectivity matrix, as defined above Notes ------- Matrix can be either a sparse matrix or a dense matrix depending on the argument `as_csr_matrix` An object, is not disconnected, is always connected to itself. Examples --------- If you want to know if powerline 0 is connected at its "extremity" side with the load of id 0 you can do .. code-block:: python import grid2op env = grid2op.make() obs = env.reset() # retrieve the id of extremity of powerline 1: id_lineex_0 = obs.line_ex_pos_topo_vect[0] id_load_1 = obs.load_pos_topo_vect[0] # get the connectivity matrix connectivity_matrix = obs.connectivity_matrix() # know if the objects are connected or not are_connected = connectivity_matrix[id_lineex_0, id_load_1] # as `are_connected` is 1.0 then these objects are indeed connected And now, supposes we do an action that changes the topology of the substation to which these two objects are connected, then we get (same example continues) .. code-block:: python topo_action = env.action_space({"set_bus": {"substations_id": [(1, [1,1,1,2,2,2])]}}) print(topo_action) # This action will: # - NOT change anything to the injections # - NOT perform any redispatching action # - NOT force any line status # - NOT switch any line status # - NOT switch anything in the topology # - Set the bus of the following element: # - assign bus 1 to line (extremity) 0 [on substation 1] # - assign bus 1 to line (origin) 2 [on substation 1] # - assign bus 1 to line (origin) 3 [on substation 1] # - assign bus 2 to line (origin) 4 [on substation 1] # - assign bus 2 to generator 0 [on substation 1] # - assign bus 2 to load 0 [on substation 1] obs, reward, done, info = env.step(topo_action) # and now retrieve the matrix connectivity_matrix = obs.connectivity_matrix() # know if the objects are connected or not are_connected = connectivity_matrix[id_lineex_0, id_load_1] # as `are_connected` is 0.0 then these objects are not connected anymore # this is visible when you "print" the action (see above) in the two following lines: # - assign bus 1 to line (extremity) 0 [on substation 1] # - assign bus 2 to load 0 [on substation 1] # -> one of them is on bus 1 [line (extremity) 0] and the other on bus 2 [load 0] """ if ( self._connectivity_matrix_ is None or ( isinstance(self._connectivity_matrix_, csr_matrix) and not as_csr_matrix ) or ( (not isinstance(self._connectivity_matrix_, csr_matrix)) and as_csr_matrix ) ): # self._connectivity_matrix_ = np.zeros(shape=(self.dim_topo, self.dim_topo), dtype=dt_float) # fill it by block for the objects beg_ = 0 end_ = 0 row_ind = [] col_ind = [] for sub_id, nb_obj in enumerate(self.sub_info): # it must be a vanilla python integer, otherwise it's not handled by some backend # especially if written in c++ nb_obj = int(nb_obj) end_ += nb_obj # tmp = np.zeros(shape=(nb_obj, nb_obj), dtype=dt_float) for obj1 in range(nb_obj): my_bus = self.topo_vect[beg_ + obj1] if my_bus == -1: # object is disconnected, nothing is done continue # connect an object to itself row_ind.append(beg_ + obj1) col_ind.append(beg_ + obj1) # connect the other objects to it for obj2 in range(obj1 + 1, nb_obj): my_bus2 = self.topo_vect[beg_ + obj2] if my_bus2 == -1: # object is disconnected, nothing is done continue if my_bus == my_bus2: # objects are on the same bus # tmp[obj1, obj2] = 1 # tmp[obj2, obj1] = 1 row_ind.append(beg_ + obj2) col_ind.append(beg_ + obj1) row_ind.append(beg_ + obj1) col_ind.append(beg_ + obj2) beg_ += nb_obj # both ends of a line are connected together (if line is connected) for q_id in range(self.n_line): if self.line_status[q_id]: # if powerline is connected connect both its side row_ind.append(self.line_or_pos_topo_vect[q_id]) col_ind.append(self.line_ex_pos_topo_vect[q_id]) row_ind.append(self.line_ex_pos_topo_vect[q_id]) col_ind.append(self.line_or_pos_topo_vect[q_id]) row_ind = np.array(row_ind).astype(dt_int) col_ind = np.array(col_ind).astype(dt_int) if not as_csr_matrix: self._connectivity_matrix_ = np.zeros( shape=(self.dim_topo, self.dim_topo), dtype=dt_float ) self._connectivity_matrix_[row_ind.T, col_ind] = 1.0 else: data = np.ones(row_ind.shape[0], dtype=dt_float) self._connectivity_matrix_ = csr_matrix( (data, (row_ind, col_ind)), shape=(self.dim_topo, self.dim_topo), dtype=dt_float, ) return self._connectivity_matrix_
def _aux_fun_get_bus(self): """see in bus_connectivity matrix""" bus_or = self.topo_vect[self.line_or_pos_topo_vect] bus_ex = self.topo_vect[self.line_ex_pos_topo_vect] connected = (bus_or > 0) & (bus_ex > 0) bus_or = bus_or[connected] bus_ex = bus_ex[connected] bus_or = self.line_or_to_subid[connected] + (bus_or - 1) * self.n_sub bus_ex = self.line_ex_to_subid[connected] + (bus_ex - 1) * self.n_sub unique_bus = np.unique(np.concatenate((bus_or, bus_ex))) unique_bus = np.sort(unique_bus) nb_bus = unique_bus.shape[0] return nb_bus, unique_bus, bus_or, bus_ex
[docs] def bus_connectivity_matrix(self, as_csr_matrix=False, return_lines_index=False): """ If we denote by `nb_bus` the total number bus of the powergrid (you can think of a "bus" being a "node" if you represent a powergrid as a graph [mathematical object, not a plot] with the lines being the "edges"]. The `bus_connectivity_matrix` will have a size nb_bus, nb_bus and will be made of 0 and 1. If `bus_connectivity_matrix[i,j] = 1` then at least a power line connects bus i and bus j. Otherwise, nothing connects it. .. warning:: The matrix returned by this function has not a fixed size. Its number of nodes and edges can change depending on the state of the grid. See :ref:`get-the-graph-gridgraph` for more information. Also, note that when "done=True" this matrix has size (1, 1) and contains only 0. Parameters ---------- as_csr_matrix: ``bool`` Whether to return the bus connectivity matrix as a sparse matrix (csr format) or as a dense matrix. By default it's ``False`` meaning a dense matrix is returned. return_lines_index: ``bool`` Whether to also return the bus index associated to both side of each powerline. Returns ------- res: ``numpy.ndarray``, shape: (nb_bus, nb_bus) dtype:float The bus connectivity matrix defined above. Notes ------ By convention we say that a bus is connected to itself. So the diagonal of this matrix is 1. Examples -------- Here is how you can use this function: .. code-block:: python bus_bus_graph, (line_or_bus, line_ex_bus) = obs.bus_connectivity_matrix(return_lines_index=True) # bus_bus_graph is the matrix described above. # line_or_bus[0] give the id of the bus to which the origin side of powerline 0 is connected # line_ex_bus[0] give the id of the bus to which the extremity side of powerline 0 is connected # (NB: if the powerline is disconnected, both are -1) # this means that if line 0 is connected: bus_bus_graph[line_or_bus[0], line_ex_bus[0]] = 1 # and bus_bus_graph[line_ex_bus[0], line_or_bus[0]] = 1 # (of course you can replace 0 with any integer `0 <= l_id < obs.n_line` """ if self._is_done: self._bus_connectivity_matrix_ = None nb_bus = 1 if as_csr_matrix: tmp_ = csr_matrix((1,1), dtype=dt_float) else: tmp_ = np.zeros(shape=(nb_bus, nb_bus), dtype=dt_float) if not return_lines_index: res = tmp_ else: cls = type(self) lor_bus = np.zeros(cls.n_line, dtype=dt_int) lex_bus = np.zeros(cls.n_line, dtype=dt_int) res = (tmp_, lor_bus, lex_bus) return res if ( self._bus_connectivity_matrix_ is None or ( isinstance(self._bus_connectivity_matrix_, csr_matrix) and not as_csr_matrix ) or ( (not isinstance(self._bus_connectivity_matrix_, csr_matrix)) and as_csr_matrix ) or return_lines_index ): nb_bus, unique_bus, bus_or, bus_ex = self._aux_fun_get_bus() # convert the bus id (from 0 to 2 * n_sub) to the row / column in the matrix (number between 0 and nb_bus) all_indx = np.arange(nb_bus) tmplate = np.arange(np.max(unique_bus) + 1) tmplate[unique_bus] = all_indx bus_or_in_mat = tmplate[bus_or] bus_ex_in_mat = tmplate[bus_ex] if not as_csr_matrix: self._bus_connectivity_matrix_ = np.zeros( shape=(nb_bus, nb_bus), dtype=dt_float ) self._bus_connectivity_matrix_[bus_or_in_mat, bus_ex_in_mat] = 1.0 self._bus_connectivity_matrix_[bus_ex_in_mat, bus_or_in_mat] = 1.0 self._bus_connectivity_matrix_[all_indx, all_indx] = 1.0 else: data = np.ones( nb_bus + bus_or_in_mat.shape[0] + bus_ex_in_mat.shape[0], dtype=dt_float, ) row_ind = np.concatenate((all_indx, bus_or_in_mat, bus_ex_in_mat)) col_ind = np.concatenate((all_indx, bus_ex_in_mat, bus_or_in_mat)) self._bus_connectivity_matrix_ = csr_matrix( (data, (row_ind, col_ind)), shape=(nb_bus, nb_bus), dtype=dt_float ) if not return_lines_index: res = self._bus_connectivity_matrix_ else: # bus or and bus ex are defined above is return_line_index is True lor_bus, _ = self._get_bus_id( self.line_or_pos_topo_vect, self.line_or_to_subid ) lex_bus, _ = self._get_bus_id( self.line_ex_pos_topo_vect, self.line_ex_to_subid ) res = (self._bus_connectivity_matrix_, (tmplate[lor_bus], tmplate[lex_bus])) return res
def _get_bus_id(self, id_topo_vect, sub_id): """ get the bus id with the internal convention that: - if object on bus 1, its bus is `sub_id` - if object on bus 2, its bus is `sub_id` + n_sub - if object on bus 3, its bus is `sub_id` + 2 * n_sub - etc. """ bus_id = 1 * self.topo_vect[id_topo_vect] connected = bus_id > 0 bus_id[connected] = sub_id[connected] + (bus_id[connected] - 1) * self.n_sub return bus_id, connected
[docs] def flow_bus_matrix(self, active_flow=True, as_csr_matrix=False): """ A matrix of size "nb bus" "nb bus". Each row and columns represent a "bus" of the grid ("bus" is a power system word that for computer scientist means "nodes" if the powergrid is represented as a graph). See the note in case of a grid in "game over" mode. The diagonal will sum the power produced and consumed at each bus. The other element of each **row** of this matrix will be the flow of power from the bus represented by the line i to the bus represented by column j. .. warning:: The matrix returned by this function has not a fixed size. Its number of nodes and edges can change depending on the state of the grid. See :ref:`get-the-graph-gridgraph` for more information. Also, note that when "done=True" this matrix has size (1, 1) and contains only 0. Notes ------ When the observation is in a "done" state (*eg* there has been a game over) then this function returns a "matrix" of dimension (1,1) [yes, yes it's a scalar] with only one element that is 0. In this case, `load_bus`, `prod_bus`, `stor_bus`, `lor_bus` and `lex_bus` are vectors full of 0. Parameters ---------- active_flow: ``bool`` Whether to get the active flow (in MW) or the reactive flow (in MVAr). Defaults to active flow. as_csr_matrix: ``bool`` Whether to retrieve the results as a scipy csr sparse matrix or as a dense matrix (default) Returns ------- res: ``matrix`` Which can either be a sparse matrix or a dense matrix depending on the value of the argument "as_csr_matrix". mappings: ``tuple`` The mapping that makes the correspondence between each object and the bus to which it is connected. It is made of 4 elements: (load_bus, prod_bus, stor_bus, lor_bus, lex_bus). For example if `load_bus[i] = 14` it means that the load with id `i` is connected to the bus 14. If `load_bus[i] = -1` then the object is disconnected. Examples -------- Here is how you can use this function: .. code-block:: python flow_mat, (load, prod, stor, ind_lor, ind_lex) = obs.flow_bus_matrix() # flow_mat is the matrix described above. Lots of information can be deduce from this matrix. For example if you want to know how much power goes from one bus say bus `i` to another bus (say bus `j` ) you can look at the associated coefficient `flow_mat[i,j]` which will also be related to the flow on the origin (or extremity) side of the powerline connecting bus `i` to bus `j` You can also know how much power (total generation + total storage discharging - total load - total storage charging - ) is injected at each bus `i` by looking at the `i` th diagonal coefficient. Another use would be to check if the current powergrid state (as seen by grid2op) meet the Kirchhoff circuit laws (conservation of energy), by doing the sum (row by row) of this matrix. `flow_mat.sum(axis=1)` """ if self._is_done: flow_mat = csr_matrix((1,1), dtype=dt_float) if not as_csr_matrix: flow_mat = flow_mat.toarray() cls = type(self) load_bus = np.zeros(cls.n_load, dtype=dt_int) prod_bus = np.zeros(cls.n_gen, dtype=dt_int) stor_bus = np.zeros(cls.n_storage, dtype=dt_int) lor_bus = np.zeros(cls.n_line, dtype=dt_int) lex_bus = np.zeros(cls.n_line, dtype=dt_int) return flow_mat, (load_bus, prod_bus, stor_bus, lor_bus, lex_bus) nb_bus, unique_bus, bus_or, bus_ex = self._aux_fun_get_bus() prod_bus, prod_conn = self._get_bus_id( self.gen_pos_topo_vect, self.gen_to_subid ) load_bus, load_conn = self._get_bus_id( self.load_pos_topo_vect, self.load_to_subid ) stor_bus, stor_conn = self._get_bus_id( self.storage_pos_topo_vect, self.storage_to_subid ) lor_bus, lor_conn = self._get_bus_id( self.line_or_pos_topo_vect, self.line_or_to_subid ) lex_bus, lex_conn = self._get_bus_id( self.line_ex_pos_topo_vect, self.line_ex_to_subid ) if self.shunts_data_available: sh_bus = 1 * self._shunt_bus sh_bus[sh_bus > 0] = ( self.shunt_to_subid[sh_bus > 0] * (sh_bus[sh_bus > 0] - 1) + self.shunt_to_subid[sh_bus > 0] ) sh_conn = self._shunt_bus != -1 # convert the bus to be "id of row or column in the matrix" instead of the bus id with # the "grid2op convention" all_indx = np.arange(nb_bus) tmplate = np.arange(np.max(unique_bus) + 1) tmplate[unique_bus] = all_indx prod_bus = tmplate[prod_bus] load_bus = tmplate[load_bus] lor_bus = tmplate[lor_bus] lex_bus = tmplate[lex_bus] stor_bus = tmplate[stor_bus] if active_flow: prod_vect = self.gen_p load_vect = self.load_p or_vect = self.p_or ex_vect = self.p_ex stor_vect = self.storage_power if self.shunts_data_available: sh_vect = self._shunt_p else: prod_vect = self.gen_q load_vect = self.load_q or_vect = self.q_or ex_vect = self.q_ex stor_vect = np.zeros(self.n_storage, dtype=dt_float) if self.shunts_data_available: sh_vect = self._shunt_q nb_lor = np.sum(lor_conn) nb_lex = np.sum(lex_conn) data = np.zeros(nb_bus + nb_lor + nb_lex, dtype=dt_float) # if two generators / loads / storage unit are connected at the same bus # this is why i go with matrix product and sparse matrices nb_prod = np.sum(prod_conn) if nb_prod: bus_prod = np.arange(prod_bus[prod_conn].max() + 1) map_mat = csr_matrix( (np.ones(nb_prod), (prod_bus[prod_conn], np.arange(nb_prod))), shape=(bus_prod.shape[0], nb_prod), dtype=dt_float, ) data[bus_prod] += map_mat.dot(prod_vect[prod_conn]) # handle load nb_load = np.sum(load_conn) if nb_load: bus_load = np.arange(load_bus[load_conn].max() + 1) map_mat = csr_matrix( (np.ones(nb_load), (load_bus[load_conn], np.arange(nb_load))), shape=(bus_load.shape[0], nb_load), dtype=dt_float, ) data[bus_load] -= map_mat.dot(load_vect[load_conn]) # handle storage nb_stor = np.sum(stor_conn) if nb_stor: bus_stor = np.arange(stor_bus[stor_conn].max() + 1) map_mat = csr_matrix( (np.ones(nb_stor), (stor_bus[stor_conn], np.arange(nb_stor))), shape=(bus_stor.shape[0], nb_stor), dtype=dt_float, ) data[bus_stor] -= map_mat.dot(stor_vect[stor_conn]) if self.shunts_data_available: # handle shunts nb_shunt = np.sum(sh_conn) if nb_shunt: bus_shunt = np.arange(sh_bus[sh_conn].max() + 1) map_mat = csr_matrix( (np.ones(nb_shunt), (sh_bus[sh_conn], np.arange(nb_shunt))), shape=(bus_shunt.shape[0], nb_shunt), dtype=dt_float, ) data[bus_shunt] -= map_mat.dot(sh_vect[sh_conn]) # powerlines data[np.arange(nb_lor) + nb_bus] -= or_vect[lor_conn] data[np.arange(nb_lex) + nb_bus + nb_lor] -= ex_vect[lex_conn] row_ind = np.concatenate((all_indx, lor_bus[lor_conn], lex_bus[lex_conn])) col_ind = np.concatenate((all_indx, lex_bus[lex_conn], lor_bus[lor_conn])) res = csr_matrix( (data, (row_ind, col_ind)), shape=(nb_bus, nb_bus), dtype=dt_float ) if not as_csr_matrix: res = res.toarray() return res, (load_bus, prod_bus, stor_bus, lor_bus, lex_bus)
def _add_edges_simple(self, vector, attr_nm, lor_bus, lex_bus, graph): """add the edges, when the attributes are common for the all the powerline""" dict_ = {(lor_bus[lid], lex_bus[lid]): val for lid, val in enumerate(vector)} dict_2 = {} for (k1, k2), val in dict_.items(): dict_2[(k2, k1)] = val dict_.update(dict_2) networkx.set_edge_attributes(graph, dict_, attr_nm) def _add_edges_multi(self, vector_or, vector_ex, attr_nm, lor_bus, lex_bus, graph): """ Utilities to add attributes of the edges of the graph in networkx, because edges are not necessarily "oriented" the same way (so we need to reverse or / ex if networkx oriented it in the same way) """ dict_or_glop = {} for lid, val in enumerate(vector_or): tup_ = (lor_bus[lid], lex_bus[lid]) if tup_ in dict_or_glop: dict_or_glop[tup_] += val else: dict_or_glop[tup_] = val dict_ex_glop = {} for lid, val in enumerate(vector_ex): tup_ = (lor_bus[lid], lex_bus[lid]) if tup_ in dict_ex_glop: dict_ex_glop[tup_] += val else: dict_ex_glop[tup_] = val dict_or = {} dict_ex = {} for (k1, k2), val in dict_or_glop.items(): if k1 < k2: # networkx put it in the right "direction" dict_or[(k1, k2)] = val else: # networkx and grid2op do not share the same "direction" dict_or[(k2, k1)] = dict_ex_glop[(k1, k2)] for (k1, k2), val in dict_ex_glop.items(): if k1 < k2: # networkx put it in the right "direction" dict_ex[(k1, k2)] = val else: # networkx and grid2op do not share the same "direction" dict_ex[(k2, k1)] = dict_or_glop[(k1, k2)] networkx.set_edge_attributes(graph, dict_or, "{}_or".format(attr_nm)) networkx.set_edge_attributes(graph, dict_ex, "{}_ex".format(attr_nm))
[docs] def as_networkx(self): """ Convert this observation as a networkx graph. Notes ------ The resulting graph is "frozen" this means that you cannot add / remove attribute on nodes or edges, nor add / remove edges or nodes. This graphs has the following properties: - it counts as many nodes as the number of buses of the grid - it counts less edges than the number of lines of the grid (two lines connecting the same buses are "merged" into one single edge - this is the case for parallel line, that are hence "merged" into the same edge) - nodes have attributes: - `p`: the active power produced at this node (negative means the sum of power produce minus power absorbed is negative) - `q`: the reactive power produced at this node - `v`: the voltage magnitude at this node - `cooldown`: how much longer you need to wait before being able to merge / split or change this node - edges have attributes too: - `rho`: the relative flow on this powerline - `cooldown`: the number of step you need to wait before being able to act on this powerline - `status`: whether this powerline is connected or not - `thermal_limit`: maximum flow allowed on the the powerline (this is the "a_or" flow) - `timestep_overflow`: number of time steps during which the powerline is on overflow - `p_or`: active power injected at this node at the "origin side". - `p_ex`: active power injected at this node at the "extremity side". - `q_or`: reactive power injected at this node at the "origin side". - `q_ex`: reactive power injected at this node at the "extremity side". - `a_or`: current flow injected at this node at the "origin side". - `a_ex`: current flow injected at this node at the "extremity side". .. danger:: **IMPORTANT NOTE** the "origin" and "extremity" of the networkx graph is not necessarily the same as the one in grid2op. The "origin" side will always be the nodes with the lowest id. For example, if an edges connects the bus 6 to the bus 8, then the "origin" of this powerline is bus 6 (**eg** the active power injected at node 6 from this edge will be *p_or*) and the "extremity" side is bus 8 (**eg** the active power injected at node 8 from this edge will be *p_ex*). .. warning:: The graph returned by this function has not a fixed size. Its number of nodes and edges can change depending on the state of the grid. See :ref:`get-the-graph-gridgraph` for more information. Also, note that when "done=True" this graph has only one node and no edge. .. note:: The graph returned by this function is "frozen" to prevent its modification. If you really want to modify it you can "unfroze" it. Returns ------- graph: ``networkx graph`` A possible representation of the observation as a networkx graph Examples -------- The following code explains how to check that a grid meet the kirchoffs law (conservation of energy) .. code-block:: python # create an environment and get the observation import grid2op env_name = ... env = grid2op.make(env_name) obs = env.reset() # retrieve the networkx graph graph = obs.as_networkx() # perform the check for every nodes for node_id in graph.nodes: # retrieve power (active and reactive) produced at this node p_ = graph.nodes[node_id]["p"] q_ = graph.nodes[node_id]["q"] # get the edges edges = graph.edges(node_id) p_lines = 0 q_lines = 0 # get the power that is "evacuated" at each nodes on all the edges connecting it to the other nodes # of the network for (k1, k2) in edges: # now retrieve the active / reactive power injected at this node (looking at either *_or or *_ex # depending on the direction of the powerline: remember that the "origin" is always the lowest # bus id. if k1 < k2: # the current inspected node is the lowest, so on the "origin" side p_lines += graph.edges[(k1, k2)]["p_or"] q_lines += graph.edges[(k1, k2)]["q_or"] else: # the current node is the largest, so on the "extremity" side p_lines += graph.edges[(k1, k2)]["p_ex"] q_lines += graph.edges[(k1, k2)]["q_ex"] assert abs(p_line - p_) <= 1e-5, "error for kirchoff's law for graph for P" assert abs(q_line - q_) <= 1e-5, "error for kirchoff's law for graph for Q" """ # TODO save this graph somewhere, in a self._as_networkx attributes for example mat_p, (load_bus, gen_bus, stor_bus, lor_bus, lex_bus) = self.flow_bus_matrix( active_flow=True, as_csr_matrix=True ) mat_q, *_ = self.flow_bus_matrix(active_flow=False, as_csr_matrix=True) # for efficiency mat_p = mat_p.tocoo() # bus voltage bus_v = np.zeros(mat_p.shape[0]) bus_v[lor_bus] = self.v_or bus_v[lex_bus] = self.v_ex bus_theta = np.zeros(mat_p.shape[0]) bus_subid = np.zeros(mat_p.shape[0], dtype=dt_int) bus_subid[lor_bus] = self.line_or_to_subid bus_subid[lex_bus] = self.line_ex_to_subid if self.support_theta: bus_theta[lor_bus] = self.theta_or bus_theta[lex_bus] = self.theta_ex # bus active injection bus_p = mat_p.diagonal().copy() mat_p.setdiag(0.0) mat_p.eliminate_zeros() # create the networkx graph try: graph = networkx.from_scipy_sparse_array(mat_p, edge_attribute="p") except AttributeError: # oldest version of scipy did not have the `from_scipy_sparse_array` function graph = networkx.from_scipy_sparse_matrix(mat_p, edge_attribute="p") # add the nodes attributes networkx.set_node_attributes( graph, {el: val for el, val in enumerate(bus_p)}, "p" ) networkx.set_node_attributes( graph, {el: val for el, val in enumerate(mat_q.diagonal())}, "q" ) networkx.set_node_attributes( graph, {el: val for el, val in enumerate(bus_v)}, "v" ) networkx.set_node_attributes( graph, {el: val for el, val in enumerate(bus_subid)}, "sub_id" ) if self.support_theta: networkx.set_node_attributes( graph, {el: val for el, val in enumerate(bus_theta)}, "theta" ) dict_cooldown = { el: val for el, val in enumerate(self.time_before_cooldown_sub) } dict_cooldown2 = {} for k, v in dict_cooldown.items(): dict_cooldown2[k + self.n_sub] = v dict_cooldown.update(dict_cooldown2) networkx.set_node_attributes(graph, dict_cooldown, "cooldown") # add the edges attributes self._add_edges_multi(self.p_or, self.p_ex, "p", lor_bus, lex_bus, graph) self._add_edges_multi(self.q_or, self.q_ex, "q", lor_bus, lex_bus, graph) self._add_edges_multi(self.a_or, self.a_ex, "a", lor_bus, lex_bus, graph) if self.support_theta: self._add_edges_multi( self.theta_or, self.theta_ex, "theta", lor_bus, lex_bus, graph ) self._add_edges_simple(self.rho, "rho", lor_bus, lex_bus, graph) self._add_edges_simple( self.time_before_cooldown_line, "cooldown", lor_bus, lex_bus, graph ) self._add_edges_simple(self.line_status, "status", lor_bus, lex_bus, graph) self._add_edges_simple( self.thermal_limit, "thermal_limit", lor_bus, lex_bus, graph ) self._add_edges_simple( self.timestep_overflow, "timestep_overflow", lor_bus, lex_bus, graph ) networkx.freeze( graph ) # extra layer of security: prevent accidental modification of this graph return graph
[docs] def get_forecasted_inj(self, time_step=1): """ This function allows you to retrieve directly the "forecast" injections for the step `time_step`. We remind that the environment, under some conditions, can produce these forecasts automatically. This function allows to retrieve what has been forecast. Parameters ---------- time_step: ``int`` The horizon of the forecast (given in number of time steps) Returns ------- gen_p_f: ``numpy.ndarray`` The forecast generators active values gen_v_f: ``numpy.ndarray`` The forecast generators voltage setpoins load_p_f: ``numpy.ndarray`` The forecast load active consumption load_q_f: ``numpy.ndarray`` The forecast load reactive consumption """ if time_step >= len(self._forecasted_inj): raise NoForecastAvailable( "Forecast for {} timestep ahead is not possible with your chronics.".format( time_step ) ) t, a = self._forecasted_inj[time_step] prod_p_f = np.full(self.n_gen, fill_value=np.NaN, dtype=dt_float) prod_v_f = np.full(self.n_gen, fill_value=np.NaN, dtype=dt_float) load_p_f = np.full(self.n_load, fill_value=np.NaN, dtype=dt_float) load_q_f = np.full(self.n_load, fill_value=np.NaN, dtype=dt_float) if "prod_p" in a["injection"]: prod_p_f = a["injection"]["prod_p"] if "prod_v" in a["injection"]: prod_v_f = a["injection"]["prod_v"] if "load_p" in a["injection"]: load_p_f = a["injection"]["load_p"] if "load_q" in a["injection"]: load_q_f = a["injection"]["load_q"] tmp_arg = ~np.isfinite(prod_p_f) prod_p_f[tmp_arg] = self.gen_p[tmp_arg] tmp_arg = ~np.isfinite(prod_v_f) prod_v_f[tmp_arg] = self.gen_v[tmp_arg] tmp_arg = ~np.isfinite(load_p_f) load_p_f[tmp_arg] = self.load_p[tmp_arg] tmp_arg = ~np.isfinite(load_q_f) load_q_f[tmp_arg] = self.load_q[tmp_arg] return prod_p_f, prod_v_f, load_p_f, load_q_f
[docs] def get_time_stamp(self): """ Get the time stamp of the current observation as a `datetime.datetime` object """ res = datetime.datetime( year=self.year, month=self.month, day=self.day, hour=self.hour_of_day, minute=self.minute_of_hour, ) return res
[docs] def simulate(self, action, time_step=1): """ This method is used to simulate the effect of an action on a forecast powergrid state. This forecast state is built upon the current observation. The forecast are pre computed by the environment. More concretely, if not deactivated by the environment (see :func:`grid2op.Environment.BaseEnv.deactivate_forecast`) and the environment has the capacity to generate these forecasts (which is the case in most grid2op environments) this function will simulate the effect of doing an action now and return the "next state" (often the state you would get at time `t + 5` mins) if you were to do the action at this step. It has the same return value as the :func:`grid2op.Environment.Environment.step` function. Parameters ---------- action: :class:`grid2op.Action.Action` The action to simulate time_step: ``int`` The time step of the forecasted grid to perform the action on. If no forecast are available for this time step, a :class:`grid2op.Exceptions.NoForecastAvailable` is thrown. Raises ------ :class:`grid2op.Exceptions.NoForecastAvailable` if no forecast are available for the time_step querried. Returns ------- simulated_observation: :class:`grid2op.Observation.Observation` agent's observation of the current environment after the application of the action "act" on the the current state. reward: ``float`` amount of reward returned after previous action done: ``bool`` whether the episode has ended, in which case further step() calls will return undefined results info: ``dict`` contains auxiliary diagnostic information (helpful for debugging, and sometimes learning) Notes ------ This is a simulation in the sense that the "next grid state" is not the real grid state you will get. As you don't know the future, the "injections you forecast for the next step" will not be the real injection you will get in the next step. Also, in some circumstances, the "Backend" (ie the powerflow) used to do the simulation may not be the same one as the one used by the environment. This is to model a real fact: as accurate your powerflow is, it does not model all the reality (*"all models are wrong"*). Having a different solver for the environment ( "the reality") than the one used to anticipate the impact of the action (this "simulate" function) is a way to represent this fact. Examples -------- To simulate what would be the effect of the action "act" if you were to take this action at this step you can do the following: .. code-block:: python import grid2op # retrieve an environment env = grid2op.make() # retrieve an observation, this is the same for all observations obs = env.reset() # and now you can simulate the effect of doing nothing in the next time step act = env.action_space() # this can be any action that grid2op understands simulated_obs, simulated_reward, simulated_done, simulated_info = obs.simulate(act) # `simulated_obs` will be the "observation" after the application of action `act` on the # " forecast of the grid state (it will be the "forecast state at time t+5mins usually) # `simulated_reward` will be the reward for the same action on the same forecast state # `simulated_done` will indicate whether or not the simulation ended up in a "game over" # `simulated_info` gives extra information on this forecast state """ if self.action_helper is None: raise NoForecastAvailable( "No forecasts are available for this instance of BaseObservation " "(no action_space " "and no simulated environment are set)." ) if self._obs_env is None: raise BaseObservationError( 'This observation has no "environment used for simulation" (_obs_env) is not created. ' "This is the case if you loaded this observation from a disk (for example using " "EpisodeData) " 'or used a Runner with multi processing with the "add_detailed_output=True" ' "flag or even if you use an environment with a non serializable backend. " "This is a feature of grid2op: it does not require backends to be serializable." ) if time_step < 0: raise NoForecastAvailable("Impossible to forecast in the past.") if time_step >= len(self._forecasted_inj): raise NoForecastAvailable( "Forecast for {} timestep(s) ahead is not possible with your chronics." "".format(time_step) ) if time_step not in self._forecasted_grid_act: timestamp, inj_forecasted = self._forecasted_inj[time_step] self._forecasted_grid_act[time_step] = { "timestamp": timestamp, "inj_action": self.action_helper(inj_forecasted), } timestamp = self._forecasted_grid_act[time_step]["timestamp"] inj_action = self._forecasted_grid_act[time_step]["inj_action"] self._obs_env.init( inj_action, time_stamp=timestamp, timestep_overflow=self.timestep_overflow, topo_vect=self.topo_vect, time_step=time_step, ) sim_obs, *rest = self._obs_env.simulate(action) sim_obs = copy.deepcopy(sim_obs) return (sim_obs, *rest) # parentheses are needed for python 3.6 at least.
[docs] def copy(self): """ INTERNAL .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ Make a copy of the observation. Returns ------- res: :class:`BaseObservation` The deep copy of the observation Notes -------- The "obs_env" attributes """ obs_env = self._obs_env self._obs_env = None # _obs_env is a pointer, it is not held by this ! action_helper = self.action_helper self.action_helper = None res = copy.deepcopy(self) self._obs_env = obs_env res._obs_env = obs_env self.action_helper = action_helper res.action_helper = action_helper return res
@property def line_or_bus(self): """ Retrieve the busbar at which each origin end of powerline is connected. The result follow grid2op convention: - -1 means the powerline is disconnected - 1 means it is connected to busbar 1 - 2 means it is connected to busbar 2 - etc. Notes ----- In a same substation, two objects are connected together if (and only if) they are connected to the same busbar. """ res = self.topo_vect[self.line_or_pos_topo_vect] res.flags.writeable = False return res @property def line_ex_bus(self): """ Retrieve the busbar at which each extremity end of powerline is connected. The result follow grid2op convention: - -1 means the powerline is disconnected - 1 means it is connected to busbar 1 - 2 means it is connected to busbar 2 - etc. Notes ----- In a same substation, two objects are connected together if (and only if) they are connected to the same busbar. """ res = self.topo_vect[self.line_ex_pos_topo_vect] res.flags.writeable = False return res @property def gen_bus(self): """ Retrieve the busbar at which each generator is connected. The result follow grid2op convention: - -1 means the generator is disconnected - 1 means it is generator to busbar 1 - 2 means it is connected to busbar 2 - etc. Notes ----- In a same substation, two objects are connected together if (and only if) they are connected to the same busbar. """ res = self.topo_vect[self.gen_pos_topo_vect] res.flags.writeable = False return res @property def load_bus(self): """ Retrieve the busbar at which each load is connected. The result follow grid2op convention: - -1 means the load is disconnected - 1 means it is load to busbar 1 - 2 means it is load to busbar 2 - etc. Notes ----- In a same substation, two objects are connected together if (and only if) they are connected to the same busbar. """ res = self.topo_vect[self.load_pos_topo_vect] res.flags.writeable = False return res @property def storage_bus(self): """ Retrieve the busbar at which each storage unit is connected. The result follow grid2op convention: - -1 means the storage unit is disconnected - 1 means it is storage unit to busbar 1 - 2 means it is connected to busbar 2 - etc. Notes ----- In a same substation, two objects are connected together if (and only if) they are connected to the same busbar. """ res = self.topo_vect[self.storage_pos_topo_vect] res.flags.writeable = False return res @property def prod_p(self): """ As of grid2op version 1.5.0, for better consistency, the "prod_p" attribute has been renamed "gen_p". This property is present to maintain the backward compatibility. Returns ------- :attr:`BaseObservation.gen_p` """ return self.gen_p @property def prod_q(self): """ As of grid2op version 1.5.0, for better consistency, the "prod_q" attribute has been renamed "gen_q". This property is present to maintain the backward compatibility. Returns ------- :attr:`BaseObservation.gen_q` """ return self.gen_q @property def prod_v(self): """ As of grid2op version 1.5.0, for better consistency, the "prod_v" attribute has been renamed "gen_v". This property is present to maintain the backward compatibility. Returns ------- :attr:`BaseObservation.gen_v` """ return self.gen_v
[docs] def sub_topology(self, sub_id): """ Returns the topology of the given substation Returns ------- """ tmp = self.topo_vect[self._topo_vect_to_sub == sub_id] tmp.flags.writeable = False return tmp
def _reset_matrices(self): self._vectorized = None
[docs] def from_vect(self, vect, check_legit=True): """ INTERNAL .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ To reload an observation from a vector, use the "env.observation_space.from_vect()". Convert back an observation represented as a vector into a proper observation. Some conversion are done silently from float to the type of the corresponding observation attribute. Parameters ---------- vect: ``numpy.ndarray`` A representation of an BaseObservation in the form of a vector that is used to convert back the current observation to be equal to the vect. """ # reset the matrices self._reset_matrices() # and ensure everything is reloaded properly super().from_vect(vect, check_legit=check_legit)
[docs] def to_dict(self): """ Transform this observation as a dictionary. This dictionary allows you to inspect the state of this observation and is simply a shortcut of the class instance. Returns ------- A dictionary representing the observation. Notes ------- The returned dictionary is not necessarily json serializable. To have a grid2op observation that you can serialize in a json fashion, please use the :func:`grid2op.Space.GridObjects.to_json` function. """ if self._dictionnarized is None: self._dictionnarized = {} self._dictionnarized["timestep_overflow"] = self.timestep_overflow self._dictionnarized["line_status"] = self.line_status self._dictionnarized["topo_vect"] = self.topo_vect self._dictionnarized["loads"] = {} self._dictionnarized["loads"]["p"] = self.load_p self._dictionnarized["loads"]["q"] = self.load_q self._dictionnarized["loads"]["v"] = self.load_v self._dictionnarized[ "prods" ] = {} # TODO will be removed in future versions self._dictionnarized["prods"][ "p" ] = self.gen_p # TODO will be removed in future versions self._dictionnarized["prods"][ "q" ] = self.gen_q # TODO will be removed in future versions self._dictionnarized["prods"][ "v" ] = self.gen_v # TODO will be removed in future versions self._dictionnarized["gens"] = {} self._dictionnarized["gens"]["p"] = self.gen_p self._dictionnarized["gens"]["q"] = self.gen_q self._dictionnarized["gens"]["v"] = self.gen_v self._dictionnarized["lines_or"] = {} self._dictionnarized["lines_or"]["p"] = self.p_or self._dictionnarized["lines_or"]["q"] = self.q_or self._dictionnarized["lines_or"]["v"] = self.v_or self._dictionnarized["lines_or"]["a"] = self.a_or self._dictionnarized["lines_ex"] = {} self._dictionnarized["lines_ex"]["p"] = self.p_ex self._dictionnarized["lines_ex"]["q"] = self.q_ex self._dictionnarized["lines_ex"]["v"] = self.v_ex self._dictionnarized["lines_ex"]["a"] = self.a_ex self._dictionnarized["rho"] = self.rho self._dictionnarized["maintenance"] = {} self._dictionnarized["maintenance"][ "time_next_maintenance" ] = self.time_next_maintenance self._dictionnarized["maintenance"][ "duration_next_maintenance" ] = self.duration_next_maintenance self._dictionnarized["cooldown"] = {} self._dictionnarized["cooldown"]["line"] = self.time_before_cooldown_line self._dictionnarized["cooldown"][ "substation" ] = self.time_before_cooldown_sub self._dictionnarized["redispatching"] = {} self._dictionnarized["redispatching"][ "target_redispatch" ] = self.target_dispatch self._dictionnarized["redispatching"][ "actual_dispatch" ] = self.actual_dispatch # storage self._dictionnarized["storage_charge"] = 1.0 * self.storage_charge self._dictionnarized["storage_power_target"] = ( 1.0 * self.storage_power_target ) self._dictionnarized["storage_power"] = 1.0 * self.storage_power # curtailment self._dictionnarized["gen_p_before_curtail"] = ( 1.0 * self.gen_p_before_curtail ) self._dictionnarized["curtailment"] = 1.0 * self.curtailment self._dictionnarized["curtailment_limit"] = 1.0 * self.curtailment_limit self._dictionnarized["curtailment_limit_effective"] = ( 1.0 * self.curtailment_limit_effective ) # alarm / attention budget self._dictionnarized["is_alarm_illegal"] = self.is_alarm_illegal[0] self._dictionnarized["time_since_last_alarm"] = self.time_since_last_alarm[ 0 ] self._dictionnarized["last_alarm"] = copy.deepcopy(self.last_alarm) self._dictionnarized["attention_budget"] = self.attention_budget[0] self._dictionnarized[ "was_alarm_used_after_game_over" ] = self.was_alarm_used_after_game_over[0] # current_step / max step self._dictionnarized["current_step"] = self.current_step self._dictionnarized["max_step"] = self.max_step return self._dictionnarized
[docs] def add_act(self, act, issue_warn=True): """ Easier access to the impact on the observation if an action were applied. This is for now only useful to get a topology in which the grid would be without doing an expensive `obs.simuulate` Notes ----- This will not give the real topology of the grid in all cases for many reasons amongst: 1) past topologies are not known by the observation. If you reconnect a powerline in the action without having specified on which bus, it has no way to know (but the environment does!) on which bus it should be reconnected (which is the last known bus) 2) some "protections" are emulated in the environment. This means that the environment can disconnect some powerline under certain conditions. This is absolutely not taken into account here. 3) the environment is stochastic, for example there can be maintenance or attacks (hazards) and the generators and loads change each step. This is not taken into account in this function. 4) no checks are performed to see if the action meets the rules of the game (number of elements you can modify in the action, cooldowns etc.) This method **supposes** that the action is legal and non ambiguous. 5) It do not check for possible "game over", for example due to isolated elements or non-connected grid (grid with 2 or more connex components) If these issues are important for you, you will need to use the :func:`grid2op.Observation.BaseObservation.simulate` method. It can be used like `obs.simulate(act, time_step=0)` but it is much more expensive. Parameters ---------- act: :class:`grid2op.Action.BaseAction` The action you want to add to the observation issue_warn: ``bool`` Issue a warning when this method might not compute the proper resulting topologies. Default to ``True``: it issues warning when something not supported is done in the action. Returns ------- res: :class:`grid2op.Observation.Observation` The resulting observation. Note that this observation is not initialized with everything. It is only relevant when you want to study the resulting topology after you applied an action. Lots of `res` attributes are empty. Examples -------- You can use it this way, for example if you want to retrieve the topology you would get (see the restriction in the above description) after applying an action: .. code-block:: python import grid2op # create the environment env_name = ... env = grid2op.make(env_name) # generate the first observation obs = env.reset() # make some action act = ... # see the dedicated page # have a look at the impact on the action on the topology partial_obs = obs + act # or `partial_obs = obs.add_act(act, issue_warn=False)` if you want to silence the warnings # and now you can inspect the topology with any method you want: partial_obs.topo_vect partial_obs.load_bus bus_mat = partial_obs.bus_connectivity_matrix() # or even elem_mat = partial_obs.connectivity_matrix() # but you cannot use partial_obs.prod_p # or partial_obs.load_q etc. """ from grid2op.Action import BaseAction if not isinstance(act, BaseAction): raise RuntimeError("You can only add actions to observation at the moment") act = copy.deepcopy(act) res = type(self)() res.set_game_over(env=None) res.topo_vect[:] = self.topo_vect res.line_status[:] = self.line_status ambiguous, except_tmp = act.is_ambiguous() if ambiguous: raise RuntimeError( f"Impossible to add an ambiguous action to an observation. Your action was " f'ambiguous because: "{except_tmp}"' ) # if a powerline has been reconnected without specific bus, i issue a warning if "set_line_status" in act.authorized_keys: reco_powerline = act.line_set_status if "set_bus" in act.authorized_keys: line_ex_set_bus = act.line_ex_set_bus line_or_set_bus = act.line_or_set_bus else: line_ex_set_bus = np.zeros(res.n_line, dtype=dt_int) line_or_set_bus = np.zeros(res.n_line, dtype=dt_int) error_no_bus_set = ( "You reconnected a powerline with your action but did not specify on which bus " "to reconnect both its end. This behaviour, also perfectly fine for an environment " "will not be accurate in the method obs + act. Consult the documentation for more " "information. Problem arose for powerlines with id {}" ) tmp = ( (reco_powerline == 1) & (line_ex_set_bus <= 0) & (res.topo_vect[self.line_ex_pos_topo_vect] == -1) ) if np.any(tmp): id_issue_ex = np.where(tmp)[0] if issue_warn: warnings.warn(error_no_bus_set.format(id_issue_ex)) if "set_bus" in act.authorized_keys: # assign 1 in the bus in this case act.line_ex_set_bus = [(el, 1) for el in id_issue_ex] tmp = ( (reco_powerline == 1) & (line_or_set_bus <= 0) & (res.topo_vect[self.line_or_pos_topo_vect] == -1) ) if np.any(tmp): id_issue_or = np.where(tmp)[0] if issue_warn: warnings.warn(error_no_bus_set.format(id_issue_or)) if "set_bus" in act.authorized_keys: # assign 1 in the bus in this case act.line_or_set_bus = [(el, 1) for el in id_issue_or] # topo vect if "set_bus" in act.authorized_keys: res.topo_vect[act.set_bus != 0] = act.set_bus[act.set_bus != 0] if "change_bus" in act.authorized_keys: do_change_bus_on = act.change_bus & ( res.topo_vect > 0 ) # change bus of elements that were on res.topo_vect[do_change_bus_on] = 3 - res.topo_vect[do_change_bus_on] # topo vect: reco of powerline that should be res.line_status = (res.topo_vect[self.line_or_pos_topo_vect] >= 1) & ( res.topo_vect[self.line_ex_pos_topo_vect] >= 1 ) # powerline status if "set_line_status" in act.authorized_keys: disco_line = (act.line_set_status == -1) & res.line_status res.topo_vect[res.line_or_pos_topo_vect[disco_line]] = -1 res.topo_vect[res.line_ex_pos_topo_vect[disco_line]] = -1 res.line_status[disco_line] = False reco_line = (act.line_set_status >= 1) & (~res.line_status) # i can do that because i already "fixed" the action to have it put 1 in case it # bus were not provided if "set_bus" in act.authorized_keys: # I assign previous bus (because it could have been modified) res.topo_vect[ res.line_or_pos_topo_vect[reco_line] ] = act.line_or_set_bus[reco_line] res.topo_vect[ res.line_ex_pos_topo_vect[reco_line] ] = act.line_ex_set_bus[reco_line] else: # I assign one (action do not allow me to modify the bus) res.topo_vect[res.line_or_pos_topo_vect[reco_line]] = 1 res.topo_vect[res.line_ex_pos_topo_vect[reco_line]] = 1 res.line_status[reco_line] = True if "change_line_status" in act.authorized_keys: disco_line = act.line_change_status & res.line_status reco_line = act.line_change_status & (~res.line_status) # handle disconnected powerlines res.topo_vect[res.line_or_pos_topo_vect[disco_line]] = -1 res.topo_vect[res.line_ex_pos_topo_vect[disco_line]] = -1 res.line_status[disco_line] = False # handle reconnected powerlines if np.any(reco_line): if "set_bus" in act.authorized_keys: line_ex_set_bus = 1 * act.line_ex_set_bus line_or_set_bus = 1 * act.line_or_set_bus else: line_ex_set_bus = np.zeros(res.n_line, dtype=dt_int) line_or_set_bus = np.zeros(res.n_line, dtype=dt_int) if issue_warn and ( np.any(line_or_set_bus[reco_line] == 0) or np.any(line_ex_set_bus[reco_line] == 0) ): warnings.warn( 'A powerline has been reconnected with a "change_status" action without ' "specifying on which bus it was supposed to be reconnected. This is " "perfectly fine in regular grid2op environment, but this behaviour " "cannot be properly implemented with the only information in the " "observation. Please see the documentation for more information." ) line_or_set_bus[reco_line & (line_or_set_bus == 0)] = 1 line_ex_set_bus[reco_line & (line_ex_set_bus == 0)] = 1 res.topo_vect[res.line_or_pos_topo_vect[reco_line]] = line_or_set_bus[ reco_line ] res.topo_vect[res.line_ex_pos_topo_vect[reco_line]] = line_ex_set_bus[ reco_line ] res.line_status[reco_line] = True if "redispatch" in act.authorized_keys: redisp = act.redispatch if np.any(redisp != 0) and issue_warn: warnings.warn( "You did redispatching on this action. Redispatching is heavily transformed " "by the environment (consult the documentation about the modeling of the " "generators for example) so we will not even try to mimic this here." ) if "set_storage" in act.authorized_keys: storage_p = act.storage_p if np.any(storage_p != 0) and issue_warn: warnings.warn( "You did action on storage units in this action. This implies performing some " "redispatching which is heavily transformed " "by the environment (consult the documentation about the modeling of the " "generators for example) so we will not even try to mimic this here." ) return res
def __add__(self, act): from grid2op.Action import BaseAction if isinstance(act, BaseAction): return self.add_act(act, issue_warn=True) raise Grid2OpException( "Only grid2op action can be added to grid2op observation at the moment." ) @property def thermal_limit(self): """ Return the thermal limit of the powergrid, given in Amps (A) Examples -------- .. code-block:: python import grid2op env_name = ... env = grid2op.make(env_name) obs = env.reset() thermal_limit = obs.thermal_limit """ res = 1.0 * self._thermal_limit res.flags.writeable = False return res @property def curtailment_mw(self): """ return the curtailment, expressed in MW rather than in ratio of pmax. Examples -------- .. code-block:: python import grid2op env_name = ... env = grid2op.make(env_name) obs = env.reset() curtailment_mw = obs.curtailment_mw """ return self.curtailment * self.gen_pmax @property def curtailment_limit_mw(self): """ return the limit of production of a generator in MW rather in ratio Examples -------- .. code-block:: python import grid2op env_name = ... env = grid2op.make(env_name) obs = env.reset() curtailment_limit_mw = obs.curtailment_limit_mw """ return self.curtailment_limit * self.gen_pmax def _update_attr_backend(self, backend): """This function updates the attribute of the observation that depends only on the backend. Parameters ---------- backend : The backend from which to update the observation """ self.line_status[:] = backend.get_line_status() self.topo_vect[:] = backend.get_topo_vect() # get the values related to continuous values self.gen_p[:], self.gen_q[:], self.gen_v[:] = backend.generators_info() self.load_p[:], self.load_q[:], self.load_v[:] = backend.loads_info() self.p_or[:], self.q_or[:], self.v_or[:], self.a_or[:] = backend.lines_or_info() self.p_ex[:], self.q_ex[:], self.v_ex[:], self.a_ex[:] = backend.lines_ex_info() self.rho[:] = backend.get_relative_flow().astype(dt_float) # margin up and down if type(self).redispatching_unit_commitment_availble: self.gen_margin_up[:] = np.minimum( type(self).gen_pmax - self.gen_p, self.gen_max_ramp_up ) self.gen_margin_up[type(self).gen_renewable] = 0.0 self.gen_margin_down[:] = np.minimum( self.gen_p - type(self).gen_pmin, self.gen_max_ramp_down ) self.gen_margin_down[type(self).gen_renewable] = 0.0 # because of the slack, sometimes it's negative... # see https://github.com/rte-france/Grid2Op/issues/313 self.gen_margin_up[self.gen_margin_up < 0.] = 0. self.gen_margin_down[self.gen_margin_down < 0.] = 0. else: self.gen_margin_up[:] = 0.0 self.gen_margin_down[:] = 0.0 # handle shunts (if avaialble) if self.shunts_data_available: sh_p, sh_q, sh_v, sh_bus = backend.shunt_info() self._shunt_p[:] = sh_p self._shunt_q[:] = sh_q self._shunt_v[:] = sh_v self._shunt_bus[:] = sh_bus if backend.can_output_theta: self.support_theta = True # backend supports the computation of theta ( self.theta_or[:], self.theta_ex[:], self.load_theta[:], self.gen_theta[:], self.storage_theta[:], ) = backend.get_theta() else: # theta will be always 0. by convention self.theta_or[:] = 0. self.theta_ex[:] = 0. self.load_theta[:] = 0. self.gen_theta[:] = 0. self.storage_theta[:] = 0. def _update_obs_complete(self, env, with_forecast=True): """ update all the observation attributes as if it was a complete, fully observable and without noise observation """ self._is_done = False # counter self.current_step = dt_int(env.nb_time_step) self.max_step = dt_int(env.max_episode_duration()) # extract the time stamps self.year = dt_int(env.time_stamp.year) self.month = dt_int(env.time_stamp.month) self.day = dt_int(env.time_stamp.day) self.hour_of_day = dt_int(env.time_stamp.hour) self.minute_of_hour = dt_int(env.time_stamp.minute) self.day_of_week = dt_int(env.time_stamp.weekday()) # get the values related to topology self.timestep_overflow[:] = env._timestep_overflow # attribute that depends only on the backend state self._update_attr_backend(env.backend) # storage units self.storage_charge[:] = env._storage_current_charge self.storage_power_target[:] = env._action_storage self.storage_power[:] = env._storage_power # handles forecasts here if with_forecast: inj_action = {} dict_ = {} dict_["load_p"] = dt_float(1.0 * self.load_p) dict_["load_q"] = dt_float(1.0 * self.load_q) dict_["prod_p"] = dt_float(1.0 * self.gen_p) dict_["prod_v"] = dt_float(1.0 * self.gen_v) inj_action["injection"] = dict_ # inj_action = self.action_helper(inj_action) timestamp = self.get_time_stamp() self._forecasted_inj = [(timestamp, inj_action)] self._forecasted_inj += env.chronics_handler.forecasts() self._forecasted_grid = [None for _ in self._forecasted_inj] # cool down and reconnection time after hard overflow, soft overflow or cascading failure self.time_before_cooldown_line[:] = env._times_before_line_status_actionable self.time_before_cooldown_sub[:] = env._times_before_topology_actionable self.time_next_maintenance[:] = env._time_next_maintenance self.duration_next_maintenance[:] = env._duration_next_maintenance # redispatching self.target_dispatch[:] = env._target_dispatch self.actual_dispatch[:] = env._actual_dispatch self._thermal_limit[:] = env.get_thermal_limit() if self.redispatching_unit_commitment_availble: self.gen_p_before_curtail[:] = env._gen_before_curtailment self.curtailment[:] = ( self.gen_p_before_curtail - self.gen_p ) / self.gen_pmax self.curtailment[~self.gen_renewable] = 0.0 self.curtailment_limit[:] = env._limit_curtailment self.curtailment_limit[self.curtailment_limit >= 1.0] = 1.0 gen_curtailed = self.gen_renewable is_acted = (self.gen_p_before_curtail != self.gen_p) self.curtailment_limit_effective[gen_curtailed & is_acted] = ( self.gen_p[gen_curtailed & is_acted] / self.gen_pmax[gen_curtailed & is_acted] ) self.curtailment_limit_effective[gen_curtailed & ~is_acted] = ( self.curtailment_limit[gen_curtailed & ~is_acted] ) self.curtailment_limit_effective[~gen_curtailed] = 1.0 else: self.curtailment[:] = 0.0 self.gen_p_before_curtail[:] = self.gen_p self.curtailment_limit[:] = 1.0 self.curtailment_limit_effective[:] = 1.0 if self.dim_alarms and env._has_attention_budget: self.is_alarm_illegal[:] = env._is_alarm_illegal if env._attention_budget.time_last_successful_alarm_raised > 0: self.time_since_last_alarm[:] = ( self.current_step - env._attention_budget.time_last_successful_alarm_raised ) else: self.time_since_last_alarm[:] = -1 self.last_alarm[:] = env._attention_budget.last_successful_alarm_raised self.attention_budget[:] = env._attention_budget.current_budget self.delta_time = dt_float(1.0 * env.delta_time_seconds / 60.0)
[docs] def get_simulator(self) -> "Simulator": """This function allows to retrieve a valid and properly initialized "Simulator" A :class:`grid2op.simulator.Simulator` can be used to simulate the impact of multiple consecutive actions, without taking into account any kind of rules. It can also be use with forecast of the productions / consumption to predict whether or not a given state is "robust" to variation of the injections for example. You can find more information about simulator on the dedicated page of the documentation. """ # BaseObservation is only used for typing in the simulator... if self._obs_env is None: raise BaseObservationError( "Impossible to build a simulator is the " "observation space does not support it. This can be the case if the " "observation is loaded from disk or if the backend cannot be copied " "for example." ) if not self._obs_env.is_valid(): raise BaseObservationError("Impossible to use a Simulator backend with an " "environment that cannot be copied (most " "liekly due to the backend that cannot be " "copied).") from grid2op.simulator import ( Simulator, ) # lazy import to prevent circular references res = Simulator(backend=self._obs_env.backend) res.set_state(self) return res