Source code for grid2op.Environment.BaseEnv

# Copyright (c) 2019-2020, RTE (
# See AUTHORS.txt
# This Source Code Form is subject to the terms of the Mozilla Public License, version 2.0.
# If a copy of the Mozilla Public License, version 2.0 was not distributed with this file,
# you can obtain one at
# SPDX-License-Identifier: MPL-2.0
# This file is part of Grid2Op, Grid2Op a testbed platform to model sequential decision making in power systems.

from datetime import datetime
import logging
import time
import copy
import os
import json
from typing import Optional, Tuple

import numpy as np
from scipy.optimize import minimize
from scipy.optimize import LinearConstraint
from abc import ABC, abstractmethod
from grid2op.Action.ActionSpace import ActionSpace
from grid2op.Observation.baseObservation import BaseObservation
from grid2op.Observation.observationSpace import ObservationSpace
from grid2op.Backend import Backend
from grid2op.dtypes import dt_int, dt_float, dt_bool
from grid2op.Space import GridObjects, RandomObject
from grid2op.Exceptions import *
from grid2op.Parameters import Parameters
from grid2op.Reward import BaseReward
from grid2op.Reward import RewardHelper
from grid2op.Opponent import OpponentSpace, NeverAttackBudget
from grid2op.Action import DontAct, BaseAction
from grid2op.Rules import AlwaysLegal
from grid2op.Opponent import BaseOpponent
from grid2op.operator_attention import LinearAttentionBudget
from grid2op.Action._BackendAction import _BackendAction

# TODO put in a separate class the redispatching function

    "\nThis is an attempt to explain why the dispatch did not succeed and caused a game over.\n"
    "To compensate the {increase} of loads and / or {decrease} of "
    "renewable energy (due to naturl causes but also through curtailment) and / or variation in the storage units, "
    "the generators should {increase} their total production of {sum_move:.2f}MW (in total).\n"
    "But, if you take into account the generator constraints ({pmax} and {max_ramp_up}) you "
    "can have at most {avail_up_sum:.2f}MW.\n"
    "Indeed at time t, generators are in state:\n\t{gen_setpoint}\ntheir ramp max is:"
    "\n\t{ramp_up}\n and pmax is:\n\t{gen_pmax}\n"
    "Wrapping up, each generator can {increase} at {maximum} of:\n\t{avail_up}\n"
    "NB: if you did not do any dispatch during this episode, it would have been possible to "
    "meet these constraints. This situation is caused by not having enough degree of freedom "
    'to "compensate" the variation of the load due to (most likely) an "over usage" of '
    "redispatching feature (some generators stuck at {pmax} as a consequence of your "
    "redispatching. They can't increase their productions to meet the {increase} in demand or "
    "{decrease} of renewables)"

BASE_TXT_COPYRIGHT = """# Copyright (c) 2019-2020, RTE (
# See AUTHORS.txt
# This Source Code Form is subject to the terms of the Mozilla Public License, version 2.0.
# If a copy of the Mozilla Public License, version 2.0 was not distributed with this file,
# you can obtain one at
# SPDX-License-Identifier: MPL-2.0
# This file is part of Grid2Op, Grid2Op a testbed platform to model sequential decision making in power systems.


[docs]class BaseEnv(GridObjects, RandomObject, ABC): """ INTERNAL .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ This class represent some usefull abstraction that is re used by :class:`Environment` and :class:`grid2op.Observation._Obsenv` for example. The documentation is showed here to document the common attributes of an "BaseEnvironment". Attributes ---------- parameters: :class:`grid2op.Parameters.Parameters` The parameters of the game (to expose more control on what is being simulated) with_forecast: ``bool`` Whether the chronics allow to have some kind of "forecast". See :func:`BaseEnv.activate_forceast` for more information logger: TO BE DONE: a way to log what is happening (**currently not implemented**) time_stamp: ``datetime.datetime`` The actual time stamp of the current observation. nb_time_step: ``int`` Number of time steps played in the current environment current_obs: :class:`grid2op.Observation.BaseObservation` The current observation (or None if it's not intialized) backend: :class:`grid2op.Backend.Backend` The backend used to compute the powerflows and cascading failures. done: ``bool`` Whether the environment is "done". If ``True`` you need to call :func:`Environment.reset` in order to continue. current_reward: ``float`` The last computed reward (reward of the current step) other_rewards: ``dict`` Dictionary with key being the name (identifier) and value being some RewardHelper. At each time step, all the values will be computed by the :class:`Environment` and the information about it will be returned in the "reward" key of the "info" dictionnary of the :func:`Environment.step`. chronics_handler: :class:`grid2op.Chronics.ChronicsHandler` The object in charge managing the "chronics", which store the information about load and generator for example. reward_range: ``tuple`` For open ai gym compatibility. It represents the range of the rewards: reward min, reward max viewer: For open ai gym compatibility. viewer_fig: For open ai gym compatibility. _gen_activeprod_t: .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ Should be initialized at 0. for "step" to properly recognize it's the first time step of the game _no_overflow_disconnection: ``bool`` .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ Whether or not cascading failures are computed or not (TRUE = the powerlines above their thermal limits will not be disconnected). This is initialized based on the attribute :attr:`grid2op.Parameters.Parameters.NO_OVERFLOW_DISCONNECTION`. _timestep_overflow: ``numpy.ndarray``, dtype: int .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ Number of consecutive timesteps each powerline has been on overflow. _nb_timestep_overflow_allowed: ``numpy.ndarray``, dtype: int .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ Number of consecutive timestep each powerline can be on overflow. It is usually read from :attr:`grid2op.Parameters.Parameters.NB_TIMESTEP_POWERFLOW_ALLOWED`. _hard_overflow_threshold: ``float`` .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ Number of timestep before an :class:`grid2op.BaseAgent.BaseAgent` can reconnet a powerline that has been disconnected by the environment due to an overflow. _env_dc: ``bool`` .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ Whether the environment computes the powerflow using the DC approximation or not. It is usually read from :attr:`grid2op.Parameters.Parameters.ENV_DC`. _names_chronics_to_backend: ``dict`` .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ Configuration file used to associated the name of the objects in the backend (both extremities of powerlines, load or production for example) with the same object in the data (:attr:`Environment.chronics_handler`). The idea is that, usually data generation comes from a different software that does not take into account the powergrid infrastructure. Hence, the same "object" can have a different name. This mapping is present to avoid the need to rename the "object" when providing data. A more detailed description is available at :func:`grid2op.ChronicsHandler.GridValue.initialize`. _env_modification: :class:`grid2op.Action.Action` .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ Representation of the actions of the environment for the modification of the powergrid. _rewardClass: ``type`` .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ Type of reward used. Should be a subclass of :class:`grid2op.BaseReward.BaseReward` _init_grid_path: ``str`` .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ The path where the description of the powergrid is located. _game_rules: :class:`grid2op.Rules.RulesChecker` .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ The rules of the game (define which actions are legal and which are not) _action_space: :class:`grid2op.Action.ActionSpace` .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ Helper used to manipulate more easily the actions given to / provided by the :class:`grid2op.Agent.BaseAgent` (player) _helper_action_env: :class:`grid2op.Action.ActionSpace` .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ Helper used to manipulate more easily the actions given to / provided by the environment to the backend. _observation_space: :class:`grid2op.Observation.ObservationSpace` .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ Helper used to generate the observation that will be given to the :class:`grid2op.BaseAgent` _reward_helper: :class:`grid2p.BaseReward.RewardHelper` .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ Helper that is called to compute the reward at each time step. kwargs_observation: ``dict`` TODO # TODO add the units (eg MW, MWh, MW/time step,etc.) in the redispatching related attributes """ ALARM_FILE_NAME = "alerts_info.json" def __init__( self, init_env_path: os.PathLike, init_grid_path: os.PathLike, parameters: Parameters, voltagecontrolerClass: type, thermal_limit_a: Optional[np.ndarray] = None, epsilon_poly: float = 1e-4, # precision of the redispatching algorithm tol_poly: float = 1e-2, # i need to compute a redispatching if the actual values are "more than tol_poly" the values they should be other_rewards: dict = None, with_forecast: bool = True, opponent_space_type: type = OpponentSpace, opponent_action_class: type = DontAct, opponent_class: type = BaseOpponent, opponent_init_budget: float = 0.0, opponent_budget_per_ts: float = 0.0, opponent_budget_class: type = NeverAttackBudget, opponent_attack_duration: int = 0, opponent_attack_cooldown: int = 99999, kwargs_opponent: dict = None, has_attention_budget: bool = False, attention_budget_cls: type = LinearAttentionBudget, kwargs_attention_budget: dict = None, logger: Optional[logging.Logger] = None, kwargs_observation: Optional[dict] = None, _is_test: bool = False, # TODO not implemented !! ): GridObjects.__init__(self) RandomObject.__init__(self) if other_rewards is None: other_rewards = {} if kwargs_attention_budget is None: kwargs_attention_budget = {} if kwargs_opponent is None: kwargs_opponent = {} self._is_test: bool = _is_test if logger is None: self.logger = logging.getLogger(__name__) self.logger.disabled = True else: self.logger: logging.Logger = logger.getChild("grid2op_BaseEnv") if init_grid_path is not None: self._init_grid_path: os.PathLike = os.path.abspath(init_grid_path) else: self._init_grid_path = None self._DEBUG: bool = False self._complete_action_cls: type = None self.__closed: bool = False # by default the environment is not closed # specific to power system if not isinstance(parameters, Parameters): raise Grid2OpException( 'Parameter "parameters" used to build the Environment should derived form the ' 'grid2op.Parameters class, type provided is "{}"'.format( type(parameters) ) ) parameters.check_valid() # check the provided parameters are valid self._parameters: Parameters = copy.deepcopy(parameters) self.with_forecast: bool = with_forecast # some timers self._time_apply_act: float = dt_float(0) self._time_powerflow: float = dt_float(0) self._time_extract_obs: float = dt_float(0) self._time_create_bk_act: float = dt_float(0) self._time_opponent: float = dt_float(0) self._time_redisp: float = dt_float(0) self._time_step: float = dt_float(0) # data relative to interpolation self._epsilon_poly: float = dt_float(epsilon_poly) self._tol_poly: float = dt_float(tol_poly) # class used for the action spaces self._helper_action_class: ActionSpace = None self._helper_observation_class: ActionSpace = None # and calendar data self.time_stamp: time.struct_time = None self.nb_time_step: datetime.timedelta = dt_int(0) self.delta_time_seconds = None # number of seconds between two consecutive step # observation self.current_obs: BaseObservation = None self._line_status: np.ndarray = None self._ignore_min_up_down_times: bool = self._parameters.IGNORE_MIN_UP_DOWN_TIME self._forbid_dispatch_off: bool = ( not self._parameters.ALLOW_DISPATCH_GEN_SWITCH_OFF ) # type of power flow to play # if True, then it will not disconnect lines above their thermal limits self._no_overflow_disconnection: bool = ( self._parameters.NO_OVERFLOW_DISCONNECTION ) self._timestep_overflow: np.ndarray = None self._nb_timestep_overflow_allowed: np.ndarray = None self._hard_overflow_threshold: float = self._parameters.HARD_OVERFLOW_THRESHOLD # store actions "cooldown" self._times_before_line_status_actionable: np.ndarray = None self._max_timestep_line_status_deactivated: int = ( self._parameters.NB_TIMESTEP_COOLDOWN_LINE ) self._times_before_topology_actionable: np.ndarray = None self._max_timestep_topology_deactivated: int = ( self._parameters.NB_TIMESTEP_COOLDOWN_SUB ) self._nb_ts_reco: int = self._parameters.NB_TIMESTEP_RECONNECTION # for maintenance operation self._time_next_maintenance: np.ndarray = None self._duration_next_maintenance: np.ndarray = None # hazard (not used outside of this class, information is given in `times_before_line_status_actionable` self._hazard_duration: np.ndarray = None self._env_dc = self._parameters.ENV_DC # redispatching data self._target_dispatch: np.ndarray = None self._actual_dispatch: np.ndarray = None self._gen_uptime: np.ndarray = None self._gen_downtime: np.ndarray = None self._gen_activeprod_t: np.ndarray = None self._gen_activeprod_t_redisp: np.ndarray = None self._thermal_limit_a: np.ndarray = thermal_limit_a self._disc_lines: np.ndarray = None # store environment modifications self._injection = None self._maintenance = None self._hazards = None self._env_modification = None # to use the data self.done = False self.current_reward = None self._helper_action_env: ActionSpace = None self.chronics_handler = None self._game_rules = None self._action_space: ActionSpace = None self._rewardClass: type = None self._actionClass: type = None self._observationClass: type = None self._legalActClass: type = None self._observation_space: ObservationSpace = None self._names_chronics_to_backend: dict = None self._reward_helper = None # gym compatibility self.reward_range = None, None self.viewer = None self.viewer_fig = None # other rewards self.other_rewards = {} for k, v in other_rewards.items(): if isinstance(v, type): if not issubclass(v, BaseReward): raise Grid2OpException( 'All values of "rewards" key word argument should be classes that inherit ' 'from "grid2op.BaseReward"' ) else: if not isinstance(v, BaseReward): raise Grid2OpException( 'All values of "rewards" key word argument should be classes that inherit ' 'from "grid2op.BaseReward"' ) if not isinstance(k, str): raise Grid2OpException( 'All keys of "rewards" should be of string type.' ) self.other_rewards[k] = RewardHelper(v, self.logger) # opponent self._opponent_action_class = ( opponent_action_class # class of the action of the opponent ) self._opponent_space_type = opponent_space_type # type of the opponent action space self._opponent_class = opponent_class # class of the opponent self._opponent_init_budget = dt_float(opponent_init_budget) self._opponent_attack_duration = dt_int(opponent_attack_duration) self._opponent_attack_cooldown = dt_int(opponent_attack_cooldown) self._opponent_budget_per_ts = dt_float(opponent_budget_per_ts) self._kwargs_opponent = kwargs_opponent self._opponent_budget_class = opponent_budget_class # below initialized by _create_env, above: need to be called self._opponent_action_space = None self._compute_opp_budget = None self._opponent = None self._oppSpace = None # voltage self._voltagecontrolerClass = voltagecontrolerClass self._voltage_controler = None # backend action self._backend_action_class = None self._backend_action = None # specific to Basic Env, do not change self.backend :Backend = None self.__is_init = False self.debug_dispatch = False # to change the parameters self.__new_param = None self.__new_forecast_param = None self.__new_reward_func = None # storage units # TODO storage: what to do when self.storage_Emin >0. and self.storage_loss > 0. # TODO and we have self._storage_current_charge - self.storage_loss < self.storage_Emin self._storage_current_charge = None # the current storage charge self._storage_previous_charge = None # the previous storage charge self._action_storage = None # the storage action performed self._amount_storage = None # total amount of storage to be dispatched self._amount_storage_prev = None self._storage_power = None self._storage_power_prev = None # curtailment self._limit_curtailment = None self._limit_curtailment_prev = None self._gen_before_curtailment = None self._sum_curtailment_mw = None self._sum_curtailment_mw_prev = None self._limited_before = 0.0 # TODO curt # attention budget self._has_attention_budget = has_attention_budget self._attention_budget = None self._attention_budget_cls = attention_budget_cls self._is_alarm_illegal = False self._is_alarm_used_in_reward = False self._kwargs_attention_budget = copy.deepcopy(kwargs_attention_budget) # to ensure self.get_obs() has a reproducible behaviour self._last_obs = None # to retrieve previous result (before 1.6.5 the seed of the # action space or observation space was not done each reset) self._has_just_been_seeded = False if kwargs_observation is not None: self._kwargs_observation = copy.deepcopy(kwargs_observation) else: self._kwargs_observation = {} if init_env_path is not None: self._init_env_path = os.path.abspath(init_env_path) else: self._init_env_path = None def _custom_deepcopy_for_copy(self, new_obj, dict_=None): if self.__closed: raise RuntimeError("Impossible to make a copy of a closed environment !") if not self.backend._can_be_copied: raise RuntimeError("Impossible to copy your environment: the backend " "class you used cannot be copied.") RandomObject._custom_deepcopy_for_copy(self, new_obj) if dict_ is None: dict_ = {} new_obj._init_grid_path = copy.deepcopy(self._init_grid_path) new_obj._init_env_path = copy.deepcopy(self._init_env_path) new_obj._DEBUG = self._DEBUG new_obj._parameters = copy.deepcopy(self._parameters) new_obj.with_forecast = self.with_forecast # some timers new_obj._time_apply_act = self._time_apply_act new_obj._time_powerflow = self._time_powerflow new_obj._time_extract_obs = self._time_extract_obs new_obj._time_create_bk_act = self._time_create_bk_act new_obj._time_opponent = self._time_opponent new_obj._time_redisp = self._time_redisp new_obj._time_step = self._time_step # data relative to interpolation new_obj._epsilon_poly = self._epsilon_poly new_obj._tol_poly = self._tol_poly # new_obj._complete_action_cls = copy.deepcopy(self._complete_action_cls) # define logger new_obj.logger = copy.deepcopy(self.logger) # TODO does that make any sense ? # class used for the action spaces new_obj._helper_action_class = self._helper_action_class # const new_obj._helper_observation_class = self._helper_observation_class # and calendar data new_obj.time_stamp = self.time_stamp new_obj.nb_time_step = self.nb_time_step new_obj.delta_time_seconds = self.delta_time_seconds # observation if self.current_obs is not None: new_obj.current_obs = self.current_obs.copy() # backend # backend action new_obj._backend_action_class = self._backend_action_class new_obj._backend_action = copy.deepcopy(self._backend_action) # specific to Basic Env, do not change new_obj.backend = self.backend.copy() if self._thermal_limit_a is not None: new_obj.backend.set_thermal_limit(self._thermal_limit_a) new_obj._thermal_limit_a = copy.deepcopy(self._thermal_limit_a) new_obj.__is_init = self.__is_init new_obj.__closed = self.__closed new_obj.debug_dispatch = self.debug_dispatch new_obj._line_status = copy.deepcopy(self._line_status) new_obj._ignore_min_up_down_times = self._ignore_min_up_down_times new_obj._forbid_dispatch_off = self._forbid_dispatch_off # type of power flow to play # if True, then it will not disconnect lines above their thermal limits new_obj._no_overflow_disconnection = self._no_overflow_disconnection new_obj._timestep_overflow = copy.deepcopy(self._timestep_overflow) new_obj._nb_timestep_overflow_allowed = copy.deepcopy( self._nb_timestep_overflow_allowed ) new_obj._hard_overflow_threshold = self._hard_overflow_threshold # store actions "cooldown" new_obj._times_before_line_status_actionable = copy.deepcopy( self._times_before_line_status_actionable ) new_obj._max_timestep_line_status_deactivated = ( self._max_timestep_line_status_deactivated ) new_obj._times_before_topology_actionable = copy.deepcopy( self._times_before_topology_actionable ) new_obj._max_timestep_topology_deactivated = ( self._max_timestep_topology_deactivated ) new_obj._nb_ts_reco = self._nb_ts_reco # for maintenance operation new_obj._time_next_maintenance = copy.deepcopy(self._time_next_maintenance) new_obj._duration_next_maintenance = copy.deepcopy( self._duration_next_maintenance ) # hazard (not used outside of this class, information is given in `times_before_line_status_actionable` new_obj._hazard_duration = copy.deepcopy(self._hazard_duration) new_obj._env_dc = self._env_dc # redispatching data new_obj._target_dispatch = copy.deepcopy(self._target_dispatch) new_obj._actual_dispatch = copy.deepcopy(self._actual_dispatch) new_obj._gen_uptime = copy.deepcopy(self._gen_uptime) new_obj._gen_downtime = copy.deepcopy(self._gen_downtime) new_obj._gen_activeprod_t = copy.deepcopy(self._gen_activeprod_t) new_obj._gen_activeprod_t_redisp = copy.deepcopy(self._gen_activeprod_t_redisp) new_obj._disc_lines = copy.deepcopy(self._disc_lines) # store environment modifications new_obj._injection = copy.deepcopy(self._injection) new_obj._maintenance = copy.deepcopy(self._maintenance) new_obj._hazards = copy.deepcopy(self._hazards) new_obj._env_modification = copy.deepcopy(self._env_modification) # to use the data new_obj.done = self.done new_obj.current_reward = copy.deepcopy(self.current_reward) new_obj.chronics_handler = copy.deepcopy(self.chronics_handler) new_obj._game_rules = copy.deepcopy(self._game_rules) new_obj._helper_action_env = self._helper_action_env.copy() new_obj._helper_action_env.legal_action = new_obj._game_rules.legal_action new_obj._action_space = self._action_space.copy() new_obj._action_space.legal_action = new_obj._game_rules.legal_action new_obj._rewardClass = self._rewardClass new_obj._actionClass = self._actionClass new_obj._observationClass = self._observationClass new_obj._legalActClass = self._legalActClass new_obj._observation_space = self._observation_space.copy(copy_backend=True) new_obj._observation_space._legal_action = ( new_obj._game_rules.legal_action ) # TODO this does not respect SOLID principles at all ! new_obj._kwargs_observation = copy.deepcopy(self._kwargs_observation) new_obj._observation_space._ptr_kwargs_observation = new_obj._kwargs_observation new_obj._names_chronics_to_backend = self._names_chronics_to_backend new_obj._reward_helper = copy.deepcopy(self._reward_helper) # gym compatibility new_obj.reward_range = copy.deepcopy(self.reward_range) new_obj.viewer = copy.deepcopy(self.viewer) new_obj.viewer_fig = copy.deepcopy(self.viewer_fig) # other rewards new_obj.other_rewards = copy.deepcopy(self.other_rewards) # opponent new_obj._opponent_space_type = self._opponent_space_type new_obj._opponent_action_class = self._opponent_action_class # const new_obj._opponent_class = self._opponent_class # const new_obj._opponent_init_budget = self._opponent_init_budget new_obj._opponent_attack_duration = self._opponent_attack_duration new_obj._opponent_attack_cooldown = self._opponent_attack_cooldown new_obj._opponent_budget_per_ts = self._opponent_budget_per_ts new_obj._kwargs_opponent = copy.deepcopy(self._kwargs_opponent) new_obj._opponent_budget_class = copy.deepcopy( self._opponent_budget_class ) # const new_obj._opponent_action_space = self._opponent_action_space # const new_obj._compute_opp_budget = self._opponent_budget_class( self._opponent_action_space ) # init the opponent new_obj._opponent = new_obj._opponent_class.__new__(new_obj._opponent_class) self._opponent._custom_deepcopy_for_copy( new_obj._opponent, {"partial_env": new_obj, **new_obj._kwargs_opponent} ) new_obj._oppSpace = new_obj._opponent_space_type( compute_budget=new_obj._compute_opp_budget, init_budget=new_obj._opponent_init_budget, attack_duration=new_obj._opponent_attack_duration, attack_cooldown=new_obj._opponent_attack_cooldown, budget_per_timestep=new_obj._opponent_budget_per_ts, opponent=new_obj._opponent, ) state_me, state_opp = self._oppSpace._get_state() new_obj._oppSpace._set_state(state_me) # voltage new_obj._voltagecontrolerClass = self._voltagecontrolerClass new_obj._voltage_controler = self._voltage_controler.copy() # to change the parameters new_obj.__new_param = copy.deepcopy(self.__new_param) new_obj.__new_forecast_param = copy.deepcopy(self.__new_forecast_param) new_obj.__new_reward_func = copy.deepcopy(self.__new_reward_func) # storage units new_obj._storage_current_charge = copy.deepcopy(self._storage_current_charge) new_obj._storage_previous_charge = copy.deepcopy(self._storage_previous_charge) new_obj._action_storage = copy.deepcopy(self._action_storage) new_obj._amount_storage = copy.deepcopy(self._amount_storage) new_obj._amount_storage_prev = copy.deepcopy(self._amount_storage_prev) new_obj._storage_power = copy.deepcopy(self._storage_power) new_obj._storage_power_prev = copy.deepcopy(self._storage_power_prev) # curtailment new_obj._limit_curtailment = copy.deepcopy(self._limit_curtailment) new_obj._limit_curtailment_prev = copy.deepcopy(self._limit_curtailment_prev) new_obj._gen_before_curtailment = copy.deepcopy(self._gen_before_curtailment) new_obj._sum_curtailment_mw = copy.deepcopy(self._sum_curtailment_mw) new_obj._sum_curtailment_mw_prev = copy.deepcopy(self._sum_curtailment_mw_prev) new_obj._limited_before = copy.deepcopy(self._limited_before) # attention budget new_obj._has_attention_budget = self._has_attention_budget new_obj._attention_budget = copy.deepcopy(self._attention_budget) new_obj._attention_budget_cls = self._attention_budget_cls # const new_obj._is_alarm_illegal = copy.deepcopy(self._is_alarm_illegal) new_obj._is_alarm_used_in_reward = copy.deepcopy(self._is_alarm_used_in_reward) new_obj._kwargs_attention_budget = copy.deepcopy(self._kwargs_attention_budget) new_obj._last_obs = self._last_obs.copy() new_obj._has_just_been_seeded = self._has_just_been_seeded
[docs] def get_path_env(self): """ Get the path that allows to create this environment. It can be used for example in `grid2op.utils.underlying_statistics` to save the information directly inside the environment data. """ if self.__closed: raise EnvError("This environment is closed, you cannot get its path.") # return os.path.split(self._init_grid_path)[0] res = self._init_env_path if self._init_env_path is not None else "" return res
[docs] def load_alarm_data(self): """ Internal Notes ------ This is called when the environment class is not created, so i need to read the data of the grid from the backend. I cannot use "self.name_line" for example. This function update the backend INSTANCE. The backend class is then updated in the env._init_backend(...) function with a call to `self.backend.assert_grid_correct()` Returns ------- """ file_alerts = os.path.join(self.get_path_env(), BaseEnv.ALARM_FILE_NAME) if os.path.exists(file_alerts) and os.path.isfile(file_alerts): with open(file_alerts, mode="r", encoding="utf-8") as f: dict_alarm = json.load(f) key = "fixed" if key not in dict_alarm: raise EnvError( 'The key "fixed" should be present in the alarm data json, for now.' ) nb_areas = len(dict_alarm[key]) # need to be remembered line_names = { el: [] for el in self.backend.name_line } # need to be remembered area_names = sorted(dict_alarm[key].keys()) # need to be remembered area_lines = [[] for _ in range(nb_areas)] # need to be remembered for area_id, area_name in enumerate(area_names): # check that: all lines in files are in the grid area = dict_alarm[key][area_name] for line in area: if line not in line_names: raise EnvError( f"You provided a description of the area of the grid for the alarms, but a " f'line named "{line}" is present in your file but not in the grid. Please ' f"check the file {file_alerts} and make sure it contains only the line named " f"{sorted(self.backend.name_line)}." ) # update the list and dictionary that remembers everything line_names[line].append(area_name) area_lines[area_id].append(line) for line, li_area in line_names.items(): # check that all lines in the grid are in at least one area if not li_area: raise EnvError( f"Line (on the grid) named {line} is not in any areas. This is not supported at " f"the moment" ) # every check pass, i update the backend class bk_cls = type(self.backend) bk_cls.dim_alarms = nb_areas bk_cls.alarms_area_names = copy.deepcopy(area_names) bk_cls.alarms_lines_area = copy.deepcopy(line_names) bk_cls.alarms_area_lines = copy.deepcopy(area_lines)
@property def action_space(self) -> ActionSpace: """this represent a view on the action space""" return self._action_space @action_space.setter def action_space(self, other): raise EnvError( "Impossible to modify the action space of the environment. You probably want to modify " "the action with which the agent is interacting. You can do that with a converter, or " "using the GymEnv. Please consult the documentation." ) @property def observation_space(self) -> ObservationSpace: """this represent a view on the action space""" return self._observation_space @observation_space.setter def observation_space(self, other): raise EnvError( "Impossible to modify the observation space of the environment. You probably want to modify " "the observation with which the agent is interacting. You can do that with a converter, or " "using the GymEnv. Please consult the documentation." )
[docs] def change_parameters(self, new_parameters): """ Allows to change the parameters of an environment. Notes ------ This only affects the environment AFTER `env.reset()` has been called. This only affects the environment and NOT the forecast. Parameters ---------- new_parameters: :class:`grid2op.Parameters.Parameters` The new parameters you want the environment to get. Examples --------- You can use this function like: .. code-block:: python import grid2op from grid2op.Parameters import Parameters env_name = ... env = grid2op.make(env_name) env.parameters.NO_OVERFLOW_DISCONNECTION # -> False new_param = Parameters() new_param.A_MEMBER = A_VALUE # eg new_param.NO_OVERFLOW_DISCONNECTION = True env.change_parameters(new_param) obs = env.reset() env.parameters.NO_OVERFLOW_DISCONNECTION # -> True """ if self.__closed: raise EnvError( "This environment is closed, you cannot change its parameters." ) if not isinstance(new_parameters, Parameters): raise EnvError( 'The new parameters "new_parameters" should be an instance of ' "grid2op.Parameters.Parameters." ) new_parameters.check_valid() # check the provided parameters are valid self.__new_param = new_parameters
[docs] def change_forecast_parameters(self, new_parameters): """ Allows to change the parameters of a "forecast environment". Notes ------ This only affects the environment AFTER `env.reset()` has been called. This only affects the "forecast env" and NOT the env itself. Parameters ---------- new_parameters: :class:`grid2op.Parameters.Parameters` The new parameters you want the environment to get. """ if self.__closed: raise EnvError( "This environment is closed, you cannot change its parameters (for the forecast / simulate)." ) if not isinstance(new_parameters, Parameters): raise EnvError( 'The new parameters "new_parameters" should be an instance of ' "grid2op.Parameters.Parameters." ) new_parameters.check_valid() # check the provided parameters are valid self.__new_forecast_param = new_parameters
def _create_attention_budget(self): if not self.__is_init: raise EnvError( "Impossible to create an attention budget with a non initialized environment!" ) if self._has_attention_budget: self._attention_budget = self._attention_budget_cls() try: self._attention_budget.init( partial_env=self, **self._kwargs_attention_budget ) except TypeError as exc_: raise EnvError( "Impossible to create the attention budget with the provided argument. Please " 'change the content of the argument "kwargs_attention_budget".' ) from exc_ def _create_opponent(self): if not self.__is_init: raise EnvError( "Impossible to create an opponent with a non initialized environment!" ) if not issubclass(self._opponent_action_class, BaseAction): raise EnvError( "Impossible to make an environment with an opponent action class not derived from BaseAction" ) try: self._opponent_init_budget = dt_float(self._opponent_init_budget) except Exception as e: raise EnvError( 'Impossible to convert "opponent_init_budget" to a float with error {}'.format( e ) ) if self._opponent_init_budget < 0.0: raise EnvError( "If you want to deactivate the opponent, please don't set its budget to a negative number." 'Prefer the use of the DontAct action type ("opponent_action_class=DontAct" ' "and / or set its budget to 0." ) if not issubclass(self._opponent_class, BaseOpponent): raise EnvError( "Impossible to make an opponent with a type that does not inherit from BaseOpponent." ) self._opponent_action_class._add_shunt_data() self._opponent_action_class._update_value_set() self._opponent_action_space = self._helper_action_class( gridobj=type(self.backend), legal_action=AlwaysLegal, actionClass=self._opponent_action_class, ) self._compute_opp_budget = self._opponent_budget_class( self._opponent_action_space ) self._opponent = self._opponent_class(self._opponent_action_space) self._oppSpace = self._opponent_space_type( compute_budget=self._compute_opp_budget, init_budget=self._opponent_init_budget, attack_duration=self._opponent_attack_duration, attack_cooldown=self._opponent_attack_cooldown, budget_per_timestep=self._opponent_budget_per_ts, opponent=self._opponent, ) self._oppSpace.init_opponent(partial_env=self, **self._kwargs_opponent) self._oppSpace.reset() def _init_myclass(self): if self._backend_action_class is not None: # the class has already been initialized return # remember the original grid2op class type(self)._INIT_GRID_CLS = type(self) bk_type = type( self.backend ) # be careful here: you need to initialize from the class, and not from the object # create the proper environment class for this specific environment self.__class__ = type(self).init_grid(bk_type) def _has_been_initialized(self): # type of power flow to play # if True, then it will not disconnect lines above their thermal limits self._init_myclass() bk_type = type(self.backend) if np.min([self.n_line, self.n_gen, self.n_load, self.n_sub]) <= 0: raise EnvironmentError("Environment has not been initialized properly") self._backend_action_class = _BackendAction.init_grid(bk_type) self._backend_action = self._backend_action_class() # initialize maintenance / hazards self._time_next_maintenance = np.full(self.n_line, -1, dtype=dt_int) self._duration_next_maintenance = np.zeros(shape=(self.n_line,), dtype=dt_int) self._times_before_line_status_actionable = np.full( shape=(self.n_line,), fill_value=0, dtype=dt_int ) # create the vector to the proper shape self._target_dispatch = np.zeros(self.n_gen, dtype=dt_float) self._actual_dispatch = np.zeros(self.n_gen, dtype=dt_float) self._gen_uptime = np.zeros(self.n_gen, dtype=dt_int) self._gen_downtime = np.zeros(self.n_gen, dtype=dt_int) self._gen_activeprod_t = np.zeros(self.n_gen, dtype=dt_float) self._gen_activeprod_t_redisp = np.zeros(self.n_gen, dtype=dt_float) self._nb_timestep_overflow_allowed = np.ones(shape=self.n_line, dtype=dt_int) self._max_timestep_line_status_deactivated = ( self._parameters.NB_TIMESTEP_COOLDOWN_LINE ) self._times_before_line_status_actionable = np.zeros( shape=(self.n_line,), dtype=dt_int ) self._times_before_topology_actionable = np.zeros( shape=(self.n_sub,), dtype=dt_int ) self._nb_timestep_overflow_allowed = np.full( shape=(self.n_line,), fill_value=self._parameters.NB_TIMESTEP_OVERFLOW_ALLOWED, dtype=dt_int, ) self._timestep_overflow = np.zeros(shape=(self.n_line,), dtype=dt_int) # update the parameters self.__new_param = self._parameters # small hack to have it working as expected self._update_parameters() self._reset_redispatching() # storage self._storage_current_charge = np.zeros(self.n_storage, dtype=dt_float) self._storage_previous_charge = np.zeros(self.n_storage, dtype=dt_float) self._action_storage = np.zeros(self.n_storage, dtype=dt_float) self._storage_power = np.zeros(self.n_storage, dtype=dt_float) self._storage_power_prev = np.zeros(self.n_storage, dtype=dt_float) self._amount_storage = 0.0 self._amount_storage_prev = 0.0 # curtailment self._limit_curtailment = np.ones( self.n_gen, dtype=dt_float ) # in ratio of pmax self._limit_curtailment_prev = np.ones( self.n_gen, dtype=dt_float ) # in ratio of pmax self._gen_before_curtailment = np.zeros(self.n_gen, dtype=dt_float) # in MW self._sum_curtailment_mw = dt_float(0.0) self._sum_curtailment_mw_prev = dt_float(0.0) self._reset_curtailment() # register this is properly initialized self.__is_init = True def _update_parameters(self): """update value for the new parameters""" self._parameters = self.__new_param self._ignore_min_up_down_times = self._parameters.IGNORE_MIN_UP_DOWN_TIME self._forbid_dispatch_off = not self._parameters.ALLOW_DISPATCH_GEN_SWITCH_OFF # type of power flow to play # if True, then it will not disconnect lines above their thermal limits self._no_overflow_disconnection = self._parameters.NO_OVERFLOW_DISCONNECTION self._hard_overflow_threshold = self._parameters.HARD_OVERFLOW_THRESHOLD # store actions "cooldown" self._max_timestep_line_status_deactivated = ( self._parameters.NB_TIMESTEP_COOLDOWN_LINE ) self._max_timestep_topology_deactivated = ( self._parameters.NB_TIMESTEP_COOLDOWN_SUB ) self._nb_ts_reco = self._parameters.NB_TIMESTEP_RECONNECTION self._nb_timestep_overflow_allowed[ : ] = self._parameters.NB_TIMESTEP_OVERFLOW_ALLOWED # hard overflow part self._env_dc = self._parameters.ENV_DC self.__new_param = None
[docs] def reset(self): """ Reset the base environment (set the appropriate variables to correct initialization). It is (and must be) overloaded in other :class:`grid2op.Environment` """ if self.__closed: raise EnvError("This environment is closed. You cannot use it anymore.") self.__is_init = True # current = None is an indicator that this is the first step of the environment # so don't change the setting of current_obs = None unless you are willing to change that self.current_obs = None self._line_status[:] = True if self.__new_param is not None: self._update_parameters() # reset __new_param to None too if self.__new_forecast_param is not None: self._observation_space._change_parameters(self.__new_forecast_param) self.__new_forecast_param = None if self.__new_reward_func is not None: self._reward_helper.change_reward(self.__new_reward_func) self._reward_helper.initialize(self) self.reward_range = self._reward_helper.range() # change also the reward used in simulate self._observation_space.change_reward(self._reward_helper.template_reward) self.__new_reward_func = None self._last_obs = None # seeds (so that next episode does not depend on what happened in previous episode) if self.seed_used is not None and not self._has_just_been_seeded: self.seed(None, _seed_me=False) self._reset_storage() self._reset_curtailment() self._has_just_been_seeded = False
def _reset_storage(self): """reset storage capacity at the beginning of new environment if needed""" if self.n_storage > 0: tmp = self._parameters.INIT_STORAGE_CAPACITY * self.storage_Emax if self._parameters.ACTIVATE_STORAGE_LOSS: tmp += self.storage_loss * self.delta_time_seconds / 3600.0 self._storage_previous_charge[ : ] = tmp # might not be needed, but it's not for the time it takes... self._storage_current_charge[:] = tmp self._storage_power[:] = 0.0 self._storage_power_prev[:] = 0.0 self._amount_storage = 0.0 self._amount_storage_prev = 0.0 # TODO storage: check in simulate too! def _reset_curtailment(self): self._limit_curtailment[self.gen_renewable] = 1.0 self._limit_curtailment_prev[self.gen_renewable] = 1.0 self._gen_before_curtailment[:] = 0.0 self._sum_curtailment_mw = dt_float(0.0) self._sum_curtailment_mw_prev = dt_float(0.0) self._limited_before = dt_float(0.0)
[docs] def seed(self, seed=None, _seed_me=True): """ Set the seed of this :class:`Environment` for a better control and to ease reproducible experiments. Parameters ---------- seed: ``int`` The seed to set. _seed_me: ``bool`` Whether to seed this instance or just the other things. Used internally only. Returns --------- seed: ``tuple`` The seed used to set the prng (pseudo random number generator) for the environment seed_chron: ``tuple`` The seed used to set the prng for the chronics_handler (if any), otherwise ``None`` seed_obs: ``tuple`` The seed used to set the prng for the observation space (if any), otherwise ``None`` seed_action_space: ``tuple`` The seed used to set the prng for the action space (if any), otherwise ``None`` seed_env_modif: ``tuple`` The seed used to set the prng for the modification of th environment (if any otherwise ``None``) seed_volt_cont: ``tuple`` The seed used to set the prng for voltage controler (if any otherwise ``None``) seed_opponent: ``tuple`` The seed used to set the prng for the opponent (if any otherwise ``None``) Examples --------- Seeding an environment should be done with: .. code-block:: python import grid2op env = grid2op.make() env.seed(0) obs = env.reset() As long as the environment instance (variable `env` in the above code) is not `reset` the `env.seed` has no real effect (but can have side effect). For a full control on the seed mechanism it is more than advised to reset it after it has been seeded. """ if self.__closed: raise EnvError("This environment is closed. You cannot use it anymore.") seed_init = None seed_chron = None seed_obs = None seed_action_space = None seed_env_modif = None seed_volt_cont = None seed_opponent = None if _seed_me: max_int = np.iinfo(dt_int).max if seed > max_int: raise Grid2OpException("Seed is too big. Max value is {}, provided value is {}".format(max_int, seed)) try: seed = np.array(seed).astype(dt_int) except Exception as exc_: raise Grid2OpException( "Impossible to seed with the seed provided. Make sure it can be converted to a" "numpy 32 bits integer." ) # example from gym # self.np_random, seed = seeding.np_random(seed) # inspiration from @ seed_init = seed super().seed(seed_init) max_seed = np.iinfo(dt_int).max # 2**32 - 1 if self.chronics_handler is not None: seed = self.space_prng.randint(max_seed) seed_chron = self.chronics_handler.seed(seed) if self._observation_space is not None: seed = self.space_prng.randint(max_seed) seed_obs = self._observation_space.seed(seed) if self._action_space is not None: seed = self.space_prng.randint(max_seed) seed_action_space = self._action_space.seed(seed) if self._helper_action_env is not None: seed = self.space_prng.randint(max_seed) seed_env_modif = self._helper_action_env.seed(seed) if self._voltage_controler is not None: seed = self.space_prng.randint(max_seed) seed_volt_cont = self._voltage_controler.seed(seed) if self._opponent is not None: seed = self.space_prng.randint(max_seed) seed_opponent = self._opponent.seed(seed) self._has_just_been_seeded = True return ( seed_init, seed_chron, seed_obs, seed_action_space, seed_env_modif, seed_volt_cont, seed_opponent, )
[docs] def deactivate_forecast(self): """ This function will have the effect to deactivate the `obs.simulate`, the forecast will not be updated in the observation space. This will most likely lead to some performance increase (~10-15% faster) if you don't use the `obs.simulate` function. Notes ------ If you really don't want to use the `obs.simulate` functionality, you should rather disable it at the creation of the environment. For example, if you use the recommended `make` function, you can pass an argument that will ignore the chronics even when reading it (using `GridStateFromFile` instead of `GridStateFromFileWithForecast` for example) this would give something like: .. code-block:: python import grid2op from grid2op.Chronics import GridStateFromFile # tell grid2op not to read the "forecast" env = grid2op.make("rte_case14_realistic", data_feeding_kwargs={"gridvalueClass": GridStateFromFile}) do_nothing_action = env.action_space() # improve speed ups to not even try to use forecast env.deactivate_forecast() # this is normal behavior obs = env.reset() # but this will make the programm stop working # obs.simulate(do_nothing_action) # DO NOT RUN IT RAISES AN ERROR """ if self.__closed: raise EnvError("This environment is closed, you cannot use it.") if self._observation_space is not None: self._observation_space.with_forecast = False self.with_forecast = False
[docs] def reactivate_forecast(self): """ This function will have the effect to reactivate the `obs.simulate`, the forecast will be updated in the observation space. This will most likely lead to some performance decrease but you will be able to use `obs.simulate` function. .. warning:: Forecast are deactivated by default (and cannot be reactivated) if the backend cannot be copied. Notes ------ You can use this function as followed: .. code-block:: python import grid2op from grid2op.Chronics import GridStateFromFile # tell grid2op not to read the "forecast" env = grid2op.make("rte_case14_realistic", data_feeding_kwargs={"gridvalueClass": GridStateFromFile}) do_nothing_action = env.action_space() # improve speed ups to not even try to use forecast env.deactivate_forecast() # this is normal behavior obs = env.reset() # but this will make the programm stop working # obs.simulate(do_nothing_action) # DO NOT RUN IT RAISES AN ERROR env.reactivate_forecast() obs, reward, done, info = env.step(do_nothing_action) # and now forecast are available again simobs, sim_r, sim_d, sim_info = obs.simulate(do_nothing_action) """ if self.__closed: raise EnvError("This environment is closed, you cannot use it.") if not self.backend._can_be_copied: raise EnvError("Impossible to activate the forecasts with a " "backend that cannot be copied.") if self._observation_space is not None: self._observation_space.with_forecast = True self.with_forecast = True
@abstractmethod def _init_backend( self, chronics_handler, backend, names_chronics_to_backend, actionClass, observationClass, rewardClass, legalActClass, ): """ INTERNAL .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ This method is used for Environment specific implementation. Only use it if you know exactly what you are doing. """ pass
[docs] def set_thermal_limit(self, thermal_limit): """ Set the thermal limit effectively. Parameters ---------- thermal_limit: ``numpy.ndarray`` The new thermal limit. It must be a numpy ndarray vector (or convertible to it). For each powerline it gives the new thermal limit. Alternatively, this can be a dictionary mapping the line names (keys) to its thermal limits (values). In that case, all thermal limits for all powerlines should be specified (this is a safety measure to reduce the odds of misuse). Examples --------- This function can be used like this: .. code-block:: python import grid2op # I create an environment env = grid2op.make("rte_case5_example", test=True) # i set the thermal limit of each powerline to 20000 amps env.set_thermal_limit([20000 for _ in range(env.n_line)]) Notes ----- As of grid2op > 1.5.0, it is possible to set the thermal limit by using a dictionary with the keys being the name of the powerline and the values the thermal limits. """ if self.__closed: raise EnvError("This environment is closed, you cannot use it.") if not self.__is_init: raise Grid2OpException( "Impossible to set the thermal limit to a non initialized Environment" ) if isinstance(thermal_limit, dict): tmp = np.full(self.n_line, fill_value=np.NaN, dtype=dt_float) for key, val in thermal_limit.items(): if key not in self.name_line: raise Grid2OpException( f"When setting a thermal limit with a dictionary, the keys should be line " f"names. We found: {key} which is not a line name. The names of the " f"powerlines are {self.name_line}" ) ind_line = np.where(self.name_line == key)[0][0] if np.isfinite(tmp[ind_line]): raise Grid2OpException( f"Humm, there is a really strange bug, some lines are set twice." ) try: val_fl = float(val) except Exception as exc_: raise Grid2OpException( f"When setting thermal limit with a dictionary, the keys should be " f"the values of the thermal limit (in amps) you provided something that " f'cannot be converted to a float. Error was "{exc_}".' ) tmp[ind_line] = val_fl elif isinstance(thermal_limit, (np.ndarray, list)): try: tmp = np.array(thermal_limit).flatten().astype(dt_float) except Exception as exc_: raise Grid2OpException( f"Impossible to convert the vector as input into a 1d numpy float array. " f"Error was: \n {exc_}" ) if tmp.shape[0] != self.n_line: raise Grid2OpException( "Attempt to set thermal limit on {} powerlines while there are {}" "on the grid".format(tmp.shape[0], self.n_line) ) if np.any(~np.isfinite(tmp)): raise Grid2OpException( "Impossible to use non finite value for thermal limits." ) else: raise Grid2OpException( f"You can only set the thermal limits of the environment with a dictionary (in that " f"case the keys are the line names, and the values the thermal limits) or with " f"a numpy array that has as many components of the number of powerlines on " f'the grid. You provided something with type "{type(thermal_limit)}" which ' f"is not supported." ) self._thermal_limit_a = tmp self.backend.set_thermal_limit(self._thermal_limit_a)
def _reset_redispatching(self): # redispatching self._target_dispatch[:] = 0.0 self._actual_dispatch[:] = 0.0 self._gen_uptime[:] = 0 self._gen_downtime[:] = 0 self._gen_activeprod_t[:] = 0.0 self._gen_activeprod_t_redisp[:] = 0.0 def _get_new_prod_setpoint(self, action): """ NB this is overidden in _ObsEnv where the data are read from the action to set this environment instead """ # get the modification of generator active setpoint from the action new_p = 1.0 * self._gen_activeprod_t if "prod_p" in action._dict_inj: tmp = action._dict_inj["prod_p"] indx_ok = np.isfinite(tmp) new_p[indx_ok] = tmp[indx_ok] # modification of the environment always override the modification of the agents (if any) # TODO have a flag there if this is the case. if "prod_p" in self._env_modification._dict_inj: # modification of the production setpoint value tmp = self._env_modification._dict_inj["prod_p"] indx_ok = np.isfinite(tmp) new_p[indx_ok] = tmp[indx_ok] return new_p def _get_already_modified_gen(self, action): redisp_act_orig = 1.0 * action._redispatch already_modified_gen = self._target_dispatch != 0.0 self._target_dispatch[already_modified_gen] += redisp_act_orig[ already_modified_gen ] first_modified = (~already_modified_gen) & (redisp_act_orig != 0) self._target_dispatch[first_modified] = ( self._actual_dispatch[first_modified] + redisp_act_orig[first_modified] ) already_modified_gen |= first_modified return already_modified_gen def _prepare_redisp(self, action, new_p, already_modified_gen): # trying with an optimization method except_ = None info_ = [] valid = True # get the redispatching action (if any) redisp_act_orig = 1.0 * action._redispatch if ( np.all(redisp_act_orig == 0.0) and np.all(self._target_dispatch == 0.0) and np.all(self._actual_dispatch == 0.0) ): return valid, except_, info_ # check that everything is consistent with pmin, pmax: if np.any(self._target_dispatch > self.gen_pmax - self.gen_pmin): # action is invalid, the target redispatching would be above pmax for at least a generator cond_invalid = self._target_dispatch > self.gen_pmax - self.gen_pmin except_ = InvalidRedispatching( "You cannot ask for a dispatch higher than pmax - pmin [it would be always " "invalid because, even if the sepoint is pmin, this dispatch would set it " "to a number higher than pmax, which is impossible]. Invalid dispatch for " "generator(s): " "{}".format(np.where(cond_invalid)[0]) ) self._target_dispatch -= redisp_act_orig return valid, except_, info_ if np.any(self._target_dispatch < self.gen_pmin - self.gen_pmax): # action is invalid, the target redispatching would be below pmin for at least a generator cond_invalid = self._target_dispatch < self.gen_pmin - self.gen_pmax except_ = InvalidRedispatching( "You cannot ask for a dispatch lower than pmin - pmax [it would be always " "invalid because, even if the sepoint is pmax, this dispatch would set it " "to a number bellow pmin, which is impossible]. Invalid dispatch for " "generator(s): " "{}".format(np.where(cond_invalid)[0]) ) self._target_dispatch -= redisp_act_orig return valid, except_, info_ # i can't redispatch turned off generators [turned off generators need to be turned on before redispatching] if np.any(redisp_act_orig[new_p == 0.0]) and self._forbid_dispatch_off: # action is invalid, a generator has been redispatched, but it's turned off except_ = InvalidRedispatching( "Impossible to dispatch a turned off generator" ) self._target_dispatch -= redisp_act_orig return valid, except_, info_ if self._forbid_dispatch_off is True: redisp_act_orig_cut = 1.0 * redisp_act_orig redisp_act_orig_cut[new_p == 0.0] = 0.0 if np.any(redisp_act_orig_cut != redisp_act_orig): info_.append( { "INFO: redispatching cut because generator will be turned_off": np.where( redisp_act_orig_cut != redisp_act_orig )[ 0 ] } ) return valid, except_, info_ def _make_redisp(self, already_modified_gen, new_p): """this computes the redispaching vector, taking into account the storage units""" except_ = None valid = True mismatch = self._actual_dispatch - self._target_dispatch mismatch = np.abs(mismatch) if ( np.abs(np.sum(self._actual_dispatch)) >= self._tol_poly or np.max(mismatch) >= self._tol_poly or np.abs(self._amount_storage) >= self._tol_poly or np.abs(self._sum_curtailment_mw) >= self._tol_poly ): except_ = self._compute_dispatch_vect(already_modified_gen, new_p) valid = except_ is None return valid, except_ def _compute_dispatch_vect(self, already_modified_gen, new_p): except_ = None # first i define the participating generators # these are the generators that will be adjusted for redispatching gen_participating = ( (new_p > 0.0) | (self._actual_dispatch != 0.0) | (self._target_dispatch != self._actual_dispatch) ) gen_participating[~self.gen_redispatchable] = False incr_in_chronics = new_p - ( self._gen_activeprod_t_redisp - self._actual_dispatch ) # check if the constraints are violated ## total available "juice" to go down (incl ramp and pmin / pmax) p_min_down = ( self.gen_pmin[gen_participating] - self._gen_activeprod_t_redisp[gen_participating] ) avail_down = np.maximum(p_min_down, -self.gen_max_ramp_down[gen_participating]) ## total available "juice" to go up (incl. ramp and pmin / pmax) p_max_up = ( self.gen_pmax[gen_participating] - self._gen_activeprod_t_redisp[gen_participating] ) avail_up = np.minimum(p_max_up, self.gen_max_ramp_up[gen_participating]) except_ = self._detect_infeasible_dispatch( incr_in_chronics[gen_participating], avail_down, avail_up ) if except_ is not None: # try to force the turn on of turned off generators (if parameters allow it) if ( self._parameters.IGNORE_MIN_UP_DOWN_TIME and self._parameters.ALLOW_DISPATCH_GEN_SWITCH_OFF ): gen_participating_tmp = self.gen_redispatchable p_min_down_tmp = ( self.gen_pmin[gen_participating_tmp] - self._gen_activeprod_t_redisp[gen_participating_tmp] ) avail_down_tmp = np.maximum( p_min_down_tmp, -self.gen_max_ramp_down[gen_participating_tmp] ) p_max_up_tmp = ( self.gen_pmax[gen_participating_tmp] - self._gen_activeprod_t_redisp[gen_participating_tmp] ) avail_up_tmp = np.minimum( p_max_up_tmp, self.gen_max_ramp_up[gen_participating_tmp] ) except_tmp = self._detect_infeasible_dispatch( incr_in_chronics[gen_participating_tmp], avail_down_tmp, avail_up_tmp, ) if except_tmp is None: # I can "save" the situation by turning on all generators, I do it # TODO logger here gen_participating = gen_participating_tmp except_ = None else: return except_tmp else: return except_ # define the objective value target_vals = ( self._target_dispatch[gen_participating] - self._actual_dispatch[gen_participating] ) already_modified_gen_me = already_modified_gen[gen_participating] target_vals_me = target_vals[already_modified_gen_me] nb_dispatchable = np.sum(gen_participating) tmp_zeros = np.zeros((1, nb_dispatchable), dtype=dt_float) coeffs = 1.0 / ( self.gen_max_ramp_up + self.gen_max_ramp_down + self._epsilon_poly ) weights = np.ones(nb_dispatchable) * coeffs[gen_participating] weights /= weights.sum() if target_vals_me.shape[0] == 0: # no dispatch means all dispatchable, otherwise i will never get to 0 already_modified_gen_me[:] = True target_vals_me = target_vals[already_modified_gen_me] # for numeric stability # to scale the input also: # see scale_x = max(np.max(np.abs(self._actual_dispatch)), 1.0) scale_x = dt_float(scale_x) target_vals_me_optim = 1.0 * (target_vals_me / scale_x) target_vals_me_optim = target_vals_me_optim.astype(dt_float) # see # where they advised to scale the function scale_objective = max(0.5 * np.sum(np.abs(target_vals_me_optim)) ** 2, 1.0) scale_objective = np.round(scale_objective, decimals=4) scale_objective = dt_float(scale_objective) # add the "sum to 0" mat_sum_0_no_turn_on = np.ones((1, nb_dispatchable), dtype=dt_float) # this is where the storage is taken into account # storages are "load convention" this means that i need to sum the amount of production to sum of storage # hence the "+ self._amount_storage" below # self._sum_curtailment_mw is "generator convention" hence the "-" there const_sum_0_no_turn_on = ( np.zeros(1, dtype=dt_float) + self._amount_storage - self._sum_curtailment_mw ) # gen increase in the chronics new_p_th = new_p[gen_participating] + self._actual_dispatch[gen_participating] # minimum value available for disp ## first limit delta because of pmin p_min_const = self.gen_pmin[gen_participating] - new_p_th ## second limit delta because of ramps ramp_down_const = ( -self.gen_max_ramp_down[gen_participating] - incr_in_chronics[gen_participating] ) ## take max of the 2 min_disp = np.maximum(p_min_const, ramp_down_const) min_disp = min_disp.astype(dt_float) # maximum value available for disp ## first limit delta because of pmin p_max_const = self.gen_pmax[gen_participating] - new_p_th ## second limit delta because of ramps ramp_up_const = ( self.gen_max_ramp_up[gen_participating] - incr_in_chronics[gen_participating] ) ## take min of the 2 max_disp = np.minimum(p_max_const, ramp_up_const) max_disp = max_disp.astype(dt_float) # add everything into a linear constraint object # equality added = 0.5 * self._epsilon_poly equality_const = LinearConstraint( mat_sum_0_no_turn_on, # do the sum (const_sum_0_no_turn_on) / scale_x, # lower bound (const_sum_0_no_turn_on) / scale_x, # upper bound ) mat_pmin_max_ramps = np.eye(nb_dispatchable) ineq_const = LinearConstraint( mat_pmin_max_ramps, (min_disp - added) / scale_x, (max_disp + added) / scale_x, ) # choose a good initial point (close to the solution) # the idea here is to chose a initial point that would be close to the # desired solution (split the (sum of the) dispatch to the available generators) x0 = np.zeros(np.sum(gen_participating)) if np.any(self._target_dispatch != 0.) or np.any(already_modified_gen): gen_for_x0 = self._target_dispatch[gen_participating] != 0. gen_for_x0 |= already_modified_gen[gen_participating] x0[gen_for_x0] = ( self._target_dispatch[gen_participating][gen_for_x0] - self._actual_dispatch[gen_participating][gen_for_x0] ) / scale_x # at this point x0 is made of the difference between the target and the # actual dispatch for all generators that have a # target dispatch non 0. # in this "if" block I set the other component of x0 to # their "right" value can_adjust = (x0 == 0.0) if np.any(can_adjust): init_sum = np.sum(x0) denom_adjust = np.sum(1.0 / weights[can_adjust]) if denom_adjust <= 1e-2: # i don't want to divide by something too cloose to 0. denom_adjust = 1.0 x0[can_adjust] = -init_sum / (weights[can_adjust] * denom_adjust) else: # to "force" the exact reset to 0.0 for all components x0 -= self._actual_dispatch[gen_participating] / scale_x def target(actual_dispatchable): # define my real objective quad_ = ( actual_dispatchable[already_modified_gen_me] - target_vals_me_optim ) ** 2 coeffs_quads = weights[already_modified_gen_me] * quad_ coeffs_quads_const = coeffs_quads.sum() coeffs_quads_const /= scale_objective # scaling the function return coeffs_quads_const def jac(actual_dispatchable): res_jac = 1.0 * tmp_zeros res_jac[0, already_modified_gen_me] = ( 2.0 * weights[already_modified_gen_me] * (actual_dispatchable[already_modified_gen_me] - target_vals_me_optim) ) res_jac /= scale_objective # scaling the function return res_jac # objective function def f(init): this_res = minimize( target, init, method="SLSQP", constraints=[equality_const, ineq_const], options={ "eps": max(self._epsilon_poly / scale_x, 1e-6), "ftol": max(self._epsilon_poly / scale_x, 1e-6), "disp": False, }, jac=jac # hess=hess # not used for SLSQP ) return this_res res = f(x0) if res.success: self._actual_dispatch[gen_participating] += res.x * scale_x else: # check if constraints are "approximately" met mat_const = np.concatenate((mat_sum_0_no_turn_on, mat_pmin_max_ramps)) downs = np.concatenate( (const_sum_0_no_turn_on / scale_x, (min_disp - added) / scale_x) ) ups = np.concatenate( (const_sum_0_no_turn_on / scale_x, (max_disp + added) / scale_x) ) vals = np.matmul(mat_const, res.x) ok_down = np.all( vals - downs >= -self._tol_poly ) # i don't violate "down" constraints ok_up = np.all(vals - ups <= self._tol_poly) if ok_up and ok_down: # it's ok i can tolerate "small" perturbations self._actual_dispatch[gen_participating] += res.x * scale_x else: # TODO try with another method here, maybe error_dispatch = ( "Redispatching automaton terminated with error (no more information available " 'at this point):\n"{}"'.format(res.message) ) except_ = InvalidRedispatching(error_dispatch) return except_ def _detect_infeasible_dispatch(self, incr_in_chronics, avail_down, avail_up): """This function is an attempt to give more detailed log by detecting infeasible dispatch""" except_ = None sum_move = ( np.sum(incr_in_chronics) + self._amount_storage - self._sum_curtailment_mw ) avail_down_sum = np.sum(avail_down) avail_up_sum = np.sum(avail_up) gen_setpoint = self._gen_activeprod_t_redisp[self.gen_redispatchable] if sum_move > avail_up_sum: # infeasible because too much is asked msg = DETAILED_REDISP_ERR_MSG.format( sum_move=sum_move, avail_up_sum=avail_up_sum, gen_setpoint=np.round(gen_setpoint, decimals=2), ramp_up=self.gen_max_ramp_up[self.gen_redispatchable], gen_pmax=self.gen_pmax[self.gen_redispatchable], avail_up=np.round(avail_up, decimals=2), increase="increase", decrease="decrease", maximum="maximum", pmax="pmax", max_ramp_up="max_ramp_up", ) except_ = InvalidRedispatching(msg) elif sum_move < avail_down_sum: # infeasible because not enough is asked msg = DETAILED_REDISP_ERR_MSG.format( sum_move=sum_move, avail_up_sum=avail_down_sum, gen_setpoint=np.round(gen_setpoint, decimals=2), ramp_up=self.gen_max_ramp_down[self.gen_redispatchable], gen_pmax=self.gen_pmin[self.gen_redispatchable], avail_up=np.round(avail_up, decimals=2), increase="decrease", decrease="increase", maximum="minimum", pmax="pmin", max_ramp_up="max_ramp_down", ) except_ = InvalidRedispatching(msg) return except_ def _update_actions(self): """ INTERNAL .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ Retrieve the actions to perform the update of the underlying powergrid represented by the :class:`grid2op.Backend`in the next time step. A call to this function will also read the next state of :attr:`chronics_handler`, so it must be called only once per time step. Returns -------- res: :class:`grid2op.Action.Action` The action representing the modification of the powergrid induced by the Backend. """ ( timestamp, tmp, maintenance_time, maintenance_duration, hazard_duration, prod_v, ) = self.chronics_handler.next_time_step() if "injection" in tmp: self._injection = tmp["injection"] else: self._injection = None if "maintenance" in tmp: self._maintenance = tmp["maintenance"] else: self._maintenance = None if "hazards" in tmp: self._hazards = tmp["hazards"] else: self._hazards = None self.time_stamp = timestamp self._duration_next_maintenance = maintenance_duration self._time_next_maintenance = maintenance_time self._hazard_duration = hazard_duration act = self._helper_action_env( { "injection": self._injection, "maintenance": self._maintenance, "hazards": self._hazards, } ) return act, prod_v def _update_time_reconnection_hazards_maintenance(self): """ INTERNAL .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ This supposes that :attr:`Environment.times_before_line_status_actionable` is already updated with the cascading failure, soft overflow and hard overflow. It also supposes that :func:`Environment._update_actions` has been called, so that the vectors :attr:`Environment.duration_next_maintenance`, :attr:`Environment._time_next_maintenance` and :attr:`Environment._hazard_duration` are updated with the most recent values. Finally the Environment supposes that this method is called before calling :func:`Environment.get_obs` This function integrates the hazards and maintenance in the :attr:`Environment.times_before_line_status_actionable` vector. For example, if a powerline `i` has no problem of overflow, but is affected by a hazard, :attr:`Environment.times_before_line_status_actionable` should be updated with the duration of this hazard (stored in one of the three vector mentionned in the above paragraph) For this Environment, we suppose that the maximum of the 3 values are taken into account. The reality would be more complicated. """ first_time_maintenance = self._time_next_maintenance == 0 self._times_before_line_status_actionable[first_time_maintenance] = np.maximum( self._times_before_line_status_actionable[first_time_maintenance], self._duration_next_maintenance[first_time_maintenance], ) self._times_before_line_status_actionable[:] = np.maximum( self._times_before_line_status_actionable, self._hazard_duration ) def _voltage_control(self, agent_action, prod_v_chronics): """ INTERNAL .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ Update the environment action "action_env" given a possibly new voltage setpoint for the generators. This function can be overide for a more complex handling of the voltages. It must update (if needed) the voltages of the environment action :attr:`BaseEnv.env_modification` Parameters ---------- agent_action: :class:`grid2op.Action.Action` The action performed by the player (or do nothing is player action were not legal or ambiguous) prod_v_chronics: ``numpy.ndarray`` or ``None`` The voltages that has been specified in the chronics """ res = self._helper_action_env() if prod_v_chronics is not None: res.update({"injection": {"prod_v": prod_v_chronics}}) return res def _handle_updown_times(self, gen_up_before, redisp_act): """ INTERNAL .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ Handles the up and down tims for the generators. """ # get the generators that are not connected after the action except_ = None # computes which generator will be turned on after the action gen_up_after = 1.0 * self._gen_activeprod_t if "prod_p" in self._env_modification._dict_inj: tmp = self._env_modification._dict_inj["prod_p"] indx_ok = np.isfinite(tmp) gen_up_after[indx_ok] = self._env_modification._dict_inj["prod_p"][indx_ok] gen_up_after += redisp_act gen_up_after = gen_up_after > 0.0 # update min down time, min up time etc. gen_disconnected_this = gen_up_before & (~gen_up_after) gen_connected_this_timestep = (~gen_up_before) & (gen_up_after) gen_still_connected = gen_up_before & gen_up_after gen_still_disconnected = (~gen_up_before) & (~gen_up_after) if ( np.any( self._gen_downtime[gen_connected_this_timestep] < self.gen_min_downtime[gen_connected_this_timestep] ) and not self._ignore_min_up_down_times ): # i reconnected a generator before the minimum time allowed id_gen = ( self._gen_downtime[gen_connected_this_timestep] < self.gen_min_downtime[gen_connected_this_timestep] ) id_gen = np.where(id_gen)[0] id_gen = np.where(gen_connected_this_timestep[id_gen])[0] except_ = GeneratorTurnedOnTooSoon( "Some generator has been connected too early ({})".format(id_gen) ) return except_ else: self._gen_downtime[gen_connected_this_timestep] = -1 self._gen_uptime[gen_connected_this_timestep] = 1 if ( np.any( self._gen_uptime[gen_disconnected_this] < self.gen_min_uptime[gen_disconnected_this] ) and not self._ignore_min_up_down_times ): # i disconnected a generator before the minimum time allowed id_gen = ( self._gen_uptime[gen_disconnected_this] < self.gen_min_uptime[gen_disconnected_this] ) id_gen = np.where(id_gen)[0] id_gen = np.where(gen_connected_this_timestep[id_gen])[0] except_ = GeneratorTurnedOffTooSoon( "Some generator has been disconnected too early ({})".format(id_gen) ) return except_ else: self._gen_downtime[gen_connected_this_timestep] = 0 self._gen_uptime[gen_connected_this_timestep] = 1 self._gen_uptime[gen_still_connected] += 1 self._gen_downtime[gen_still_disconnected] += 1 return except_
[docs] def get_obs(self, _update_state=True): """ Return the observations of the current environment made by the :class:`grid2op.BaseAgent.BaseAgent`. .. note:: This function is called twice when the env is reset, otherwise once per step Returns ------- res: :class:`grid2op.Observation.Observation` The current BaseObservation given to the :class:`grid2op.BaseAgent.BaseAgent` / bot / controler. Examples --------- This function can be use at any moment, even if the actual observation is not present. .. code-block:: python import grid2op # I create an environment env = grid2op.make() obs = env.reset() # have a big piece of code obs2 = env.get_obs() # obs2 and obs are identical. """ if self.__closed: raise EnvError("This environment is closed. You cannot use it anymore.") if not self.__is_init: raise EnvError( "This environment is not initialized. You cannot retrieve its observation." ) if self._last_obs is None: self._last_obs = self._observation_space( env=self, _update_state=_update_state ) return self._last_obs.copy()
[docs] def get_thermal_limit(self): """ Get the current thermal limit in amps registered for the environment. Examples --------- It can be used like this: .. code-block:: python import grid2op # I create an environment env = grid2op.make() thermal_limits = env.get_thermal_limit() """ if self.__closed: raise EnvError("This environment is closed, you cannot use it.") return 1.0 * self._thermal_limit_a
def _withdraw_storage_losses(self): """ empty the energy in the storage units depending on the `storage_loss` NB this is a loss, this is not seen grid side, so `storage_discharging_efficiency` has no impact on this """ # NB this should be done AFTER the computation of self._amount_storage, because this energy is dissipated # in the storage units, thus NOT seen as power from the grid. if self._parameters.ACTIVATE_STORAGE_LOSS: tmp_ = self.storage_loss * self.delta_time_seconds / 3600.0 self._storage_current_charge -= tmp_ # charge cannot be negative, but it can be below Emin if there are some uncompensated losses self._storage_current_charge[:] = np.maximum( self._storage_current_charge, 0.0 ) def _aux_remove_power_too_high(self, delta_, indx_too_high): """ delta_ is given in energy (and NOT power) handles self._storage_power in case we need to cut the storage action because the power would be too high """ coeff_p_to_E = ( self.delta_time_seconds / 3600.0 ) # TODO optim this is const for all time steps tmp_ = 1.0 / coeff_p_to_E * delta_ if self._parameters.ACTIVATE_STORAGE_LOSS: # from the storage i need to reduce of tmp_ MW (to compensate the delta_ MWh) # but when it's "transfer" to the grid i don't have the same amount (due to inefficiencies) # it's a "/" because i need more energy from the grid than what the actual charge will be tmp_ /= self.storage_charging_efficiency[indx_too_high] self._storage_power[indx_too_high] -= tmp_ def _aux_remove_power_too_low(self, delta_, indx_too_low): """ delta_ is given in energy (and NOT power) handles self._storage_power in case we need to cut the storage action because the power would be too low """ coeff_p_to_E = ( self.delta_time_seconds / 3600.0 ) # TODO optim this is const for all time steps tmp_ = 1.0 / coeff_p_to_E * delta_ if self._parameters.ACTIVATE_STORAGE_LOSS: # from the storage i need to increase of tmp_ MW (to compensate the delta_ MWh) # but when it's "transfer" to the grid i don't have the same amount (due to inefficiencies) # it's a "*" because i have less power on the grid than what is removed from the battery tmp_ *= self.storage_discharging_efficiency[indx_too_low] self._storage_power[indx_too_low] -= tmp_ def _compute_storage(self, action_storage_power): self._storage_previous_charge[:] = self._storage_current_charge storage_act = np.isfinite(action_storage_power) & (action_storage_power != 0.0) self._action_storage[:] = 0.0 self._storage_power[:] = 0.0 modif = False coeff_p_to_E = ( self.delta_time_seconds / 3600.0 ) # TODO optim this is const for all time steps if np.any(storage_act): modif = True this_act_stor = action_storage_power[storage_act] eff_ = np.ones(np.sum(storage_act)) if self._parameters.ACTIVATE_STORAGE_LOSS: fill_storage = ( this_act_stor > 0.0 ) # index of storages that sees their charge increasing unfill_storage = ( this_act_stor < 0.0 ) # index of storages that sees their charge decreasing eff_[fill_storage] *= self.storage_charging_efficiency[storage_act][ fill_storage ] eff_[unfill_storage] /= self.storage_discharging_efficiency[ storage_act ][unfill_storage] self._storage_current_charge[storage_act] += ( this_act_stor * coeff_p_to_E * eff_ ) self._action_storage[storage_act] += action_storage_power[storage_act] self._storage_power[storage_act] = this_act_stor if modif: # indx when there is too much energy on the battery indx_too_high = self._storage_current_charge > self.storage_Emax if np.any(indx_too_high): delta_ = ( self._storage_current_charge[indx_too_high] - self.storage_Emax[indx_too_high] ) self._aux_remove_power_too_high(delta_, indx_too_high) self._storage_current_charge[indx_too_high] = self.storage_Emax[ indx_too_high ] # indx when there is not enough energy on the battery indx_too_low = self._storage_current_charge < self.storage_Emin if np.any(indx_too_low): delta_ = ( self._storage_current_charge[indx_too_low] - self.storage_Emin[indx_too_low] ) self._aux_remove_power_too_low(delta_, indx_too_low) self._storage_current_charge[indx_too_low] = self.storage_Emin[ indx_too_low ] self._storage_current_charge[:] = np.maximum( self._storage_current_charge, self.storage_Emin ) # storage is "load convention", dispatch is "generator convention" # i need the generator to have the same sign as the action on the batteries self._amount_storage = np.sum(self._storage_power) else: # battery effect should be removed, so i multiply it by -1. self._amount_storage = 0.0 tmp = self._amount_storage self._amount_storage -= self._amount_storage_prev self._amount_storage_prev = tmp # dissipated energy, it's not seen on the grid, just lost in the storage unit. # this is why it should not be taken into account in self._amount_storage # and NOT absorbed by the generators either # NB loss in the storage unit can make it got below Emin in energy, but never below 0. self._withdraw_storage_losses() # end storage def _compute_max_ramp_this_step(self, new_p): """ compute the total "power" i can add or remove this step that takes into account generators ramps and Pmin / Pmax new_p: array of the (temporary) new production in the chronics that should happen """ # TODO # maximum value it can take th_max = np.minimum( self._gen_activeprod_t_redisp[self.gen_redispatchable] + self.gen_max_ramp_up[self.gen_redispatchable], self.gen_pmax[self.gen_redispatchable], ) # minimum value it can take th_min = np.maximum( self._gen_activeprod_t_redisp[self.gen_redispatchable] - self.gen_max_ramp_down[self.gen_redispatchable], self.gen_pmin[self.gen_redispatchable], ) max_total_up = np.sum(th_max - new_p[self.gen_redispatchable]) max_total_down = np.sum( th_min - new_p[self.gen_redispatchable] ) # TODO is that it ? return max_total_down, max_total_up def _aux_update_curtail_env_act(self, new_p): if "prod_p" in self._env_modification._dict_inj: self._env_modification._dict_inj["prod_p"][:] = new_p else: self._env_modification._dict_inj["prod_p"] = 1.0 * new_p self._env_modification._modif_inj = True def _aux_update_curtailment_act(self, action): curtailment_act = 1.0 * action._curtail ind_curtailed_in_act = (curtailment_act != -1.0) & self.gen_renewable self._limit_curtailment_prev[:] = self._limit_curtailment self._limit_curtailment[ind_curtailed_in_act] = curtailment_act[ ind_curtailed_in_act ] def _aux_compute_new_p_curtailment(self, new_p, curtailment_vect): """modifies the new_p argument !!!!""" gen_curtailed = ( curtailment_vect != 1.0 ) # curtailed either right now, or in a previous action max_action = self.gen_pmax[gen_curtailed] * curtailment_vect[gen_curtailed] new_p[gen_curtailed] = np.minimum(max_action, new_p[gen_curtailed]) return gen_curtailed def _aux_handle_curtailment_without_limit(self, action, new_p): """modifies the new_p argument !!!! (but not the action)""" if self.redispatching_unit_commitment_availble and ( action._modif_curtailment or np.any(self._limit_curtailment != 1.0) ): self._aux_update_curtailment_act(action) gen_curtailed = self._aux_compute_new_p_curtailment( new_p, self._limit_curtailment ) tmp_sum_curtailment_mw = dt_float( np.sum(new_p[gen_curtailed]) - np.sum(self._gen_before_curtailment[gen_curtailed]) ) self._sum_curtailment_mw = ( tmp_sum_curtailment_mw - self._sum_curtailment_mw_prev ) self._sum_curtailment_mw_prev = tmp_sum_curtailment_mw self._aux_update_curtail_env_act(new_p) else: self._sum_curtailment_mw = -self._sum_curtailment_mw_prev self._sum_curtailment_mw_prev = dt_float(0.0) gen_curtailed = self._limit_curtailment != 1.0 return gen_curtailed def _aux_readjust_curtailment_after_limiting( self, total_curtailment, new_p_th, new_p ): self._sum_curtailment_mw += total_curtailment self._sum_curtailment_mw_prev += total_curtailment if total_curtailment > self._tol_poly: # in this case, the curtailment is too strong, I need to make it less strong curtailed = new_p_th - new_p else: # in this case, the curtailment is too low, this can happen, for example when there is a # "strong" curtailment but afterwards you ask to set everything to 1. (so no curtailment) # I cannot reuse the previous case (too_much > self._tol_poly) because the # curtailment is already computed there... new_p_with_previous_curtailment = 1.0 * new_p_th self._aux_compute_new_p_curtailment( new_p_with_previous_curtailment, self._limit_curtailment_prev ) curtailed = new_p_th - new_p_with_previous_curtailment curt_sum = curtailed.sum() if abs(curt_sum) > self._tol_poly: curtailed[~self.gen_renewable] = 0.0 curtailed *= total_curtailment / curt_sum new_p[self.gen_renewable] += curtailed[self.gen_renewable] def _aux_readjust_storage_after_limiting(self, total_storage): new_act_storage = 1.0 * self._storage_power sum_this_step = new_act_storage.sum() if abs(total_storage) < abs(sum_this_step): # i can modify the current action modif_storage = new_act_storage * total_storage / sum_this_step else: # i need to retrieve what I did in a previous action # because the current action is not enough (the previous actions # cause a problem right now) new_act_storage = 1.0 * self._storage_power_prev sum_this_step = new_act_storage.sum() if abs(sum_this_step) > 1e-1: modif_storage = new_act_storage * total_storage / sum_this_step else: # TODO: this is not cover by any test :-( # it happens when you do an action too strong, then a do nothing, # then you decrease the limit to rapidly # (game over would jappen after at least one do nothing) # In this case I reset it completely or do I ? I don't really # know what to do ! modif_storage = new_act_storage # or self._storage_power ??? # handle self._storage_power and self._storage_current_charge coeff_p_to_E = ( self.delta_time_seconds / 3600.0 ) # TODO optim this is const for all time steps self._storage_power -= modif_storage # now compute the state of charge of the storage units (with efficiencies) is_discharging = self._storage_power < 0.0 is_charging = self._storage_power > 0.0 modif_storage[is_discharging] /= type(self).storage_discharging_efficiency[ is_discharging ] modif_storage[is_charging] *= type(self).storage_charging_efficiency[ is_charging ] self._storage_current_charge -= coeff_p_to_E * modif_storage # inform the grid that the storage is reduced self._amount_storage -= total_storage self._amount_storage_prev -= total_storage def _aux_limit_curtail_storage_if_needed(self, new_p, new_p_th, gen_curtailed): gen_redisp = self.gen_redispatchable normal_increase = new_p - ( self._gen_activeprod_t_redisp - self._actual_dispatch ) normal_increase = normal_increase[gen_redisp] p_min_down = ( self.gen_pmin[gen_redisp] - self._gen_activeprod_t_redisp[gen_redisp] ) avail_down = np.maximum(p_min_down, -self.gen_max_ramp_down[gen_redisp]) p_max_up = self.gen_pmax[gen_redisp] - self._gen_activeprod_t_redisp[gen_redisp] avail_up = np.minimum(p_max_up, self.gen_max_ramp_up[gen_redisp]) sum_move = ( np.sum(normal_increase) + self._amount_storage - self._sum_curtailment_mw ) total_storage_curtail = self._amount_storage - self._sum_curtailment_mw update_env_act = False if abs(total_storage_curtail) >= self._tol_poly: # if there is an impact on the curtailment / storage (otherwise I cannot fix anything) too_much = 0.0 if sum_move > np.sum(avail_up): # I need to limit curtailment (not enough ramps up available) too_much = dt_float(sum_move - np.sum(avail_up) + self._tol_poly) self._limited_before = too_much elif sum_move < np.sum(avail_down): # I need to limit storage unit (not enough ramps down available) too_much = dt_float(sum_move - np.sum(avail_down) - self._tol_poly) self._limited_before = too_much elif np.abs(self._limited_before) >= self._tol_poly: # adjust the "mess" I did before by not curtailing enough # max_action = self.gen_pmax[gen_curtailed] * self._limit_curtailment[gen_curtailed] update_env_act = True too_much = min(np.sum(avail_up) - self._tol_poly, self._limited_before) self._limited_before -= too_much too_much = self._limited_before if abs(too_much) > self._tol_poly: total_curtailment = ( -self._sum_curtailment_mw / total_storage_curtail * too_much ) total_storage = ( self._amount_storage / total_storage_curtail * too_much ) # TODO !!! update_env_act = True # TODO "log" the total_curtailment and total_storage somewhere (in the info part of the step function) if np.sign(total_curtailment) != np.sign(total_storage): # curtailment goes up, storage down, i only "limit" the one that # has the same sign as too much total_curtailment = ( too_much if np.sign(total_curtailment) == np.sign(too_much) else 0.0 ) total_storage = ( too_much if np.sign(total_storage) == np.sign(too_much) else 0.0 ) # NB i can directly assign all the "curtailment" to the maximum because in this case, too_much will # necessarily be > than total_curtail (or total_storage) because the other # one is of opposite sign # fix curtailment self._aux_readjust_curtailment_after_limiting( total_curtailment, new_p_th, new_p ) # fix storage self._aux_readjust_storage_after_limiting(total_storage) if update_env_act: self._aux_update_curtail_env_act(new_p) def _aux_handle_act_inj(self, action: BaseAction): for inj_key in ["load_p", "prod_p", "load_q"]: # modification of the injections in the action, this erases the actions in the environment if inj_key in action._dict_inj: if inj_key in self._env_modification._dict_inj: this_p_load = 1.0 * self._env_modification._dict_inj[inj_key] act_modif = action._dict_inj[inj_key] this_p_load[np.isfinite(act_modif)] = act_modif[ np.isfinite(act_modif) ] self._env_modification._dict_inj[inj_key][:] = this_p_load else: self._env_modification._dict_inj[inj_key] = ( 1.0 * action._dict_inj[inj_key] ) self._env_modification._modif_inj = True def _aux_handle_attack(self, action: BaseAction): # TODO code the opponent part here and split more the timings! here "opponent time" is # TODO included in time_apply_act lines_attacked, subs_attacked = None, None attack, attack_duration = self._oppSpace.attack( observation=self.current_obs, agent_action=action, env_action=self._env_modification, ) if attack is not None: # the opponent choose to attack # i update the "cooldown" on these things lines_attacked, subs_attacked = attack.get_topological_impact() self._times_before_line_status_actionable[lines_attacked] = np.maximum( attack_duration, self._times_before_line_status_actionable[lines_attacked], ) self._times_before_topology_actionable[subs_attacked] = np.maximum( attack_duration, self._times_before_topology_actionable[subs_attacked] ) self._backend_action += attack return lines_attacked, subs_attacked, attack_duration def _aux_apply_redisp(self, action, new_p, new_p_th, gen_curtailed, except_): is_illegal_redisp = False is_done = False is_illegal_reco = False # remember generator that were "up" before the action gen_up_before = self._gen_activeprod_t > 0.0 # compute the redispatching and the new productions active setpoint already_modified_gen = self._get_already_modified_gen(action) valid_disp, except_tmp, info_ = self._prepare_redisp( action, new_p, already_modified_gen ) if except_tmp is not None: action = self._action_space({}) is_illegal_redisp = True except_.append(except_tmp) if self.n_storage > 0: # TODO curtailment: cancel it here too ! self._storage_current_charge[:] = self._storage_previous_charge self._amount_storage -= self._amount_storage_prev # dissipated energy, it's not seen on the grid, just lost in the storage unit. # this is why it should not be taken into account in self._amount_storage # and NOT absorbed by the generators either self._withdraw_storage_losses() # end storage # fix redispatching for curtailment storage if ( self.redispatching_unit_commitment_availble and self._parameters.LIMIT_INFEASIBLE_CURTAILMENT_STORAGE_ACTION ): # limit the curtailment / storage in case of infeasible redispatching self._aux_limit_curtail_storage_if_needed(new_p, new_p_th, gen_curtailed) self._storage_power_prev[:] = self._storage_power # case where the action modifies load (TODO maybe make a different env for that...) self._aux_handle_act_inj(action) valid_disp, except_tmp = self._make_redisp(already_modified_gen, new_p) if not valid_disp or except_tmp is not None: # game over case (divergence of the scipy routine to compute redispatching) action = self._action_space({}) is_illegal_redisp = True except_.append(except_tmp) is_done = True except_.append( InvalidRedispatching( "Game over due to infeasible redispatching state. " 'The routine used to compute the "next state" has diverged. ' "This means that there is no way to compute a physically valid generator state " "(one that meets all pmin / pmax - ramp min / ramp max with the information " "provided. As one of the physical constraints would be violated, this means that " "a generator would be damaged in real life. This is a game over." ) ) return action, is_illegal_redisp, is_illegal_reco, is_done # check the validity of min downtime and max uptime except_tmp = self._handle_updown_times(gen_up_before, self._actual_dispatch) if except_tmp is not None: is_illegal_reco = True action = self._action_space({}) except_.append(except_tmp) return action, is_illegal_redisp, is_illegal_reco, is_done def _aux_update_backend_action(self, action, action_storage_power, init_disp): # make sure the dispatching action is not implemented "as is" by the backend. # the environment must make sure it's a zero-sum action. # same kind of limit for the storage action._redispatch[:] = 0.0 action._storage_power[:] = self._storage_power self._backend_action += action action._storage_power[:] = action_storage_power action._redispatch[:] = init_disp # TODO storage: check the original action, even when replaced by do nothing is not modified self._backend_action += self._env_modification self._backend_action.set_redispatch(self._actual_dispatch) def _aux_register_env_converged(self, disc_lines, action, init_line_status, new_p): beg_res = time.perf_counter() self.backend.update_thermal_limit( self ) # update the thermal limit, for DLR for example overflow_lines = self.backend.get_line_overflow() # save the current topology as "last" topology (for connected powerlines) # and update the state of the disconnected powerline due to cascading failure self._backend_action.update_state(disc_lines) # one timestep passed, i can maybe reconnect some lines self._times_before_line_status_actionable[ self._times_before_line_status_actionable > 0 ] -= 1 # update the vector for lines that have been disconnected self._times_before_line_status_actionable[disc_lines >= 0] = int( self._nb_ts_reco ) self._update_time_reconnection_hazards_maintenance() # for the powerline that are on overflow, increase this time step self._timestep_overflow[overflow_lines] += 1 # set to 0 the number of timestep for lines that are not on overflow self._timestep_overflow[~overflow_lines] = 0 # build the topological action "cooldown" aff_lines, aff_subs = action.get_topological_impact(init_line_status) if self._max_timestep_line_status_deactivated > 0: # i update the cooldown only when this does not impact the line disconnected for the # opponent or by maintenance for example cond = aff_lines # powerlines i modified # powerlines that are not affected by any other "forced disconnection" cond &= ( self._times_before_line_status_actionable < self._max_timestep_line_status_deactivated ) self._times_before_line_status_actionable[ cond ] = self._max_timestep_line_status_deactivated if self._max_timestep_topology_deactivated > 0: self._times_before_topology_actionable[ self._times_before_topology_actionable > 0 ] -= 1 self._times_before_topology_actionable[ aff_subs ] = self._max_timestep_topology_deactivated # extract production active value at this time step (should be independent of action class) self._gen_activeprod_t[:], *_ = self.backend.generators_info() # problem with the gen_activeprod_t above, is that the slack bus absorbs alone all the losses # of the system. So basically, when it's too high (higher than the ramp) it can # mess up the rest of the environment self._gen_activeprod_t_redisp[:] = new_p + self._actual_dispatch # set the line status self._line_status[:] = copy.deepcopy(self.backend.get_line_status()) # finally, build the observation (it's a different one at each step, we cannot reuse the same one) # THIS SHOULD BE DONE AFTER EVERYTHING IS INITIALIZED ! self.current_obs = self.get_obs() # TODO storage: get back the result of the storage ! with the illegal action when a storage unit # TODO is non zero and disconnected, this should be ok. self._time_extract_obs += time.perf_counter() - beg_res def _aux_run_pf_after_state_properly_set( self, action, init_line_status, new_p, except_ ): has_error = True try: # compute the next _grid state beg_pf = time.perf_counter() disc_lines, detailed_info, conv_ = self.backend.next_grid_state( env=self, is_dc=self._env_dc ) self._disc_lines[:] = disc_lines self._time_powerflow += time.perf_counter() - beg_pf if conv_ is None: # everything went well, so i register what is needed self._aux_register_env_converged( disc_lines, action, init_line_status, new_p ) has_error = False else: except_.append(conv_) except Grid2OpException as exc_: except_.append(exc_) if self.logger is not None: self.logger.error( 'Impossible to compute next grid state with error "{}"'.format(exc_) ) return detailed_info, has_error
[docs] def step(self, action: BaseAction) -> Tuple[BaseObservation, float, bool, dict]: """ Run one timestep of the environment's dynamics. When end of episode is reached, you are responsible for calling `reset()` to reset this environment's state. Accepts an action and returns a tuple (observation, reward, done, info). If the :class:`grid2op.BaseAction.BaseAction` is illegal or ambiguous, the step is performed, but the action is replaced with a "do nothing" action. Parameters ---------- action: :class:`grid2op.Action.Action` an action provided by the agent that is applied on the underlying through the backend. Returns ------- observation: :class:`grid2op.Observation.Observation` agent's observation of the current environment reward: ``float`` amount of reward returned after previous action done: ``bool`` whether the episode has ended, in which case further step() calls will return undefined results info: ``dict`` contains auxiliary diagnostic information (helpful for debugging, and sometimes learning). It is a dictionary with keys: - "disc_lines": a numpy array (or ``None``) saying, for each powerline if it has been disconnected due to overflow (if not disconnected it will be -1, otherwise it will be a positive integer: 0 meaning that is one of the cause of the cascading failure, 1 means that it is disconnected just after, 2 that it's disconnected just after etc.) - "is_illegal" (``bool``) whether the action given as input was illegal - "is_ambiguous" (``bool``) whether the action given as input was ambiguous. - "is_dispatching_illegal" (``bool``) was the action illegal due to redispatching - "is_illegal_reco" (``bool``) was the action illegal due to a powerline reconnection - "reason_alarm_illegal" (``None`` or ``Exception``) reason for which the alarm is illegal (it's None if no alarm are raised or if the alarm feature is not used) - "opponent_attack_line" (``np.ndarray``, ``bool``) for each powerline, say if the opponent attacked it (``True``) or not (``False``). - "opponent_attack_sub" (``np.ndarray``, ``bool``) for each substation, say if the opponent attacked it (``True``) or not (``False``). - "opponent_attack_duration" (``int``) the duration of the current attack (if any) - "exception" (``list`` of :class:`Exceptions.Exceptions.Grid2OpException` if an exception was raised or ``[]`` if everything was fine.) - "detailed_infos_for_cascading_failures" (optional, only if the backend has been create with `detailed_infos_for_cascading_failures=True`) the list of the intermediate steps computed during the simulation of the "cascading failures". Examples --------- As any openAI gym environment, this is used like: .. code-block:: python import grid2op from grid2op.Agent import RandomAgent # I create an environment env = grid2op.make() # define an agent here, this is an example agent = RandomAgent(env.action_space) # environment need to be "reset" before usage: obs = env.reset() reward = env.reward_range[0] done = False # now run through each steps like this while not done: action = agent.act(obs, reward, done) obs, reward, done, info = env.step(action) Notes ----- If the flag `done=True` is raised (*ie* this is the end of the episode) then the observation is NOT properly updated and should not be used at all. Actually, it will be in a "game over" state (see :class:`grid2op.Observation.BaseObservation.set_game_over`). """ if self.__closed: raise EnvError("This environment is closed. You cannot use it anymore.") if not self.__is_init: raise Grid2OpException( "Impossible to make a step with a non initialized backend. Have you called " '"env.reset()" after the last game over ?' ) # I did something after calling "env.seed()" which is # somehow "env.step()" or "env.reset()" self._has_just_been_seeded = False has_error = True is_done = False is_illegal = False is_ambiguous = False is_illegal_redisp = False is_illegal_reco = False reason_alarm_illegal = None self._is_alarm_illegal = False self._is_alarm_used_in_reward = False except_ = [] detailed_info = [] init_disp = 1.0 * action._redispatch # dispatching action action_storage_power = 1.0 * action._storage_power # battery information attack_duration = 0 lines_attacked, subs_attacked = None, None conv_ = None init_line_status = copy.deepcopy(self.backend.get_line_status()) self.nb_time_step += 1 self._disc_lines[:] = -1 beg_step = time.perf_counter() self._last_obs = None try: beg_ = time.perf_counter() is_legal, reason = self._game_rules(action=action, env=self) if not is_legal: # action is replace by do nothing action = self._action_space({}) init_disp = 1.0 * action._redispatch # dispatching action action_storage_power = ( 1.0 * action._storage_power ) # battery information except_.append(reason) is_illegal = True ambiguous, except_tmp = action.is_ambiguous() if ambiguous: # action is replace by do nothing action = self._action_space({}) init_disp = 1.0 * action._redispatch # dispatching action action_storage_power = ( 1.0 * action._storage_power ) # battery information is_ambiguous = True except_.append(except_tmp) if self._has_attention_budget: # this feature is implemented, so i do it reason_alarm_illegal = self._attention_budget.register_action( self, action, is_illegal, is_ambiguous ) self._is_alarm_illegal = reason_alarm_illegal is not None # get the modification of generator active setpoint from the environment self._env_modification, prod_v_chronics = self._update_actions() self._env_modification._single_act = ( False # because it absorbs all redispatching actions ) new_p = self._get_new_prod_setpoint(action) new_p_th = 1.0 * new_p # storage unit if self.n_storage > 0: # limiting the storage units is done in `_aux_apply_redisp` # this only ensure the Emin / Emax and all the actions self._compute_storage(action_storage_power) # curtailment (does not attempt to "limit" the curtailment to make sure # it is feasible) self._gen_before_curtailment[self.gen_renewable] = new_p[self.gen_renewable] gen_curtailed = self._aux_handle_curtailment_without_limit(action, new_p) beg__redisp = time.perf_counter() if self.redispatching_unit_commitment_availble or self.n_storage > 0.0: # this computes the "optimal" redispatching # and it is also in this function that the limiting of the curtailment / storage actions # is perform to make the state "feasible" res_disp = self._aux_apply_redisp( action, new_p, new_p_th, gen_curtailed, except_ ) action, is_illegal_redisp, is_illegal_reco, is_done = res_disp self._time_redisp += time.perf_counter() - beg__redisp if not is_done: self._aux_update_backend_action(action, action_storage_power, init_disp) # now get the new generator voltage setpoint voltage_control_act = self._voltage_control(action, prod_v_chronics) self._backend_action += voltage_control_act # handle the opponent here tick = time.perf_counter() lines_attacked, subs_attacked, attack_duration = self._aux_handle_attack( action ) tock = time.perf_counter() self._time_opponent += tock - tick self._time_create_bk_act += tock - beg_ self.backend.apply_action(self._backend_action) self._time_apply_act += time.perf_counter() - beg_ # now it's time to run the powerflow properly # and to update the time dependant properties detailed_info, has_error = self._aux_run_pf_after_state_properly_set( action, init_line_status, new_p, except_ ) else: has_error = True except StopIteration: # episode is over is_done = True self._backend_action.reset() end_step = time.perf_counter() self._time_step += end_step - beg_step if conv_ is not None: except_.append(conv_) self.infos = { "disc_lines": self._disc_lines, "is_illegal": is_illegal, "is_ambiguous": is_ambiguous, "is_dispatching_illegal": is_illegal_redisp, "is_illegal_reco": is_illegal_reco, "reason_alarm_illegal": reason_alarm_illegal, "opponent_attack_line": lines_attacked, "opponent_attack_sub": subs_attacked, "opponent_attack_duration": attack_duration, "exception": except_, } if self.backend.detailed_infos_for_cascading_failures: self.infos["detailed_infos_for_cascading_failures"] = detailed_info self.done = self._is_done(has_error, is_done) self.current_reward, other_reward = self._get_reward( action, has_error, self.done, # is_done is_illegal or is_illegal_redisp or is_illegal_reco, is_ambiguous, ) self.infos["rewards"] = other_reward if has_error and self.current_obs is not None: # forward to the observation if an alarm is used or not if hasattr(self._reward_helper.template_reward, "has_alarm_component"): self._is_alarm_used_in_reward = ( self._reward_helper.template_reward.is_alarm_used ) self.current_obs = self.get_obs(_update_state=False) # update the observation so when it's plotted everything is "shutdown" self.current_obs.set_game_over(self) # TODO documentation on all the possible way to be illegal now if self.done: self.__is_init = False return self.current_obs, self.current_reward, self.done, self.infos
def _get_reward(self, action, has_error, is_done, is_illegal, is_ambiguous): res = self._reward_helper( action, self, has_error, is_done, is_illegal, is_ambiguous ) other_rewards = { k: v(action, self, has_error, is_done, is_illegal, is_ambiguous) for k, v in self.other_rewards.items() } return res, other_rewards
[docs] def get_reward_instance(self): """ INTERNAL .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ Returns the instance of the object that is used to compute the reward. """ if self.__closed: raise EnvError("This environment is closed, you cannot use it.") return self._reward_helper.template_reward
def _is_done(self, has_error, is_done): no_more_data = self.chronics_handler.done() return has_error or is_done or no_more_data def _reset_vectors_and_timings(self): """ INTERNAL .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ Maintenance are not reset, otherwise the data are not read properly (skip the first time step) """ self._no_overflow_disconnection = self._parameters.NO_OVERFLOW_DISCONNECTION self._timestep_overflow[:] = 0 self._nb_timestep_overflow_allowed[ : ] = self._parameters.NB_TIMESTEP_OVERFLOW_ALLOWED self.nb_time_step = 0 # to have the first step at 0 self._hard_overflow_threshold = self._parameters.HARD_OVERFLOW_THRESHOLD self._env_dc = self._parameters.ENV_DC self._times_before_line_status_actionable[:] = 0 self._max_timestep_line_status_deactivated = ( self._parameters.NB_TIMESTEP_COOLDOWN_LINE ) self._times_before_topology_actionable[:] = 0 self._max_timestep_topology_deactivated = ( self._parameters.NB_TIMESTEP_COOLDOWN_SUB ) # reset timings self._time_apply_act = dt_float(0.0) self._time_powerflow = dt_float(0.0) self._time_extract_obs = dt_float(0.0) self._time_opponent = dt_float(0.0) self._time_create_bk_act = dt_float(0.0) self._time_redisp = dt_float(0.0) self._time_step = dt_float(0.0) if self._has_attention_budget: self._attention_budget.reset() # reward and others self.current_reward = self.reward_range[0] self.done = False def _reset_maintenance(self): self._time_next_maintenance[:] = -1 self._duration_next_maintenance[:] = 0 def __enter__(self): """ Support *with-statement* for the environment. Examples -------- .. code-block:: python import grid2op import grid2op.BaseAgent with grid2op.make() as env: agent = grid2op.BaseAgent.DoNothingAgent(env.action_space) act = env.action_space() obs, r, done, info = env.step(act) act = agent.act(obs, r, info) obs, r, done, info = env.step(act) """ return self def __exit__(self, *args): """ Support *with-statement* for the environment. """ self.close() # propagate exception return False
[docs] def close(self): """close an environment: this will attempt to free as much memory as possible. Note that after an environment is closed, you will not be able to use anymore. Any attempt to use a closed environment might result in non deterministic behaviour. """ if self.__closed: raise EnvError( "This environment is closed already, you cannot close it a second time." ) # todo there might be some side effect if hasattr(self, "viewer") and self.viewer is not None: self.viewer = None self.viewer_fig = None if hasattr(self, "backend") and self.backend is not None: self.backend.close() del self.backend self.backend :Backend = None if hasattr(self, "observation_space") and self.observation_space is not None: # do not forget to close the backend of the observation (used for simulate) self.observation_space.close() if hasattr(self, "_voltage_controler") and self._voltage_controler is not None: # in case there is a backend in the voltage controler self._voltage_controler.close() if hasattr(self, "_oppSpace") and self._oppSpace is not None: # in case there is a backend in the opponent space self._oppSpace.close() if hasattr(self, "_helper_action_env") and self._helper_action_env is not None: # close the action helper self._helper_action_env.close() if hasattr(self, "action_space") and self.action_space is not None: # close the action space if needed self.action_space.close() if hasattr(self, "_reward_helper") and self._reward_helper is not None: # close the reward if needed self._reward_helper.close() if hasattr(self, "other_rewards"): for el, reward in self.other_rewards.items(): # close the "other rewards" reward.close() self.backend : Backend = None self.__is_init = False self.__closed = True # clean all the attributes for attr_nm in [ "logger", "_init_grid_path", "_DEBUG", "_complete_action_cls", "_parameters", "with_forecast", "_time_apply_act", "_time_powerflow", "_time_extract_obs", "_time_create_bk_act", "_time_opponent", "_time_redisp", "_time_step", "_epsilon_poly", "_helper_action_class", "_helper_observation_class", "time_stamp", "nb_time_step", "delta_time_seconds", "current_obs", "_line_status", "_ignore_min_up_down_times", "_forbid_dispatch_off", "_no_overflow_disconnection", "_timestep_overflow", "_nb_timestep_overflow_allowed", "_hard_overflow_threshold", "_times_before_line_status_actionable", "_max_timestep_line_status_deactivated", "_times_before_topology_actionable", "_nb_ts_reco", "_time_next_maintenance", "_duration_next_maintenance", "_hazard_duration", "_env_dc", "_target_dispatch", "_actual_dispatch", "_gen_uptime", "_gen_downtime", "_gen_activeprod_t", "_gen_activeprod_t_redisp", "_thermal_limit_a", "_disc_lines", "_injection", "_maintenance", "_hazards", "_env_modification", "done", "current_reward", "_helper_action_env", "chronics_handler", "_game_rules", "_action_space", "_rewardClass", "_actionClass", "_observationClass", "_legalActClass", "_observation_space", "_names_chronics_to_backend", "_reward_helper", "reward_range", "viewer", "viewer_fig", "other_rewards", "_opponent_action_class", "_opponent_class", "_opponent_init_budget", "_opponent_attack_duration", "_opponent_attack_cooldown", "_opponent_budget_per_ts", "_kwargs_opponent", "_opponent_budget_class", "_opponent_action_space", "_compute_opp_budget", "_opponent", "_oppSpace", "_voltagecontrolerClass", "_voltage_controler", "_backend_action_class", "_backend_action", "backend", "debug_dispatch", # "__new_param", "__new_forecast_param", "__new_reward_func", "_storage_current_charge", "_storage_previous_charge", "_action_storage", "_amount_storage", "_amount_storage_prev", "_storage_power", "_storage_power_prev", "_limit_curtailment", "_limit_curtailment_prev", "_gen_before_curtailment", "_sum_curtailment_mw", "_sum_curtailment_mw_prev", "_has_attention_budget", "_attentiong_budget", "_attention_budget_cls", "_is_alarm_illegal", "_is_alarm_used_in_reward", "_kwargs_attention_budget", "_limited_before", ]: if hasattr(self, attr_nm): delattr(self, attr_nm) setattr(self, attr_nm, None)
[docs] def attach_layout(self, grid_layout): """ Compare to the method of the base class, this one performs a check. This method must be called after initialization. Parameters ---------- grid_layout: ``dict`` The layout of the grid (*i.e* the coordinates (x,y) of all substations). The keys should be the substation names, and the values a tuple (with two float) representing the coordinate of the substation. Examples --------- Here is an example on how to attach a layout for an environment: .. code-block:: python import grid2op # create the environment env = grid2op.make() # assign coordinates (0., 0.) to all substations (this is a dummy thing to do here!) layout = {sub_name: (0., 0.) for sub_name in env.name_sub} env.attach_layout(layout) """ if self.__closed: raise EnvError("This environment is closed, you cannot use it.") if isinstance(grid_layout, dict): pass elif isinstance(grid_layout, list): grid_layout = {k: v for k, v in zip(self.name_sub, grid_layout)} else: raise EnvError( "Attempt to set a layout from something different than a dictionary or a list. " "This is for now not supported." ) if self.__is_init: res = {} for el in self.name_sub: if not el in grid_layout: raise EnvError( 'The substation "{}" is not present in grid_layout while in the powergrid.' "".format(el) ) tmp = grid_layout[el] try: x, y = tmp x = dt_float(x) y = dt_float(y) res[el] = (x, y) except Exception as e_: raise EnvError( 'attach_layout: impossible to convert the value of "{}" to a pair of float ' 'that will be used the grid layout. The error is: "{}"' "".format(el, e_) ) super().attach_layout(res) if self._action_space is not None: self._action_space.attach_layout(res) if self._helper_action_env is not None: self._helper_action_env.attach_layout(res) if self._observation_space is not None: self._observation_space.attach_layout(res) if self._voltage_controler is not None: self._voltage_controler.attach_layout(res) if self._opponent_action_space is not None: self._opponent_action_space.attach_layout(res)
[docs] def fast_forward_chronics(self, nb_timestep): """ This method allows you to skip some time step at the beginning of the chronics. This is usefull at the beginning of the training, if you want your agent to learn on more diverse scenarios. Indeed, the data provided in the chronics usually starts always at the same date time (for example Jan 1st at 00:00). This can lead to suboptimal exploration, as during this phase, only a few time steps are managed by the agent, so in general these few time steps will correspond to grid state around Jan 1st at 00:00. Parameters ---------- nb_timestep: ``int`` Number of time step to "fast forward" Examples --------- This can be used like this: .. code-block:: python import grid2op # create the environment env = grid2op.make() # skip the first 150 steps of the chronics env.fast_forward_chronics(150) done = env.is_done if not done: obs = env.get_obs() # do something else: # there was a "game over" # you need to reset the env (which will "cancel" the fast_forward) pass # do something else Notes ----- This method can set the state of the environment in a 'game over' state (`done=True`) for example if the chronics last `xxx` time steps and you ask to "fast foward" more than `xxx` steps. This is why we advise to check the state of the environment after the call to this method if you use it (see the "Examples" paragaph) """ if self.__closed: raise EnvError("This environment is closed, you cannot use it.") nb_timestep = int(nb_timestep) # Go to the timestep requested minus one nb_timestep = max(1, nb_timestep - 1) self.chronics_handler.fast_forward(nb_timestep) self.nb_time_step += nb_timestep # Update the timing vectors min_time_line_reco = np.zeros(self.n_line, dtype=dt_int) min_time_topo = np.zeros(self.n_sub, dtype=dt_int) ff_time_line_act = self._times_before_line_status_actionable - nb_timestep ff_time_topo_act = self._times_before_topology_actionable - nb_timestep self._times_before_line_status_actionable[:] = np.maximum( ff_time_line_act, min_time_line_reco ) self._times_before_topology_actionable[:] = np.maximum( ff_time_topo_act, min_time_topo ) # Update to the fast forward state using a do nothing action self.step(self._action_space({}))
[docs] def get_current_line_status(self): """ INTERNAL .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ prefer using :attr:`grid2op.BaseObservation.line_status` This method allows to retrieve the line status. """ if self.current_obs is not None: powerline_status = self._line_status else: # at first time step, every powerline is connected powerline_status = np.full(self.n_line, fill_value=True, dtype=dt_bool) # powerline_status = self._line_status return powerline_status
@property def parameters(self): """ Return a deepcopy of the parameters used by the environment It is a deepcopy, so modifying it will have absolutely no effect. If you want to change the parameters of an environment, please use either :func:`grid2op.Environment.BaseEnv.change_parameters` to change the parameters of this environment or :func:`grid2op.Environment.BaseEnv.change_forecast_parameters` to change the parameter of the environment used by `simulate`. """ if self.__closed: raise EnvError("This environment is closed, you cannot use it.") return copy.deepcopy(self._parameters) @parameters.setter def parameters(self, value): raise RuntimeError( "Use the env.change_parameters(new_parameters) to change the parameters. " "NB: it will only have an effect AFTER the env is reset." )
[docs] def change_reward(self, new_reward_func): """ Change the reward function used for the environment. Parameters ---------- new_reward_func: Either an object of class BaseReward, or a subclass of BaseReward: the new reward function to use Notes ------ This only affects the environment AFTER `env.reset()` has been called. """ if self.__closed: raise EnvError("This environment is closed, you cannot use it.") is_ok = isinstance(new_reward_func, BaseReward) or issubclass( new_reward_func, BaseReward ) if not is_ok: raise EnvError( f"Impossible to change the reward function with type {type(new_reward_func)}. " f"It should be an object from a class that inherit grid2op.Reward.BaseReward " f"or a subclass of grid2op.Reward.BaseReward" ) self.__new_reward_func = new_reward_func
def _aux_gen_classes(self, cls, sys_path): if not isinstance(cls, type): raise RuntimeError(f"cls should be a type and not an object !: {cls}") if not issubclass(cls, GridObjects): raise RuntimeError(f"cls should inherit from GridObjects: {cls}") from pathlib import Path path_env = cls._PATH_ENV cls._PATH_ENV = str(Path(self.get_path_env()).as_posix()) res = cls._get_full_cls_str() cls._PATH_ENV = path_env output_file = os.path.join(sys_path, f"{cls.__name__}") if not os.path.exists(output_file): # if the file is not already saved, i save it and add it to the __init__ file with open(output_file, "w", encoding="utf-8") as f: f.write(res) return f"\nfrom .{cls.__name__}_file import {cls.__name__}" else: # otherwise i do nothing return ""
[docs] def generate_classes(self, _guard=None, _is_base_env__=True, sys_path=None): """ Use with extra care ! If you get into trouble like : AttributeError: Can't get attribute 'ActionSpace_l2rpn_icaps_2021_small' on <module 'grid2op.Space.GridObjects' from '/home/benjamin/Documents/grid2op_dev/grid2op/Space/'> You might want to call this function and that MIGHT solve your problem. This function will create a subdirectory ino the env directory, that will be accessed when loadin the class used for the environment. The default behaviour is to build the class on the fly. Examples -------- Here is how to best leverage this functionality: First step, generated the classes once and for all. **NB** you need to redo this step each time you customize the environment. This customization includes, but is not limited to: - change the backend type: `grid2op.make(..., backend=...)` - change the action class: `grid2op.make(..., action_class=...)` - change observation class: `grid2op.make(..., observation_class=...)` - change the `volagecontroler_class` - change the `grid_path` - change the `opponent_action_class` - etc. .. code-block:: python import grid2op env_name = ... env = grid2op.make(env_name, ...) # again: redo this step each time you customize "..." # for example if you change the `action_class` or the `backend` etc. env.generate_classes() Then, next time you want to use the SAME environment, you can do: .. code-block:: python import grid2op env_name = SAME NAME AS ABOVE env = grid2op.make(env_name, experimental_read_from_local_dir=True, SAME ENV CUSTOMIZATION AS ABOVE) And it should (this is experimerimental for now, and we expect feedback on the matter) solve the issues involving pickle. Again, if you customize your environment (see above for more information) you'll have to redo this step ! """ if self.__closed: return # create the folder if _guard is not None: raise RuntimeError("use `env.generate_classes()` with no arguments !") if type(self)._PATH_ENV is not None: raise RuntimeError( "This function should only be called ONCE without specifying that the classes " "need to be read from disk (class attribute type(self)._PATH_ENV should be None)" ) import shutil if sys_path is None: if not _is_base_env__: raise RuntimeError("Cannot generate file from a \"sub env\" " "(eg no the top level env) if I don't know the path of " "the top level environment.") sys_path = os.path.join(self.get_path_env(), "_grid2op_classes") if _is_base_env__: if os.path.exists(sys_path): shutil.rmtree(sys_path) os.mkdir(sys_path) # initialized the "__init__" file _init_txt = "" mode = "w" if not _is_base_env__: _init_txt = BASE_TXT_COPYRIGHT + _init_txt else: # i am apppending to the __init__ file in case of obs_env mode = "a" # generate the classes _init_txt += self._aux_gen_classes(type(self), sys_path) _init_txt += self._aux_gen_classes(type(self.backend), sys_path) _init_txt += self._aux_gen_classes( self.backend._complete_action_class, sys_path ) _init_txt += self._aux_gen_classes(self._backend_action_class, sys_path) _init_txt += self._aux_gen_classes(type(self.action_space), sys_path) _init_txt += self._aux_gen_classes(self._actionClass, sys_path) _init_txt += self._aux_gen_classes(self._complete_action_cls, sys_path) _init_txt += self._aux_gen_classes(type(self.observation_space), sys_path) _init_txt += self._aux_gen_classes(self._observationClass, sys_path) _init_txt += self._aux_gen_classes( self._opponent_action_space.subtype, sys_path ) # now do the same for the obs_env if _is_base_env__: _init_txt += self._aux_gen_classes( self._voltage_controler.action_space.subtype, sys_path ) init_grid_tmp = self._observation_space.obs_env._init_grid_path self._observation_space.obs_env._init_grid_path = self._init_grid_path self._observation_space.obs_env.generate_classes(_is_base_env__=False, sys_path=sys_path) self._observation_space.obs_env._init_grid_path = init_grid_tmp # now write the __init__ file _init_txt += "\n" with open(os.path.join(sys_path, ""), mode, encoding="utf-8") as f: f.write(_init_txt)
def __del__(self): """when the environment is garbage collected, free all the memory, including cross reference to itself in the observation space.""" if hasattr(self, "_BaseEnv__closed") and not self.__closed: self.close()