# Copyright (c) 2019-2020, RTE (https://www.rte-france.com)
# See AUTHORS.txt
# This Source Code Form is subject to the terms of the Mozilla Public License, version 2.0.
# If a copy of the Mozilla Public License, version 2.0 was not distributed with this file,
# you can obtain one at http://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
# This file is part of Grid2Op, Grid2Op a testbed platform to model sequential decision making in power systems.
from datetime import datetime
import tempfile
import logging
import time
import copy
import os
import json
from typing import Optional, Tuple, Union, Dict, Any, Literal
import importlib
import sys
import warnings
import numpy as np
from scipy.optimize import (minimize, LinearConstraint)
from abc import ABC, abstractmethod
from grid2op.Observation import (BaseObservation,
ObservationSpace,
HighResSimCounter)
from grid2op.Backend import Backend
from grid2op.dtypes import dt_int, dt_float, dt_bool
from grid2op.Space import GridObjects, RandomObject
from grid2op.Exceptions import (Grid2OpException,
EnvError,
InvalidRedispatching,
GeneratorTurnedOffTooSoon,
GeneratorTurnedOnTooSoon,
AmbiguousActionRaiseAlert,
ImpossibleTopology)
from grid2op.Parameters import Parameters
from grid2op.Reward import BaseReward, RewardHelper
from grid2op.Opponent import OpponentSpace, NeverAttackBudget, BaseOpponent
from grid2op.Action import DontAct, BaseAction, ActionSpace
from grid2op.operator_attention import LinearAttentionBudget
from grid2op.Action._backendAction import _BackendAction
from grid2op.Chronics import ChronicsHandler
from grid2op.Rules import AlwaysLegal, BaseRules, AlwaysLegal
from grid2op.typing_variables import STEP_INFO_TYPING, RESET_OPTIONS_TYPING
# TODO put in a separate class the redispatching function
DETAILED_REDISP_ERR_MSG = (
"\nThis is an attempt to explain why the dispatch did not succeed and caused a game over.\n"
"To compensate the {increase} of loads and / or {decrease} of "
"renewable energy (due to naturl causes but also through curtailment) and / or variation in the storage units, "
"the generators should {increase} their total production of {sum_move:.2f}MW (in total).\n"
"But, if you take into account the generator constraints ({pmax} and {max_ramp_up}) you "
"can have at most {avail_up_sum:.2f}MW.\n"
"Indeed at time t, generators are in state:\n\t{gen_setpoint}\ntheir ramp max is:"
"\n\t{ramp_up}\n and pmax is:\n\t{gen_pmax}\n"
"Wrapping up, each generator can {increase} at {maximum} of:\n\t{avail_up}\n"
"NB: if you did not do any dispatch during this episode, it would have been possible to "
"meet these constraints. This situation is caused by not having enough degree of freedom "
'to "compensate" the variation of the load due to (most likely) an "over usage" of '
"redispatching feature (some generators stuck at {pmax} as a consequence of your "
"redispatching. They can't increase their productions to meet the {increase} in demand or "
"{decrease} of renewables)"
)
BASE_TXT_COPYRIGHT = """# Copyright (c) 2019-2020, RTE (https://www.rte-france.com)
# See AUTHORS.txt
# This Source Code Form is subject to the terms of the Mozilla Public License, version 2.0.
# If a copy of the Mozilla Public License, version 2.0 was not distributed with this file,
# you can obtain one at http://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
# This file is part of Grid2Op, Grid2Op a testbed platform to model sequential decision making in power systems.
# THIS FILE HAS BEEN AUTOMATICALLY GENERATED BY "env.generate_classes()"
# WE DO NOT RECOMMEND TO ALTER IT IN ANY WAY
"""
[docs]class BaseEnv(GridObjects, RandomObject, ABC):
"""
INTERNAL
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
This class represent some usefull abstraction that is re used by :class:`Environment` and
:class:`grid2op.Observation._Obsenv` for example.
The documentation is showed here to document the common attributes of an "BaseEnvironment".
.. _danger-env-ownership:
Notes
------------------------
Note en environment data ownership
.. danger::
A non pythonic decision has been implemented in grid2op for various reasons: an environment
owns everything created from it.
This means that if you (or the python interpreter) deletes the environment, you might not
use some data generate with this environment.
More precisely, you cannot do something like:
.. code-block:: python
import grid2op
env = grid2op.make("l2rpn_case14_sandbox")
saved_obs = []
obs = env.reset()
saved_obs.append(obs)
obs2, reward, done, info = env.step(env.action_space())
saved_obs.append(obs2)
saved_obs[0].simulate(env.action_space()) # works
del env
saved_obs[0].simulate(env.action_space()) # DOES NOT WORK
It will raise an error like `Grid2OpException EnvError "This environment is closed. You cannot use it anymore."`
This will also happen if you do things inside functions, for example like this:
.. code-block:: python
import grid2op
def foo(manager):
env = grid2op.make("l2rpn_case14_sandbox")
obs = env.reset()
manager.append(obs)
obs2, reward, done, info = env.step(env.action_space())
manager.append(obs2)
manager[0].simulate(env.action_space()) # works
return manager
manager = []
manager = foo(manager)
manager[0].simulate(env.action_space()) # DOES NOT WORK
The same error is raised because the environment `env` is automatically deleted by python when the function `foo` ends
(well it might work on some cases, if the function is called before the variable `env` is actually deleted but you
should not rely on this behaviour.)
Attributes
----------
parameters: :class:`grid2op.Parameters.Parameters`
The parameters of the game (to expose more control on what is being simulated)
with_forecast: ``bool``
Whether the chronics allow to have some kind of "forecast". See :func:`BaseEnv.activate_forceast`
for more information
logger:
TO BE DONE: a way to log what is happening (**currently not implemented**)
time_stamp: ``datetime.datetime``
The actual time stamp of the current observation.
nb_time_step: ``int``
Number of time steps played in the current environment
current_obs: :class:`grid2op.Observation.BaseObservation`
The current observation (or None if it's not intialized)
backend: :class:`grid2op.Backend.Backend`
The backend used to compute the powerflows.
done: ``bool``
Whether the environment is "done". If ``True`` you need to call :func:`Environment.reset` in order
to continue.
current_reward: ``float``
The last computed reward (reward of the current step)
other_rewards: ``dict``
Dictionary with key being the name (identifier) and value being some RewardHelper. At each time step, all the
values will be computed by the :class:`Environment` and the information about it will be returned in the
"reward" key of the "info" dictionnary of the :func:`Environment.step`.
chronics_handler: :class:`grid2op.Chronics.ChronicsHandler`
The object in charge managing the "chronics", which store the information about load and generator for example.
reward_range: ``tuple``
For open ai gym compatibility. It represents the range of the rewards: reward min, reward max
_viewer:
For open ai gym compatibility.
viewer_fig:
For open ai gym compatibility.
_gen_activeprod_t:
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
Should be initialized at 0. for "step" to properly recognize it's the first time step of the game
_no_overflow_disconnection: ``bool``
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
Whether or not cascading failures are computed or not (TRUE = the powerlines above their thermal limits will
not be disconnected). This is initialized based on the attribute
:attr:`grid2op.Parameters.Parameters.NO_OVERFLOW_DISCONNECTION`.
_timestep_overflow: ``numpy.ndarray``, dtype: int
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
Number of consecutive timesteps each powerline has been on overflow.
_nb_timestep_overflow_allowed: ``numpy.ndarray``, dtype: int
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
Number of consecutive timestep each powerline can be on overflow. It is usually read from
:attr:`grid2op.Parameters.Parameters.NB_TIMESTEP_POWERFLOW_ALLOWED`.
_hard_overflow_threshold: ``float``
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
Number of timestep before an :class:`grid2op.BaseAgent.BaseAgent` can reconnet a powerline that has been
disconnected
by the environment due to an overflow.
_env_dc: ``bool``
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
Whether the environment computes the powerflow using the DC approximation or not. It is usually read from
:attr:`grid2op.Parameters.Parameters.ENV_DC`.
_names_chronics_to_backend: ``dict``
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
Configuration file used to associated the name of the objects in the backend
(both extremities of powerlines, load or production for
example) with the same object in the data (:attr:`Environment.chronics_handler`). The idea is that, usually
data generation comes from a different software that does not take into account the powergrid infrastructure.
Hence, the same "object" can have a different name. This mapping is present to avoid the need to rename
the "object" when providing data. A more detailed description is available at
:func:`grid2op.ChronicsHandler.GridValue.initialize`.
_env_modification: :class:`grid2op.Action.Action`
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
Representation of the actions of the environment for the modification of the powergrid.
_rewardClass: ``type``
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
Type of reward used. Should be a subclass of :class:`grid2op.BaseReward.BaseReward`
_init_grid_path: ``str``
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
The path where the description of the powergrid is located.
_game_rules: :class:`grid2op.Rules.RulesChecker`
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
The rules of the game (define which actions are legal and which are not)
_action_space: :class:`grid2op.Action.ActionSpace`
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
Helper used to manipulate more easily the actions given to / provided by the :class:`grid2op.Agent.BaseAgent`
(player)
_helper_action_env: :class:`grid2op.Action.ActionSpace`
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
Helper used to manipulate more easily the actions given to / provided by the environment to the backend.
_observation_space: :class:`grid2op.Observation.ObservationSpace`
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
Helper used to generate the observation that will be given to the :class:`grid2op.BaseAgent`
_reward_helper: :class:`grid2p.BaseReward.RewardHelper`
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
Helper that is called to compute the reward at each time step.
kwargs_observation: ``dict``
TODO
# TODO add the units (eg MW, MWh, MW/time step,etc.) in the redispatching related attributes
"""
ALARM_FILE_NAME = "alerts_info.json"
ALARM_KEY = "fixed"
ALERT_FILE_NAME = "alerts_info.json"
ALERT_KEY = "by_line"
CAN_SKIP_TS = False # each step is exactly one time step
#: this are the keys of the dictionnary `options`
#: that can be used when calling `env.reset(..., options={})`
KEYS_RESET_OPTIONS = {"time serie id", "init state", "init ts", "max step"}
def __init__(
self,
init_env_path: os.PathLike,
init_grid_path: os.PathLike,
parameters: Parameters,
voltagecontrolerClass: type,
name="unknown",
thermal_limit_a: Optional[np.ndarray] = None,
epsilon_poly: float = 1e-4, # precision of the redispatching algorithm
tol_poly: float = 1e-2, # i need to compute a redispatching if the actual values are "more than tol_poly" the values they should be
other_rewards: dict = None,
with_forecast: bool = True,
opponent_space_type: type = OpponentSpace,
opponent_action_class: type = DontAct,
opponent_class: type = BaseOpponent,
opponent_init_budget: float = 0.0,
opponent_budget_per_ts: float = 0.0,
opponent_budget_class: type = NeverAttackBudget,
opponent_attack_duration: int = 0,
opponent_attack_cooldown: int = 99999,
kwargs_opponent: dict = None,
has_attention_budget: bool = False,
attention_budget_cls: type = LinearAttentionBudget,
kwargs_attention_budget: dict = None,
logger: Optional[logging.Logger] = None,
kwargs_observation: Optional[dict] = None,
observation_bk_class=None, # type of backend for the observation space
observation_bk_kwargs=None, # type of backend for the observation space
highres_sim_counter=None,
update_obs_after_reward=False,
n_busbar=2,
_is_test: bool = False, # TODO not implemented !!
_init_obs: Optional[BaseObservation] =None,
_local_dir_cls=None,
_read_from_local_dir=None,
_raw_backend_class=None,
):
#: flag to indicate not to erase the directory when the env has been used
self._do_not_erase_local_dir_cls = False
GridObjects.__init__(self)
RandomObject.__init__(self)
self.name = name
self._local_dir_cls = _local_dir_cls # suppose it's the second path to the environment, so the classes are already in the files
self._read_from_local_dir = _read_from_local_dir
if self._read_from_local_dir is not None:
if os.path.split(self._read_from_local_dir)[1] == "_grid2op_classes":
# legacy behaviour (using experimental_read_from_local_dir kwargs in env.make)
self._do_not_erase_local_dir_cls = True
else:
self._do_not_erase_local_dir_cls = True
self._actionClass_orig = None
self._observationClass_orig = None
self._raw_backend_class = _raw_backend_class
self._n_busbar = n_busbar # env attribute not class attribute !
if other_rewards is None:
other_rewards = {}
if kwargs_attention_budget is None:
kwargs_attention_budget = {}
if kwargs_opponent is None:
kwargs_opponent = {}
self._is_test: bool = _is_test
if logger is None:
self.logger = logging.getLogger(__name__)
self.logger.disabled = True
else:
self.logger: logging.Logger = logger.getChild("grid2op_BaseEnv")
if init_grid_path is not None:
self._init_grid_path: os.PathLike = os.path.abspath(init_grid_path)
else:
self._init_grid_path = None
self._DEBUG: bool = False
self._complete_action_cls: type = None
self.__closed: bool = False # by default the environment is not closed
# specific to power system
if not isinstance(parameters, Parameters):
raise Grid2OpException(
'Parameter "parameters" used to build the Environment should derived form the '
'grid2op.Parameters class, type provided is "{}"'.format(
type(parameters)
)
)
parameters.check_valid() # check the provided parameters are valid
self._parameters: Parameters = copy.deepcopy(parameters)
self.with_forecast: bool = with_forecast
self._forecasts = None
# some timers
self._time_apply_act: float = dt_float(0)
self._time_powerflow: float = dt_float(0)
self._time_extract_obs: float = dt_float(0)
self._time_create_bk_act: float = dt_float(0)
self._time_opponent: float = dt_float(0)
self._time_redisp: float = dt_float(0)
self._time_step: float = dt_float(0)
# data relative to interpolation
self._epsilon_poly: float = dt_float(epsilon_poly)
self._tol_poly: float = dt_float(tol_poly)
# class used for the action spaces
self._helper_action_class: ActionSpace = None
self._helper_observation_class: ObservationSpace = None
# and calendar data
self.time_stamp: time.struct_time = datetime(year=2019, month=1, day=1)
self.nb_time_step: datetime.timedelta = dt_int(0)
self.delta_time_seconds = None # number of seconds between two consecutive step
# observation
self.current_obs: Optional[BaseObservation] = None
self._line_status: np.ndarray = None
self._ignore_min_up_down_times: bool = self._parameters.IGNORE_MIN_UP_DOWN_TIME
self._forbid_dispatch_off: bool = (
not self._parameters.ALLOW_DISPATCH_GEN_SWITCH_OFF
)
# type of power flow to play
# if True, then it will not disconnect lines above their thermal limits
self._no_overflow_disconnection: bool = (
self._parameters.NO_OVERFLOW_DISCONNECTION
)
self._timestep_overflow: np.ndarray = None
self._nb_timestep_overflow_allowed: np.ndarray = None
self._hard_overflow_threshold: np.ndarray = None
# store actions "cooldown"
self._times_before_line_status_actionable: np.ndarray = None
self._max_timestep_line_status_deactivated: int = (
self._parameters.NB_TIMESTEP_COOLDOWN_LINE
)
self._times_before_topology_actionable: np.ndarray = None
self._max_timestep_topology_deactivated: int = (
self._parameters.NB_TIMESTEP_COOLDOWN_SUB
)
self._nb_ts_reco: int = self._parameters.NB_TIMESTEP_RECONNECTION
# for maintenance operation
self._time_next_maintenance: np.ndarray = None
self._duration_next_maintenance: np.ndarray = None
# hazard (not used outside of this class, information is given in `times_before_line_status_actionable`
self._hazard_duration: np.ndarray = None
self._env_dc = self._parameters.ENV_DC
# redispatching data
self._target_dispatch: np.ndarray = None
self._already_modified_gen: np.ndarray = None
self._actual_dispatch: np.ndarray = None
self._gen_uptime: np.ndarray = None
self._gen_downtime: np.ndarray = None
self._gen_activeprod_t: np.ndarray = None
self._gen_activeprod_t_redisp: np.ndarray = None
self._thermal_limit_a: np.ndarray = thermal_limit_a
self._disc_lines: np.ndarray = None
# store environment modifications
self._injection = None
self._maintenance = None
self._hazards = None
self._env_modification = None
# to use the data
self.done = False
self.current_reward = None
self._helper_action_env: ActionSpace = None
self.chronics_handler : ChronicsHandler = None
self._game_rules = None
self._action_space: ActionSpace = None
self._rewardClass: type = None
self._actionClass: type = None
self._observationClass: type = None
self._legalActClass: type = None
self._observation_space: ObservationSpace = None
self._names_chronics_to_backend: dict = None
self._reward_helper = None
# gym compatibility
self.reward_range = None, None
self._viewer = None
self.viewer_fig = None
# other rewards
self.other_rewards = {}
for k, v in other_rewards.items():
if isinstance(v, type):
if not issubclass(v, BaseReward):
raise Grid2OpException(
'All values of "rewards" key word argument should be classes that inherit '
'from "grid2op.BaseReward"'
)
else:
if not isinstance(v, BaseReward):
raise Grid2OpException(
'All values of "rewards" key word argument should be classes that inherit '
'from "grid2op.BaseReward"'
)
if not isinstance(k, str):
raise Grid2OpException(
'All keys of "rewards" should be of string type.'
)
self.other_rewards[k] = RewardHelper(v, self.logger)
# opponent
self._opponent_action_class = (
opponent_action_class # class of the action of the opponent
)
self._opponent_space_type = opponent_space_type # type of the opponent action space
self._opponent_class = opponent_class # class of the opponent
self._opponent_init_budget = dt_float(opponent_init_budget)
self._opponent_attack_duration = dt_int(opponent_attack_duration)
self._opponent_attack_cooldown = dt_int(opponent_attack_cooldown)
self._opponent_budget_per_ts = dt_float(opponent_budget_per_ts)
self._kwargs_opponent = kwargs_opponent
self._opponent_budget_class = opponent_budget_class
# below initialized by _create_env, above: need to be called
self._opponent_action_space = None
self._compute_opp_budget = None
self._opponent = None
self._oppSpace = None
# voltage
self._voltagecontrolerClass = voltagecontrolerClass
self._voltage_controler = None
# backend action
self._backend_action_class : type = None
self._backend_action : _BackendAction = None
# specific to Basic Env, do not change
self.backend : Backend = None
self.__is_init = False
self.debug_dispatch = False
# to change the parameters
self.__new_param = None
self.__new_forecast_param = None
self.__new_reward_func = None
# storage units
# TODO storage: what to do when self.storage_Emin >0. and self.storage_loss > 0.
# TODO and we have self._storage_current_charge - self.storage_loss < self.storage_Emin
self._storage_current_charge = None # the current storage charge
self._storage_previous_charge = None # the previous storage charge
self._action_storage = None # the storage action performed
self._amount_storage = None # total amount of storage to be dispatched
self._amount_storage_prev = None
self._storage_power = None
self._storage_power_prev = None
# curtailment
self._limit_curtailment = None
self._limit_curtailment_prev = None
self._gen_before_curtailment = None
self._sum_curtailment_mw = None
self._sum_curtailment_mw_prev = None
self._limited_before = 0.0 # TODO curt
# attention budget
self._has_attention_budget = has_attention_budget
self._attention_budget = None
self._attention_budget_cls = attention_budget_cls
self._is_alarm_illegal = False
self._is_alarm_used_in_reward = False
# alert infos
self._is_alert_illegal = False
self._is_alert_used_in_reward = False
self._kwargs_attention_budget = copy.deepcopy(kwargs_attention_budget)
# to ensure self.get_obs() has a reproducible behaviour
self._last_obs = None
# to retrieve previous result (before 1.6.5 the seed of the
# action space or observation space was not done each reset)
self._has_just_been_seeded = False
if kwargs_observation is not None:
self._kwargs_observation = copy.deepcopy(kwargs_observation)
else:
self._kwargs_observation = {}
if init_env_path is not None:
self._init_env_path = os.path.abspath(init_env_path)
else:
self._init_env_path = None
# time_dependant attributes for the "forecast env"
if _init_obs is not None:
self._init_obs = _init_obs.copy()
self._init_obs._obs_env = None
else:
self._init_obs = None
self._observation_bk_class = observation_bk_class
self._observation_bk_kwargs = observation_bk_kwargs
if highres_sim_counter is not None:
self._highres_sim_counter = highres_sim_counter
else:
self._highres_sim_counter = HighResSimCounter()
self._update_obs_after_reward = update_obs_after_reward
# alert
self._last_alert = None
self._time_since_last_alert = None
self._alert_duration= None
self._total_number_of_alert = 0
self._time_since_last_attack = None
self._was_alert_used_after_attack = None
self._attack_under_alert = None
self._is_already_attacked = None
# general things that can be used by the reward
self._reward_to_obs = {}
@property
def highres_sim_counter(self):
return self._highres_sim_counter
@property
def nb_highres_called(self):
return self._highres_sim_counter.nb_highres_called
def _custom_deepcopy_for_copy(self, new_obj, dict_=None):
if self.__closed:
raise RuntimeError("Impossible to make a copy of a closed environment !")
if hasattr(self.backend, "_can_be_copied"):
if not self.backend._can_be_copied:
# introduced later on, might not be copied perfectly for some older backends
raise RuntimeError("Impossible to copy your environment: the backend "
"class you used cannot be copied.")
# for earlier backend it is not possible to check this so I ignore it.
RandomObject._custom_deepcopy_for_copy(self, new_obj)
new_obj.name = self.name
if dict_ is None:
dict_ = {}
new_obj._n_busbar = self._n_busbar
new_obj._init_grid_path = copy.deepcopy(self._init_grid_path)
new_obj._init_env_path = copy.deepcopy(self._init_env_path)
new_obj._local_dir_cls = None # copy of a env is not the "main" env. TODO
new_obj._do_not_erase_local_dir_cls = self._do_not_erase_local_dir_cls
new_obj._read_from_local_dir = self._read_from_local_dir
new_obj._raw_backend_class = self._raw_backend_class
new_obj._DEBUG = self._DEBUG
new_obj._parameters = copy.deepcopy(self._parameters)
new_obj.with_forecast = self.with_forecast
new_obj._forecasts = copy.deepcopy(self._forecasts)
# some timers
new_obj._time_apply_act = self._time_apply_act
new_obj._time_powerflow = self._time_powerflow
new_obj._time_extract_obs = self._time_extract_obs
new_obj._time_create_bk_act = self._time_create_bk_act
new_obj._time_opponent = self._time_opponent
new_obj._time_redisp = self._time_redisp
new_obj._time_step = self._time_step
# data relative to interpolation
new_obj._epsilon_poly = self._epsilon_poly
new_obj._tol_poly = self._tol_poly
#
new_obj._complete_action_cls = self._complete_action_cls # const
# define logger
new_obj.logger = copy.deepcopy(self.logger) # TODO does that make any sense ?
# class used for the action spaces
new_obj._helper_action_class = self._helper_action_class # const
new_obj._helper_observation_class = self._helper_observation_class # const
# and calendar data
new_obj.time_stamp = self.time_stamp
new_obj.nb_time_step = self.nb_time_step
new_obj.delta_time_seconds = self.delta_time_seconds
# backend
# backend action
new_obj._backend_action_class = self._backend_action_class # const
new_obj._backend_action = copy.deepcopy(self._backend_action)
# specific to Basic Env, do not change
new_obj.backend = self.backend.copy()
if self._thermal_limit_a is not None:
new_obj.backend.set_thermal_limit(self._thermal_limit_a)
new_obj._thermal_limit_a = copy.deepcopy(self._thermal_limit_a)
new_obj.__is_init = self.__is_init
new_obj.__closed = self.__closed
new_obj.debug_dispatch = self.debug_dispatch
new_obj._line_status = copy.deepcopy(self._line_status)
new_obj._ignore_min_up_down_times = self._ignore_min_up_down_times
new_obj._forbid_dispatch_off = self._forbid_dispatch_off
# type of power flow to play
# if True, then it will not disconnect lines above their thermal limits
new_obj._no_overflow_disconnection = self._no_overflow_disconnection
new_obj._timestep_overflow = copy.deepcopy(self._timestep_overflow)
new_obj._nb_timestep_overflow_allowed = copy.deepcopy(
self._nb_timestep_overflow_allowed
)
new_obj._hard_overflow_threshold = copy.deepcopy(self._hard_overflow_threshold)
# store actions "cooldown"
new_obj._times_before_line_status_actionable = copy.deepcopy(
self._times_before_line_status_actionable
)
new_obj._max_timestep_line_status_deactivated = (
self._max_timestep_line_status_deactivated
)
new_obj._times_before_topology_actionable = copy.deepcopy(
self._times_before_topology_actionable
)
new_obj._max_timestep_topology_deactivated = (
self._max_timestep_topology_deactivated
)
new_obj._nb_ts_reco = self._nb_ts_reco
# for maintenance operation
new_obj._time_next_maintenance = copy.deepcopy(self._time_next_maintenance)
new_obj._duration_next_maintenance = copy.deepcopy(
self._duration_next_maintenance
)
# hazard (not used outside of this class, information is given in `times_before_line_status_actionable`
new_obj._hazard_duration = copy.deepcopy(self._hazard_duration)
new_obj._env_dc = self._env_dc
# redispatching data
new_obj._target_dispatch = copy.deepcopy(self._target_dispatch)
new_obj._already_modified_gen = copy.deepcopy(self._already_modified_gen)
new_obj._actual_dispatch = copy.deepcopy(self._actual_dispatch)
new_obj._gen_uptime = copy.deepcopy(self._gen_uptime)
new_obj._gen_downtime = copy.deepcopy(self._gen_downtime)
new_obj._gen_activeprod_t = copy.deepcopy(self._gen_activeprod_t)
new_obj._gen_activeprod_t_redisp = copy.deepcopy(self._gen_activeprod_t_redisp)
new_obj._disc_lines = copy.deepcopy(self._disc_lines)
# store environment modifications
new_obj._injection = copy.deepcopy(self._injection)
new_obj._maintenance = copy.deepcopy(self._maintenance)
new_obj._hazards = copy.deepcopy(self._hazards)
new_obj._env_modification = copy.deepcopy(self._env_modification)
# action space used by the environment
new_obj._game_rules = copy.deepcopy(self._game_rules)
new_obj._helper_action_env = self._helper_action_env.copy()
new_obj._helper_action_env.legal_action = new_obj._game_rules.legal_action
# to use the data
new_obj.done = self.done
new_obj.current_reward = copy.deepcopy(self.current_reward)
new_obj.chronics_handler = copy.deepcopy(self.chronics_handler)
# retrieve the "pointer" to the new_obj action space (for initializing the grid)
new_obj.chronics_handler.cleanup_action_space()
new_obj.chronics_handler.action_space = new_obj._helper_action_env
# action space
new_obj._action_space = self._action_space.copy()
new_obj._action_space.legal_action = new_obj._game_rules.legal_action
new_obj._rewardClass = self._rewardClass
new_obj._actionClass = self._actionClass
new_obj._actionClass_orig = self._actionClass_orig
new_obj._observationClass = self._observationClass
new_obj._observationClass_orig = self._observationClass_orig
new_obj._legalActClass = self._legalActClass
new_obj._names_chronics_to_backend = self._names_chronics_to_backend # cst
# other rewards
new_obj.other_rewards = {k: copy.deepcopy(v) for k, v in self.other_rewards.items()}
for extra_reward in new_obj.other_rewards.values():
extra_reward.reset(new_obj)
# voltage
new_obj._voltagecontrolerClass = self._voltagecontrolerClass
if self._voltage_controler is not None:
new_obj._voltage_controler = self._voltage_controler.copy()
else:
new_obj._voltage_controler = None
# needed for the "Environment.get_kwargs(env, False, False)" (used in the observation_space)
new_obj._attention_budget_cls = self._attention_budget_cls # const
new_obj._kwargs_attention_budget = copy.deepcopy(self._kwargs_attention_budget)
new_obj._has_attention_budget = self._has_attention_budget
# opponent
new_obj._opponent_space_type = self._opponent_space_type
new_obj._opponent_action_class = self._opponent_action_class # const
new_obj._opponent_class = self._opponent_class # const
new_obj._opponent_init_budget = self._opponent_init_budget
new_obj._opponent_attack_duration = self._opponent_attack_duration
new_obj._opponent_attack_cooldown = self._opponent_attack_cooldown
new_obj._opponent_budget_per_ts = self._opponent_budget_per_ts
new_obj._kwargs_opponent = copy.deepcopy(self._kwargs_opponent)
new_obj._opponent_budget_class = copy.deepcopy(
self._opponent_budget_class
) # const
new_obj._opponent_action_space = self._opponent_action_space # const
new_obj._compute_opp_budget = self._opponent_budget_class(
self._opponent_action_space
)
new_obj._observation_bk_class = self._observation_bk_class
new_obj._observation_bk_kwargs = self._observation_bk_kwargs
# do not copy it.
new_obj._highres_sim_counter = self._highres_sim_counter
# observation space (might depends on the previous things)
# at this stage the function "Environment.get_kwargs(env, False, False)" should run
new_obj._kwargs_observation = copy.deepcopy(self._kwargs_observation)
new_obj._observation_space = self._observation_space.copy(copy_backend=True, env=new_obj)
new_obj._observation_space._legal_action = (
new_obj._game_rules.legal_action
) # TODO this does not respect SOLID principles at all !
new_obj._observation_space._ptr_kwargs_observation = new_obj._kwargs_observation
new_obj._reward_helper = copy.deepcopy(self._reward_helper)
# gym compatibility
new_obj.reward_range = copy.deepcopy(self.reward_range)
new_obj._viewer = copy.deepcopy(self._viewer)
new_obj.viewer_fig = copy.deepcopy(self.viewer_fig)
# init the opponent
new_obj._opponent = new_obj._opponent_class.__new__(new_obj._opponent_class)
self._opponent._custom_deepcopy_for_copy(
new_obj._opponent, {"partial_env": new_obj, **new_obj._kwargs_opponent}
)
new_obj._oppSpace = new_obj._opponent_space_type(
compute_budget=new_obj._compute_opp_budget,
init_budget=new_obj._opponent_init_budget,
attack_duration=new_obj._opponent_attack_duration,
attack_cooldown=new_obj._opponent_attack_cooldown,
budget_per_timestep=new_obj._opponent_budget_per_ts,
opponent=new_obj._opponent
)
state_me, state_opp = self._oppSpace._get_state()
new_obj._oppSpace._set_state(state_me)
# to change the parameters
new_obj.__new_param = copy.deepcopy(self.__new_param)
new_obj.__new_forecast_param = copy.deepcopy(self.__new_forecast_param)
new_obj.__new_reward_func = copy.deepcopy(self.__new_reward_func)
# storage units
new_obj._storage_current_charge = copy.deepcopy(self._storage_current_charge)
new_obj._storage_previous_charge = copy.deepcopy(self._storage_previous_charge)
new_obj._action_storage = copy.deepcopy(self._action_storage)
new_obj._amount_storage = copy.deepcopy(self._amount_storage)
new_obj._amount_storage_prev = copy.deepcopy(self._amount_storage_prev)
new_obj._storage_power = copy.deepcopy(self._storage_power)
new_obj._storage_power_prev = copy.deepcopy(self._storage_power_prev)
# curtailment
new_obj._limit_curtailment = copy.deepcopy(self._limit_curtailment)
new_obj._limit_curtailment_prev = copy.deepcopy(self._limit_curtailment_prev)
new_obj._gen_before_curtailment = copy.deepcopy(self._gen_before_curtailment)
new_obj._sum_curtailment_mw = copy.deepcopy(self._sum_curtailment_mw)
new_obj._sum_curtailment_mw_prev = copy.deepcopy(self._sum_curtailment_mw_prev)
new_obj._limited_before = copy.deepcopy(self._limited_before)
# attention budget
new_obj._attention_budget = copy.deepcopy(self._attention_budget)
new_obj._is_alarm_illegal = copy.deepcopy(self._is_alarm_illegal)
new_obj._is_alarm_used_in_reward = copy.deepcopy(self._is_alarm_used_in_reward)
# alert
new_obj._is_alert_illegal = copy.deepcopy(self._is_alert_illegal)
new_obj._is_alert_used_in_reward = copy.deepcopy(self._is_alert_used_in_reward)
new_obj._has_just_been_seeded = self._has_just_been_seeded
# extra things used by the reward to pass to the obs
new_obj._reward_to_obs = copy.deepcopy(self._reward_to_obs)
# time_dependant attributes for the "forecast env"
if self._init_obs is None:
new_obj._init_obs = None
else:
new_obj._init_obs = self._init_obs.copy()
# do not forget !
new_obj._is_test = self._is_test
# alert
new_obj._last_alert = copy.deepcopy(self._last_alert)
new_obj._time_since_last_alert = copy.deepcopy(self._time_since_last_alert)
new_obj._alert_duration = copy.deepcopy(self._alert_duration)
new_obj._total_number_of_alert = self._total_number_of_alert
new_obj._time_since_last_attack = copy.deepcopy(self._time_since_last_attack)
new_obj._is_already_attacked = copy.deepcopy(self._is_already_attacked)
new_obj._attack_under_alert = copy.deepcopy(self._attack_under_alert)
new_obj._was_alert_used_after_attack = copy.deepcopy(self._was_alert_used_after_attack)
new_obj._update_obs_after_reward = copy.deepcopy(self._update_obs_after_reward)
if self._last_obs is not None:
new_obj._last_obs = self._last_obs.copy(env=new_obj)
else:
new_obj._last_obs = None
# observation
from grid2op.Environment._obsEnv import _ObsEnv
if self.current_obs is not None and not isinstance(self, _ObsEnv):
# breaks for some version of lightsim2grid... (a powerflow need to be run to retrieve the observation)
new_obj.current_obs = new_obj.get_obs()
[docs] def get_path_env(self):
"""
Get the path that allows to create this environment.
It can be used for example in :func:`grid2op.utils.EpisodeStatistics`
to save the information directly inside
the environment data.
"""
if self.__closed:
raise EnvError("This environment is closed, you cannot get its path.")
res = self._init_env_path if self._init_env_path is not None else ""
return res
def _check_alarm_file_consistent(self, dict_):
if (self.ALERT_KEY not in dict_) and (self.ALARM_KEY not in dict_):
raise EnvError(
f'One of {self.ALERT_KEY} or {self.ALARM_KEY} should be present in the alarm data json, for now.'
)
def _set_no_alarm(self):
bk_cls = type(self.backend)
bk_cls.dim_alarms = 0
bk_cls.alarms_area_names = []
bk_cls.alarms_lines_area = {}
bk_cls.alarms_area_lines = []
[docs] def load_alarm_data(self):
"""
Internal
.. warning::
/!\\\\ Only valid with "l2rpn_icaps_2021" environment /!\\\\
Notes
------
This is called when the environment class is not created, so i need to read the data of the grid from the
backend.
I cannot use "self.name_line" for example.
This function update the backend INSTANCE. The backend class is then updated in the
:func:`BaseEnv._init_backend`
function with a call to `self.backend.assert_grid_correct()`
Returns
-------
"""
file_alarms = os.path.join(self.get_path_env(), BaseEnv.ALARM_FILE_NAME)
if os.path.exists(file_alarms) and os.path.isfile(file_alarms):
with open(file_alarms, mode="r", encoding="utf-8") as f:
dict_alarm = json.load(f)
self._check_alarm_file_consistent(dict_alarm)
if self.ALARM_KEY not in dict_alarm:
# not an alarm but an alert
self._set_no_alarm()
return # TODO update grid in this case !
nb_areas = len(dict_alarm[self.ALARM_KEY]) # need to be remembered
line_names = {
el: [] for el in self.backend.name_line
} # need to be remembered
area_names = sorted(dict_alarm[self.ALARM_KEY].keys()) # need to be remembered
area_lines = [[] for _ in range(nb_areas)] # need to be remembered
for area_id, area_name in enumerate(area_names):
# check that: all lines in files are in the grid
area = dict_alarm[self.ALARM_KEY][area_name]
for line in area:
if line not in line_names:
raise EnvError(
f"You provided a description of the area of the grid for the alarms, but a "
f'line named "{line}" is present in your file but not in the grid. Please '
f"check the file {file_alarms} and make sure it contains only the line named "
f"{sorted(self.backend.name_line)}."
)
# update the list and dictionary that remembers everything
line_names[line].append(area_name)
area_lines[area_id].append(line)
for line, li_area in line_names.items():
# check that all lines in the grid are in at least one area
if not li_area:
raise EnvError(
f"Line (on the grid) named {line} is not in any areas. This is not supported at "
f"the moment"
)
# every check pass, i update the backend class
bk_cls = type(self.backend)
bk_cls.tell_dim_alarm(nb_areas)
bk_cls.alarms_area_names = copy.deepcopy(area_names)
bk_cls.alarms_lines_area = copy.deepcopy(line_names)
bk_cls.alarms_area_lines = copy.deepcopy(area_lines)
else:
self._set_no_alarm()
def _set_no_alert(self):
bk_cls = type(self.backend)
bk_cls.tell_dim_alert(0)
bk_cls.alertable_line_names = []
bk_cls.alertable_line_ids = np.array([], dtype=dt_int)
[docs] def load_alert_data(self):
"""
Internal
Notes
------
This is called to get the alertable lines when the warning is raised "by line"
Returns
-------
"""
file_alarms = os.path.join(self.get_path_env(), BaseEnv.ALERT_FILE_NAME)
if os.path.exists(file_alarms) and os.path.isfile(file_alarms):
with open(file_alarms, mode="r", encoding="utf-8") as f:
dict_alert = json.load(f)
self._check_alarm_file_consistent(dict_alert)
if self.ALERT_KEY not in dict_alert:
# not an alert but an alarm
self._set_no_alert()
return
if dict_alert[self.ALERT_KEY] != "opponent":
raise EnvError('You can only define alert from the opponent for now.')
if "lines_attacked" in self._kwargs_opponent:
lines_attacked = copy.deepcopy(self._kwargs_opponent["lines_attacked"])
if isinstance(lines_attacked[0], list):
lines_attacked = sum(lines_attacked, start=[])
else:
lines_attacked = []
warnings.warn("The kwargs \"lines_attacked\" is not present in the description of your opponent "
"yet you want to use alert. Know that in this case no alert will be defined...")
alertable_line_names = copy.deepcopy(lines_attacked)
alertable_line_ids = np.empty(len(alertable_line_names), dtype=dt_int)
for i, el in enumerate(alertable_line_names):
indx = (self.backend.name_line == el).nonzero()[0]
if not len(indx):
raise Grid2OpException(f"Attacked line {el} is not found in the grid.")
alertable_line_ids[i] = indx[0]
nb_lines = len(alertable_line_ids)
bk_cls = type(self.backend)
bk_cls.tell_dim_alert(nb_lines)
bk_cls.alertable_line_names = copy.deepcopy(alertable_line_names)
bk_cls.alertable_line_ids = np.array(alertable_line_ids).astype(dt_int)
else:
self._set_no_alert()
@property
def action_space(self) -> ActionSpace:
"""this represent a view on the action space"""
return self._action_space
@action_space.setter
def action_space(self, other):
raise EnvError(
"Impossible to modify the action space of the environment. You probably want to modify "
"the action with which the agent is interacting. You can do that with a converter, or "
"using the GymEnv. Please consult the documentation."
)
@property
def observation_space(self) -> ObservationSpace:
"""this represent a view on the action space"""
return self._observation_space
@observation_space.setter
def observation_space(self, other):
raise EnvError(
"Impossible to modify the observation space of the environment. You probably want to modify "
"the observation with which the agent is interacting. You can do that with a converter, or "
"using the GymEnv. Please consult the documentation."
)
[docs] def change_parameters(self, new_parameters):
"""
Allows to change the parameters of an environment.
Notes
------
This only affects the environment AFTER `env.reset()` has been called.
This only affects the environment and NOT the forecast.
Parameters
----------
new_parameters: :class:`grid2op.Parameters.Parameters`
The new parameters you want the environment to get.
Examples
---------
You can use this function like:
.. code-block:: python
import grid2op
from grid2op.Parameters import Parameters
env_name = "l2rpn_case14_sandbox" # or any other name
env = grid2op.make(env_name)
env.parameters.NO_OVERFLOW_DISCONNECTION # -> False
new_param = Parameters()
new_param.A_MEMBER = A_VALUE # eg new_param.NO_OVERFLOW_DISCONNECTION = True
env.change_parameters(new_param)
obs = env.reset()
env.parameters.NO_OVERFLOW_DISCONNECTION # -> True
"""
if self.__closed:
raise EnvError(
"This environment is closed, you cannot change its parameters."
)
if not isinstance(new_parameters, Parameters):
raise EnvError(
'The new parameters "new_parameters" should be an instance of '
"grid2op.Parameters.Parameters. "
)
new_parameters.check_valid() # check the provided parameters are valid
self.__new_param = new_parameters
[docs] def change_forecast_parameters(self, new_parameters):
"""
Allows to change the parameters of a "forecast environment" that is for
the method :func:`grid2op.Observation.BaseObservation.simulate` and
:func:`grid2op.Observation.BaseObservation.get_forecast_env`
Notes
------
This only affects the environment AFTER `env.reset()` has been called.
This only affects the "forecast env" and NOT the env itself.
Parameters
----------
new_parameters: :class:`grid2op.Parameters.Parameters`
The new parameters you want the environment to get.
Examples
--------
This can be used like:
.. code-block:: python
import grid2op
env_name = "l2rpn_case14_sandbox" # or any other name
env = grid2op.make(env_name)
param = env.parameters
param.NO_OVERFLOW_DISCONNECTION = True # or any other properties of the environment
env.change_forecast_parameters(param)
# at this point this has no impact.
obs = env.reset()
# now, after the reset, the right parameters are used
sim_obs, sim_reward, sim_done, sim_info = obs.simulate(env.action_space())
# the new parameters `param` are used for this
# and also for
forecasted_env = obs.get_forecast_env()
"""
if self.__closed:
raise EnvError(
"This environment is closed, you cannot change its parameters (for the forecast / simulate)."
)
if not isinstance(new_parameters, Parameters):
raise EnvError(
'The new parameters "new_parameters" should be an instance of '
"grid2op.Parameters.Parameters."
)
new_parameters.check_valid() # check the provided parameters are valid
self.__new_forecast_param = new_parameters
def _create_attention_budget(self, **kwargs):
if not self.__is_init:
raise EnvError(
"Impossible to create an attention budget with a non initialized environment!"
)
if self._has_attention_budget:
if type(self).assistant_warning_type == "zonal":
self._attention_budget = self._attention_budget_cls()
try:
self._kwargs_attention_budget.update(kwargs)
self._attention_budget.init(
partial_env=self, **self._kwargs_attention_budget
)
except TypeError as exc_:
raise EnvError(
"Impossible to create the attention budget with the provided argument. Please "
'change the content of the argument "kwargs_attention_budget".'
) from exc_
elif type(self).assistant_warning_type == "by_line":
self._has_attention_budget = False
def _create_opponent(self):
if not self.__is_init:
raise EnvError(
"Impossible to create an opponent with a non initialized environment!"
)
if not issubclass(self._opponent_action_class, BaseAction):
raise EnvError(
"Impossible to make an environment with an opponent action class not derived from BaseAction"
)
try:
self._opponent_init_budget = dt_float(self._opponent_init_budget)
except Exception as e:
raise EnvError(
'Impossible to convert "opponent_init_budget" to a float with error {}'.format(
e
)
)
if self._opponent_init_budget < 0.0:
raise EnvError(
"If you want to deactivate the opponent, please don't set its budget to a negative number."
'Prefer the use of the DontAct action type ("opponent_action_class=DontAct" '
"and / or set its budget to 0."
)
if not issubclass(self._opponent_class, BaseOpponent):
raise EnvError(
"Impossible to make an opponent with a type that does not inherit from BaseOpponent."
)
self._opponent_action_class._add_shunt_data()
self._opponent_action_class._update_value_set()
self._opponent_action_space = self._helper_action_class(
gridobj=type(self.backend),
legal_action=AlwaysLegal,
actionClass=self._opponent_action_class,
_local_dir_cls=self._local_dir_cls
)
self._compute_opp_budget = self._opponent_budget_class(
self._opponent_action_space
)
self._opponent = self._opponent_class(self._opponent_action_space)
self._oppSpace = self._opponent_space_type(
compute_budget=self._compute_opp_budget,
init_budget=self._opponent_init_budget,
attack_duration=self._opponent_attack_duration,
attack_cooldown=self._opponent_attack_cooldown,
budget_per_timestep=self._opponent_budget_per_ts,
opponent=self._opponent,
_local_dir_cls=self._local_dir_cls,
)
self._oppSpace.init_opponent(partial_env=self, **self._kwargs_opponent)
self._oppSpace.reset()
def _init_myclass(self):
if self._backend_action_class is not None:
# the class has already been initialized
return
# remember the original grid2op class
orig_cls = type(self)
# be careful here: you need to initialize from the class, and not from the object
bk_type = type(self.backend)
# create the proper environment class for this specific environment
new_cls = type(self).init_grid(bk_type, _local_dir_cls=self._local_dir_cls)
# assign the right initial grid class
if orig_cls._INIT_GRID_CLS is None:
new_cls._INIT_GRID_CLS = orig_cls
else:
new_cls._INIT_GRID_CLS = orig_cls._INIT_GRID_CLS
self.__class__ = new_cls
def _has_been_initialized(self):
# type of power flow to play
# if True, then it will not disconnect lines above their thermal limits
self._init_myclass()
bk_type = type(self.backend)
if np.min([self.n_line, self.n_gen, self.n_load, self.n_sub]) <= 0:
raise EnvironmentError("Environment has not been initialized properly")
self._backend_action_class = _BackendAction.init_grid(bk_type, _local_dir_cls=self._local_dir_cls)
self._backend_action = self._backend_action_class()
# initialize maintenance / hazards
self._time_next_maintenance = np.full(self.n_line, -1, dtype=dt_int)
self._duration_next_maintenance = np.zeros(shape=(self.n_line,), dtype=dt_int)
self._times_before_line_status_actionable = np.full(
shape=(self.n_line,), fill_value=0, dtype=dt_int
)
# create the vector to the proper shape
self._target_dispatch = np.zeros(self.n_gen, dtype=dt_float)
self._already_modified_gen = np.zeros(self.n_gen, dtype=dt_bool)
self._actual_dispatch = np.zeros(self.n_gen, dtype=dt_float)
self._gen_uptime = np.zeros(self.n_gen, dtype=dt_int)
self._gen_downtime = np.zeros(self.n_gen, dtype=dt_int)
self._gen_activeprod_t = np.zeros(self.n_gen, dtype=dt_float)
self._gen_activeprod_t_redisp = np.zeros(self.n_gen, dtype=dt_float)
self._max_timestep_line_status_deactivated = (
self._parameters.NB_TIMESTEP_COOLDOWN_LINE
)
self._times_before_line_status_actionable = np.zeros(
shape=(self.n_line,), dtype=dt_int
)
self._times_before_topology_actionable = np.zeros(
shape=(self.n_sub,), dtype=dt_int
)
self._nb_timestep_overflow_allowed = np.full(
shape=(self.n_line,),
fill_value=self._parameters.NB_TIMESTEP_OVERFLOW_ALLOWED,
dtype=dt_int,
)
self._hard_overflow_threshold = np.full(
shape=(self.n_line,),
fill_value=self._parameters.HARD_OVERFLOW_THRESHOLD,
dtype=dt_float,
)
self._timestep_overflow = np.zeros(shape=(self.n_line,), dtype=dt_int)
# update the parameters
self.__new_param = self._parameters # small hack to have it working as expected
self._update_parameters()
self._reset_redispatching()
# storage
self._storage_current_charge = np.zeros(self.n_storage, dtype=dt_float)
self._storage_previous_charge = np.zeros(self.n_storage, dtype=dt_float)
self._action_storage = np.zeros(self.n_storage, dtype=dt_float)
self._storage_power = np.zeros(self.n_storage, dtype=dt_float)
self._storage_power_prev = np.zeros(self.n_storage, dtype=dt_float)
self._amount_storage = 0.0
self._amount_storage_prev = 0.0
# curtailment
self._limit_curtailment = np.ones(
self.n_gen, dtype=dt_float
) # in ratio of pmax
self._limit_curtailment_prev = np.ones(
self.n_gen, dtype=dt_float
) # in ratio of pmax
self._gen_before_curtailment = np.zeros(self.n_gen, dtype=dt_float) # in MW
self._sum_curtailment_mw = dt_float(0.0)
self._sum_curtailment_mw_prev = dt_float(0.0)
self._reset_curtailment()
# register this is properly initialized
self.__is_init = True
def _update_parameters(self):
"""update value for the new parameters"""
self._parameters = self.__new_param
self._ignore_min_up_down_times = self._parameters.IGNORE_MIN_UP_DOWN_TIME
self._forbid_dispatch_off = not self._parameters.ALLOW_DISPATCH_GEN_SWITCH_OFF
# type of power flow to play
# if True, then it will not disconnect lines above their thermal limits
self._no_overflow_disconnection = self._parameters.NO_OVERFLOW_DISCONNECTION
# store actions "cooldown"
self._max_timestep_line_status_deactivated = (
self._parameters.NB_TIMESTEP_COOLDOWN_LINE
)
self._max_timestep_topology_deactivated = (
self._parameters.NB_TIMESTEP_COOLDOWN_SUB
)
self._nb_ts_reco = self._parameters.NB_TIMESTEP_RECONNECTION
self._nb_timestep_overflow_allowed[
:
] = self._parameters.NB_TIMESTEP_OVERFLOW_ALLOWED
self._hard_overflow_threshold[:] = self._parameters.HARD_OVERFLOW_THRESHOLD
# hard overflow part
self._env_dc = self._parameters.ENV_DC
self.__new_param = None
def set_id(self, id_: Union[int, str]) -> None:
# nothing to do in general, overloaded for real Environment
pass
[docs] def reset(self,
*,
seed: Union[int, None] = None,
options: RESET_OPTIONS_TYPING = None):
"""
Reset the base environment (set the appropriate variables to correct initialization).
It is (and must be) overloaded in other :class:`grid2op.Environment`
"""
if self.__closed:
raise EnvError("This environment is closed. You cannot use it anymore.")
if options is not None:
for el in options:
if el not in type(self).KEYS_RESET_OPTIONS:
raise EnvError(f"You tried to customize the `reset` call with some "
f"`options` using the key `{el}` which is invalid. "
f"Only keys in {sorted(list(type(self).KEYS_RESET_OPTIONS))} "
f"can be used.")
self.__is_init = True
# current = None is an indicator that this is the first step of the environment
# so don't change the setting of current_obs = None unless you are willing to change that
self.current_obs = None
self._line_status[:] = True
if self.__new_param is not None:
self._update_parameters() # reset __new_param to None too
if self.__new_forecast_param is not None:
self._observation_space._change_parameters(self.__new_forecast_param)
self.__new_forecast_param = None
if self.__new_reward_func is not None:
self._reward_helper.change_reward(self.__new_reward_func)
self._reward_helper.initialize(self)
self.reward_range = self._reward_helper.range()
# change also the reward used in simulate
self._observation_space.change_reward(self._reward_helper.template_reward)
self.__new_reward_func = None
self._last_obs = None
if options is not None and "time serie id" in options:
self.set_id(options["time serie id"])
if seed is not None:
self.seed(seed)
elif self.seed_used is not None and not self._has_just_been_seeded:
# seeds (so that next episode does not depend on what happened in previous episode)
self.seed(None, _seed_me=False)
self._reset_storage()
self._reset_curtailment()
self._reset_alert()
self._reward_to_obs = {}
self._has_just_been_seeded = False
def _reset_alert(self):
self._last_alert[:] = False
self._is_already_attacked[:] = False
self._time_since_last_alert[:] = -1
self._alert_duration[:] = 0
self._total_number_of_alert = 0
self._time_since_last_attack[:] = -1
self._was_alert_used_after_attack[:] = 0
self._attack_under_alert[:] = 0
def _reset_storage(self):
"""reset storage capacity at the beginning of new environment if needed"""
if self.n_storage > 0:
tmp = self._parameters.INIT_STORAGE_CAPACITY * self.storage_Emax
if self._parameters.ACTIVATE_STORAGE_LOSS:
tmp += self.storage_loss * self.delta_time_seconds / 3600.0
self._storage_previous_charge[
:
] = tmp # might not be needed, but it's not for the time it takes...
self._storage_current_charge[:] = tmp
self._storage_power[:] = 0.0
self._storage_power_prev[:] = 0.0
self._amount_storage = 0.0
self._amount_storage_prev = 0.0
# TODO storage: check in simulate too!
def _reset_curtailment(self):
self._limit_curtailment[self.gen_renewable] = 1.0
self._limit_curtailment_prev[self.gen_renewable] = 1.0
self._gen_before_curtailment[:] = 0.0
self._sum_curtailment_mw = dt_float(0.0)
self._sum_curtailment_mw_prev = dt_float(0.0)
self._limited_before = dt_float(0.0)
[docs] def seed(self, seed=None, _seed_me=True):
"""
Set the seed of this :class:`Environment` for a better control and to ease reproducible experiments.
.. seealso::
function :func:`Environment.reset` for extra information
.. versionchanged:: 1.9.8
Starting from version 1.9.8 you can directly set the seed when calling
reset.
.. warning::
It is preferable to call this function `just before` a call to `env.reset()` otherwise
the seeding might not work properly (especially if some non standard "time serie generators"
*aka* chronics are used)
Parameters
----------
seed: ``int``
The seed to set.
_seed_me: ``bool``
Whether to seed this instance or just the other things. Used internally only.
Returns
---------
seed: ``tuple``
The seed used to set the prng (pseudo random number generator) for the environment
seed_chron: ``tuple``
The seed used to set the prng for the chronics_handler (if any), otherwise ``None``
seed_obs: ``tuple``
The seed used to set the prng for the observation space (if any), otherwise ``None``
seed_action_space: ``tuple``
The seed used to set the prng for the action space (if any), otherwise ``None``
seed_env_modif: ``tuple``
The seed used to set the prng for the modification of th environment (if any otherwise ``None``)
seed_volt_cont: ``tuple``
The seed used to set the prng for voltage controler (if any otherwise ``None``)
seed_opponent: ``tuple``
The seed used to set the prng for the opponent (if any otherwise ``None``)
Examples
---------
Seeding an environment should be done with:
.. code-block:: python
import grid2op
env = grid2op.make("l2rpn_case14_sandbox")
env.seed(0)
obs = env.reset()
As long as the environment instance (variable `env` in the above code) is not `reset` the `env.seed` has no
real effect (but can have side effect).
For a full control on the seed mechanism it is more than advised to reset it after it has been seeded.
"""
if self.__closed:
raise EnvError("This environment is closed. You cannot use it anymore.")
seed_init = None
seed_chron = None
seed_obs = None
seed_action_space = None
seed_env_modif = None
seed_volt_cont = None
seed_opponent = None
if _seed_me:
max_int = np.iinfo(dt_int).max
if seed > max_int:
raise Grid2OpException("Seed is too big. Max value is {}, provided value is {}".format(max_int, seed))
try:
seed = np.array(seed).astype(dt_int)
except Exception as exc_:
raise Grid2OpException(
"Impossible to seed with the seed provided. Make sure it can be converted to a"
"numpy 32 bits integer."
)
# example from gym
# self.np_random, seed = seeding.np_random(seed)
# inspiration from @ https://github.com/openai/gym/tree/master/gym/utils
seed_init = seed
super().seed(seed_init)
max_seed = np.iinfo(dt_int).max # 2**32 - 1
if self.chronics_handler is not None:
seed = self.space_prng.randint(max_seed)
seed_chron = self.chronics_handler.seed(seed)
if self._observation_space is not None:
seed = self.space_prng.randint(max_seed)
seed_obs = self._observation_space.seed(seed)
if self._action_space is not None:
seed = self.space_prng.randint(max_seed)
seed_action_space = self._action_space.seed(seed)
if self._helper_action_env is not None:
seed = self.space_prng.randint(max_seed)
seed_env_modif = self._helper_action_env.seed(seed)
if self._voltage_controler is not None:
seed = self.space_prng.randint(max_seed)
seed_volt_cont = self._voltage_controler.seed(seed)
if self._opponent is not None:
seed = self.space_prng.randint(max_seed)
seed_opponent = self._opponent.seed(seed)
self._has_just_been_seeded = True
return (
seed_init,
seed_chron,
seed_obs,
seed_action_space,
seed_env_modif,
seed_volt_cont,
seed_opponent,
)
[docs] def deactivate_forecast(self):
"""
This function will have the effect to deactivate the `obs.simulate`, the forecast will not be updated
in the observation space.
This will most likely lead to some performance increase (~10-15% faster) if you don't use the
`obs.simulate` function.
Notes
------
If you really don't want to use the `obs.simulate` functionality, you should rather disable it at the creation
of the environment. For example, if you use the recommended `make` function, you can pass an argument
that will ignore the chronics even when reading it (using `GridStateFromFile` instead of
`GridStateFromFileWithForecast` for example) this would give something like:
.. code-block:: python
import grid2op
from grid2op.Chronics import GridStateFromFile
# tell grid2op not to read the "forecast"
env = grid2op.make("l2rpn_case14_sandbox", data_feeding_kwargs={"gridvalueClass": GridStateFromFile})
do_nothing_action = env.action_space()
# improve speed ups to not even try to use forecast
env.deactivate_forecast()
# this is normal behavior
obs = env.reset()
# but this will make the programm stop working
# obs.simulate(do_nothing_action) # DO NOT RUN IT RAISES AN ERROR
"""
if self.__closed:
raise EnvError("This environment is closed, you cannot use it.")
if self._observation_space is not None:
self._observation_space.with_forecast = False
self.with_forecast = False
[docs] def reactivate_forecast(self):
"""
This function will have the effect to reactivate the `obs.simulate`, the forecast will be updated
in the observation space.
This will most likely lead to some performance decrease but you will be able to use `obs.simulate` function.
.. warning::
Forecast are deactivated by default (and cannot be reactivated) if the
backend cannot be copied.
.. warning::
You need to call 'env.reset()' for this function to work properly. It is NOT recommended
to reactivate forecasts in the middle of an episode.
Notes
------
You can use this function as followed:
.. code-block:: python
import grid2op
from grid2op.Chronics import GridStateFromFile
# tell grid2op not to read the "forecast"
env = grid2op.make("l2rpn_case14_sandbox", data_feeding_kwargs={"gridvalueClass": GridStateFromFile})
do_nothing_action = env.action_space()
# improve speed ups to not even try to use forecast
env.deactivate_forecast()
# this is normal behavior
obs = env.reset()
# but this will make the programm stop working
# obs.simulate(do_nothing_action) # DO NOT RUN IT RAISES AN ERROR
env.reactivate_forecast()
obs = env.reset() # you need to reset the env for this function to have any effects
obs, reward, done, info = env.step(do_nothing_action)
# and now forecast are available again
simobs, sim_r, sim_d, sim_info = obs.simulate(do_nothing_action)
"""
if self.__closed:
raise EnvError("This environment is closed, you cannot use it.")
if not self.backend._can_be_copied:
raise EnvError("Impossible to activate the forecasts with a "
"backend that cannot be copied.")
if self._observation_space is not None:
self._observation_space.reactivate_forecast(self)
self.with_forecast = True
def _init_alert_data(self):
cls = type(self)
self._last_alert = np.full(cls.dim_alerts,
dtype=dt_bool, fill_value=False)
self._is_already_attacked = np.full(cls.dim_alerts, dtype=dt_bool,
fill_value=False)
self._time_since_last_alert = np.full(cls.dim_alerts,
dtype=dt_int, fill_value=-1)
self._alert_duration = np.full(cls.dim_alerts,
dtype=dt_int, fill_value=0)
self._total_number_of_alert = 0
self._time_since_last_attack = np.full(cls.dim_alerts,
dtype=dt_int, fill_value=-1)
self._was_alert_used_after_attack = np.full(cls.dim_alerts,
dtype=dt_int, fill_value=0)
self._attack_under_alert = np.full(cls.dim_alerts,
dtype=dt_int, fill_value=0)
@abstractmethod
def _init_backend(
self,
chronics_handler,
backend,
names_chronics_to_backend,
actionClass,
observationClass,
rewardClass,
legalActClass,
):
"""
INTERNAL
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
This method is used for Environment specific implementation. Only use it if you know exactly what
you are doing.
"""
pass
[docs] def set_thermal_limit(self, thermal_limit):
"""
Set the thermal limit effectively.
Parameters
----------
thermal_limit: ``numpy.ndarray``
The new thermal limit. It must be a numpy ndarray vector (or convertible to it). For each powerline it
gives the new thermal limit.
Alternatively, this can be a dictionary mapping the line names (keys) to its thermal limits (values). In
that case, all thermal limits for all powerlines should be specified (this is a safety measure
to reduce the odds of misuse).
Examples
---------
This function can be used like this:
.. code-block:: python
import grid2op
# I create an environment
env = grid2op.make(""l2rpn_case14_sandbox"", test=True)
# i set the thermal limit of each powerline to 20000 amps
env.set_thermal_limit([20000 for _ in range(env.n_line)])
Notes
-----
As of grid2op > 1.5.0, it is possible to set the thermal limit by using a dictionary with the keys being
the name of the powerline and the values the thermal limits.
"""
if self.__closed:
raise EnvError("This environment is closed, you cannot use it.")
if not self.__is_init:
raise Grid2OpException(
"Impossible to set the thermal limit to a non initialized Environment. "
"Have you called `env.reset()` after last game over ?"
)
if isinstance(thermal_limit, dict):
tmp = np.full(self.n_line, fill_value=np.NaN, dtype=dt_float)
for key, val in thermal_limit.items():
if key not in self.name_line:
raise Grid2OpException(
f"When setting a thermal limit with a dictionary, the keys should be line "
f"names. We found: {key} which is not a line name. The names of the "
f"powerlines are {self.name_line}"
)
ind_line = (self.name_line == key).nonzero()[0][0]
if np.isfinite(tmp[ind_line]):
raise Grid2OpException(
f"Humm, there is a really strange bug, some lines are set twice."
)
try:
val_fl = float(val)
except Exception as exc_:
raise Grid2OpException(
f"When setting thermal limit with a dictionary, the keys should be "
f"the values of the thermal limit (in amps) you provided something that "
f'cannot be converted to a float. Error was "{exc_}".'
)
tmp[ind_line] = val_fl
elif isinstance(thermal_limit, (np.ndarray, list)):
try:
tmp = np.array(thermal_limit).flatten().astype(dt_float)
except Exception as exc_:
raise Grid2OpException(
f"Impossible to convert the vector as input into a 1d numpy float array. "
f"Error was: \n {exc_}"
)
if tmp.shape[0] != self.n_line:
raise Grid2OpException(
"Attempt to set thermal limit on {} powerlines while there are {}"
"on the grid".format(tmp.shape[0], self.n_line)
)
if (~np.isfinite(tmp)).any():
raise Grid2OpException(
"Impossible to use non finite value for thermal limits."
)
else:
raise Grid2OpException(
f"You can only set the thermal limits of the environment with a dictionary (in that "
f"case the keys are the line names, and the values the thermal limits) or with "
f"a numpy array that has as many components of the number of powerlines on "
f'the grid. You provided something with type "{type(thermal_limit)}" which '
f"is not supported."
)
self._thermal_limit_a[:] = tmp
self.backend.set_thermal_limit(self._thermal_limit_a)
self.observation_space.set_thermal_limit(self._thermal_limit_a)
def _reset_redispatching(self):
# redispatching
self._target_dispatch[:] = 0.0
self._already_modified_gen[:] = False
self._actual_dispatch[:] = 0.0
self._gen_uptime[:] = 0
self._gen_downtime[:] = 0
self._gen_activeprod_t[:] = 0.0
self._gen_activeprod_t_redisp[:] = 0.0
def _get_new_prod_setpoint(self, action):
"""
NB this is overidden in _ObsEnv where the data are read from the action to set this environment
instead
"""
# get the modification of generator active setpoint from the action
new_p = 1.0 * self._gen_activeprod_t
if "prod_p" in action._dict_inj:
tmp = action._dict_inj["prod_p"]
indx_ok = np.isfinite(tmp)
new_p[indx_ok] = tmp[indx_ok]
# modification of the environment always override the modification of the agents (if any)
# TODO have a flag there if this is the case.
if "prod_p" in self._env_modification._dict_inj:
# modification of the production setpoint value
tmp = self._env_modification._dict_inj["prod_p"]
indx_ok = np.isfinite(tmp)
new_p[indx_ok] = tmp[indx_ok]
return new_p
def _get_already_modified_gen(self, action):
redisp_act_orig = 1.0 * action._redispatch
self._target_dispatch[self._already_modified_gen] += redisp_act_orig[self._already_modified_gen]
first_modified = (~self._already_modified_gen) & (redisp_act_orig != 0)
self._target_dispatch[first_modified] = (
self._actual_dispatch[first_modified] + redisp_act_orig[first_modified]
)
self._already_modified_gen[redisp_act_orig != 0] = True
return self._already_modified_gen
def _prepare_redisp(self, action, new_p, already_modified_gen):
# trying with an optimization method
except_ = None
info_ = []
valid = True
# get the redispatching action (if any)
redisp_act_orig = 1.0 * action._redispatch
if (
np.all(np.abs(redisp_act_orig) <= 1e-7)
and np.all(np.abs(self._target_dispatch) <= 1e-7)
and np.all(np.abs(self._actual_dispatch) <= 1e-7)
):
return valid, except_, info_
# check that everything is consistent with pmin, pmax:
if (self._target_dispatch > self.gen_pmax - self.gen_pmin).any():
# action is invalid, the target redispatching would be above pmax for at least a generator
cond_invalid = self._target_dispatch > self.gen_pmax - self.gen_pmin
except_ = InvalidRedispatching(
"You cannot ask for a dispatch higher than pmax - pmin [it would be always "
"invalid because, even if the sepoint is pmin, this dispatch would set it "
"to a number higher than pmax, which is impossible]. Invalid dispatch for "
"generator(s): "
"{}".format((cond_invalid).nonzero()[0])
)
self._target_dispatch -= redisp_act_orig
return valid, except_, info_
if (self._target_dispatch < self.gen_pmin - self.gen_pmax).any():
# action is invalid, the target redispatching would be below pmin for at least a generator
cond_invalid = self._target_dispatch < self.gen_pmin - self.gen_pmax
except_ = InvalidRedispatching(
"You cannot ask for a dispatch lower than pmin - pmax [it would be always "
"invalid because, even if the sepoint is pmax, this dispatch would set it "
"to a number bellow pmin, which is impossible]. Invalid dispatch for "
"generator(s): "
"{}".format((cond_invalid).nonzero()[0])
)
self._target_dispatch -= redisp_act_orig
return valid, except_, info_
# i can't redispatch turned off generators [turned off generators need to be turned on before redispatching]
if (redisp_act_orig[np.abs(new_p) <= 1e-7]).any() and self._forbid_dispatch_off:
# action is invalid, a generator has been redispatched, but it's turned off
except_ = InvalidRedispatching(
"Impossible to dispatch a turned off generator"
)
self._target_dispatch -= redisp_act_orig
return valid, except_, info_
if self._forbid_dispatch_off is True:
redisp_act_orig_cut = 1.0 * redisp_act_orig
redisp_act_orig_cut[np.abs(new_p) <= 1e-7] = 0.0
if (redisp_act_orig_cut != redisp_act_orig).any():
info_.append(
{
"INFO: redispatching cut because generator will be turned_off": (
redisp_act_orig_cut != redisp_act_orig
).nonzero()[0]
}
)
return valid, except_, info_
def _make_redisp(self, already_modified_gen, new_p):
"""this computes the redispaching vector, taking into account the storage units"""
except_ = None
valid = True
mismatch = self._actual_dispatch - self._target_dispatch
mismatch = np.abs(mismatch)
if (
np.abs((self._actual_dispatch).sum()) >= self._tol_poly
or np.max(mismatch) >= self._tol_poly
or np.abs(self._amount_storage) >= self._tol_poly
or np.abs(self._sum_curtailment_mw) >= self._tol_poly
):
except_ = self._compute_dispatch_vect(already_modified_gen, new_p)
valid = except_ is None
return valid, except_
def _compute_dispatch_vect(self, already_modified_gen, new_p):
except_ = None
# handle the case where there are storage or redispatching
# action or curtailment action on the "init state"
# of the grid
if self.nb_time_step == 0:
self._gen_activeprod_t_redisp[:] = new_p
# first i define the participating generators
# these are the generators that will be adjusted for redispatching
gen_participating = (
(new_p > 0.0)
| (np.abs(self._actual_dispatch) >= 1e-7)
| (self._target_dispatch != self._actual_dispatch)
)
gen_participating[~self.gen_redispatchable] = False
incr_in_chronics = new_p - (
self._gen_activeprod_t_redisp - self._actual_dispatch
)
# check if the constraints are violated
## total available "juice" to go down (incl ramp and pmin / pmax)
p_min_down = (
self.gen_pmin[gen_participating]
- self._gen_activeprod_t_redisp[gen_participating]
)
avail_down = np.maximum(p_min_down, -self.gen_max_ramp_down[gen_participating])
## total available "juice" to go up (incl. ramp and pmin / pmax)
p_max_up = (
self.gen_pmax[gen_participating]
- self._gen_activeprod_t_redisp[gen_participating]
)
avail_up = np.minimum(p_max_up, self.gen_max_ramp_up[gen_participating])
except_ = self._detect_infeasible_dispatch(
incr_in_chronics[gen_participating], avail_down, avail_up
)
if except_ is not None:
# try to force the turn on of turned off generators (if parameters allow it)
if (
self._parameters.IGNORE_MIN_UP_DOWN_TIME
and self._parameters.ALLOW_DISPATCH_GEN_SWITCH_OFF
):
gen_participating_tmp = self.gen_redispatchable
p_min_down_tmp = (
self.gen_pmin[gen_participating_tmp]
- self._gen_activeprod_t_redisp[gen_participating_tmp]
)
avail_down_tmp = np.maximum(
p_min_down_tmp, -self.gen_max_ramp_down[gen_participating_tmp]
)
p_max_up_tmp = (
self.gen_pmax[gen_participating_tmp]
- self._gen_activeprod_t_redisp[gen_participating_tmp]
)
avail_up_tmp = np.minimum(
p_max_up_tmp, self.gen_max_ramp_up[gen_participating_tmp]
)
except_tmp = self._detect_infeasible_dispatch(
incr_in_chronics[gen_participating_tmp],
avail_down_tmp,
avail_up_tmp,
)
if except_tmp is None:
# I can "save" the situation by turning on all generators, I do it
# TODO logger here
gen_participating = gen_participating_tmp
except_ = None
else:
return except_tmp
else:
return except_
# define the objective value
target_vals = (
self._target_dispatch[gen_participating]
- self._actual_dispatch[gen_participating]
)
already_modified_gen_me = already_modified_gen[gen_participating]
target_vals_me = target_vals[already_modified_gen_me]
nb_dispatchable = gen_participating.sum()
tmp_zeros = np.zeros((1, nb_dispatchable), dtype=dt_float)
coeffs = 1.0 / (
self.gen_max_ramp_up + self.gen_max_ramp_down + self._epsilon_poly
)
weights = np.ones(nb_dispatchable) * coeffs[gen_participating]
weights /= weights.sum()
if target_vals_me.shape[0] == 0:
# no dispatch means all dispatchable, otherwise i will never get to 0
already_modified_gen_me[:] = True
target_vals_me = target_vals[already_modified_gen_me]
# for numeric stability
# to scale the input also:
# see https://stackoverflow.com/questions/11155721/positive-directional-derivative-for-linesearch
scale_x = max(np.max(np.abs(self._actual_dispatch)), 1.0)
scale_x = dt_float(scale_x)
target_vals_me_optim = 1.0 * (target_vals_me / scale_x)
target_vals_me_optim = target_vals_me_optim.astype(dt_float)
# see https://stackoverflow.com/questions/11155721/positive-directional-derivative-for-linesearch
# where they advised to scale the function
scale_objective = max(0.5 * np.abs(target_vals_me_optim).sum() ** 2, 1.0)
scale_objective = np.round(scale_objective, decimals=4)
scale_objective = dt_float(scale_objective)
# add the "sum to 0"
mat_sum_0_no_turn_on = np.ones((1, nb_dispatchable), dtype=dt_float)
# this is where the storage is taken into account
# storages are "load convention" this means that i need to sum the amount of production to sum of storage
# hence the "+ self._amount_storage" below
# self._sum_curtailment_mw is "generator convention" hence the "-" there
const_sum_0_no_turn_on = (
np.zeros(1, dtype=dt_float)
+ self._amount_storage
- self._sum_curtailment_mw
)
# gen increase in the chronics
new_p_th = new_p[gen_participating] + self._actual_dispatch[gen_participating]
# minimum value available for disp
## first limit delta because of pmin
p_min_const = self.gen_pmin[gen_participating] - new_p_th
## second limit delta because of ramps
ramp_down_const = (
-self.gen_max_ramp_down[gen_participating]
- incr_in_chronics[gen_participating]
)
## take max of the 2
min_disp = np.maximum(p_min_const, ramp_down_const)
min_disp = min_disp.astype(dt_float)
# maximum value available for disp
## first limit delta because of pmin
p_max_const = self.gen_pmax[gen_participating] - new_p_th
## second limit delta because of ramps
ramp_up_const = (
self.gen_max_ramp_up[gen_participating]
- incr_in_chronics[gen_participating]
)
## take min of the 2
max_disp = np.minimum(p_max_const, ramp_up_const)
max_disp = max_disp.astype(dt_float)
# add everything into a linear constraint object
# equality
added = 0.5 * self._epsilon_poly
equality_const = LinearConstraint(
mat_sum_0_no_turn_on, # do the sum
(const_sum_0_no_turn_on) / scale_x, # lower bound
(const_sum_0_no_turn_on) / scale_x, # upper bound
)
mat_pmin_max_ramps = np.eye(nb_dispatchable)
ineq_const = LinearConstraint(
mat_pmin_max_ramps,
(min_disp - added) / scale_x,
(max_disp + added) / scale_x,
)
# choose a good initial point (close to the solution)
# the idea here is to chose a initial point that would be close to the
# desired solution (split the (sum of the) dispatch to the available generators)
x0 = np.zeros(gen_participating.sum())
if (np.abs(self._target_dispatch) >= 1e-7).any() or already_modified_gen.any():
gen_for_x0 = np.abs(self._target_dispatch[gen_participating]) >= 1e-7
gen_for_x0 |= already_modified_gen[gen_participating]
x0[gen_for_x0] = (
self._target_dispatch[gen_participating][gen_for_x0]
- self._actual_dispatch[gen_participating][gen_for_x0]
) / scale_x
# at this point x0 is made of the difference between the target and the
# actual dispatch for all generators that have a
# target dispatch non 0.
# in this "if" block I set the other component of x0 to
# their "right" value
can_adjust = (np.abs(x0) <= 1e-7)
if can_adjust.any():
init_sum = x0.sum()
denom_adjust = (1.0 / weights[can_adjust]).sum()
if denom_adjust <= 1e-2:
# i don't want to divide by something too cloose to 0.
denom_adjust = 1.0
x0[can_adjust] = -init_sum / (weights[can_adjust] * denom_adjust)
else:
# to "force" the exact reset to 0.0 for all components
x0 -= self._actual_dispatch[gen_participating] / scale_x
def target(actual_dispatchable):
# define my real objective
quad_ = (
actual_dispatchable[already_modified_gen_me] - target_vals_me_optim
) ** 2
coeffs_quads = weights[already_modified_gen_me] * quad_
coeffs_quads_const = coeffs_quads.sum()
coeffs_quads_const /= scale_objective # scaling the function
return coeffs_quads_const
def jac(actual_dispatchable):
res_jac = 1.0 * tmp_zeros
res_jac[0, already_modified_gen_me] = (
2.0
* weights[already_modified_gen_me]
* (actual_dispatchable[already_modified_gen_me] - target_vals_me_optim)
)
res_jac /= scale_objective # scaling the function
return res_jac
# objective function
def f(init):
this_res = minimize(
target,
init,
method="SLSQP",
constraints=[equality_const, ineq_const],
options={
"eps": max(self._epsilon_poly / scale_x, 1e-6),
"ftol": max(self._epsilon_poly / scale_x, 1e-6),
"disp": False,
},
jac=jac
# hess=hess # not used for SLSQP
)
return this_res
res = f(x0)
if res.success:
self._actual_dispatch[gen_participating] += res.x * scale_x
else:
# check if constraints are "approximately" met
mat_const = np.concatenate((mat_sum_0_no_turn_on, mat_pmin_max_ramps))
downs = np.concatenate(
(const_sum_0_no_turn_on / scale_x, (min_disp - added) / scale_x)
)
ups = np.concatenate(
(const_sum_0_no_turn_on / scale_x, (max_disp + added) / scale_x)
)
vals = np.matmul(mat_const, res.x)
ok_down = np.all(
vals - downs >= -self._tol_poly
) # i don't violate "down" constraints
ok_up = np.all(vals - ups <= self._tol_poly)
if ok_up and ok_down:
# it's ok i can tolerate "small" perturbations
self._actual_dispatch[gen_participating] += res.x * scale_x
else:
# TODO try with another method here, maybe
error_dispatch = (
"Redispatching automaton terminated with error (no more information available "
'at this point):\n"{}"'.format(res.message)
)
except_ = InvalidRedispatching(error_dispatch)
return except_
def _detect_infeasible_dispatch(self, incr_in_chronics, avail_down, avail_up):
"""This function is an attempt to give more detailed log by detecting infeasible dispatch"""
except_ = None
sum_move = (
incr_in_chronics.sum() + self._amount_storage - self._sum_curtailment_mw
)
avail_down_sum = avail_down.sum()
avail_up_sum = avail_up.sum()
gen_setpoint = self._gen_activeprod_t_redisp[self.gen_redispatchable]
if sum_move > avail_up_sum:
# infeasible because too much is asked
msg = DETAILED_REDISP_ERR_MSG.format(
sum_move=sum_move,
avail_up_sum=avail_up_sum,
gen_setpoint=np.round(gen_setpoint, decimals=2),
ramp_up=self.gen_max_ramp_up[self.gen_redispatchable],
gen_pmax=self.gen_pmax[self.gen_redispatchable],
avail_up=np.round(avail_up, decimals=2),
increase="increase",
decrease="decrease",
maximum="maximum",
pmax="pmax",
max_ramp_up="max_ramp_up",
)
except_ = InvalidRedispatching(msg)
elif sum_move < avail_down_sum:
# infeasible because not enough is asked
msg = DETAILED_REDISP_ERR_MSG.format(
sum_move=sum_move,
avail_up_sum=avail_down_sum,
gen_setpoint=np.round(gen_setpoint, decimals=2),
ramp_up=self.gen_max_ramp_down[self.gen_redispatchable],
gen_pmax=self.gen_pmin[self.gen_redispatchable],
avail_up=np.round(avail_up, decimals=2),
increase="decrease",
decrease="increase",
maximum="minimum",
pmax="pmin",
max_ramp_up="max_ramp_down",
)
except_ = InvalidRedispatching(msg)
return except_
def _update_actions(self):
"""
INTERNAL
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
Retrieve the actions to perform the update of the underlying powergrid represented by
the :class:`grid2op.Backend`in the next time step.
A call to this function will also read the next state of :attr:`chronics_handler`, so it must be called only
once per time step.
Returns
--------
res: :class:`grid2op.Action.Action`
The action representing the modification of the powergrid induced by the Backend.
"""
(
timestamp,
tmp,
maintenance_time,
maintenance_duration,
hazard_duration,
prod_v,
) = self.chronics_handler.next_time_step()
if "injection" in tmp:
self._injection = tmp["injection"]
else:
self._injection = None
if "maintenance" in tmp:
self._maintenance = tmp["maintenance"]
else:
self._maintenance = None
if "hazards" in tmp:
self._hazards = tmp["hazards"]
else:
self._hazards = None
self.time_stamp = timestamp
self._duration_next_maintenance = maintenance_duration
self._time_next_maintenance = maintenance_time
self._hazard_duration = hazard_duration
act = self._helper_action_env(
{
"injection": self._injection,
"maintenance": self._maintenance,
"hazards": self._hazards,
}
)
return act, prod_v
def _update_time_reconnection_hazards_maintenance(self):
"""
INTERNAL
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
This supposes that :attr:`Environment.times_before_line_status_actionable` is already updated
with the cascading failure, soft overflow and hard overflow.
It also supposes that :func:`Environment._update_actions` has been called, so that the vectors
:attr:`Environment.duration_next_maintenance`, :attr:`Environment._time_next_maintenance` and
:attr:`Environment._hazard_duration` are updated with the most recent values.
Finally the Environment supposes that this method is called before calling :func:`Environment.get_obs`
This function integrates the hazards and maintenance in the
:attr:`Environment.times_before_line_status_actionable` vector.
For example, if a powerline `i` has no problem
of overflow, but is affected by a hazard, :attr:`Environment.times_before_line_status_actionable`
should be updated with the duration of this hazard (stored in one of the three vector mentionned in the
above paragraph)
For this Environment, we suppose that the maximum of the 3 values are taken into account. The reality would
be more complicated.
"""
first_time_maintenance = self._time_next_maintenance == 0
self._times_before_line_status_actionable[first_time_maintenance] = np.maximum(
self._times_before_line_status_actionable[first_time_maintenance],
self._duration_next_maintenance[first_time_maintenance],
)
self._times_before_line_status_actionable[:] = np.maximum(
self._times_before_line_status_actionable, self._hazard_duration
)
def _voltage_control(self, agent_action, prod_v_chronics):
"""
INTERNAL
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
Update the environment action "action_env" given a possibly new voltage setpoint for the generators. This
function can be overide for a more complex handling of the voltages.
It must update (if needed) the voltages of the environment action :attr:`BaseEnv.env_modification`
Parameters
----------
agent_action: :class:`grid2op.Action.Action`
The action performed by the player (or do nothing is player action were not legal or ambiguous)
prod_v_chronics: ``numpy.ndarray`` or ``None``
The voltages that has been specified in the chronics
"""
res = self._helper_action_env()
if prod_v_chronics is not None:
res.update({"injection": {"prod_v": prod_v_chronics}})
return res
def _handle_updown_times(self, gen_up_before, redisp_act):
"""
INTERNAL
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
Handles the up and down tims for the generators.
"""
# get the generators that are not connected after the action
except_ = None
# computes which generator will be turned on after the action
gen_up_after = 1.0 * self._gen_activeprod_t
if "prod_p" in self._env_modification._dict_inj:
tmp = self._env_modification._dict_inj["prod_p"]
indx_ok = np.isfinite(tmp)
gen_up_after[indx_ok] = self._env_modification._dict_inj["prod_p"][indx_ok]
gen_up_after += redisp_act
gen_up_after = gen_up_after > 0.0
# update min down time, min up time etc.
gen_disconnected_this = gen_up_before & (~gen_up_after)
gen_connected_this_timestep = (~gen_up_before) & (gen_up_after)
gen_still_connected = gen_up_before & gen_up_after
gen_still_disconnected = (~gen_up_before) & (~gen_up_after)
if ((
self._gen_downtime[gen_connected_this_timestep]
< self.gen_min_downtime[gen_connected_this_timestep]
).any()
and not self._ignore_min_up_down_times
):
# i reconnected a generator before the minimum time allowed
id_gen = (
self._gen_downtime[gen_connected_this_timestep]
< self.gen_min_downtime[gen_connected_this_timestep]
)
id_gen = (id_gen).nonzero()[0]
id_gen = (gen_connected_this_timestep[id_gen]).nonzero()[0]
except_ = GeneratorTurnedOnTooSoon(
"Some generator has been connected too early ({})".format(id_gen)
)
return except_
else:
self._gen_downtime[gen_connected_this_timestep] = -1
self._gen_uptime[gen_connected_this_timestep] = 1
if (
(
self._gen_uptime[gen_disconnected_this]
< self.gen_min_uptime[gen_disconnected_this]
).any()
and not self._ignore_min_up_down_times
):
# i disconnected a generator before the minimum time allowed
id_gen = (
self._gen_uptime[gen_disconnected_this]
< self.gen_min_uptime[gen_disconnected_this]
)
id_gen = (id_gen).nonzero()[0]
id_gen = (gen_connected_this_timestep[id_gen]).nonzero()[0]
except_ = GeneratorTurnedOffTooSoon(
"Some generator has been disconnected too early ({})".format(id_gen)
)
return except_
else:
self._gen_downtime[gen_connected_this_timestep] = 0
self._gen_uptime[gen_connected_this_timestep] = 1
self._gen_uptime[gen_still_connected] += 1
self._gen_downtime[gen_still_disconnected] += 1
return except_
[docs] def get_obs(self, _update_state=True, _do_copy=True):
"""
Return the observations of the current environment made by the :class:`grid2op.Agent.BaseAgent`.
.. note::
This function is called twice when the env is reset, otherwise once per step
_do_copy :
.. versionadded: 1.9.2
Whether or not to make a copy of the returned observation. By default it will do one. Be aware that
this might cause trouble if used incorrectly.
Returns
-------
res: :class:`grid2op.Observation.BaseObservation`
The current observation usually given to the :class:`grid2op.Agent.BaseAgent` / bot / controler.
Examples
---------
This function can be use at any moment, even if the actual observation is not present.
.. code-block:: python
import grid2op
# I create an environment
env = grid2op.make("l2rpn_case14_sandbox")
obs = env.reset()
# have a big piece of code
obs2 = env.get_obs()
# obs2 and obs are identical.
"""
if self.__closed:
raise EnvError("This environment is closed. You cannot use it anymore.")
if not self.__is_init:
raise EnvError(
"This environment is not initialized. You cannot retrieve its observation. "
"Have you called `env.reset()` after last game over ?"
)
if self._last_obs is None:
self._last_obs = self._observation_space(
env=self, _update_state=_update_state
)
if _do_copy:
return copy.deepcopy(self._last_obs)
else:
return self._last_obs
[docs] def get_thermal_limit(self):
"""
Get the current thermal limit in amps registered for the environment.
Examples
---------
It can be used like this:
.. code-block:: python
import grid2op
# I create an environment
env = grid2op.make("l2rpn_case14_sandbox")
thermal_limits = env.get_thermal_limit()
"""
if self.__closed:
raise EnvError("This environment is closed, you cannot use it.")
if not self.__is_init:
raise EnvError(
"This environment is not initialized. It has no thermal limits. "
"Have you called `env.reset()` after last game over ?"
)
return 1.0 * self._thermal_limit_a
def _withdraw_storage_losses(self):
"""
empty the energy in the storage units depending on the `storage_loss`
NB this is a loss, this is not seen grid side, so `storage_discharging_efficiency` has no impact on this
"""
# NB this should be done AFTER the computation of self._amount_storage, because this energy is dissipated
# in the storage units, thus NOT seen as power from the grid.
if self._parameters.ACTIVATE_STORAGE_LOSS:
tmp_ = self.storage_loss * self.delta_time_seconds / 3600.0
self._storage_current_charge -= tmp_
# charge cannot be negative, but it can be below Emin if there are some uncompensated losses
self._storage_current_charge[:] = np.maximum(
self._storage_current_charge, 0.0
)
def _aux_remove_power_too_high(self, delta_, indx_too_high):
"""
delta_ is given in energy (and NOT power)
handles self._storage_power in
case we need to cut the storage action because the power would be too high
"""
coeff_p_to_E = (
self.delta_time_seconds / 3600.0
) # TODO optim this is const for all time steps
tmp_ = 1.0 / coeff_p_to_E * delta_
if self._parameters.ACTIVATE_STORAGE_LOSS:
# from the storage i need to reduce of tmp_ MW (to compensate the delta_ MWh)
# but when it's "transfer" to the grid i don't have the same amount (due to inefficiencies)
# it's a "/" because i need more energy from the grid than what the actual charge will be
tmp_ /= self.storage_charging_efficiency[indx_too_high]
self._storage_power[indx_too_high] -= tmp_
def _aux_remove_power_too_low(self, delta_, indx_too_low):
"""
delta_ is given in energy (and NOT power)
handles self._storage_power in
case we need to cut the storage action because the power would be too low
"""
coeff_p_to_E = (
self.delta_time_seconds / 3600.0
) # TODO optim this is const for all time steps
tmp_ = 1.0 / coeff_p_to_E * delta_
if self._parameters.ACTIVATE_STORAGE_LOSS:
# from the storage i need to increase of tmp_ MW (to compensate the delta_ MWh)
# but when it's "transfer" to the grid i don't have the same amount (due to inefficiencies)
# it's a "*" because i have less power on the grid than what is removed from the battery
tmp_ *= self.storage_discharging_efficiency[indx_too_low]
self._storage_power[indx_too_low] -= tmp_
def _compute_storage(self, action_storage_power):
self._storage_previous_charge[:] = self._storage_current_charge
storage_act = np.isfinite(action_storage_power) & (np.abs(action_storage_power) >= 1e-7)
self._action_storage[:] = 0.0
self._storage_power[:] = 0.0
modif = False
coeff_p_to_E = (
self.delta_time_seconds / 3600.0
) # TODO optim this is const for all time steps
if storage_act.any():
modif = True
this_act_stor = action_storage_power[storage_act]
eff_ = np.ones(storage_act.sum())
if self._parameters.ACTIVATE_STORAGE_LOSS:
fill_storage = (
this_act_stor > 0.0
) # index of storages that sees their charge increasing
unfill_storage = (
this_act_stor < 0.0
) # index of storages that sees their charge decreasing
eff_[fill_storage] *= self.storage_charging_efficiency[storage_act][
fill_storage
]
eff_[unfill_storage] /= self.storage_discharging_efficiency[
storage_act
][unfill_storage]
self._storage_current_charge[storage_act] += (
this_act_stor * coeff_p_to_E * eff_
)
self._action_storage[storage_act] += action_storage_power[storage_act]
self._storage_power[storage_act] = this_act_stor
if modif:
# indx when there is too much energy on the battery
indx_too_high = self._storage_current_charge > self.storage_Emax
if indx_too_high.any():
delta_ = (
self._storage_current_charge[indx_too_high]
- self.storage_Emax[indx_too_high]
)
self._aux_remove_power_too_high(delta_, indx_too_high)
self._storage_current_charge[indx_too_high] = self.storage_Emax[
indx_too_high
]
# indx when there is not enough energy on the battery
indx_too_low = self._storage_current_charge < self.storage_Emin
if indx_too_low.any():
delta_ = (
self._storage_current_charge[indx_too_low]
- self.storage_Emin[indx_too_low]
)
self._aux_remove_power_too_low(delta_, indx_too_low)
self._storage_current_charge[indx_too_low] = self.storage_Emin[
indx_too_low
]
self._storage_current_charge[:] = np.maximum(
self._storage_current_charge, self.storage_Emin
)
# storage is "load convention", dispatch is "generator convention"
# i need the generator to have the same sign as the action on the batteries
self._amount_storage = self._storage_power.sum()
else:
# battery effect should be removed, so i multiply it by -1.
self._amount_storage = 0.0
tmp = self._amount_storage
self._amount_storage -= self._amount_storage_prev
self._amount_storage_prev = tmp
# dissipated energy, it's not seen on the grid, just lost in the storage unit.
# this is why it should not be taken into account in self._amount_storage
# and NOT absorbed by the generators either
# NB loss in the storage unit can make it got below Emin in energy, but never below 0.
self._withdraw_storage_losses()
# end storage
def _compute_max_ramp_this_step(self, new_p):
"""
compute the total "power" i can add or remove this step that takes into account
generators ramps and Pmin / Pmax
new_p: array of the (temporary) new production in the chronics that should happen
"""
# TODO
# maximum value it can take
th_max = np.minimum(
self._gen_activeprod_t_redisp[self.gen_redispatchable]
+ self.gen_max_ramp_up[self.gen_redispatchable],
self.gen_pmax[self.gen_redispatchable],
)
# minimum value it can take
th_min = np.maximum(
self._gen_activeprod_t_redisp[self.gen_redispatchable]
- self.gen_max_ramp_down[self.gen_redispatchable],
self.gen_pmin[self.gen_redispatchable],
)
max_total_up = (th_max - new_p[self.gen_redispatchable]).sum()
max_total_down = (
th_min - new_p[self.gen_redispatchable]
).sum() # TODO is that it ?
return max_total_down, max_total_up
def _aux_update_curtail_env_act(self, new_p):
if "prod_p" in self._env_modification._dict_inj:
self._env_modification._dict_inj["prod_p"][:] = new_p
else:
self._env_modification._dict_inj["prod_p"] = 1.0 * new_p
self._env_modification._modif_inj = True
def _aux_update_curtailment_act(self, action):
curtailment_act = 1.0 * action._curtail
ind_curtailed_in_act = (curtailment_act != -1.0) & self.gen_renewable
self._limit_curtailment_prev[:] = self._limit_curtailment
self._limit_curtailment[ind_curtailed_in_act] = curtailment_act[
ind_curtailed_in_act
]
def _aux_compute_new_p_curtailment(self, new_p, curtailment_vect):
"""modifies the new_p argument !!!!"""
gen_curtailed = (
np.abs(curtailment_vect - 1.) >= 1e-7
) # curtailed either right now, or in a previous action
max_action = self.gen_pmax[gen_curtailed] * curtailment_vect[gen_curtailed]
new_p[gen_curtailed] = np.minimum(max_action, new_p[gen_curtailed])
return gen_curtailed
def _aux_handle_curtailment_without_limit(self, action, new_p):
"""modifies the new_p argument !!!! (but not the action)"""
if self.redispatching_unit_commitment_availble and (
action._modif_curtailment or (np.abs(self._limit_curtailment - 1.) >= 1e-7).any()
):
self._aux_update_curtailment_act(action)
gen_curtailed = self._aux_compute_new_p_curtailment(
new_p, self._limit_curtailment
)
tmp_sum_curtailment_mw = dt_float(
new_p[gen_curtailed].sum()
- self._gen_before_curtailment[gen_curtailed].sum()
)
self._sum_curtailment_mw = (
tmp_sum_curtailment_mw - self._sum_curtailment_mw_prev
)
self._sum_curtailment_mw_prev = tmp_sum_curtailment_mw
self._aux_update_curtail_env_act(new_p)
else:
self._sum_curtailment_mw = -self._sum_curtailment_mw_prev
self._sum_curtailment_mw_prev = dt_float(0.0)
gen_curtailed = np.abs(self._limit_curtailment - 1.) >= 1e-7
return gen_curtailed
def _aux_readjust_curtailment_after_limiting(
self, total_curtailment, new_p_th, new_p
):
self._sum_curtailment_mw += total_curtailment
self._sum_curtailment_mw_prev += total_curtailment
if total_curtailment > self._tol_poly:
# in this case, the curtailment is too strong, I need to make it less strong
curtailed = new_p_th - new_p
else:
# in this case, the curtailment is too low, this can happen, for example when there is a
# "strong" curtailment but afterwards you ask to set everything to 1. (so no curtailment)
# I cannot reuse the previous case (too_much > self._tol_poly) because the
# curtailment is already computed there...
new_p_with_previous_curtailment = 1.0 * new_p_th
self._aux_compute_new_p_curtailment(
new_p_with_previous_curtailment, self._limit_curtailment_prev
)
curtailed = new_p_th - new_p_with_previous_curtailment
curt_sum = curtailed.sum()
if abs(curt_sum) > self._tol_poly:
curtailed[~self.gen_renewable] = 0.0
curtailed *= total_curtailment / curt_sum
new_p[self.gen_renewable] += curtailed[self.gen_renewable]
def _aux_readjust_storage_after_limiting(self, total_storage):
new_act_storage = 1.0 * self._storage_power
sum_this_step = new_act_storage.sum()
if abs(total_storage) < abs(sum_this_step):
# i can modify the current action
modif_storage = new_act_storage * total_storage / sum_this_step
else:
# i need to retrieve what I did in a previous action
# because the current action is not enough (the previous actions
# cause a problem right now)
new_act_storage = 1.0 * self._storage_power_prev
sum_this_step = new_act_storage.sum()
if abs(sum_this_step) > 1e-1:
modif_storage = new_act_storage * total_storage / sum_this_step
else:
# TODO: this is not cover by any test :-(
# it happens when you do an action too strong, then a do nothing,
# then you decrease the limit to rapidly
# (game over would jappen after at least one do nothing)
# In this case I reset it completely or do I ? I don't really
# know what to do !
modif_storage = new_act_storage # or self._storage_power ???
# handle self._storage_power and self._storage_current_charge
coeff_p_to_E = (
self.delta_time_seconds / 3600.0
) # TODO optim this is const for all time steps
self._storage_power -= modif_storage
# now compute the state of charge of the storage units (with efficiencies)
is_discharging = self._storage_power < 0.0
is_charging = self._storage_power > 0.0
modif_storage[is_discharging] /= type(self).storage_discharging_efficiency[
is_discharging
]
modif_storage[is_charging] *= type(self).storage_charging_efficiency[
is_charging
]
self._storage_current_charge -= coeff_p_to_E * modif_storage
# inform the grid that the storage is reduced
self._amount_storage -= total_storage
self._amount_storage_prev -= total_storage
def _aux_limit_curtail_storage_if_needed(self, new_p, new_p_th, gen_curtailed):
gen_redisp = self.gen_redispatchable
normal_increase = new_p - (
self._gen_activeprod_t_redisp - self._actual_dispatch
)
normal_increase = normal_increase[gen_redisp]
p_min_down = (
self.gen_pmin[gen_redisp] - self._gen_activeprod_t_redisp[gen_redisp]
)
avail_down = np.maximum(p_min_down, -self.gen_max_ramp_down[gen_redisp])
p_max_up = self.gen_pmax[gen_redisp] - self._gen_activeprod_t_redisp[gen_redisp]
avail_up = np.minimum(p_max_up, self.gen_max_ramp_up[gen_redisp])
sum_move = (
normal_increase.sum() + self._amount_storage - self._sum_curtailment_mw
)
total_storage_curtail = self._amount_storage - self._sum_curtailment_mw
update_env_act = False
if abs(total_storage_curtail) >= self._tol_poly:
# if there is an impact on the curtailment / storage (otherwise I cannot fix anything)
too_much = 0.0
if sum_move > avail_up.sum():
# I need to limit curtailment (not enough ramps up available)
too_much = dt_float(sum_move - avail_up.sum() + self._tol_poly)
self._limited_before = too_much
elif sum_move < avail_down.sum():
# I need to limit storage unit (not enough ramps down available)
too_much = dt_float(sum_move - avail_down.sum() - self._tol_poly)
self._limited_before = too_much
elif np.abs(self._limited_before) >= self._tol_poly:
# adjust the "mess" I did before by not curtailing enough
# max_action = self.gen_pmax[gen_curtailed] * self._limit_curtailment[gen_curtailed]
update_env_act = True
too_much = min(avail_up.sum() - self._tol_poly, self._limited_before)
self._limited_before -= too_much
too_much = self._limited_before
if abs(too_much) > self._tol_poly:
total_curtailment = (
-self._sum_curtailment_mw / total_storage_curtail * too_much
)
total_storage = (
self._amount_storage / total_storage_curtail * too_much
) # TODO !!!
update_env_act = True
# TODO "log" the total_curtailment and total_storage somewhere (in the info part of the step function)
if np.sign(total_curtailment) != np.sign(total_storage):
# curtailment goes up, storage down, i only "limit" the one that
# has the same sign as too much
total_curtailment = (
too_much
if np.sign(total_curtailment) == np.sign(too_much)
else 0.0
)
total_storage = (
too_much if np.sign(total_storage) == np.sign(too_much) else 0.0
)
# NB i can directly assign all the "curtailment" to the maximum because in this case, too_much will
# necessarily be > than total_curtail (or total_storage) because the other
# one is of opposite sign
# fix curtailment
self._aux_readjust_curtailment_after_limiting(
total_curtailment, new_p_th, new_p
)
# fix storage
self._aux_readjust_storage_after_limiting(total_storage)
if update_env_act:
self._aux_update_curtail_env_act(new_p)
def _aux_handle_act_inj(self, action: BaseAction):
for inj_key in ["load_p", "prod_p", "load_q"]:
# modification of the injections in the action, this erases the actions in the environment
if inj_key in action._dict_inj:
if inj_key in self._env_modification._dict_inj:
this_p_load = 1.0 * self._env_modification._dict_inj[inj_key]
act_modif = action._dict_inj[inj_key]
this_p_load[np.isfinite(act_modif)] = act_modif[
np.isfinite(act_modif)
]
self._env_modification._dict_inj[inj_key][:] = this_p_load
else:
self._env_modification._dict_inj[inj_key] = (
1.0 * action._dict_inj[inj_key]
)
self._env_modification._modif_inj = True
def _aux_handle_attack(self, action: BaseAction):
# TODO code the opponent part here and split more the timings! here "opponent time" is
# TODO included in time_apply_act
lines_attacked, subs_attacked = None, None
attack, attack_duration = self._oppSpace.attack(
observation=self.current_obs,
agent_action=action,
env_action=self._env_modification,
)
if attack is not None:
# the opponent choose to attack
# i update the "cooldown" on these things
lines_attacked, subs_attacked = attack.get_topological_impact()
self._times_before_line_status_actionable[lines_attacked] = np.maximum(
attack_duration,
self._times_before_line_status_actionable[lines_attacked],
)
self._times_before_topology_actionable[subs_attacked] = np.maximum(
attack_duration, self._times_before_topology_actionable[subs_attacked]
)
self._backend_action += attack
return lines_attacked, subs_attacked, attack_duration
def _aux_apply_redisp(self, action, new_p, new_p_th, gen_curtailed, except_):
is_illegal_redisp = False
is_done = False
is_illegal_reco = False
# remember generator that were "up" before the action
gen_up_before = self._gen_activeprod_t > 0.0
# compute the redispatching and the new productions active setpoint
already_modified_gen = self._get_already_modified_gen(action)
valid_disp, except_tmp, info_ = self._prepare_redisp(
action, new_p, already_modified_gen
)
if except_tmp is not None:
orig_action = action
action = self._action_space({})
if type(self).dim_alerts:
action.raise_alert = orig_action.raise_alert
is_illegal_redisp = True
except_.append(except_tmp)
if type(self).n_storage > 0:
# TODO curtailment: cancel it here too !
self._storage_current_charge[:] = self._storage_previous_charge
self._amount_storage -= self._amount_storage_prev
# dissipated energy, it's not seen on the grid, just lost in the storage unit.
# this is why it should not be taken into account in self._amount_storage
# and NOT absorbed by the generators either
self._withdraw_storage_losses()
# end storage
# fix redispatching for curtailment storage
if (
self.redispatching_unit_commitment_availble
and self._parameters.LIMIT_INFEASIBLE_CURTAILMENT_STORAGE_ACTION
):
# limit the curtailment / storage in case of infeasible redispatching
self._aux_limit_curtail_storage_if_needed(new_p, new_p_th, gen_curtailed)
self._storage_power_prev[:] = self._storage_power
# case where the action modifies load (TODO maybe make a different env for that...)
self._aux_handle_act_inj(action)
valid_disp, except_tmp = self._make_redisp(already_modified_gen, new_p)
if not valid_disp or except_tmp is not None:
# game over case (divergence of the scipy routine to compute redispatching)
res_action = self._action_space({})
if type(self).dim_alerts:
res_action.raise_alert = action.raise_alert
is_illegal_redisp = True
except_.append(except_tmp)
is_done = True
except_.append(
InvalidRedispatching(
"Game over due to infeasible redispatching state. "
'The routine used to compute the "next state" has diverged. '
"This means that there is no way to compute a physically valid generator state "
"(one that meets all pmin / pmax - ramp min / ramp max with the information "
"provided. As one of the physical constraints would be violated, this means that "
"a generator would be damaged in real life. This is a game over."
)
)
return res_action, is_illegal_redisp, is_illegal_reco, is_done
# check the validity of min downtime and max uptime
except_tmp = self._handle_updown_times(gen_up_before, self._actual_dispatch)
if except_tmp is not None:
is_illegal_reco = True
res_action = self._action_space({})
if type(self).dim_alerts:
res_action.raise_alert = action.raise_alert
except_.append(except_tmp)
else:
res_action = action
return res_action, is_illegal_redisp, is_illegal_reco, is_done
def _aux_update_backend_action(self,
action: BaseAction,
action_storage_power: np.ndarray,
init_disp: np.ndarray):
# make sure the dispatching action is not implemented "as is" by the backend.
# the environment must make sure it's a zero-sum action.
# same kind of limit for the storage
res_exc_ = None
action._redispatch[:] = 0.0
action._storage_power[:] = self._storage_power
self._backend_action += action
action._storage_power[:] = action_storage_power
action._redispatch[:] = init_disp
# TODO storage: check the original action, even when replaced by do nothing is not modified
self._backend_action += self._env_modification
self._backend_action.set_redispatch(self._actual_dispatch)
return res_exc_
def _update_alert_properties(self, action, lines_attacked, subs_attacked):
# update the environment with the alert information from the
# action (if env supports it)
if type(self).dim_alerts == 0:
return
self._last_alert[:] = action.raise_alert
self._time_since_last_alert[~self._last_alert & (self._time_since_last_alert != -1)] += 1
self._time_since_last_alert[self._last_alert] = 0
self._alert_duration[self._last_alert] += 1
self._alert_duration[~self._last_alert] = 0
self._total_number_of_alert += self._last_alert.sum()
if lines_attacked is not None:
lines_attacked_al = lines_attacked[type(self).alertable_line_ids]
mask_first_ts_attack = lines_attacked_al & (~self._is_already_attacked)
self._time_since_last_attack[mask_first_ts_attack] = 0
self._time_since_last_attack[~mask_first_ts_attack & (self._time_since_last_attack != -1)] += 1
# update the time already attacked
self._is_already_attacked[lines_attacked_al] = True
else:
self._time_since_last_attack[self._time_since_last_attack != -1] += 1
self._is_already_attacked[:] = False
mask_new_attack = self._time_since_last_attack == 0
self._attack_under_alert[mask_new_attack] = 2 * self._last_alert[mask_new_attack] - 1
mask_attack_too_old = self._time_since_last_attack > self._parameters.ALERT_TIME_WINDOW
self._attack_under_alert[mask_attack_too_old] = 0
# TODO more complicated (will do it in update_after_reward)
# self._was_alert_used_after_attack[:] = XXX
# TODO after alert budget will be implemented !
# self._is_alert_illegal
def _aux_register_env_converged(self, disc_lines, action, init_line_status, new_p):
beg_res = time.perf_counter()
self.backend.update_thermal_limit(
self
) # update the thermal limit, for DLR for example
overflow_lines = self.backend.get_line_overflow()
# save the current topology as "last" topology (for connected powerlines)
# and update the state of the disconnected powerline due to cascading failure
self._backend_action.update_state(disc_lines)
# one timestep passed, i can maybe reconnect some lines
self._times_before_line_status_actionable[
self._times_before_line_status_actionable > 0
] -= 1
# update the vector for lines that have been disconnected
self._times_before_line_status_actionable[disc_lines >= 0] = int(
self._nb_ts_reco
)
self._update_time_reconnection_hazards_maintenance()
# for the powerline that are on overflow, increase this time step
self._timestep_overflow[overflow_lines] += 1
# set to 0 the number of timestep for lines that are not on overflow
self._timestep_overflow[~overflow_lines] = 0
# build the topological action "cooldown"
aff_lines, aff_subs = action.get_topological_impact(init_line_status)
if self._max_timestep_line_status_deactivated > 0:
# i update the cooldown only when this does not impact the line disconnected for the
# opponent or by maintenance for example
cond = aff_lines # powerlines i modified
# powerlines that are not affected by any other "forced disconnection"
cond &= (
self._times_before_line_status_actionable
< self._max_timestep_line_status_deactivated
)
self._times_before_line_status_actionable[
cond
] = self._max_timestep_line_status_deactivated
if self._max_timestep_topology_deactivated > 0:
self._times_before_topology_actionable[
self._times_before_topology_actionable > 0
] -= 1
self._times_before_topology_actionable[
aff_subs
] = self._max_timestep_topology_deactivated
# extract production active value at this time step (should be independent of action class)
self._gen_activeprod_t[:], *_ = self.backend.generators_info()
# problem with the gen_activeprod_t above, is that the slack bus absorbs alone all the losses
# of the system. So basically, when it's too high (higher than the ramp) it can
# mess up the rest of the environment
self._gen_activeprod_t_redisp[:] = new_p + self._actual_dispatch
# set the line status
self._line_status[:] = copy.deepcopy(self.backend.get_line_status())
# finally, build the observation (it's a different one at each step, we cannot reuse the same one)
# THIS SHOULD BE DONE AFTER EVERYTHING IS INITIALIZED !
self.current_obs = self.get_obs(_do_copy=False)
# TODO storage: get back the result of the storage ! with the illegal action when a storage unit
# TODO is non zero and disconnected, this should be ok.
self._time_extract_obs += time.perf_counter() - beg_res
def _backend_next_grid_state(self):
"""overlaoded in MaskedEnv"""
return self.backend.next_grid_state(env=self, is_dc=self._env_dc)
def _aux_run_pf_after_state_properly_set(
self, action, init_line_status, new_p, except_
):
has_error = True
detailed_info = None
try:
# compute the next _grid state
beg_pf = time.perf_counter()
disc_lines, detailed_info, conv_ = self._backend_next_grid_state()
self._disc_lines[:] = disc_lines
self._time_powerflow += time.perf_counter() - beg_pf
if conv_ is None:
# everything went well, so i register what is needed
self._aux_register_env_converged(
disc_lines, action, init_line_status, new_p
)
has_error = False
else:
except_.append(conv_)
except Grid2OpException as exc_:
except_.append(exc_)
if self.logger is not None:
self.logger.error(
'Impossible to compute next grid state with error "{}"'.format(exc_)
)
return detailed_info, has_error
[docs] def step(self, action: BaseAction) -> Tuple[BaseObservation,
float,
bool,
STEP_INFO_TYPING]:
"""
Run one timestep of the environment's dynamics. When end of
episode is reached, you are responsible for calling `reset()`
to reset this environment's state.
Accepts an action and returns a tuple (observation, reward, done, info).
If the :class:`grid2op.BaseAction.BaseAction` is illegal or ambiguous, the step is performed, but the action is
replaced with a "do nothing" action.
Parameters
----------
action: :class:`grid2op.Action.Action`
an action provided by the agent that is applied on the underlying through the backend.
Returns
-------
observation: :class:`grid2op.Observation.Observation`
agent's observation of the current environment
reward: ``float``
amount of reward returned after previous action
done: ``bool``
whether the episode has ended, in which case further step() calls will return undefined results
info: ``dict``
contains auxiliary diagnostic information (helpful for debugging, and sometimes learning). It is a
dictionary with keys:
- "disc_lines": a numpy array (or ``None``) saying, for each powerline if it has been disconnected
due to overflow (if not disconnected it will be -1, otherwise it will be a
positive integer: 0 meaning that is one of the cause of the cascading failure, 1 means
that it is disconnected just after, 2 that it's disconnected just after etc.)
- "is_illegal" (``bool``) whether the action given as input was illegal
- "is_ambiguous" (``bool``) whether the action given as input was ambiguous.
- "is_dispatching_illegal" (``bool``) was the action illegal due to redispatching
- "is_illegal_reco" (``bool``) was the action illegal due to a powerline reconnection
- "reason_alarm_illegal" (``None`` or ``Exception``) reason for which the alarm is illegal
(it's None if no alarm are raised or if the alarm feature is not used)
- "reason_alert_illegal" (``None`` or ``Exception``) reason for which the alert is illegal
(it's None if no alert are raised or if the alert feature is not used)
- "opponent_attack_line" (``np.ndarray``, ``bool``) for each powerline, say if the opponent
attacked it (``True``) or not (``False``).
- "opponent_attack_sub" (``np.ndarray``, ``bool``) for each substation, say if the opponent
attacked it (``True``) or not (``False``).
- "opponent_attack_duration" (``int``) the duration of the current attack (if any)
- "exception" (``list`` of :class:`Exceptions.Exceptions.Grid2OpException` if an exception was
raised or ``[]`` if everything was fine.)
- "detailed_infos_for_cascading_failures" (optional, only if the backend has been create with
`detailed_infos_for_cascading_failures=True`) the list of the intermediate steps computed during
the simulation of the "cascading failures".
- "rewards": dictionary of all "other_rewards" provided when the env was built.
- "time_series_id": id of the time series used (if any, similar to a call to `env.chronics_handler.get_id()`)
Examples
---------
This is used like:
.. code-block:: python
import grid2op
from grid2op.Agent import RandomAgent
# I create an environment
env = grid2op.make("l2rpn_case14_sandbox")
# define an agent here, this is an example
agent = RandomAgent(env.action_space)
# environment need to be "reset" before usage:
obs = env.reset()
reward = env.reward_range[0]
done = False
# now run through each steps like this
while not done:
action = agent.act(obs, reward, done)
obs, reward, done, info = env.step(action)
Notes
-----
If the flag `done=True` is raised (*ie* this is the end of the episode) then the observation is NOT properly
updated and should not be used at all.
Actually, it will be in a "game over" state (see :class:`grid2op.Observation.BaseObservation.set_game_over`).
"""
if self.__closed:
raise EnvError("This environment is closed. You cannot use it anymore.")
if not self.__is_init:
raise Grid2OpException(
"Impossible to make a step with a non initialized backend. Have you called "
'"env.reset()" after the last game over ?'
)
# I did something after calling "env.seed()" which is
# somehow "env.step()" or "env.reset()"
self._has_just_been_seeded = False
cls = type(self)
has_error = True
is_done = False
is_illegal = False
is_ambiguous = False
is_illegal_redisp = False
is_illegal_reco = False
reason_alarm_illegal = None
self._is_alarm_illegal = False
self._is_alarm_used_in_reward = False
reason_alert_illegal = None
self._is_alert_illegal = False
self._is_alert_used_in_reward = False
except_ = []
detailed_info = []
init_disp = 1.0 * action._redispatch # dispatching action
init_alert = None
if cls.dim_alerts > 0:
init_alert = copy.deepcopy(action._raise_alert)
action_storage_power = 1.0 * action._storage_power # battery information
attack_duration = 0
lines_attacked, subs_attacked = None, None
conv_ = None
init_line_status = copy.deepcopy(self.backend.get_line_status())
self.nb_time_step += 1
self._disc_lines[:] = -1
beg_step = time.perf_counter()
self._last_obs : Optional[BaseObservation] = None
self._forecasts = None # force reading the forecast from the time series
try:
beg_ = time.perf_counter()
ambiguous, except_tmp = action.is_ambiguous()
if ambiguous:
# action is replace by do nothing
action = self._action_space({})
init_disp = 1.0 * action._redispatch # dispatching action
action_storage_power = (
1.0 * action._storage_power
) # battery information
is_ambiguous = True
if cls.dim_alerts > 0:
# keep the alert even if the rest is ambiguous (if alert is non ambiguous)
is_ambiguous_alert = isinstance(except_tmp, AmbiguousActionRaiseAlert)
if is_ambiguous_alert:
# reset the alert
init_alert = np.zeros(cls.dim_alerts, dtype=dt_bool)
else:
action.raise_alert = init_alert
except_.append(except_tmp)
is_legal, reason = self._game_rules(action=action, env=self)
if not is_legal:
# action is replace by do nothing
action = self._action_space({})
init_disp = 1.0 * action._redispatch # dispatching action
action_storage_power = (
1.0 * action._storage_power
) # battery information
except_.append(reason)
if cls.dim_alerts > 0:
# keep the alert even if the rest is illegal
action.raise_alert = init_alert
is_illegal = True
if self._has_attention_budget:
if cls.assistant_warning_type == "zonal":
# this feature is implemented, so i do it
reason_alarm_illegal = self._attention_budget.register_action(
self, action, is_illegal, is_ambiguous
)
self._is_alarm_illegal = reason_alarm_illegal is not None
# get the modification of generator active setpoint from the environment
self._env_modification, prod_v_chronics = self._update_actions()
self._env_modification._single_act = (
False # because it absorbs all redispatching actions
)
new_p = self._get_new_prod_setpoint(action)
new_p_th = 1.0 * new_p
# storage unit
if cls.n_storage > 0:
# limiting the storage units is done in `_aux_apply_redisp`
# this only ensure the Emin / Emax and all the actions
self._compute_storage(action_storage_power)
# curtailment (does not attempt to "limit" the curtailment to make sure
# it is feasible)
self._gen_before_curtailment[self.gen_renewable] = new_p[self.gen_renewable]
gen_curtailed = self._aux_handle_curtailment_without_limit(action, new_p)
beg__redisp = time.perf_counter()
if cls.redispatching_unit_commitment_availble or cls.n_storage > 0.0:
# this computes the "optimal" redispatching
# and it is also in this function that the limiting of the curtailment / storage actions
# is perform to make the state "feasible"
res_disp = self._aux_apply_redisp(
action, new_p, new_p_th, gen_curtailed, except_
)
action, is_illegal_redisp, is_illegal_reco, is_done = res_disp
self._time_redisp += time.perf_counter() - beg__redisp
if not is_done:
self._aux_update_backend_action(action, action_storage_power, init_disp)
# now get the new generator voltage setpoint
voltage_control_act = self._voltage_control(action, prod_v_chronics)
self._backend_action += voltage_control_act
# handle the opponent here
tick = time.perf_counter()
lines_attacked, subs_attacked, attack_duration = self._aux_handle_attack(
action
)
tock = time.perf_counter()
self._time_opponent += tock - tick
self._time_create_bk_act += tock - beg_
try:
self.backend.apply_action(self._backend_action)
except ImpossibleTopology as exc_:
has_error = True
except_.append(exc_)
is_done = True
# TODO in this case: cancel the topological action of the agent
# and continue instead of "game over"
self._time_apply_act += time.perf_counter() - beg_
# now it's time to run the powerflow properly
# and to update the time dependant properties
if not is_done:
self._update_alert_properties(action, lines_attacked, subs_attacked)
detailed_info, has_error = self._aux_run_pf_after_state_properly_set(
action, init_line_status, new_p, except_
)
else:
has_error = True
except StopIteration:
# episode is over
is_done = True
self._backend_action.reset()
end_step = time.perf_counter()
self._time_step += end_step - beg_step
if conv_ is not None:
except_.append(conv_)
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
chron_id = self.chronics_handler.get_id()
if chron_id == "":
chron_id = None
self.infos = {
"disc_lines": self._disc_lines,
"is_illegal": is_illegal,
"is_ambiguous": is_ambiguous,
"is_dispatching_illegal": is_illegal_redisp,
"is_illegal_reco": is_illegal_reco,
"reason_alarm_illegal": reason_alarm_illegal,
"reason_alert_illegal": reason_alert_illegal,
"opponent_attack_line": lines_attacked,
"opponent_attack_sub": subs_attacked,
"opponent_attack_duration": attack_duration,
"exception": except_,
"time_series_id": chron_id
}
if self.backend.detailed_infos_for_cascading_failures:
self.infos["detailed_infos_for_cascading_failures"] = detailed_info
self.done = self._is_done(has_error, is_done)
self.current_reward, other_reward = self._get_reward(
action,
has_error,
self.done, # is_done
is_illegal or is_illegal_redisp or is_illegal_reco,
is_ambiguous,
)
self.infos["rewards"] = other_reward
if has_error and self.current_obs is not None:
# forward to the observation if an alarm is used or not
if hasattr(self._reward_helper.template_reward, "has_alarm_component"):
self._is_alarm_used_in_reward = (
self._reward_helper.template_reward.is_alarm_used
)
if hasattr(self._reward_helper.template_reward, "has_alert_component"):
self._is_alert_used_in_reward = (
self._reward_helper.template_reward.is_alert_used
)
self.current_obs = self.get_obs(_update_state=False, _do_copy=False)
# update the observation so when it's plotted everything is "shutdown"
self.current_obs.set_game_over(self)
if self._update_obs_after_reward and self.current_obs is not None:
# transfer some information computed in the reward into the obs (if any)
self.current_obs.update_after_reward(self)
# TODO documentation on all the possible way to be illegal now
if self.done:
self.__is_init = False
return self.current_obs, self.current_reward, self.done, self.infos
def _get_reward(self, action, has_error, is_done, is_illegal, is_ambiguous):
res = self._reward_helper(
action, self, has_error, is_done, is_illegal, is_ambiguous
)
other_rewards = {
k: v(action, self, has_error, is_done, is_illegal, is_ambiguous)
for k, v in self.other_rewards.items()
}
return res, other_rewards
[docs] def get_reward_instance(self):
"""
INTERNAL
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
Returns the instance of the object that is used to compute the reward.
"""
if self.__closed:
raise EnvError("This environment is closed, you cannot use it.")
return self._reward_helper.template_reward
def _is_done(self, has_error, is_done):
no_more_data = self.chronics_handler.done()
return has_error or is_done or no_more_data
def _reset_vectors_and_timings(self):
"""
INTERNAL
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
Maintenance are not reset, otherwise the data are not read properly (skip the first time step)
"""
self._no_overflow_disconnection = self._parameters.NO_OVERFLOW_DISCONNECTION
self._timestep_overflow[:] = 0
self._nb_timestep_overflow_allowed[
:
] = self._parameters.NB_TIMESTEP_OVERFLOW_ALLOWED
self.nb_time_step = 0 # to have the first step at 0
self._hard_overflow_threshold[:] = self._parameters.HARD_OVERFLOW_THRESHOLD
self._env_dc = self._parameters.ENV_DC
self._times_before_line_status_actionable[:] = 0
self._max_timestep_line_status_deactivated = (
self._parameters.NB_TIMESTEP_COOLDOWN_LINE
)
self._times_before_topology_actionable[:] = 0
self._max_timestep_topology_deactivated = (
self._parameters.NB_TIMESTEP_COOLDOWN_SUB
)
# reset timings
self._time_apply_act = dt_float(0.0)
self._time_powerflow = dt_float(0.0)
self._time_extract_obs = dt_float(0.0)
self._time_opponent = dt_float(0.0)
self._time_create_bk_act = dt_float(0.0)
self._time_redisp = dt_float(0.0)
self._time_step = dt_float(0.0)
if self._has_attention_budget:
self._attention_budget.reset()
# reward and others
self.current_reward = self.reward_range[0]
self.done = False
def _reset_maintenance(self):
self._time_next_maintenance[:] = -1
self._duration_next_maintenance[:] = 0
def __enter__(self):
"""
Support *with-statement* for the environment.
Examples
--------
.. code-block:: python
import grid2op
import grid2op.BaseAgent
with grid2op.make("l2rpn_case14_sandbox") as env:
agent = grid2op.BaseAgent.DoNothingAgent(env.action_space)
act = env.action_space()
obs, r, done, info = env.step(act)
act = agent.act(obs, r, info)
obs, r, done, info = env.step(act)
"""
return self
def __exit__(self, *args):
"""
Support *with-statement* for the environment.
"""
self.close()
# propagate exception
return False
[docs] def close(self):
"""close an environment: this will attempt to free as much memory as possible.
Note that after an environment is closed, you will not be able to use anymore.
Any attempt to use a closed environment might result in non deterministic behaviour.
"""
if self.__closed:
raise EnvError(
f"This environment {id(self)} {self} is closed already, you cannot close it a second time."
)
# todo there might be some side effect
if hasattr(self, "_viewer") and self._viewer is not None:
self._viewer = None
self.viewer_fig = None
if hasattr(self, "backend") and self.backend is not None:
self.backend.close()
del self.backend
self.backend :Backend = None
if hasattr(self, "_observation_space") and self._observation_space is not None:
# do not forget to close the backend of the observation (used for simulate)
self._observation_space.close()
self._observation_space = None
if hasattr(self, "_voltage_controler") and self._voltage_controler is not None:
# in case there is a backend in the voltage controler
self._voltage_controler.close()
self._voltage_controler = None
if hasattr(self, "_oppSpace") and self._oppSpace is not None:
# in case there is a backend in the opponent space
self._oppSpace.close()
self._oppSpace = None
if hasattr(self, "_helper_action_env") and self._helper_action_env is not None:
# close the action helper
self._helper_action_env.close()
self._helper_action_env = None
if hasattr(self, "_action_space") and self._action_space is not None:
# close the action space if needed
self._action_space.close()
self._action_space = None
if hasattr(self, "_reward_helper") and self._reward_helper is not None:
# close the reward if needed
self._reward_helper.close()
self._reward_helper = None
if hasattr(self, "other_rewards") and self.other_rewards is not None:
for el, reward in self.other_rewards.items():
# close the "other rewards"
reward.close()
self.other_rewards = None
self.backend : Backend = None
self.__is_init = False
self.__closed = True
# clean all the attributes
for attr_nm in [
"logger",
"_init_grid_path",
"_DEBUG",
"_complete_action_cls",
"_parameters",
"with_forecast",
"_time_apply_act",
"_time_powerflow",
"_time_extract_obs",
"_time_create_bk_act",
"_time_opponent",
"_time_redisp",
"_time_step",
"_epsilon_poly",
"_helper_action_class",
"_helper_observation_class",
"time_stamp",
"nb_time_step",
"delta_time_seconds",
"current_obs",
"_line_status",
"_ignore_min_up_down_times",
"_forbid_dispatch_off",
"_no_overflow_disconnection",
"_timestep_overflow",
"_nb_timestep_overflow_allowed",
"_hard_overflow_threshold",
"_times_before_line_status_actionable",
"_max_timestep_line_status_deactivated",
"_times_before_topology_actionable",
"_nb_ts_reco",
"_time_next_maintenance",
"_duration_next_maintenance",
"_hazard_duration",
"_env_dc",
"_target_dispatch",
"_actual_dispatch",
"_gen_uptime",
"_gen_downtime",
"_gen_activeprod_t",
"_gen_activeprod_t_redisp",
"_thermal_limit_a",
"_disc_lines",
"_injection",
"_maintenance",
"_hazards",
"_env_modification",
"done",
"current_reward",
"_helper_action_env",
"chronics_handler",
"_game_rules",
"_action_space",
"_rewardClass",
"_actionClass",
"_observationClass",
"_legalActClass",
"_observation_space",
"_names_chronics_to_backend",
"_reward_helper",
"reward_range",
"_viewer",
"viewer_fig",
"other_rewards",
"_opponent_action_class",
"_opponent_class",
"_opponent_init_budget",
"_opponent_attack_duration",
"_opponent_attack_cooldown",
"_opponent_budget_per_ts",
"_kwargs_opponent",
"_opponent_budget_class",
"_opponent_action_space",
"_compute_opp_budget",
"_opponent",
"_oppSpace",
"_voltagecontrolerClass",
"_voltage_controler",
"_backend_action_class",
"_backend_action",
"backend",
"debug_dispatch",
# "__new_param", "__new_forecast_param", "__new_reward_func",
"_storage_current_charge",
"_storage_previous_charge",
"_action_storage",
"_amount_storage",
"_amount_storage_prev",
"_storage_power",
"_storage_power_prev",
"_limit_curtailment",
"_limit_curtailment_prev",
"_gen_before_curtailment",
"_sum_curtailment_mw",
"_sum_curtailment_mw_prev",
"_has_attention_budget",
"_attentiong_budget",
"_attention_budget_cls",
"_is_alarm_illegal",
"_is_alarm_used_in_reward",
"_is_alert_illegal",
"_is_alert_used_in_reward",
"_kwargs_attention_budget",
"_limited_before",
]:
if hasattr(self, attr_nm):
delattr(self, attr_nm)
setattr(self, attr_nm, None)
if self._do_not_erase_local_dir_cls:
# The resources are not held by this env, so
# I do not remove them
# (case for ObsEnv or ForecastedEnv)
return
self._aux_close_local_dir_cls()
def _aux_close_local_dir_cls(self):
if self._local_dir_cls is not None:
# I am the "keeper" of the temporary directory
# deleting this env should also delete the temporary directory
if not (hasattr(self._local_dir_cls, "_RUNNER_DO_NOT_ERASE") and not self._local_dir_cls._RUNNER_DO_NOT_ERASE):
# BUT if a runner uses it, then I should not delete it !
self._local_dir_cls.cleanup()
self._local_dir_cls = None
# In this case it's likely that the OS will clean it for grid2op with a warning...
[docs] def attach_layout(self, grid_layout):
"""
Compare to the method of the base class, this one performs a check.
This method must be called after initialization.
Parameters
----------
grid_layout: ``dict``
The layout of the grid (*i.e* the coordinates (x,y) of all substations). The keys
should be the substation names, and the values a tuple (with two float) representing
the coordinate of the substation.
Examples
---------
Here is an example on how to attach a layout for an environment:
.. code-block:: python
import grid2op
# create the environment
env = grid2op.make("l2rpn_case14_sandbox")
# assign coordinates (0., 0.) to all substations (this is a dummy thing to do here!)
layout = {sub_name: (0., 0.) for sub_name in env.name_sub}
env.attach_layout(layout)
"""
if self.__closed:
raise EnvError("This environment is closed, you cannot use it.")
if isinstance(grid_layout, dict):
pass
elif isinstance(grid_layout, list):
grid_layout = {k: v for k, v in zip(self.name_sub, grid_layout)}
else:
raise EnvError(
"Attempt to set a layout from something different than a dictionary or a list. "
"This is for now not supported."
)
if self.__is_init:
res = {}
for el in self.name_sub:
if not el in grid_layout:
raise EnvError(
'The substation "{}" is not present in grid_layout while in the powergrid.'
"".format(el)
)
tmp = grid_layout[el]
try:
x, y = tmp
x = dt_float(x)
y = dt_float(y)
res[el] = (x, y)
except Exception as e_:
raise EnvError(
'attach_layout: impossible to convert the value of "{}" to a pair of float '
'that will be used the grid layout. The error is: "{}"'
"".format(el, e_)
)
super().attach_layout(res)
if self._action_space is not None:
self._action_space.attach_layout(res)
if self._helper_action_env is not None:
self._helper_action_env.attach_layout(res)
if self._observation_space is not None:
self._observation_space.attach_layout(res)
if self._voltage_controler is not None:
self._voltage_controler.attach_layout(res)
if self._opponent_action_space is not None:
self._opponent_action_space.attach_layout(res)
[docs] def fast_forward_chronics(self, nb_timestep):
"""
This method allows you to skip some time step at the beginning of the chronics.
This is usefull at the beginning of the training, if you want your agent to learn on more diverse scenarios.
Indeed, the data provided in the chronics usually starts always at the same date time (for example Jan 1st at
00:00). This can lead to suboptimal exploration, as during this phase, only a few time steps are managed by
the agent, so in general these few time steps will correspond to grid state around Jan 1st at 00:00.
.. seealso::
From grid2op version 1.10.3, a similar objective can be
obtained directly by calling :func:`grid2op.Environment.Environment.reset` with `"init ts"`
as option, for example like `obs = env.reset(options={"init ts": 12})`
.. danger::
The usage of both :func:`BaseEnv.fast_forward_chronics` and :func:`Environment.set_max_iter`
is not recommended at all and might not behave correctly. Please use `env.reset` with
`obs = env.reset(options={"max step": xxx, "init ts": yyy})` for a correct behaviour.
Parameters
----------
nb_timestep: ``int``
Number of time step to "fast forward"
Examples
---------
From grid2op version 1.10.3 we recommend not to use this function (which will be deprecated)
but to use the :func:`grid2op.Environment.Environment.reset` functon with the `"init ts"`
option.
.. code-block:: python
import grid2op
env_name = "l2rpn_case14_sandbox"
env = grid2op.make(env_name)
obs = env.reset(options={"init ts": 123})
For the legacy usave, this can be used like this:
.. code-block:: python
import grid2op
# create the environment
env = grid2op.make("l2rpn_case14_sandbox")
# skip the first 150 steps of the chronics
env.fast_forward_chronics(150)
done = env.is_done
if not done:
obs = env.get_obs()
# do something
else:
# there was a "game over"
# you need to reset the env (which will "cancel" the fast_forward)
pass
# do something else
Notes
-----
This method can set the state of the environment in a 'game over' state (`done=True`) for example if the
chronics last `xxx` time steps and you ask to "fast foward" more than `xxx` steps. This is why we advise to
check the state of the environment after the call to this method if you use it (see the "Examples" paragaph)
"""
if self.__closed:
raise EnvError("This environment is closed, you cannot use it.")
if not self.__is_init:
raise EnvError("This environment is not intialized. "
"Have you called `env.reset()` after last game over ?")
nb_timestep = int(nb_timestep)
# Go to the timestep requested minus one
nb_timestep = max(1, nb_timestep - 1)
self.chronics_handler.fast_forward(nb_timestep)
self.nb_time_step += nb_timestep
# Update the timing vectors
min_time_line_reco = np.zeros(self.n_line, dtype=dt_int)
min_time_topo = np.zeros(self.n_sub, dtype=dt_int)
ff_time_line_act = self._times_before_line_status_actionable - nb_timestep
ff_time_topo_act = self._times_before_topology_actionable - nb_timestep
self._times_before_line_status_actionable[:] = np.maximum(
ff_time_line_act, min_time_line_reco
)
self._times_before_topology_actionable[:] = np.maximum(
ff_time_topo_act, min_time_topo
)
# Update to the fast forward state using a do nothing action
self.step(self._action_space({}))
[docs] def get_current_line_status(self):
"""
INTERNAL
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
prefer using :attr:`grid2op.Observation.BaseObservation.line_status`
This method allows to retrieve the line status.
"""
if self.current_obs is not None:
powerline_status = self._line_status
else:
# at first time step, every powerline is connected
powerline_status = np.full(self.n_line, fill_value=True, dtype=dt_bool)
# powerline_status = self._line_status
return powerline_status
@property
def parameters(self):
"""
Return a deepcopy of the parameters used by the environment
It is a deepcopy, so modifying it will have absolutely no effect on the environment.
If you want to change the parameters of an environment, please use either
:func:`grid2op.Environment.BaseEnv.change_parameters` to change the parameters of this environment or
:func:`grid2op.Environment.BaseEnv.change_forecast_parameters` to change the parameter of the environment
used by :func:`grid2op.Observation.BaseObservation.simulate` or
:func:`grid2op.Observation.BaseObservation.get_forecast_env`
.. danger::
To modify the environment parameters you need to do:
.. code-block:: python
params = env.parameters
params.WHATEVER = NEW_VALUE
env.change_parameters(params)
env.reset()
If you simply do:
.. code-block:: python
env.params.WHATEVER = NEW_VALUE # no effet !
This will have absolutely no impact.
"""
if self.__closed:
raise EnvError("This environment is closed, you cannot use it.")
res = copy.deepcopy(self._parameters)
# res.read_only = True # TODO at some point !
return res
@parameters.setter
def parameters(self, value):
raise RuntimeError(
"Use the env.change_parameters(new_parameters) to change the parameters. "
"NB: it will only have an effect AFTER the env is reset."
)
[docs] def change_reward(self, new_reward_func):
"""
Change the reward function used for the environment.
TODO examples !
Parameters
----------
new_reward_func:
Either an object of class BaseReward, or a subclass of BaseReward: the new reward function to use
Notes
------
This only affects the environment AFTER `env.reset()` has been called.
"""
if self.__closed:
raise EnvError("This environment is closed, you cannot use it.")
is_ok = isinstance(new_reward_func, BaseReward) or issubclass(
new_reward_func, BaseReward
)
if not is_ok:
raise EnvError(
f"Impossible to change the reward function with type {type(new_reward_func)}. "
f"It should be an object from a class that inherit grid2op.Reward.BaseReward "
f"or a subclass of grid2op.Reward.BaseReward"
)
self.__new_reward_func = new_reward_func
@staticmethod
def _aux_gen_classes(cls_other, sys_path, _add_class_output=False):
if not isinstance(cls_other, type):
raise RuntimeError(f"cls_other should be a type and not an object !: {cls_other}")
if not issubclass(cls_other, GridObjects):
raise RuntimeError(f"cls_other should inherit from GridObjects: {cls_other}")
from pathlib import Path
path_env = cls_other._PATH_GRID_CLASSES
# cls_other._PATH_GRID_CLASSES = str(Path(self.get_path_env()).as_posix())
cls_other._PATH_GRID_CLASSES = str(Path(sys_path).as_posix())
res = cls_other._get_full_cls_str()
cls_other._PATH_GRID_CLASSES = path_env
output_file = os.path.join(sys_path, f"{cls_other.__name__}_file.py")
if not os.path.exists(output_file):
# if the file is not already saved, i save it and add it to the __init__ file
with open(output_file, "w", encoding="utf-8") as f:
f.write(res)
str_import = f"\nfrom .{cls_other.__name__}_file import {cls_other.__name__}"
else:
# if the file exists, I check it's the same
from grid2op.MakeEnv.UpdateEnv import _aux_hash_file, _aux_update_hash_text
hash_saved = _aux_hash_file(output_file)
my_hash = _aux_update_hash_text(res)
if hash_saved.hexdigest() != my_hash.hexdigest():
raise EnvError(f"It appears some classes have been modified between what was saved on the hard drive "
f"and the current state of the grid. This should not have happened. "
f"Check class {cls_other.__name__}")
str_import = None
if not _add_class_output:
return str_import
# NB: these imports needs to be consistent with what is done in
# griobj.init_grid(...)
package_path, nm_ = os.path.split(output_file)
nm_, ext = os.path.splitext(nm_)
sub_repo, tmp_nm = os.path.split(package_path)
if sub_repo not in sys.path:
sys.path.append(sub_repo)
sub_repo_mod = None
if tmp_nm == "_grid2op_classes":
# legacy "experimental_read_from_local_dir"
# issue was the module "_grid2op_classes" had the same name
# regardless of the environment, so grid2op was "confused"
path_init = os.path.join(sub_repo, "__init__.py")
if not os.path.exists(path_init):
try:
with open(path_init, "w", encoding='utf-8') as f:
f.write("# DO NOT REMOVE, automatically generated by grid2op")
except FileExistsError:
pass
env_path, env_nm = os.path.split(sub_repo)
if env_path not in sys.path:
sys.path.append(env_path)
if not package_path in sys.path:
sys.path.append(package_path)
super_supermodule = importlib.import_module(env_nm)
nm_ = f"{tmp_nm}.{nm_}"
tmp_nm = env_nm
super_module = importlib.import_module(tmp_nm, package=sub_repo_mod)
add_sys_path = os.path.dirname(super_module.__file__)
if not add_sys_path in sys.path:
sys.path.append(add_sys_path)
if f"{tmp_nm}.{nm_}" in sys.modules:
cls_res = getattr(sys.modules[f"{tmp_nm}.{nm_}"], cls_other.__name__)
return str_import, cls_res
try:
module = importlib.import_module(f".{nm_}", package=tmp_nm)
except ModuleNotFoundError as exc_:
# invalidate the cache and reload the package in this case
importlib.invalidate_caches()
importlib.reload(super_module)
module = importlib.import_module(f".{nm_}", package=tmp_nm)
cls_res = getattr(module, cls_other.__name__)
return str_import, cls_res
[docs] def generate_classes(self, *, local_dir_id=None, _guard=None, _is_base_env__=True, sys_path=None):
"""
Use with care, but can be incredibly useful !
If you get into trouble like :
.. code-block:: none
AttributeError: Can't get attribute 'ActionSpace_l2rpn_icaps_2021_small'
on <module 'grid2op.Space.GridObjects' from
/home/user/Documents/grid2op_dev/grid2op/Space/GridObjects.py'>
You might want to call this function and that MIGHT solve your problem.
This function will create a subdirectory ino the env directory,
that will be accessed when loading the classes
used for the environment.
The default behaviour is to build the class on the fly which can cause some
issues when using `pickle` or `multiprocessing` for example.
Examples
--------
Here is how to best leverage this functionality:
First step, generated the classes once and for all.
.. warning::
You need to redo this step each time
you customize the environment. This customization includes, but is not limited to:
- change the backend type: `grid2op.make(..., backend=...)`
- change the action class: `grid2op.make(..., action_class=...)`
- change observation class: `grid2op.make(..., observation_class=...)`
- change the `volagecontroler_class`
- change the `grid_path`
- change the `opponent_action_class`
- etc.
.. code-block:: python
import grid2op
env_name = "l2rpn_case14_sandbox" # or any other name
env = grid2op.make(env_name, ...) # again: redo this step each time you customize "..."
# for example if you change the `action_class` or the `backend` etc.
env.generate_classes()
Then, next time you want to use the SAME environment, you can do:
.. code-block:: python
import grid2op
env_name = SAME NAME AS ABOVE
env = grid2op.make(env_name,
experimental_read_from_local_dir=True,
SAME ENV CUSTOMIZATION AS ABOVE)
And it should (this is experimerimental for now, and we expect feedback on the matter) solve
the issues involving pickle.
Again, if you customize your environment (see above for more information) you'll have to redo this step !
"""
if self.__closed:
return
# create the folder
if _guard is not None:
raise RuntimeError("use `env.generate_classes()` with no arguments !")
if type(self)._PATH_GRID_CLASSES is not None:
raise RuntimeError(
"This function should only be called ONCE without specifying that the classes "
"need to be read from disk (class attribute type(self)._PATH_GRID_CLASSES should be None)"
)
import shutil
if sys_path is None:
if not _is_base_env__:
raise RuntimeError("Cannot generate file from a \"sub env\" "
"(eg no the top level env) if I don't know the path of "
"the top level environment.")
if local_dir_id is not None:
sys_path = os.path.join(self.get_path_env(), "_grid2op_classes", local_dir_id)
else:
sys_path = os.path.join(self.get_path_env(), "_grid2op_classes")
if _is_base_env__:
if os.path.exists(sys_path):
shutil.rmtree(sys_path)
os.mkdir(sys_path)
with open(os.path.join(sys_path, "__init__.py"), "w", encoding="utf-8") as f:
f.write(BASE_TXT_COPYRIGHT)
# initialized the "__init__" file
_init_txt = ""
mode = "a"
# generate the classes
# for the environment
txt_ = self._aux_gen_classes(type(self), sys_path)
if txt_ is not None:
_init_txt += txt_
# for the forecast env (we do this even if it's not used)
from grid2op.Environment._forecast_env import _ForecastEnv
for_env_cls = _ForecastEnv.init_grid(type(self.backend), _local_dir_cls=self._local_dir_cls)
txt_ = self._aux_gen_classes(for_env_cls, sys_path, _add_class_output=False)
if txt_ is not None:
_init_txt += txt_
# for the backend
txt_, cls_res_bk = self._aux_gen_classes(type(self.backend), sys_path, _add_class_output=True)
if txt_ is not None:
_init_txt += txt_
old_bk_cls = self.backend.__class__
self.backend.__class__ = cls_res_bk
txt_, cls_res_complete_act = self._aux_gen_classes(
old_bk_cls._complete_action_class, sys_path, _add_class_output=True
)
if txt_ is not None:
_init_txt += txt_
self.backend.__class__._complete_action_class = cls_res_complete_act
txt_, cls_res_bk_act = self._aux_gen_classes(self._backend_action_class, sys_path, _add_class_output=True)
if txt_ is not None:
_init_txt += txt_
self._backend_action_class = cls_res_bk_act
self.backend.__class__.my_bk_act_class = cls_res_bk_act
# for the other class
txt_ = self._aux_gen_classes(type(self.action_space), sys_path)
if txt_ is not None:
_init_txt += txt_
txt_ = self._aux_gen_classes(self._actionClass, sys_path)
if txt_ is not None:
_init_txt += txt_
txt_ = self._aux_gen_classes(self._complete_action_cls, sys_path)
if txt_ is not None:
_init_txt += txt_
txt_ = self._aux_gen_classes(type(self.observation_space), sys_path)
if txt_ is not None:
_init_txt += txt_
txt_ = self._aux_gen_classes(self._observationClass, sys_path)
if txt_ is not None:
_init_txt += txt_
txt_ = self._aux_gen_classes(
self._opponent_action_space.subtype, sys_path
)
if txt_ is not None:
_init_txt += txt_
# now do the same for the obs_env
if _is_base_env__:
txt_ = self._aux_gen_classes(
self._voltage_controler.action_space.subtype, sys_path
)
if txt_ is not None:
_init_txt += txt_
init_grid_tmp = self._observation_space.obs_env._init_grid_path
self._observation_space.obs_env._init_grid_path = self._init_grid_path
self._observation_space.obs_env.generate_classes(local_dir_id=local_dir_id,
_is_base_env__=False,
sys_path=sys_path)
self._observation_space.obs_env._init_grid_path = init_grid_tmp
# now write the __init__ file
_init_txt += "\n"
with open(os.path.join(sys_path, "__init__.py"), mode, encoding="utf-8") as f:
f.write(_init_txt)
def __del__(self):
"""when the environment is garbage collected, free all the memory, including cross reference to itself in the observation space."""
if hasattr(self, "_BaseEnv__closed") and not self.__closed:
self.close()
def _update_vector_with_timestep(self, horizon, is_overflow):
"""
INTERNAL
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
update the value of the "time dependant" attributes, used mainly for the "_ObsEnv" (simulate) or
the "Forecasted env" (obs.get_forecast_env())
"""
cls = type(self)
# update the cooldowns
self._times_before_line_status_actionable[:] = np.maximum(
self._times_before_line_status_actionable - (horizon - 1), 0
)
self._times_before_topology_actionable[:] = np.maximum(
self._times_before_topology_actionable - (horizon - 1), 0
)
# update the maintenance
tnm_orig = 1 * self._time_next_maintenance
dnm_orig = 1 * self._duration_next_maintenance
has_maint = self._time_next_maintenance != -1
reconnected = np.full(cls.n_line, fill_value=False)
maint_started = np.full(cls.n_line, fill_value=False)
maint_over = np.full(cls.n_line, fill_value=False)
maint_started[has_maint] = (tnm_orig[has_maint] <= horizon)
maint_over[has_maint] = (tnm_orig[has_maint] + dnm_orig[has_maint]
<= horizon)
reconnected[has_maint] = tnm_orig[has_maint] + dnm_orig[has_maint] == horizon
first_ts_maintenance = tnm_orig == horizon
still_in_maintenance = maint_started & (~maint_over) & (~first_ts_maintenance)
# count down time next maintenance
self._time_next_maintenance[:] = np.maximum(
self._time_next_maintenance - horizon, -1
)
# powerline that are still in maintenance at this time step
self._time_next_maintenance[still_in_maintenance] = 0
self._duration_next_maintenance[still_in_maintenance] -= (horizon - tnm_orig[still_in_maintenance])
# powerline that will be in maintenance at this time step
self._time_next_maintenance[first_ts_maintenance] = 0
# powerline that will be in maintenance at this time step
self._time_next_maintenance[reconnected | maint_over] = -1
self._duration_next_maintenance[reconnected | maint_over] = 0
# soft overflow
# this is tricky here because I have no model to predict the future...
# As i cannot do better, I simply do "if I am in overflow now, i will be later"
self._timestep_overflow[is_overflow] += (horizon - 1)
return still_in_maintenance, reconnected, first_ts_maintenance
def _reset_to_orig_state(self, obs):
"""
INTERNAL
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
reset this "environment" to the state it should be
update the value of the "time dependant" attributes, used mainly for the "_ObsEnv" (simulate) or
the "Forecasted env" (obs.get_forecast_env())
"""
self.backend.set_thermal_limit(obs._thermal_limit)
if "opp_space_state" in obs._env_internal_params:
self._oppSpace._set_state(obs._env_internal_params["opp_space_state"],
obs._env_internal_params["opp_state"])
# storage unit
self._storage_current_charge[:] = obs.storage_charge
self._storage_previous_charge[:] = obs._env_internal_params["_storage_previous_charge"]
self._action_storage[:] = obs.storage_power_target
self._storage_power[:] = obs.storage_power
self._amount_storage = obs._env_internal_params["_amount_storage"]
self._amount_storage_prev = obs._env_internal_params["_amount_storage_prev"]
# curtailment
self._limit_curtailment[:] = obs.curtailment_limit
self._gen_before_curtailment[:] = obs.gen_p_before_curtail
self._sum_curtailment_mw = obs._env_internal_params["_sum_curtailment_mw"]
self._sum_curtailment_mw_prev = obs._env_internal_params["_sum_curtailment_mw_prev"]
# line status
self._line_status[:] = obs._env_internal_params["_line_status_env"] == 1
# attention budget
if self._has_attention_budget:
self._attention_budget.set_state(obs._env_internal_params["_attention_budget_state"])
# cooldown
self._times_before_line_status_actionable[
:
] = obs.time_before_cooldown_line
self._times_before_topology_actionable[
:
] = obs.time_before_cooldown_sub
# maintenance
self._time_next_maintenance[:] = obs.time_next_maintenance
self._duration_next_maintenance[:] = obs.duration_next_maintenance
# redisp
self._target_dispatch[:] = obs.target_dispatch
self._actual_dispatch[:] = obs.actual_dispatch
self._already_modified_gen[:] = obs._env_internal_params["_already_modified_gen"]
self._gen_activeprod_t[:] = obs._env_internal_params["_gen_activeprod_t"]
self._gen_activeprod_t_redisp[:] = obs._env_internal_params["_gen_activeprod_t_redisp"]
# current step
self.nb_time_step = obs.current_step
self.delta_time_seconds = 60. * obs.delta_time
# soft overflow
self._timestep_overflow[:] = obs.timestep_overflow
def forecasts(self):
# ensure that the "env.chronics_handler.forecasts" is called at most once per step
# this should NOT be called is self.deactive_forecast is true
if not self.with_forecast:
raise Grid2OpException("Attempt to retrieve the forecasts when they are not available.")
if self._forecasts is None:
self._forecasts = self.chronics_handler.forecasts()
return self._forecasts
@staticmethod
def _check_rules_correct(legalActClass):
if isinstance(legalActClass, type):
# raise Grid2OpException(
# 'Parameter "legalActClass" used to build the Environment should be a type '
# "(a class) and not an object (an instance of a class). "
# 'It is currently "{}"'.format(type(legalActClass))
# )
if not issubclass(legalActClass, BaseRules):
raise Grid2OpException(
'Parameter "legalActClass" used to build the Environment should derived form the '
'grid2op.BaseRules class, type provided is "{}"'.format(
type(legalActClass)
)
)
else:
if not isinstance(legalActClass, BaseRules):
raise Grid2OpException(
'Parameter "legalActClass" used to build the Environment should be an instance of the '
'grid2op.BaseRules class, type provided is "{}"'.format(
type(legalActClass)
)
)
[docs] def classes_are_in_files(self) -> bool:
"""
Whether the classes created when this environment has been made are
store on the hard drive (will return `True`) or not.
.. info::
This will become the default behaviour in future grid2op versions.
See :ref:`troubleshoot_pickle` for more information.
"""
return self._read_from_local_dir is not None