# Copyright (c) 2019-2020, RTE (https://www.rte-france.com)
# See AUTHORS.txt
# This Source Code Form is subject to the terms of the Mozilla Public License, version 2.0.
# If a copy of the Mozilla Public License, version 2.0 was not distributed with this file,
# you can obtain one at http://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
# This file is part of Grid2Op, Grid2Op a testbed platform to model sequential decision making in power systems.
import os
import copy
import warnings
import numpy as np
import re
from typing import Optional, Union, Literal
import grid2op
from grid2op.Opponent import OpponentSpace
from grid2op.dtypes import dt_float, dt_bool, dt_int
from grid2op.Action import (
ActionSpace,
BaseAction,
TopologyAction,
DontAct,
CompleteAction,
)
from grid2op.Exceptions import *
from grid2op.Observation import CompleteObservation, ObservationSpace, BaseObservation
from grid2op.Reward import FlatReward, RewardHelper, BaseReward
from grid2op.Rules import RulesChecker, AlwaysLegal, BaseRules
from grid2op.Backend import Backend
from grid2op.Chronics import ChronicsHandler
from grid2op.VoltageControler import ControlVoltageFromFile, BaseVoltageController
from grid2op.Environment.baseEnv import BaseEnv
from grid2op.Opponent import BaseOpponent, NeverAttackBudget
from grid2op.operator_attention import LinearAttentionBudget
from grid2op.Space import DEFAULT_N_BUSBAR_PER_SUB
from grid2op.typing_variables import RESET_OPTIONS_TYPING, N_BUSBAR_PER_SUB_TYPING
from grid2op.MakeEnv.PathUtils import USE_CLASS_IN_FILE
[docs]class Environment(BaseEnv):
"""
This class is the grid2op implementation of the "Environment" entity in the RL framework.
.. danger::
Long story short, once a environment is deleted, you cannot use anything it "holds" including,
but not limited to the capacity to perform `obs.simulate(...)` even if the `obs` is still
referenced.
See :ref:`danger-env-ownership` (first danger block).
Attributes
----------
name: ``str``
The name of the environment
action_space: :class:`grid2op.Action.ActionSpace`
Another name for :attr:`Environment.helper_action_player` for gym compatibility.
observation_space: :class:`grid2op.Observation.ObservationSpace`
Another name for :attr:`Environment.helper_observation` for gym compatibility.
reward_range: ``(float, float)``
The range of the reward function
metadata: ``dict``
For gym compatibility, do not use
spec: ``None``
For Gym compatibility, do not use
_viewer: ``object``
Used to display the powergrid. Currently properly supported.
"""
REGEX_SPLIT = r"^[a-zA-Z0-9_\\.]*$"
def __init__(
self,
init_env_path: str,
init_grid_path: str,
chronics_handler,
backend,
parameters,
name="unknown",
n_busbar : N_BUSBAR_PER_SUB_TYPING=DEFAULT_N_BUSBAR_PER_SUB,
names_chronics_to_backend=None,
actionClass=TopologyAction,
observationClass=CompleteObservation,
rewardClass=FlatReward,
legalActClass=AlwaysLegal,
voltagecontrolerClass=ControlVoltageFromFile,
other_rewards={},
thermal_limit_a=None,
with_forecast=True,
epsilon_poly=1e-4, # precision of the redispatching algorithm we don't recommend to go above 1e-4
tol_poly=1e-2, # i need to compute a redispatching if the actual values are "more than tol_poly" the values they should be
opponent_space_type=OpponentSpace,
opponent_action_class=DontAct,
opponent_class=BaseOpponent,
opponent_init_budget=0.0,
opponent_budget_per_ts=0.0,
opponent_budget_class=NeverAttackBudget,
opponent_attack_duration=0,
opponent_attack_cooldown=99999,
kwargs_opponent={},
attention_budget_cls=LinearAttentionBudget,
kwargs_attention_budget={},
has_attention_budget=False,
logger=None,
kwargs_observation=None,
observation_bk_class=None,
observation_bk_kwargs=None,
highres_sim_counter=None,
_update_obs_after_reward=True,
_init_obs=None,
_raw_backend_class=None,
_compat_glop_version=None,
_read_from_local_dir=None,
_is_test=False,
_allow_loaded_backend=False,
_local_dir_cls=None, # only set at the first call to `make(...)` after should be false
_overload_name_multimix=None,
):
BaseEnv.__init__(
self,
init_env_path=init_env_path,
init_grid_path=init_grid_path,
parameters=parameters,
thermal_limit_a=thermal_limit_a,
epsilon_poly=epsilon_poly,
tol_poly=tol_poly,
other_rewards=other_rewards,
with_forecast=with_forecast,
voltagecontrolerClass=voltagecontrolerClass,
opponent_space_type=opponent_space_type,
opponent_action_class=opponent_action_class,
opponent_class=opponent_class,
opponent_budget_class=opponent_budget_class,
opponent_init_budget=opponent_init_budget,
opponent_budget_per_ts=opponent_budget_per_ts,
opponent_attack_duration=opponent_attack_duration,
opponent_attack_cooldown=opponent_attack_cooldown,
kwargs_opponent=kwargs_opponent,
has_attention_budget=has_attention_budget,
attention_budget_cls=attention_budget_cls,
kwargs_attention_budget=kwargs_attention_budget,
logger=logger.getChild("grid2op_Environment")
if logger is not None
else None,
kwargs_observation=kwargs_observation,
observation_bk_class=observation_bk_class,
observation_bk_kwargs=observation_bk_kwargs,
highres_sim_counter=highres_sim_counter,
update_obs_after_reward=_update_obs_after_reward,
n_busbar=n_busbar, # TODO n_busbar_per_sub different num per substations: read from a config file maybe (if not provided by the user)
name=name,
_raw_backend_class=_raw_backend_class if _raw_backend_class is not None else type(backend),
_init_obs=_init_obs,
_is_test=_is_test, # is this created with "test=True" # TODO not implemented !!
_local_dir_cls=_local_dir_cls,
_read_from_local_dir=_read_from_local_dir,
)
if name == "unknown":
warnings.warn(
'It is NOT recommended to create an environment without "make" and EVEN LESS '
"to use an environment without a name..."
)
if _overload_name_multimix is not None:
# this means that the "make" call is issued from the
# creation of a MultiMix.
# So I use the base name instead.
self.name = "".join(_overload_name_multimix[2:])
self.multimix_mix_name = name
self._overload_name_multimix = _overload_name_multimix
else:
self.name = name
self._overload_name_multimix = None
self.multimix_mix_name = None
# to remember if the user specified a "max_iter" at some point
self._max_iter = chronics_handler.max_iter # for all episode, set in the chronics_handler or by a call to `env.set_max_iter`
self._max_step = None # for the current episode
#: starting grid2Op 1.11 classes are stored on the disk when an environment is created
#: so the "environment" is created twice (one to generate the class and then correctly to load them)
self._allow_loaded_backend : bool = _allow_loaded_backend
# for gym compatibility (action_spacen and observation_space initialized below)
self.reward_range = None
self._viewer = None
self.metadata = None
self.spec = None
self._compat_glop_version = _compat_glop_version
# needs to be done before "_init_backend" otherwise observationClass is not defined in the
# observation space (real_env_kwargs)
self._observationClass_orig = observationClass
# for plotting
self._init_backend(
chronics_handler,
backend,
names_chronics_to_backend,
actionClass,
observationClass,
rewardClass,
legalActClass,
)
def _init_backend(
self,
chronics_handler,
backend,
names_chronics_to_backend,
actionClass,
observationClass,
rewardClass,
legalActClass,
):
"""
INTERNAL
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
Create a proper and valid environment.
"""
if isinstance(rewardClass, type):
if not issubclass(rewardClass, BaseReward):
raise Grid2OpException(
'Parameter "rewardClass" used to build the Environment should derived form '
'the grid2op.BaseReward class, type provided is "{}"'.format(
type(rewardClass)
)
)
else:
if not isinstance(rewardClass, BaseReward):
raise Grid2OpException(
'Parameter "rewardClass" used to build the Environment should derived form '
'the grid2op.BaseReward class, type provided is "{}"'.format(
type(rewardClass)
)
)
# backend
if not isinstance(backend, Backend):
raise Grid2OpException(
'Parameter "backend" used to build the Environment should derived form the '
'grid2op.Backend class, type provided is "{}"'.format(type(backend))
)
self.backend = backend
if self.backend.is_loaded and self._init_obs is None and not self._allow_loaded_backend:
raise EnvError(
"Impossible to use the same backend twice. Please create your environment with a "
"new backend instance (new object)."
)
self._actionClass_orig = actionClass
need_process_backend = False
if not self.backend.is_loaded:
if hasattr(self.backend, "init_pp_backend") and self.backend.init_pp_backend is not None:
# hack for lightsim2grid ...
if type(self.backend.init_pp_backend)._INIT_GRID_CLS is not None:
type(self.backend.init_pp_backend)._INIT_GRID_CLS._clear_grid_dependant_class_attributes()
type(self.backend.init_pp_backend)._clear_grid_dependant_class_attributes()
# usual case: the backend is not loaded
# NB it is loaded when the backend comes from an observation for
# example
if self._read_from_local_dir is not None:
# test to support pickle conveniently
# type(self.backend)._PATH_GRID_CLASSES = self.get_path_env()
self.backend._PATH_GRID_CLASSES = self._read_from_local_dir
# all the above should be done in this exact order, otherwise some weird behaviour might occur
# this is due to the class attribute
type(self.backend).set_env_name(self.name)
type(self.backend).set_n_busbar_per_sub(self._n_busbar)
if self._compat_glop_version is not None:
type(self.backend).glop_version = self._compat_glop_version
self.backend.load_grid(
self._init_grid_path
) # the real powergrid of the environment
self.backend.load_storage_data(self.get_path_env())
self.backend._fill_names_obj()
try:
self.backend.load_redispacthing_data(self.get_path_env())
except BackendError as exc_:
self.backend.redispatching_unit_commitment_availble = False
warnings.warn(f"Impossible to load redispatching data. This is not an error but you will not be able "
f"to use all grid2op functionalities. "
f"The error was: \"{exc_}\"")
exc_ = self.backend.load_grid_layout(self.get_path_env())
if exc_ is not None:
warnings.warn(
f"No layout have been found for you grid (or the layout provided was corrupted). You will "
f'not be able to use the renderer, plot the grid etc. The error was "{exc_}"'
)
# alarm set up
self.load_alarm_data()
self.load_alert_data()
# to force the initialization of the backend to the proper type
self.backend.assert_grid_correct(
_local_dir_cls=self._local_dir_cls)
self.backend.is_loaded = True
need_process_backend = True
self._handle_compat_glop_version(need_process_backend)
self._has_been_initialized() # really important to include this piece of code! and just here after the
# backend has loaded everything
self._line_status = np.ones(shape=self.n_line, dtype=dt_bool)
self._disc_lines = np.zeros(shape=self.n_line, dtype=dt_int) - 1
if self._thermal_limit_a is None:
self._thermal_limit_a = self.backend.thermal_limit_a.astype(dt_float)
else:
self.backend.set_thermal_limit(self._thermal_limit_a.astype(dt_float))
*_, tmp = self.backend.generators_info()
# rules of the game
self._check_rules_correct(legalActClass)
self._game_rules = RulesChecker(legalActClass=legalActClass)
self._game_rules.initialize(self)
self._legalActClass = legalActClass
# action helper
if not isinstance(actionClass, type):
raise Grid2OpException(
'Parameter "actionClass" used to build the Environment should be a type (a class) '
"and not an object (an instance of a class). "
'It is currently "{}"'.format(type(legalActClass))
)
if not issubclass(actionClass, BaseAction):
raise Grid2OpException(
'Parameter "actionClass" used to build the Environment should derived form the '
'grid2op.BaseAction class, type provided is "{}"'.format(
type(actionClass)
)
)
if not isinstance(observationClass, type):
raise Grid2OpException(
f'Parameter "observationClass" used to build the Environment should be a type (a class) '
f"and not an object (an instance of a class). "
f'It is currently : {observationClass} (type "{type(observationClass)}")'
)
if not issubclass(observationClass, BaseObservation):
raise Grid2OpException(
f'Parameter "observationClass" used to build the Environment should derived form the '
f'grid2op.BaseObservation class, type provided is "{type(observationClass)}"'
)
# action affecting the grid that will be made by the agent
# be careful here: you need to initialize from the class, and not from the object
bk_type = type(self.backend)
self._rewardClass = rewardClass
self._actionClass = actionClass.init_grid(gridobj=bk_type, _local_dir_cls=self._local_dir_cls)
self._actionClass._add_shunt_data()
self._actionClass._update_value_set()
self._observationClass = observationClass.init_grid(gridobj=bk_type, _local_dir_cls=self._local_dir_cls)
self._complete_action_cls = CompleteAction.init_grid(gridobj=bk_type, _local_dir_cls=self._local_dir_cls)
self._helper_action_class = ActionSpace.init_grid(gridobj=bk_type, _local_dir_cls=self._local_dir_cls)
self._action_space = self._helper_action_class(
gridobj=bk_type,
actionClass=actionClass,
legal_action=self._game_rules.legal_action,
_local_dir_cls=self._local_dir_cls
)
# action that affect the grid made by the environment.
self._helper_action_env = self._helper_action_class(
gridobj=bk_type,
actionClass=CompleteAction,
legal_action=self._game_rules.legal_action,
_local_dir_cls=self._local_dir_cls,
)
# handles input data
if not isinstance(chronics_handler, ChronicsHandler):
raise Grid2OpException(
'Parameter "chronics_handler" used to build the Environment should derived form the '
'grid2op.ChronicsHandler class, type provided is "{}"'.format(
type(chronics_handler)
)
)
if names_chronics_to_backend is None and type(self.backend).IS_BK_CONVERTER:
names_chronics_to_backend = self.backend.names_target_to_source
self.chronics_handler = chronics_handler
self.chronics_handler.initialize(
self.name_load,
self.name_gen,
self.name_line,
self.name_sub,
names_chronics_to_backend=names_chronics_to_backend,
)
# new in grdi2op 1.10.2: used
self.chronics_handler.action_space = self._helper_action_env
self._names_chronics_to_backend = names_chronics_to_backend
self.delta_time_seconds = dt_float(self.chronics_handler.time_interval.seconds)
# this needs to be done after the chronics handler: rewards might need information
# about the chronics to work properly.
self._helper_observation_class = ObservationSpace.init_grid(gridobj=bk_type, _local_dir_cls=self._local_dir_cls)
# FYI: this try to copy the backend if it fails it will modify the backend
# and the environment to force the deactivation of the
# forecasts
self._observation_space = self._helper_observation_class(
gridobj=bk_type,
observationClass=observationClass,
actionClass=actionClass,
rewardClass=rewardClass,
env=self,
kwargs_observation=self._kwargs_observation,
observation_bk_class=self._observation_bk_class,
observation_bk_kwargs=self._observation_bk_kwargs,
_local_dir_cls=self._local_dir_cls
)
# test to make sure the backend is consistent with the chronics generator
self.chronics_handler.check_validity(self.backend)
self._reset_storage() # this should be called after the self.delta_time_seconds is set
# reward function
self._reward_helper = RewardHelper(self._rewardClass, logger=self.logger)
self._reward_helper.initialize(self)
for k, v in self.other_rewards.items():
v.initialize(self)
# controller for voltage
if not issubclass(self._voltagecontrolerClass, BaseVoltageController):
raise Grid2OpException(
'Parameter "voltagecontrolClass" should derive from "ControlVoltageFromFile".'
)
self._voltage_controler = self._voltagecontrolerClass(
gridobj=bk_type,
controler_backend=self.backend,
actionSpace_cls=self._helper_action_class,
_local_dir_cls=self._local_dir_cls
)
# create the opponent
# At least the 3 following attributes should be set before calling _create_opponent
self._create_opponent()
# create the attention budget
self._create_attention_budget()
# init the alert relate attributes
self._init_alert_data()
# performs one step to load the environment properly (first action need to be taken at first time step after
# first injections given)
self._reset_maintenance()
self._reset_redispatching()
self._reward_to_obs = {}
do_nothing = self._helper_action_env({})
# needs to be done at the end, but before the first "step" is called
self._observation_space.set_real_env_kwargs(self)
# see issue https://github.com/rte-france/Grid2Op/issues/617
# thermal limits are set AFTER this initial step
_no_overflow_disconnection = self._no_overflow_disconnection
self._no_overflow_disconnection = True
*_, fail_to_start, info = self.step(do_nothing)
self._no_overflow_disconnection = _no_overflow_disconnection
if fail_to_start:
raise Grid2OpException(
"Impossible to initialize the powergrid, the powerflow diverge at iteration 0. "
"Available information are: {}".format(info)
) from info["exception"][0]
# test the backend returns object of the proper size
if need_process_backend:
# hack to fix an issue with lightsim2grid...
# (base class is not reset correctly, will be fixed ASAP)
base_cls_ls = None
if hasattr(self.backend, "init_pp_backend") and self.backend.init_pp_backend is not None:
base_cls_ls = type(self.backend.init_pp_backend)
self.backend.assert_grid_correct_after_powerflow()
# hack to fix an issue with lightsim2grid...
# (base class is not reset correctly, will be fixed ASAP)
if hasattr(self.backend, "init_pp_backend") and self.backend.init_pp_backend is not None:
if self.backend._INIT_GRID_CLS is not None:
# the init grid class has already been properly computed
self.backend._INIT_GRID_CLS._clear_grid_dependant_class_attributes()
elif base_cls_ls is not None:
# we need to clear the class of the original type as it has not been properly computed
base_cls_ls._clear_grid_dependant_class_attributes()
# for gym compatibility
self.reward_range = self._reward_helper.range()
self._viewer = None
self.viewer_fig = None
self.metadata = {"render.modes": ["rgb_array"]}
self.spec = None
self.current_reward = self.reward_range[0]
self.done = False
# reset everything to be consistent
self._reset_vectors_and_timings()
[docs] def max_episode_duration(self):
"""
Return the maximum duration (in number of steps) of the current episode.
Notes
-----
For possibly infinite episode, the duration is returned as `np.iinfo(np.int32).max` which corresponds
to the maximum 32 bit integer (usually `2147483647`)
"""
if self._max_step is not None:
return self._max_step
tmp = dt_int(self.chronics_handler.max_episode_duration())
if tmp < 0:
tmp = dt_int(np.iinfo(dt_int).max)
return tmp
def _aux_check_max_iter(self, max_iter):
try:
max_iter_int = int(max_iter)
except ValueError as exc_:
raise EnvError("Impossible to set 'max_iter' by providing something that is not an integer.") from exc_
if max_iter_int != max_iter:
raise EnvError("Impossible to set 'max_iter' by providing something that is not an integer.")
if max_iter_int < 1 and max_iter_int != -1:
raise EnvError("'max_iter' should be an int >= 1 or -1")
return max_iter_int
[docs] def set_max_iter(self, max_iter):
"""
Set the maximum duration of an episode for all the next episodes.
.. seealso::
The option `max step` when calling the :func:`Environment.reset` function
used like `obs = env.reset(options={"max step": 288})` (see examples of
`env.reset` for more information)
.. note::
The real maximum duration of a duration depends on this parameter but also on the
size of the time series used. For example, if you use an environment with
time series lasting 8064 steps and you call `env.set_max_iter(9000)`
the maximum number of iteration will still be 8064.
.. warning::
It only has an impact on future episode. Said differently it also has an impact AFTER
`env.reset` has been called.
.. danger::
The usage of both :func:`BaseEnv.fast_forward_chronics` and :func:`Environment.set_max_iter`
is not recommended at all and might not behave correctly. Please use `env.reset` with
`obs = env.reset(options={"max step": xxx, "init ts": yyy})` for a correct behaviour.
Parameters
----------
max_iter: ``int``
The maximum number of iterations you can do before reaching the end of the episode. Set it to "-1" for
possibly infinite episode duration.
Examples
--------
It can be used like this:
.. code-block:: python
import grid2op
env_name = "l2rpn_case14_sandbox"
env = grid2op.make(env_name)
obs = env.reset()
obs.max_step == 8064 # default for this environment
env.set_max_iter(288)
# no impact here
obs = env.reset()
obs.max_step == 288
# the limitation still applies to the next episode
obs = env.reset()
obs.max_step == 288
If you want to "unset" your limitation, you can do:
.. code-block:: python
env.set_max_iter(-1)
obs = env.reset()
obs.max_step == 8064
Finally, you cannot limit it to something larger than the duration
of the time series of the environment:
.. code-block:: python
env.set_max_iter(9000)
obs = env.reset()
obs.max_step == 8064
# the call to env.set_max_iter has no impact here
Notes
-------
Maximum length of the episode can depend on the chronics used. See :attr:`Environment.chronics_handler` for
more information
"""
max_iter_int = self._aux_check_max_iter(max_iter)
self._max_iter = max_iter_int
self.chronics_handler._set_max_iter(max_iter_int)
@property
def _helper_observation(self):
return self._observation_space
@property
def _helper_action_player(self):
return self._action_space
def _handle_compat_glop_version(self, need_process_backend):
if (
self._compat_glop_version is not None
and self._compat_glop_version != grid2op.__version__
):
warnings.warn(
'You are using a grid2op "compatibility" environment. This means that some '
"feature will not be available. This feature is absolutely NOT recommended except to "
"read back data (for example with EpisodeData) that were stored with previous "
"grid2op version."
)
if need_process_backend:
# the following line must be called BEFORE "self.backend.assert_grid_correct()" !
self.backend.storage_deact_for_backward_comaptibility()
def _voltage_control(self, agent_action, prod_v_chronics):
"""
INTERNAL
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
Update the environment action "action_env" given a possibly new voltage setpoint for the generators. This
function can be overide for a more complex handling of the voltages.
It must update (if needed) the voltages of the environment action :attr:`BaseEnv.env_modification`
Parameters
----------
agent_action: :class:`grid2op.Action.Action`
The action performed by the player (or do nothing is player action were not legal or ambiguous)
prod_v_chronics: ``numpy.ndarray`` or ``None``
The voltages that has been specified in the chronics
"""
volt_control_act = self._voltage_controler.fix_voltage(
self.current_obs, agent_action, self._env_modification, prod_v_chronics
)
return volt_control_act
[docs] def set_chunk_size(self, new_chunk_size):
"""
For an efficient data pipeline, it can be usefull to not read all part of the input data
(for example for load_p, prod_p, load_q, prod_v). Grid2Op support the reading of large chronics by "chunk"
of given size.
Reading data in chunk can also reduce the memory footprint, useful in case of multiprocessing environment while
large chronics.
It is critical to set a small chunk_size in case of training machine learning algorithm (reinforcement
learning agent) at the beginning when the agent performs poorly, the software might spend most of its time
loading the data.
**NB** this has no effect if the chronics does not support this feature.
**NB** The environment need to be **reset** for this to take effect (it won't affect the chronics already
loaded)
Parameters
----------
new_chunk_size: ``int`` or ``None``
The new chunk size (positive integer)
Examples
---------
Here is an example on how to use this function
.. code-block:: python
import grid2op
# I create an environment
env = grid2op.make("l2rpn_case14_sandbox", test=True)
env.set_chunk_size(100)
env.reset() # otherwise chunk size has no effect !
# and now data will be read from the hard drive 100 time steps per 100 time steps
# instead of the whole episode at once.
"""
if new_chunk_size is None:
self.chronics_handler.set_chunk_size(new_chunk_size)
return
try:
new_chunk_size = int(new_chunk_size)
except Exception as exc_:
raise Grid2OpException(
"Impossible to set the chunk size. It should be convertible a integer, and not"
'{}. The error was: \n"{}"'.format(new_chunk_size, exc_)
)
if new_chunk_size <= 0:
raise Grid2OpException(
'Impossible to read less than 1 data at a time. Please make sure "new_chunk_size"'
"is a positive integer."
)
self.chronics_handler.set_chunk_size(new_chunk_size)
[docs] def simulate(self, action):
"""
Another method to call `obs.simulate` to ensure compatibility between multi environment and
regular one.
Parameters
----------
action:
A grid2op action
Returns
-------
Same return type as :func:`grid2op.Environment.BaseEnv.step` or
:func:`grid2op.Observation.BaseObservation.simulate`
Notes
-----
Prefer using `obs.simulate` if possible, it will be faster than this function.
"""
return self.get_obs().simulate(action)
[docs] def set_id(self, id_: Union[int, str]) -> None:
"""
Set the id that will be used at the next call to :func:`Environment.reset`.
**NB** this has no effect if the chronics does not support this feature.
**NB** The environment need to be **reset** for this to take effect.
.. versionchanged:: 1.6.4
`id_` can now be a string instead of an integer. You can call something like
`env.set_id("0000")` or `env.set_id("Scenario_april_000")`
or `env.set_id("2050-01-03_0")` (depending on your environment)
to use the right time series.
.. seealso::
function :func:`Environment.reset` for extra information
.. versionchanged:: 1.9.8
Starting from version 1.9.8 you can directly set the time serie id when calling
reset.
.. warning::
If the "time serie generator" you use is on standard (*eg* it is random in some sense)
and if you want fully reproducible results, you should first call `env.set_id(...)` and
then call `env.seed(...)` (and of course `env.reset()`)
Calling `env.seed(...)` and then `env.set_id(...)` might not behave the way you want.
In this case, it is much better to use the function
`reset(seed=..., options={"time serie id": ...})` directly.
Parameters
----------
id_: ``int``
the id of the chronics used.
Examples
--------
Here an example that will loop 10 times through the same chronics (always using the same injection then):
.. code-block:: python
import grid2op
from grid2op import make
from grid2op.BaseAgent import DoNothingAgent
env = make("l2rpn_case14_sandbox") # create an environment
agent = DoNothingAgent(env.action_space) # create an BaseAgent
for i in range(10):
env.set_id(0) # tell the environment you simply want to use the chronics with ID 0
obs = env.reset() # it is necessary to perform a reset
reward = env.reward_range[0]
done = False
while not done:
act = agent.act(obs, reward, done)
obs, reward, done, info = env.step(act)
And here you have an example on how you can loop through the scenarios in a given order:
.. code-block:: python
import grid2op
from grid2op import make
from grid2op.BaseAgent import DoNothingAgent
env = make("l2rpn_case14_sandbox") # create an environment
agent = DoNothingAgent(env.action_space) # create an BaseAgent
scenario_order = [1,2,3,4,5,10,8,6,5,7,78, 8]
for id_ in scenario_order:
env.set_id(id_) # tell the environment you simply want to use the chronics with ID 0
obs = env.reset() # it is necessary to perform a reset
reward = env.reward_range[0]
done = False
while not done:
act = agent.act(obs, reward, done)
obs, reward, done, info = env.step(act)
"""
if isinstance(id_, str):
# new in grid2op 1.6.4
self.chronics_handler.tell_id(id_, previous=True)
return
try:
id_ = int(id_)
except Exception as exc_:
raise EnvError(
'the "id_" parameters should be convertible to integer and not be of type {}'
'with error \n"{}"'.format(type(id_), exc_)
)
self.chronics_handler.tell_id(id_ - 1)
[docs] def attach_renderer(self, graph_layout=None):
"""
This function will attach a renderer, necessary to use for plotting capabilities.
Parameters
----------
graph_layout: ``dict``
Here for backward compatibility. Currently not used.
If you want to set a specific layout call :func:`BaseEnv.attach_layout`
If ``None`` this class will use the default substations layout provided when the environment was created.
Otherwise it will use the data provided.
Examples
---------
Here is how to use the function
.. code-block:: python
import grid2op
# create the environment
env = grid2op.make("l2rpn_case14_sandbox")
if False:
# if you want to change the default layout of the powergrid
# assign coordinates (0., 0.) to all substations (this is a dummy thing to do here!)
layout = {sub_name: (0., 0.) for sub_name in env.name_sub}
env.attach_layout(layout)
# NB again, this code will make everything look super ugly !!!! Don't change the
# default layout unless you have a reason to.
# and if you want to use the renderer
env.attach_renderer()
# and now you can "render" (plot) the state of the grid
obs = env.reset()
done = False
reward = env.reward_range[0]
while not done:
env.render()
action = agent.act(obs, reward, done)
obs, reward, done, info = env.step(action)
"""
# Viewer already exists: skip
if self._viewer is not None:
return
# Do we have the dependency
try:
from grid2op.PlotGrid import PlotMatplot
except ImportError:
err_msg = (
"Cannot attach renderer: missing dependency\n"
"Please install matplotlib or run pip install grid2op[optional]"
)
raise Grid2OpException(err_msg) from None
self._viewer = PlotMatplot(self._observation_space)
self.viewer_fig = None
# Set renderer modes
self.metadata = {"render.modes": ["silent", "rgb_array"]} # "human",
def __str__(self):
return "<{} instance named {}>".format(type(self).__name__, self.name)
# TODO be closer to original gym implementation
[docs] def reset_grid(self,
init_act_opt : Optional[BaseAction]=None,
method:Literal["combine", "ignore"]="combine"):
"""
INTERNAL
.. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\
This is automatically called when using `env.reset`
Reset the backend to a clean state by reloading the powergrid from the hard drive.
This might takes some time.
If the thermal has been modified, it also modify them into the new backend.
"""
self.backend.reset(
self._init_grid_path,
) # the real powergrid of the environment
# self.backend.assert_grid_correct()
if self._thermal_limit_a is not None:
self.backend.set_thermal_limit(self._thermal_limit_a.astype(dt_float))
self._backend_action = self._backend_action_class()
self.nb_time_step = -1 # to have init obs at step 1 (and to prevent 'setting to proper state' "action" to be illegal)
init_action = None
if not self._parameters.IGNORE_INITIAL_STATE_TIME_SERIE:
# load the initial state from the time series (default)
# TODO logger: log that
init_action : BaseAction = self.chronics_handler.get_init_action(self._names_chronics_to_backend)
else:
# do as if everything was connected to busbar 1
# TODO logger: log that
init_action = self._helper_action_env({"set_bus": np.ones(type(self).dim_topo, dtype=dt_int)})
if type(self).shunts_data_available:
init_action += self._helper_action_env({"shunt": {"set_bus": np.ones(type(self).n_shunt, dtype=dt_int)}})
if init_action is None:
# default behaviour for grid2op < 1.10.2
init_action = self._helper_action_env({})
else:
# remove the "change part" of the action
init_action.remove_change()
if init_act_opt is not None:
init_act_opt.remove_change()
if method == "combine":
init_action._add_act_and_remove_line_status_only_set(init_act_opt)
elif method == "ignore":
init_action = init_act_opt
else:
raise Grid2OpException(f"kwargs `method` used to set the initial state of the grid "
f"is not understood (use one of `combine` or `ignore` and "
f"not `{method}`)")
init_action._set_topo_vect.nonzero()
*_, fail_to_start, info = self.step(init_action)
if fail_to_start:
raise Grid2OpException(
"Impossible to initialize the powergrid, the powerflow diverge at iteration 0. "
"Available information are: {}".format(info)
)
if info["exception"] and init_action.can_affect_something():
raise Grid2OpException(f"There has been an error at the initialization, most likely due to a "
f"incorrect 'init state'. You need to change either the time series used (chronics, chronics_handler, "
f"gridvalue, etc.) or the 'init state' option provided in "
f"`env.reset(..., options={'init state': XXX, ...})`. Error was: {info['exception']}")
# assign the right
self._observation_space.set_real_env_kwargs(self)
[docs] def add_text_logger(self, logger=None):
"""
Add a text logger to this :class:`Environment`
Logging is for now an incomplete feature, really incomplete (not used)
Parameters
----------
logger:
The logger to use
"""
self.logger = logger
return self
def _aux_get_skip_ts(self, options):
skip_ts = None
if options is not None and "init ts" in options:
try:
skip_ts = int(options["init ts"])
except ValueError as exc_:
raise Grid2OpException("In `env.reset` the kwargs `init ts` should be convertible to an int") from exc_
if skip_ts != options["init ts"]:
raise Grid2OpException(f"In `env.reset` the kwargs `init ts` should be convertible to an int, found {options['init ts']}")
return skip_ts
[docs] def reset(self,
*,
seed: Union[int, None] = None,
options: RESET_OPTIONS_TYPING = None) -> BaseObservation:
"""
Reset the environment to a clean state.
It will reload the next chronics if any. And reset the grid to a clean state.
This triggers a full reloading of both the chronics (if they are stored as files) and of the powergrid,
to ensure the episode is fully over.
This method should be called only at the end of an episode.
Parameters
----------
seed: int
The seed to used (new in version 1.9.8), see examples for more details. Ignored if not set (meaning no seeds will
be used, experiments might not be reproducible)
options: dict
Some options to "customize" the reset call. For example specifying the "time serie id" (grid2op >= 1.9.8) to use
or the "initial state of the grid" (grid2op >= 1.10.2) or to
start the episode at some specific time in the time series (grid2op >= 1.10.3) with the
"init ts" key.
See examples for more information about this. Ignored if
not set.
Examples
--------
The standard "gym loop" can be done with the following code:
.. code-block:: python
import grid2op
# create the environment
env_name = "l2rpn_case14_sandbox"
env = grid2op.make(env_name)
# start a new episode
obs = env.reset()
done = False
reward = env.reward_range[0]
while not done:
action = agent.act(obs, reward, done)
obs, reward, done, info = env.step(action)
.. versionadded:: 1.9.8
It is now possible to set the seed and the time series you want to use at the new
episode by calling `env.reset(seed=..., options={"time serie id": ...})`
Before version 1.9.8, if you wanted to use a fixed seed, you would need to (see
doc of :func:`Environment.seed` ):
.. code-block:: python
seed = ...
env.seed(seed)
obs = env.reset()
...
Starting from version 1.9.8 you can do this in one call:
.. code-block:: python
seed = ...
obs = env.reset(seed=seed)
For the "time series id" it is the same concept. Before you would need to do (see
doc of :func:`Environment.set_id` for more information ):
.. code-block:: python
time_serie_id = ...
env.set_id(time_serie_id)
obs = env.reset()
...
And now (from version 1.9.8) you can more simply do:
.. code-block:: python
time_serie_id = ...
obs = env.reset(options={"time serie id": time_serie_id})
...
.. versionadded:: 1.10.2
Another feature has been added in version 1.10.2, which is the possibility to set the
grid to a given "topological" state at the first observation (before this version,
you could only retrieve an observation with everything connected together).
In grid2op 1.10.2, you can do that by using the keys `"init state"` in the "options" kwargs of
the reset function. The value associated to this key should be dictionnary that can be
converted to a non ambiguous grid2op action using an "action space".
.. notes::
The "action space" used here is not the action space of the agent. It's an "action
space" that uses a :grid2op:`Action.Action.BaseAction` class meaning you can do any
type of action, on shunts, on topology, on line status etc. even if the agent is not
allowed to.
Likewise, nothing check if this action is legal or not.
You can use it like this:
.. code-block:: python
# to start an episode with a line disconnected, you can do:
init_state_dict = {"set_line_status": [(0, -1)]}
obs = env.reset(options={"init state": init_state_dict})
obs.line_status[0] is False
# to start an episode with a different topolovy
init_state_dict = {"set_bus": {"lines_or_id": [(0, 2)], "lines_ex_id": [(3, 2)]}}
obs = env.reset(options={"init state": init_state_dict})
.. note::
Since grid2op version 1.10.2, there is also the possibility to set the "initial state"
of the grid directly in the time series. The priority is always given to the
argument passed in the "options" value.
Concretely if, in the "time series" (formelly called "chronics") provides an action would change
the topology of substation 1 and 2 (for example) and you provide an action that disable the
line 6, then the initial state will see substation 1 and 2 changed (as in the time series)
and line 6 disconnected.
Another example in this case: if the action you provide would change topology of substation 2 and 4
then the initial state (after `env.reset`) will give:
- substation 1 as in the time serie
- substation 2 as in "options"
- substation 4 as in "options"
.. note::
Concerning the previously described behaviour, if you want to ignore the data in the
time series, you can add : `"method": "ignore"` in the dictionary describing the action.
In this case the action in the time series will be totally ignored and the initial
state will be fully set by the action passed in the "options" dict.
An example is:
.. code-block:: python
init_state_dict = {"set_line_status": [(0, -1)], "method": "force"}
obs = env.reset(options={"init state": init_state_dict})
obs.line_status[0] is False
.. versionadded:: 1.10.3
Another feature has been added in version 1.10.3, the possibility to skip the
some steps of the time series and starts at some given steps.
The time series often always start at a given day of the week (*eg* Monday)
and at a given time (*eg* midnight). But for some reason you notice that your
agent performs poorly on other day of the week or time of the day. This might be
because it has seen much more data from Monday at midnight that from any other
day and hour of the day.
To alleviate this issue, you can now easily reset an episode and ask grid2op
to start this episode after xxx steps have "passed".
Concretely, you can do it with:
.. code-block:: python
import grid2op
env_name = "l2rpn_case14_sandbox"
env = grid2op.make(env_name)
obs = env.reset(options={"init ts": 1})
Doing that your agent will start its episode not at midnight (which
is the case for this environment), but at 00:05
If you do:
.. code-block:: python
obs = env.reset(options={"init ts": 12})
In this case, you start the episode at 01:00 and not at midnight (you
start at what would have been the 12th steps)
If you want to start the "next day", you can do:
.. code-block:: python
obs = env.reset(options={"init ts": 288})
etc.
.. note::
On this feature, if a powerline is on soft overflow (meaning its flow is above
the limit but below the :attr:`grid2op.Parameters.Parameters.HARD_OVERFLOW_THRESHOLD` * `the limit`)
then it is still connected (of course) and the counter
:attr:`grid2op.Observation.BaseObservation.timestep_overflow` is at 0.
If a powerline is on "hard overflow" (meaning its flow would be above
:attr:`grid2op.Parameters.Parameters.HARD_OVERFLOW_THRESHOLD` * `the limit`), then, as it is
the case for a "normal" (without options) reset, this line is disconnected, but can be reconnected
directly (:attr:`grid2op.Observation.BaseObservation.time_before_cooldown_line` == 0)
.. seealso::
The function :func:`Environment.fast_forward_chronics` for an alternative usage (that will be
deprecated at some point)
Yet another feature has been added in grid2op version 1.10.3 in this `env.reset` function. It is
the capacity to limit the duration of an episode.
.. code-block:: python
import grid2op
env_name = "l2rpn_case14_sandbox"
env = grid2op.make(env_name)
obs = env.reset(options={"max step": 288})
This will limit the duration to 288 steps (1 day), meaning your agent
will have successfully managed the entire episode if it manages to keep
the grid in a safe state for a whole day (depending on the environment you are
using the default duration is either one week - roughly 2016 steps or 4 weeks)
.. note::
This option only affect the current episode. It will have no impact on the
next episode (after reset)
For example:
.. code-block:: python
obs = env.reset()
obs.max_step == 8064 # default for this environment
obs = env.reset(options={"max step": 288})
obs.max_step == 288 # specified by the option
obs = env.reset()
obs.max_step == 8064 # retrieve the default behaviour
.. seealso::
The function :func:`Environment.set_max_iter` for an alternative usage with the different
that `set_max_iter` is permenanent: it impacts all the future episodes and not only
the next one.
"""
# process the "options" kwargs
# (if there is an init state then I need to process it to remove the
# some keys)
self._max_step = None
method = "combine"
init_state = None
skip_ts = self._aux_get_skip_ts(options)
max_iter_int = None
if options is not None and "init state" in options:
act_as_dict = options["init state"]
if isinstance(act_as_dict, dict):
if "method" in act_as_dict:
method = act_as_dict["method"]
del act_as_dict["method"]
init_state : BaseAction = self._helper_action_env(act_as_dict)
elif isinstance(act_as_dict, BaseAction):
init_state = act_as_dict
else:
raise Grid2OpException("`init state` kwargs in `env.reset(, options=XXX) should either be a "
"grid2op action (instance of grid2op.Action.BaseAction) or a dictionaray "
f"representing an action. You provided {act_as_dict} which is a {type(act_as_dict)}")
ambiguous, except_tmp = init_state.is_ambiguous()
if ambiguous:
raise Grid2OpException("You provided an invalid (ambiguous) action to set the 'init state'") from except_tmp
init_state.remove_change()
super().reset(seed=seed, options=options)
if options is not None and "max step" in options:
# use the "max iter" provided in the options
max_iter_int = self._aux_check_max_iter(options["max step"])
if skip_ts is not None:
max_iter_chron = max_iter_int + skip_ts
else:
max_iter_chron = max_iter_int
self.chronics_handler._set_max_iter(max_iter_chron)
else:
# reset previous max iter to value set with `env.set_max_iter(...)` (or -1 by default)
self.chronics_handler._set_max_iter(self._max_iter)
self.chronics_handler.next_chronics()
self.chronics_handler.initialize(
self.backend.name_load,
self.backend.name_gen,
self.backend.name_line,
self.backend.name_sub,
names_chronics_to_backend=self._names_chronics_to_backend,
)
if max_iter_int is not None:
self._max_step = min(max_iter_int, self.chronics_handler.real_data.max_iter - (skip_ts if skip_ts is not None else 0))
else:
self._max_step = None
self._env_modification = None
self._reset_maintenance()
self._reset_redispatching()
self._reset_vectors_and_timings() # it need to be done BEFORE to prevent cascading failure when there has been
self.reset_grid(init_state, method)
if self.viewer_fig is not None:
del self.viewer_fig
self.viewer_fig = None
if skip_ts is not None:
self._reset_vectors_and_timings()
if skip_ts < 1:
raise Grid2OpException(f"In `env.reset` the kwargs `init ts` should be an int >= 1, found {options['init ts']}")
if skip_ts == 1:
self._init_obs = None
self.step(self.action_space())
elif skip_ts == 2:
self.fast_forward_chronics(1)
else:
self.fast_forward_chronics(skip_ts)
# if True, then it will not disconnect lines above their thermal limits
self._reset_vectors_and_timings() # and it needs to be done AFTER to have proper timings at tbe beginning
# the attention budget is reset above
# reset the opponent
self._oppSpace.reset()
# reset, if need, reward and other rewards
self._reward_helper.reset(self)
for extra_reward in self.other_rewards.values():
extra_reward.reset(self)
# and reset also the "simulated env" in the observation space
self._observation_space.reset(self)
self._observation_space.set_real_env_kwargs(self)
self._last_obs = None # force the first observation to be generated properly
if self._init_obs is not None:
self._reset_to_orig_state(self._init_obs)
return self.get_obs()
[docs] def render(self, mode="rgb_array"):
"""
Render the state of the environment on the screen, using matplotlib
Also returns the Matplotlib figure
Examples
--------
Rendering need first to define a "renderer" which can be done with the following code:
.. code-block:: python
import grid2op
# create the environment
env = grid2op.make("l2rpn_case14_sandbox")
# if you want to use the renderer
env.attach_renderer()
# and now you can "render" (plot) the state of the grid
obs = env.reset()
done = False
reward = env.reward_range[0]
while not done:
env.render() # this piece of code plot the grid
action = agent.act(obs, reward, done)
obs, reward, done, info = env.step(action)
"""
# Try to create a plotter instance
# Does nothing if viewer exists
# Raises if matplot is not installed
self.attach_renderer()
# Check mode is correct
if mode not in self.metadata["render.modes"]:
err_msg = 'Renderer mode "{}" not supported. Available modes are {}.'
raise Grid2OpException(err_msg.format(mode, self.metadata["render.modes"]))
# Render the current observation
fig = self._viewer.plot_obs(
self.current_obs, figure=self.viewer_fig, redraw=True
)
# First time show for human mode
if self.viewer_fig is None and mode == "human":
fig.show()
else: # Update the figure content
fig.canvas.draw()
# Store to re-use the figure
self.viewer_fig = fig
# Return the rgb array
rgb_array = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8).reshape(self._viewer.height, self._viewer.width, 3)
return rgb_array
def _custom_deepcopy_for_copy(self, new_obj):
new_obj.metadata = copy.deepcopy(self.metadata)
new_obj.spec = copy.deepcopy(self.spec)
new_obj._compat_glop_version = self._compat_glop_version
new_obj._max_iter = self._max_iter
new_obj._max_step = self._max_step
new_obj._overload_name_multimix = self._overload_name_multimix
new_obj.multimix_mix_name = self.multimix_mix_name
super()._custom_deepcopy_for_copy(new_obj)
[docs] def copy(self) -> "Environment":
"""
Performs a deep copy of the environment
Unless you have a reason to, it is not advised to make copy of an Environment.
Examples
--------
It should be used as follow:
.. code-block:: python
import grid2op
env = grid2op.make("l2rpn_case14_sandbox")
cpy_of_env = env.copy()
"""
# res = copy.deepcopy(self) # painfully slow...
# create an empty "me"
my_cls = type(self)
res = my_cls.__new__(my_cls)
# fill its attribute
self._custom_deepcopy_for_copy(res)
return res
[docs] def get_kwargs(self,
with_backend=True,
with_chronics_handler=True,
with_backend_kwargs=False):
"""
This function allows to make another Environment with the same parameters as the one that have been used
to make this one.
This is useful especially in cases where Environment is not pickable (for example if some non pickable c++
code are used) but you still want to make parallel processing using "MultiProcessing" module. In that case,
you can send this dictionary to each child process, and have each child process make a copy of ``self``
**NB** This function should not be used to make a copy of an environment. Prefer using :func:`Environment.copy`
for such purpose.
Returns
-------
res: ``dict``
A dictionary that helps build an environment like ``self`` (which is NOT a copy of self) but rather
an instance of an environment with the same properties.
Examples
--------
It should be used as follow:
.. code-block:: python
import grid2op
from grid2op.Environment import Environment
env = grid2op.make("l2rpn_case14_sandbox") # create the environment of your choice
copy_of_env = Environment(**env.get_kwargs())
# And you can use this one as you would any other environment.
# NB this is not a "proper" copy. for example it will not be at the same step, it will be possible
# seeded with a different seed.
# use `env.copy()` to make a proper copy of an environment.
"""
res = {}
res["n_busbar"] = self._n_busbar
res["init_env_path"] = self._init_env_path
res["init_grid_path"] = self._init_grid_path
if with_chronics_handler:
res["chronics_handler"] = copy.deepcopy(self.chronics_handler)
res["chronics_handler"].cleanup_action_space()
# deals with the backend
if with_backend:
if not self.backend._can_be_copied:
raise RuntimeError("Impossible to get the kwargs for this "
"environment, the backend cannot be copied.")
res["backend"] = self.backend.copy()
res["backend"]._is_loaded = False # i can reload a copy of an environment
res["parameters"] = copy.deepcopy(self._parameters)
res["names_chronics_to_backend"] = copy.deepcopy(
self._names_chronics_to_backend
)
res["actionClass"] = self._actionClass_orig
res["observationClass"] = self._observationClass_orig
res["rewardClass"] = self._rewardClass
res["legalActClass"] = self._legalActClass
res["epsilon_poly"] = self._epsilon_poly
res["tol_poly"] = self._tol_poly
res["thermal_limit_a"] = self._thermal_limit_a
res["voltagecontrolerClass"] = self._voltagecontrolerClass
res["other_rewards"] = {k: v.rewardClass for k, v in self.other_rewards.items()}
res["name"] = self.name
res["_raw_backend_class"] = self._raw_backend_class
if with_backend_kwargs:
# used for multi processing, to pass exactly the
# right things when building the backends
# in each sub process
res["_backend_kwargs"] = self.backend._my_kwargs
res["with_forecast"] = self.with_forecast
res["opponent_space_type"] = self._opponent_space_type
res["opponent_action_class"] = self._opponent_action_class
res["opponent_class"] = self._opponent_class
res["opponent_init_budget"] = self._opponent_init_budget
res["opponent_budget_per_ts"] = self._opponent_budget_per_ts
res["opponent_budget_class"] = self._opponent_budget_class
res["opponent_attack_duration"] = self._opponent_attack_duration
res["opponent_attack_cooldown"] = self._opponent_attack_cooldown
res["kwargs_opponent"] = self._kwargs_opponent
res["attention_budget_cls"] = self._attention_budget_cls
res["kwargs_attention_budget"] = copy.deepcopy(self._kwargs_attention_budget)
res["has_attention_budget"] = self._has_attention_budget
res["_read_from_local_dir"] = self._read_from_local_dir
res["kwargs_observation"] = copy.deepcopy(self._kwargs_observation)
res["logger"] = self.logger
res["observation_bk_class"] = self._observation_bk_class
res["observation_bk_kwargs"] = self._observation_bk_kwargs
return res
def _chronics_folder_name(self):
return "chronics"
[docs] def train_val_split(
self,
val_scen_id,
add_for_train="train",
add_for_val="val",
add_for_test=None,
test_scen_id=None,
remove_from_name=None,
deep_copy=False,
):
"""
This function is used as :func:`Environment.train_val_split_random`.
Please refer to this the help of :func:`Environment.train_val_split_random` for more information about
this function.
Parameters
----------
val_scen_id: ``list``
List of the scenario names that will be placed in the validation set
test_scen_id: ``list``
.. versionadded:: 2.6.5
List of the scenario names that will be placed in the test set (only used
if add_for_test is not None - and mandatory in this case)
add_for_train: ``str``
See :func:`Environment.train_val_split_random` for more information
add_for_val: ``str``
See :func:`Environment.train_val_split_random` for more information
add_for_test: ``str``
.. versionadded:: 2.6.5
See :func:`Environment.train_val_split_random` for more information
remove_from_name: ``str``
See :func:`Environment.train_val_split_random` for more information
deep_copy: ``bool``
.. versionadded:: 2.6.5
See :func:`Environment.train_val_split_random` for more information
Returns
-------
nm_train: ``str``
See :func:`Environment.train_val_split_random` for more information
nm_val: ``str``
See :func:`Environment.train_val_split_random` for more information
nm_test: ``str``, optionnal
.. versionadded:: 2.6.5
See :func:`Environment.train_val_split_random` for more information
Examples
--------
A full example on a training / validation / test split with explicit specification of which
chronics goes in which scenarios is:
.. code-block:: python
import grid2op
import os
env_name = "l2rpn_case14_sandbox" # or any other...
env = grid2op.make(env_name)
# retrieve the names of the chronics:
full_path_data = env.chronics_handler.subpaths
chron_names = [os.path.split(el)[-1] for el in full_path_data]
# splitting into training / test, keeping the "last" 10 chronics to the test set
nm_env_train, m_env_val, nm_env_test = env.train_val_split(test_scen_id=chron_names[-10:], # last 10 in test set
add_for_test="test",
val_scen_id=chron_names[-20:-10], # last 20 to last 10 in val test
)
env_train = grid2op.make(env_name+"_train")
env_val = grid2op.make(env_name+"_val")
env_test = grid2op.make(env_name+"_test")
For a more simple example, with less parametrization and with random assignment (recommended),
please refer to the help of :func:`Environment.train_val_split_random`
**NB** read the "Notes" of this section for possible "unexpected" behaviour of the code snippet above.
On Some windows based platform, if you don't have an admin account nor a
"developer" account (see https://docs.python.org/3/library/os.html#os.symlink)
you might need to do:
.. code-block:: python
import grid2op
import os
env_name = "l2rpn_case14_sandbox" # or any other...
env = grid2op.make(env_name)
# retrieve the names of the chronics:
full_path_data = env.chronics_handler.subpaths
chron_names = [os.path.split(el)[-1] for el in full_path_data]
# splitting into training / test, keeping the "last" 10 chronics to the test set
nm_env_train, m_env_val, nm_env_test = env.train_val_split(test_scen_id=chron_names[-10:], # last 10 in test set
add_for_test="test",
val_scen_id=chron_names[-20:-10], # last 20 to last 10 in val test
deep_copy=True)
.. warning::
The above code will use much more memory on your hard drive than the version using symbolic links.
It will also be significantly slower !
As an "historical curiosity", this is what you needed to do in grid2op version < 1.6.5:
.. code-block:: python
import grid2op
import os
env_name = "l2rpn_case14_sandbox" # or any other...
env = grid2op.make(env_name)
# retrieve the names of the chronics:
full_path_data = env.chronics_handler.subpaths
chron_names = [os.path.split(el)[-1] for el in full_path_data]
# splitting into training / test, keeping the "last" 10 chronics to the test set
nm_env_trainval, nm_env_test = env.train_val_split(val_scen_id=chron_names[-10:],
add_for_val="test",
add_for_train="trainval")
# now splitting again the training set into training and validation, keeping the last 10 chronics
# of this environment for validation
env_trainval = grid2op.make(nm_env_trainval) # create the "trainval" environment
full_path_data = env_trainval.chronics_handler.subpaths
chron_names = [os.path.split(el)[-1] for el in full_path_data]
nm_env_train, nm_env_val = env_trainval.train_val_split(val_scen_id=chron_names[-10:],
remove_from_name="_trainval$")
# and now you can use the following code to load the environments:
env_train = grid2op.make(env_name+"_train")
env_val = grid2op.make(env_name+"_val")
env_test = grid2op.make(env_name+"_test")
Notes
------
We don't recommend you to use this function. It provides a great level of control on which
scenarios goes into which dataset, which is nice, but
"*with great power comes great responsibilities*".
Keep in mind that scenarios might be "sorted" by having some "month" in their names.
For example, the first k scenarios might be called "April_XXX"
and the last k ones having names with "September_XXX".
In general, we would not consider good practice to have all validation (or test) scenarios coming
from the same months. Keep that in mind if you use the code snippet above.
"""
# define all the locations
cls = type(self)
if re.match(cls.REGEX_SPLIT, add_for_train) is None:
raise EnvError(
f"The suffixes you can use for training data (add_for_train) "
f'should match the regex "{cls.REGEX_SPLIT}"'
)
if re.match(cls.REGEX_SPLIT, add_for_val) is None:
raise EnvError(
f"The suffixes you can use for validation data (add_for_val)"
f'should match the regex "{cls.REGEX_SPLIT}"'
)
if add_for_test is not None:
if re.match(cls.REGEX_SPLIT, add_for_test) is None:
raise EnvError(
f"The suffixes you can use for test data (add_for_test)"
f'should match the regex "{cls.REGEX_SPLIT}"'
)
if add_for_test is None and test_scen_id is not None:
raise EnvError(f"add_for_test is None and test_scen_id is not None.")
if add_for_test is not None and test_scen_id is None:
raise EnvError(f"add_for_test is not None and test_scen_id is None.")
from grid2op.Chronics import MultifolderWithCache, Multifolder
if not isinstance(
self.chronics_handler.real_data, (MultifolderWithCache, Multifolder)
):
raise EnvError(
"It does not make sense to split a environment between training / validation "
"if the chronics are not read from directories."
)
my_path = self.get_path_env()
path_train = os.path.split(my_path)
my_name = path_train[1]
if remove_from_name is not None:
if re.match(r"^[a-zA-Z0-9\\^\\$_]*$", remove_from_name) is None:
raise EnvError(
"The suffixes you can remove from the name of the environment (remove_from_name)"
'should match the regex "^[a-zA-Z0-9^$_]*$"'
)
my_name = re.sub(remove_from_name, "", my_name)
nm_train = f"{my_name}_{add_for_train}"
path_train = os.path.join(path_train[0], nm_train)
path_val = os.path.split(my_path)
nm_val = f"{my_name}_{add_for_val}"
path_val = os.path.join(path_val[0], nm_val)
nm_test = None
path_test = None
if add_for_test is not None:
path_test = os.path.split(my_path)
nm_test = f"{my_name}_{add_for_test}"
path_test = os.path.join(path_test[0], nm_test)
chronics_dir = self._chronics_folder_name()
# create the folder
if os.path.exists(path_val):
raise RuntimeError(
f"Impossible to create the validation environment that should have the name "
f'"{nm_val}" because an environment is already named this way. If you want to '
f'continue either delete the folder "{path_val}" or name your validation environment '
f"differently "
f'using the "add_for_val" keyword argument of this function.'
)
if os.path.exists(path_train):
raise RuntimeError(
f"Impossible to create the training environment that should have the name "
f'"{nm_train}" because an environment is already named this way. If you want to '
f'continue either delete the folder "{path_train}" or name your training environment '
f" differently "
f'using the "add_for_train" keyword argument of this function.'
)
if nm_test is not None and os.path.exists(path_test):
raise RuntimeError(
f"Impossible to create the test environment that should have the name "
f'"{nm_test}" because an environment is already named this way. If you want to '
f'continue either delete the folder "{path_test}" or name your test environment '
f" differently "
f'using the "add_for_test" keyword argument of this function.'
)
os.mkdir(path_val)
os.mkdir(path_train)
if nm_test is not None:
os.mkdir(path_test)
# assign which chronics goes where
chronics_path = os.path.join(my_path, chronics_dir)
all_chron = sorted(os.listdir(chronics_path))
to_val = set(val_scen_id)
to_test = set() # see https://github.com/rte-france/Grid2Op/issues/363
if nm_test is not None:
to_test = set(test_scen_id)
if deep_copy:
import shutil
copy_file_fun = shutil.copy2
copy_dir_fun = shutil.copytree
else:
copy_file_fun = os.symlink
copy_dir_fun = os.symlink
# "copy" the files
for el in os.listdir(my_path):
tmp_path = os.path.join(my_path, el)
if os.path.isfile(tmp_path):
# this is a regular env file
copy_file_fun(tmp_path, os.path.join(path_train, el))
copy_file_fun(tmp_path, os.path.join(path_val, el))
if nm_test is not None:
copy_file_fun(tmp_path, os.path.join(path_test, el))
elif os.path.isdir(tmp_path):
if el == chronics_dir:
# this is the chronics folder
os.mkdir(os.path.join(path_train, chronics_dir))
os.mkdir(os.path.join(path_val, chronics_dir))
if nm_test is not None:
os.mkdir(os.path.join(path_test, chronics_dir))
for chron_name in all_chron:
tmp_path_chron = os.path.join(tmp_path, chron_name)
if chron_name in to_val:
copy_dir_fun(
tmp_path_chron,
os.path.join(path_val, chronics_dir, chron_name),
)
elif chron_name in to_test:
copy_dir_fun(
tmp_path_chron,
os.path.join(path_test, chronics_dir, chron_name),
)
else:
copy_dir_fun(
tmp_path_chron,
os.path.join(path_train, chronics_dir, chron_name),
)
if add_for_test is None:
res = nm_train, nm_val
else:
res = nm_train, nm_val, nm_test
return res
[docs] def train_val_split_random(
self,
pct_val=10.0,
add_for_train="train",
add_for_val="val",
add_for_test=None,
pct_test=None,
remove_from_name=None,
deep_copy=False,
):
"""
By default a grid2op environment contains multiple "scenarios" containing values for all the producers
and consumers representing multiple days. In a "game like" environment, you can think of the scenarios as
being different "game levels": different mazes in pacman, different levels in mario etc.
We recommend to train your agent on some of these "chroncis" (aka levels) and test the performance of your
agent on some others, to avoid overfitting.
This function allows to easily split an environment into different part. This is most commonly used in machine
learning where part of a dataset is used for training and another part is used for assessing the performance
of the trained model.
This function rely on "symbolic link" and will not duplicate data.
New created environments will behave like regular grid2op environment and will be accessible with "make" just
like any others (see the examples section for more information).
This function will make the split at random. If you want more control on the which scenarios to use for
training and which for validation, use the :func:`Environment.train_val_split` that allows to specify
which scenarios goes in the validation environment (and the others go in the training environment).
Parameters
----------
pct_val: ``float``
Percentage of chronics that will go to the validation set.
For 10% of the chronics, set it to 10. and NOT to 0.1.
add_for_train: ``str``
Suffix that will be added to the name of the environment for the training set. We don't recommend to
modify the default value ("train")
add_for_val: ``str``
Suffix that will be added to the name of the environment for the validation set. We don't recommend to
modify the default value ("val")
add_for_test: ``str``, (optional)
.. versionadded:: 2.6.5
Suffix that will be added to the name of the environment for the test set. By default,
it only splits into training and validation, so this is ignored. We recommend
to assign it to "test" if you want to split into training / validation and test.
If it is set, then the `pct_test` must also be set.
pct_test: ``float``, (optional)
.. versionadded:: 2.6.5
Percentage of chronics that will go to the test set.
For 10% of the chronics, set it to 10. and NOT to 0.1.
(If you set it, you need to set the `add_for_test` argument.)
remove_from_name: ``str``
If you "split" an environment multiple times, this allows you to keep "short" names (for example
you will be able to call `grid2op.make(env_name+"_train")` instead of
`grid2op.make(env_name+"_train_train")`)
deep_copy: ``bool``
.. versionadded:: 2.6.5
A function to specify to "copy" the elements of the original
environment to the created one. By default it will save as
much memory as possible using symbolic links (rather than performing
copies). By default it does use symbolic links (`deep_copy=False`).
.. note::
If set to ``True`` the new environment will take much more space
on the hard drive, and the execution of this function will
be much slower !
.. warning::
On windows based system, you will most likely run into issues
if you don't set this parameters.
Indeed, Windows does not link symbolink links
(https://docs.python.org/3/library/os.html#os.symlink).
In this case, you can use the ``deep_copy=True`` and
it will work fine (examples in the function
:func:`Environment.train_val_split`)
Returns
-------
nm_train: ``str``
Complete name of the "training" environment
nm_val: ``str``
Complete name of the "validation" environment
nm_test: ``str``, optionnal
.. versionadded:: 2.6.5
Complete name of the "test" environment. It is only returned if
`add_for_test` and `pct_test` are not `None`.
Examples
--------
This function can be used like:
.. code-block:: python
import grid2op
env_name = "l2rpn_case14_sandbox" # or any other...
env = grid2op.make(env_name)
# extract 1% of the "chronics" to be used in the validation environment. The other 99% will
# be used for test
nm_env_train, nm_env_val = env.train_val_split_random(pct_val=1.)
# and now you can use the training set only to train your agent:
print(f"The name of the training environment is \\"{nm_env_train}\\"")
print(f"The name of the validation environment is \\"{nm_env_val}\\"")
env_train = grid2op.make(nm_env_train)
And even after you close the python session, you can still use this environment for training. If you used
the exact code above that will look like:
.. code-block:: python
import grid2op
env_name_train = "l2rpn_case14_sandbox_train" # depending on the option you passed above
env_train = grid2op.make(env_name_train)
.. versionadded:: 2.6.5
Possibility to create a training, validation AND test set.
If you have grid2op version >= 1.6.5, you can also use the following:
.. code-block:: python
import grid2op
env_name = "l2rpn_case14_sandbox" # or any other...
env = grid2op.make(env_name)
# extract 1% of the "chronics" to be used in the validation environment. The other 99% will
# be used for test
nm_env_train, nm_env_val, nm_env_test = env.train_val_split_random(pct_val=1., pct_test=1.)
# and now you can use the training set only to train your agent:
print(f"The name of the training environment is \\"{nm_env_train}\\"")
print(f"The name of the validation environment is \\"{nm_env_val}\\"")
print(f"The name of the test environment is \\"{nm_env_test}\\"")
env_train = grid2op.make(nm_env_train)
.. warning::
In this case this function returns 3 elements and not 2 !
Notes
-----
This function will fail if an environment already exists with one of the name that would be given
to the training environment or the validation environment (or test environment).
"""
if re.match(self.REGEX_SPLIT, add_for_train) is None:
raise EnvError(
"The suffixes you can use for training data (add_for_train) "
'should match the regex "{self.REGEX_SPLIT}"'
)
if re.match(self.REGEX_SPLIT, add_for_val) is None:
raise EnvError(
"The suffixes you can use for validation data (add_for_val)"
'should match the regex "{self.REGEX_SPLIT}"'
)
if add_for_test is None and pct_test is not None:
raise EnvError(f"add_for_test is None and pct_test is not None.")
if add_for_test is not None and pct_test is None:
raise EnvError(f"add_for_test is not None and pct_test is None.")
my_path = self.get_path_env()
chronics_path = os.path.join(my_path, self._chronics_folder_name())
all_chron = sorted(os.listdir(chronics_path))
all_chron = [
el for el in all_chron if os.path.isdir(os.path.join(chronics_path, el))
]
nb_init = len(all_chron)
to_val = self.space_prng.choice(
all_chron, size=int(nb_init * pct_val * 0.01), replace=False
)
test_scen_id = None
if pct_test is not None:
all_chron = set(all_chron) - set(to_val)
all_chron = list(all_chron)
test_scen_id = self.space_prng.choice(
all_chron, size=int(nb_init * pct_test * 0.01), replace=False
)
return self.train_val_split(
to_val,
add_for_train=add_for_train,
add_for_val=add_for_val,
remove_from_name=remove_from_name,
add_for_test=add_for_test,
test_scen_id=test_scen_id,
deep_copy=deep_copy,
)
[docs] def get_params_for_runner(self):
"""
This method is used to initialize a proper :class:`grid2op.Runner.Runner` to use this specific environment.
Examples
--------
It should be used as followed:
.. code-block:: python
import grid2op
from grid2op.Runner import Runner
from grid2op.Agent import DoNothingAgent # for example
env = grid2op.make("l2rpn_case14_sandbox") # create the environment of your choice
# create the proper runner
runner = Runner(**env.get_params_for_runner(), agentClass=DoNothingAgent)
# now you can run
runner.run(nb_episode=1) # run for 1 episode
"""
res = {}
res["init_env_path"] = self._init_env_path
res["init_grid_path"] = self._init_grid_path
res["path_chron"] = self.chronics_handler.path
res["parameters_path"] = self._parameters.to_dict()
res["names_chronics_to_backend"] = self._names_chronics_to_backend
res["actionClass"] = self._actionClass_orig
res["observationClass"] = self._observationClass_orig
res["rewardClass"] = copy.deepcopy(self._rewardClass)
res["legalActClass"] = self._legalActClass
res["envClass"] = Environment # TODO !
res["gridStateclass"] = self.chronics_handler.chronicsClass
res["backendClass"] = self._raw_backend_class
res["_overload_name_multimix"] = self._overload_name_multimix
if hasattr(self.backend, "_my_kwargs"):
res["backend_kwargs"] = self.backend._my_kwargs
else:
msg_ = ("You are probably using a legacy backend class that cannot "
"be copied properly. Please upgrade your backend to the latest version.")
self.logger.warn(msg_)
warnings.warn(msg_)
res["backend_kwargs"] = None
res["verbose"] = False
dict_ = copy.deepcopy(self.chronics_handler.kwargs)
if "path" in dict_:
# path is handled elsewhere
del dict_["path"]
if self.chronics_handler.max_iter is not None:
res["max_iter"] = self.chronics_handler.max_iter
res["gridStateclass_kwargs"] = dict_
res["thermal_limit_a"] = self._thermal_limit_a
res["voltageControlerClass"] = self._voltagecontrolerClass
res["other_rewards"] = {k: v.rewardClass for k, v in self.other_rewards.items()}
res["grid_layout"] = self.grid_layout
res["name_env"] = self.name
res["n_busbar"] = self._n_busbar
res["opponent_space_type"] = self._opponent_space_type
res["opponent_action_class"] = self._opponent_action_class
res["opponent_class"] = self._opponent_class
res["opponent_init_budget"] = self._opponent_init_budget
res["opponent_budget_per_ts"] = self._opponent_budget_per_ts
res["opponent_budget_class"] = self._opponent_budget_class
res["opponent_attack_duration"] = self._opponent_attack_duration
res["opponent_attack_cooldown"] = self._opponent_attack_cooldown
res["opponent_kwargs"] = self._kwargs_opponent
res["attention_budget_cls"] = self._attention_budget_cls
res["kwargs_attention_budget"] = copy.deepcopy(self._kwargs_attention_budget)
res["has_attention_budget"] = self._has_attention_budget
res["_read_from_local_dir"] = self._read_from_local_dir
res["_local_dir_cls"] = self._local_dir_cls # should be transfered to the runner so that folder is not deleted while runner exists
res["logger"] = self.logger
res["kwargs_observation"] = copy.deepcopy(self._kwargs_observation)
res["observation_bk_class"] = self._observation_bk_class
res["observation_bk_kwargs"] = self._observation_bk_kwargs
res["_is_test"] = self._is_test # TODO not implemented !!
return res
@classmethod
def init_obj_from_kwargs(cls,
*,
other_env_kwargs,
init_env_path,
init_grid_path,
chronics_handler,
backend,
parameters,
name,
names_chronics_to_backend,
actionClass,
observationClass,
rewardClass,
legalActClass,
voltagecontrolerClass,
other_rewards,
opponent_space_type,
opponent_action_class,
opponent_class,
opponent_init_budget,
opponent_budget_per_ts,
opponent_budget_class,
opponent_attack_duration,
opponent_attack_cooldown,
kwargs_opponent,
with_forecast,
attention_budget_cls,
kwargs_attention_budget,
has_attention_budget,
logger,
kwargs_observation,
observation_bk_class,
observation_bk_kwargs,
_raw_backend_class,
_read_from_local_dir,
_local_dir_cls,
_overload_name_multimix,
n_busbar=DEFAULT_N_BUSBAR_PER_SUB
):
res = cls(init_env_path=init_env_path,
init_grid_path=init_grid_path,
chronics_handler=chronics_handler,
backend=backend,
parameters=parameters,
name=name,
names_chronics_to_backend=names_chronics_to_backend,
actionClass=actionClass,
observationClass=observationClass,
rewardClass=rewardClass,
legalActClass=legalActClass,
voltagecontrolerClass=voltagecontrolerClass,
other_rewards=other_rewards,
opponent_space_type=opponent_space_type,
opponent_action_class=opponent_action_class,
opponent_class=opponent_class,
opponent_init_budget=opponent_init_budget,
opponent_budget_per_ts=opponent_budget_per_ts,
opponent_budget_class=opponent_budget_class,
opponent_attack_duration=opponent_attack_duration,
opponent_attack_cooldown=opponent_attack_cooldown,
kwargs_opponent=kwargs_opponent,
with_forecast=with_forecast,
attention_budget_cls=attention_budget_cls,
kwargs_attention_budget=kwargs_attention_budget,
has_attention_budget=has_attention_budget,
logger=logger,
kwargs_observation=kwargs_observation,
observation_bk_class=observation_bk_class,
observation_bk_kwargs=observation_bk_kwargs,
n_busbar=int(n_busbar),
_raw_backend_class=_raw_backend_class,
_read_from_local_dir=_read_from_local_dir,
_local_dir_cls=_local_dir_cls,
_overload_name_multimix=_overload_name_multimix)
return res
[docs] def generate_data(self, nb_year=1, nb_core=1, seed=None, **kwargs):
"""This function uses the chronix2grid package to generate more data that will then
be available locally. You need to install it independently (see https://github.com/BDonnot/ChroniX2Grid#installation
for more information)
I also requires the lightsim2grid simulator.
This is only available for some environment (only the environment after 2022).
Generating data takes some time (around 1 - 2 minutes to generate a weekly scenario) and this why we recommend
to do it "offline" and then use the generated data for training or evaluation.
.. warning::
You should not start this function twice. Before starting a new run, make sure the previous one has terminated (otherwise you might
erase some previously generated scenario)
Examples
---------
The recommended process when you want to use this function is to first generate some more data:
.. code-block:: python
import grid2op
env = grid2op.make("l2rpn_wcci_2022")
env.generate_data(nb_year=XXX) # replace XXX by the amount of data you want. If you put 1 you will have 52 different
# scenarios
Then, later on, you can use it as you please, transparently:
.. code-block:: python
import grid2op
env = grid2op.make("l2rpn_wcci_2022")
obs = env.reset() # obs might come from the data you have generated
Parameters
----------
nb_year : int, optional
the number of "year" you want to generate. Each "year" is made of 52 weeks meaning that if you
ask to generate one year, you have 52 more scenarios, by default 1
nb_core : int, optional
number of computer cores to use, by default 1.
seed: int, optional
If the same seed is given, then the same data will be generated.
**kwargs:
key word arguments passed to `add_data` function of `chronix2grid.grid2op_utils` module
"""
try:
from chronix2grid.grid2op_utils import add_data
except ImportError as exc_:
raise ImportError(
f"Chronix2grid package is not installed. Install it with `pip install grid2op[chronix2grid]`"
f"Please visit https://github.com/bdonnot/chronix2grid#installation "
f"for further install instructions."
) from exc_
pot_file = None
if self.get_path_env() is not None:
pot_file = os.path.join(self.get_path_env(), "chronix2grid_adddata_kwargs.json")
if os.path.exists(pot_file) and os.path.isfile(pot_file):
import json
with open(pot_file, "r", encoding="utf-8") as f:
kwargs_default = json.load(f)
for el in kwargs_default:
if not el in kwargs:
kwargs[el] = kwargs_default[el]
# TODO logger here for the kwargs used (including seed=seed, nb_scenario=nb_year, nb_core=nb_core)
add_data(
env=self, seed=seed, nb_scenario=nb_year, nb_core=nb_core,
**kwargs
)
def _add_classes_in_files(self, sys_path, bk_type, are_classes_in_files):
if are_classes_in_files:
# then generate the proper classes
_PATH_GRID_CLASSES = bk_type._PATH_GRID_CLASSES
try:
bk_type._PATH_GRID_CLASSES = None
my_type_tmp = type(self).init_grid(gridobj=bk_type, _local_dir_cls=None)
txt_, cls_res_me = self._aux_gen_classes(my_type_tmp,
sys_path,
_add_class_output=True)
# then add the class to the init file
with open(os.path.join(sys_path, "__init__.py"), "a", encoding="utf-8") as f:
f.write(txt_)
finally:
# make sure to put back the correct _PATH_GRID_CLASSES
bk_type._PATH_GRID_CLASSES = _PATH_GRID_CLASSES