# Copyright (c) 2019-2022, RTE (https://www.rte-france.com)
# See AUTHORS.txt
# This Source Code Form is subject to the terms of the Mozilla Public License, version 2.0.
# If a copy of the Mozilla Public License, version 2.0 was not distributed with this file,
# you can obtain one at http://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
# This file is part of Grid2Op, Grid2Op a testbed platform to model sequential decision making in power systems.
import copy
from typing import Optional, Tuple
try:
from typing import Self
except ImportError:
from typing_extensions import Self
import numpy as np
import os
from scipy.optimize import minimize
from scipy.optimize import LinearConstraint
from grid2op.dtypes import dt_float
from grid2op.Environment import BaseEnv
from grid2op.Action import BaseAction
from grid2op.Backend import Backend
from grid2op.Observation.baseObservation import BaseObservation
from grid2op.Observation.highresSimCounter import HighResSimCounter
from grid2op.Exceptions import SimulatorError, InvalidRedispatching
[docs]class Simulator(object):
"""This class represents a "simulator". It allows to check the impact on this or that on th powergrid, quite
like what human operators have at their disposal in control rooms.
It behaves similarly to `env.step(...)` or `obs.simulate(...)` with a few key differences:
- you can "chain" the call to simulator: `simulator.predict(...).predict(...).predict(...)`
- it does not take into account the "time": no cooldown on lines nor substation, storage
"state of charge" (energy) does not decrease when you use them
- no automatic line disconnection: lines are not disconnected when they are above their limit
- no opponent will act on the grid
Please see the documentation for usage examples.
"""
[docs] def __init__(
self, backend: Optional[Backend], env: Optional[BaseEnv] = None, tol_redisp=1e-6,
_highres_sim_counter: Optional[HighResSimCounter] =None
):
# backend should be initiliazed !
if backend is not None:
if not isinstance(backend, Backend):
raise SimulatorError(
f'The "backend" argument should be an object '
f'of type "Backend" you provided {backend}'
)
if env is not None:
raise SimulatorError(
"When building a simulator with a grid2op backend "
'make sure you set the kwarg "env=None"'
)
if backend._can_be_copied:
self.backend: Backend = backend.copy()
else:
raise SimulatorError("Impossible to make a Simulator when you "
"cannot copy the backend.")
else:
if env is None:
raise SimulatorError(
"If you want to build a simulator with a blank / None "
'backend you should provide an environment (kwargs "env")'
)
if not isinstance(env, BaseEnv):
raise SimulatorError(
f"Make sure the environment you provided is "
f"a grid2op Environment (an object of a type "
f"inheriting from BaseEnv"
)
if env.backend._can_be_copied:
self.backend: Backend = env.backend.copy()
else:
raise SimulatorError("Impossible to make a Simulator when you "
"cannot copy the backend of the environment.")
self.current_obs: BaseObservation = None
self._converged: Optional[bool] = None
self._error: Optional[Exception] = None
self._tol_redisp: float = tol_redisp
if _highres_sim_counter is not None:
self._highres_sim_counter = _highres_sim_counter
else:
self._highres_sim_counter = HighResSimCounter()
@property
def converged(self) -> bool:
"""
Returns
-------
bool
Whether or not the powerflow has converged
"""
return self._converged
@converged.setter
def converged(self, values):
raise SimulatorError("Cannot set this property.")
[docs] def copy(self) -> Self:
"""Allows to perform a (deep) copy of the simulator.
Returns
-------
Simulator
A (deep) copy of the simulator you want to copy.
Raises
------
SimulatorError
In case the simulator is not initialized.
"""
if self.current_obs is None:
raise SimulatorError(
"Impossible to copy a non initialized Simulator. "
"Have you used `simulator.set_state(obs, ...)` with a valid observation before ?"
)
res = copy.copy(self)
res.backend = res.backend.copy()
res.current_obs = res.current_obs.copy()
# do not copy this !
res._highres_sim_counter = self._highres_sim_counter
return res
[docs] def change_backend(self, backend: Backend) -> None:
"""You can use this function in case you want to change the "solver" use to perform the computation.
For example, you could use a machine learning based model to do the computation (to accelerate them), provided
that you have at your disposal such an algorithm.
.. warning::
The backend you pass as argument should be initialized with the same grid as the one currently in use.
Notes
-----
Once changed, all the "simulator" that "derived" from this simulator will use the same backend types.
Parameters
----------
backend : Backend
Another grid2op backend you can use to perform the computation.
Raises
------
SimulatorError
When you do not pass a correct backend.
"""
if not isinstance(backend, Backend):
raise SimulatorError(
"when using change_backend function, the backend should"
" be an object (an not a class) of type backend"
)
self.backend.close()
self.backend = backend.copy() # backend_class.init_grid(type(self.backend))
self.set_state(obs=self.current_obs)
[docs] def change_backend_type(self, backend_type: type, grid_path: os.PathLike, **kwargs):
"""It allows to change the type of the backend used
Parameters
----------
backend_type : type
The new backend type
grid_path : os.PathLike
The path from where to load the powergrid
kwargs:
Extra arguments used to build the backend.
Notes
-----
Once changed, all the "simulator" that "derived" from this simulator will use the same backend types.
Raises
------
SimulatorError
if something went wrong (eg you do not pass a type, your type does not inherit from Backend, the file
located at `grid_path` does not exists etc.)
"""
if not isinstance(backend_type, type):
raise SimulatorError(
"when using change_backend_type function, the backend_type should"
" be a class an not an object"
)
if not issubclass(backend_type, Backend):
raise SimulatorError(
"when using change_backend_type function, the backend_type should"
" be subtype of class Backend"
)
if not os.path.exists(grid_path):
raise SimulatorError(
f'the supposed grid path "{grid_path}" does not exists'
)
if not os.path.isfile(grid_path):
raise SimulatorError(f'the supposed grid path "{grid_path}" if not a file')
if backend_type._IS_INIT:
backend_type_init = backend_type
else:
backend_type_init= backend_type.init_grid(type(self.backend))
tmp_backend = backend_type_init(**kwargs)
# load a forecasted grid if there are any
path_env, grid_name_with_ext = os.path.split(grid_path)
grid_name, ext = os.path.splitext(grid_name_with_ext)
grid_forecast_name = f"{grid_name}_forecast.{ext}"
if os.path.exists(os.path.join(path_env, grid_forecast_name)):
grid_path_loaded = os.path.join(path_env, grid_forecast_name)
else:
grid_path_loaded = grid_path
tmp_backend.load_grid(grid_path_loaded)
tmp_backend.assert_grid_correct()
tmp_backend.runpf()
tmp_backend.assert_grid_correct_after_powerflow()
tmp_backend.set_thermal_limit(self.backend.get_thermal_limit())
self.backend.close()
self.backend = tmp_backend
self.set_state(obs=self.current_obs)
[docs] def set_state(
self,
obs: Optional[BaseObservation] = None,
do_powerflow: bool = True,
new_gen_p: np.ndarray = None,
new_gen_v: np.ndarray = None,
new_load_p: np.ndarray = None,
new_load_q: np.ndarray = None,
update_thermal_limit: bool = True,
):
"""Set the state of the simulator to a given state described by an observation (and optionally some
new loads and generation)
Parameters
----------
obs : Optional[BaseObservation], optional
The observation to get the state from, by default None
do_powerflow : bool, optional
Whether to use the underlying backend to get a consistent state after
this modification or not, by default True
new_gen_p : np.ndarray, optional
new generator active setpoint, by default None
new_gen_v : np.ndarray, optional
new generator voltage setpoint, by default None
new_load_p : np.ndarray, optional
new load active consumption, by default None
new_load_q : np.ndarray, optional
new load reactive consumption, by default None
update_thermal_limit: bool, optional
Do you update the thermal limit of the backend (we recommend to leave it to `True`
otherwise some bugs can appear such as
https://github.com/rte-france/Grid2Op/issues/377)
Raises
------
SimulatorError
In case the current simulator is not initialized.
"""
if obs is not None:
self.current_obs = obs.copy()
if self.current_obs is None:
raise SimulatorError(
"The simulator is not initialized. Have you used `simulator.set_state(obs, ...)` with a valid observation before ?"
)
# you cannot use "simulate" of the observation in this class
self.current_obs._obs_env = None
self.current_obs._forecasted_inj = []
self.current_obs._forecasted_grid = []
# udpate the new state if needed
if new_load_p is not None:
self.current_obs.load_p[:] = new_load_p
if new_load_q is not None:
self.current_obs.load_q[:] = new_load_q
if new_gen_p is not None:
self.current_obs.gen_p[:] = new_gen_p
if new_gen_v is not None:
self.current_obs.gen_v[:] = new_gen_v
self._converged = None
self.error = None
self.backend.update_from_obs(self.current_obs, force_update=True)
if update_thermal_limit:
self.backend.update_thermal_limit_from_vect(self.current_obs.thermal_limit)
if do_powerflow:
self._do_powerflow()
def _do_powerflow(self):
self._highres_sim_counter.add_one()
self._converged, self._error = self.backend.runpf()
def _update_obs(self):
if self._converged:
self.current_obs._update_attr_backend(self.backend)
else:
self.current_obs.set_game_over()
def _adjust_controlable_gen(
self, new_gen_p: np.ndarray, target_dispatch: np.ndarray, sum_target: float
) -> Optional[float]:
nb_dispatchable = self.current_obs.gen_redispatchable.sum()
# which generators needs to be "optimized" -> the one where
# the target function matter
gen_in_target = np.abs(target_dispatch[self.current_obs.gen_redispatchable]) >= 1e-7
# compute the upper / lower bounds for the generators
dispatchable = new_gen_p[self.current_obs.gen_redispatchable]
val_min = (
self.current_obs.gen_pmin[self.current_obs.gen_redispatchable]
- dispatchable
)
val_max = (
self.current_obs.gen_pmax[self.current_obs.gen_redispatchable]
- dispatchable
)
# define the target function (things that will be minimized)
target_dispatch_redisp = target_dispatch[self.current_obs.gen_redispatchable]
coeffs = 1.0 / (
self.current_obs.gen_max_ramp_up
+ self.current_obs.gen_max_ramp_down
+ self._tol_redisp
)
weights = np.ones(nb_dispatchable) * coeffs[self.current_obs.gen_redispatchable]
weights /= weights.sum()
scale_objective = max(0.5 * np.abs(target_dispatch_redisp).sum() ** 2, 1.0)
scale_objective = np.round(scale_objective, decimals=4)
tmp_zeros = np.zeros((1, nb_dispatchable), dtype=float)
# wrap everything into the proper scipy form
def target(actual_dispatchable):
# define my real objective
quad_ = (
1e2
* (
actual_dispatchable[gen_in_target]
- target_dispatch_redisp[gen_in_target]
)
** 2
)
coeffs_quads = weights[gen_in_target] * quad_
coeffs_quads_const = coeffs_quads.sum()
coeffs_quads_const /= scale_objective # scaling the function
coeffs_quads_const += 1e-2 * (actual_dispatchable**2 * weights).sum()
return coeffs_quads_const
def jac(actual_dispatchable):
res_jac = 1.0 * tmp_zeros
res_jac[0, gen_in_target] = (
1e2
* 2.0
* weights[gen_in_target]
* (
actual_dispatchable[gen_in_target]
- target_dispatch_redisp[gen_in_target]
)
)
res_jac /= scale_objective # scaling the function
res_jac += 2e-2 * actual_dispatchable * weights
return res_jac
mat_sum_ok = np.ones((1, nb_dispatchable))
equality_const = LinearConstraint(
mat_sum_ok, sum_target - self._tol_redisp, sum_target + self._tol_redisp
)
ineq_const = LinearConstraint(np.eye(nb_dispatchable), lb=val_min, ub=val_max)
# objective function
def f(init):
this_res = minimize(
target,
init,
method="SLSQP",
constraints=[equality_const, ineq_const],
options={
"eps": self._tol_redisp,
"ftol": self._tol_redisp,
"disp": False,
},
jac=jac,
)
return this_res
# choose a good initial point (close to the solution)
# the idea here is to chose a initial point that would be close to the
# desired solution (split the (sum of the) dispatch to the available generators)
x0 = 1.0 * target_dispatch_redisp
can_adjust = np.abs(x0) <= 1e-7
if (can_adjust).any():
init_sum = x0.sum()
denom_adjust = (1.0 / weights[can_adjust]).sum()
if denom_adjust <= 1e-2:
# i don't want to divide by something too cloose to 0.
denom_adjust = 1.0
x0[can_adjust] = -init_sum / (weights[can_adjust] * denom_adjust)
res = f(x0.astype(float))
if res.success:
return res.x
else:
return None
def _amount_curtailed(
self, act: BaseAction, new_gen_p: np.ndarray
) -> Tuple[np.ndarray, float]:
curt_vect = 1.0 * act.curtail
curt_vect[curt_vect == -1.0] = 1.0
limit_curtail = curt_vect * act.gen_pmax
curtailed = np.maximum(new_gen_p - limit_curtail, 0.0)
curtailed[~act.gen_renewable] = 0.0
amount_curtail = curtailed.sum()
new_gen_p_after_curtail = 1.0 * new_gen_p
new_gen_p_after_curtail -= curtailed
return new_gen_p_after_curtail, amount_curtail
def _amount_storage(self, act: BaseAction) -> Tuple[float, np.ndarray]:
storage_act = 1.0 * act.storage_p
res = self.current_obs.storage_power_target.sum()
current_charge = 1.0 * self.current_obs.storage_charge
storage_power = np.zeros(act.n_storage)
if np.all(np.abs(storage_act) <= self._tol_redisp):
return -res, storage_power, current_charge
coeff_p_to_E = (
self.current_obs.delta_time / 60.0
) # obs.delta_time is in minutes
# convert power (action to energy)
storage_act_E = storage_act * coeff_p_to_E
# take into account the efficiencies
do_charge = storage_act_E < 0.0
do_discharge = storage_act_E > 0.0
storage_act_E[do_charge] /= act.storage_charging_efficiency[do_charge]
storage_act_E[do_discharge] *= act.storage_discharging_efficiency[do_discharge]
# make sure we don't go over / above Emin / Emax
min_down_E = act.storage_Emin - current_charge
min_up_E = act.storage_Emax - current_charge
storage_act_E = np.minimum(storage_act_E, min_up_E)
storage_act_E = np.maximum(storage_act_E, min_down_E)
current_charge += storage_act_E
# convert back to power (for the observation) the amount the grid got
storage_power = storage_act_E / coeff_p_to_E
storage_power[do_charge] *= act.storage_charging_efficiency[do_charge]
storage_power[do_discharge] /= act.storage_discharging_efficiency[do_discharge]
res += storage_power.sum()
return -res, storage_power, current_charge
def _fix_redisp_curtailment_storage(
self, act: BaseAction, new_gen_p: np.ndarray
) -> Tuple[bool, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray,]:
"""This function emulates the "frequency control" of the
environment.
Its main goal is to ensure that the sum of injected power thanks to redispatching,
storage units and curtailment sum to 0.
It is a very rough simplification of what happens in the environment.
"""
new_gen_p_after_curtail, amount_curtail = self._amount_curtailed(act, new_gen_p)
amount_storage, storage_power, storage_charge = self._amount_storage(act)
sum_target = amount_curtail - amount_storage # TODO !
target_dispatch = self.current_obs.target_dispatch + act.redispatch
# if previous setpoint was say -2 and at this step I redispatch of
# say + 4 then the real setpoint should be +2 (and not +4)
new_vect_redisp = (np.abs(act.redispatch) >= 1e-7) & (
np.abs(self.current_obs.target_dispatch) <= 1e-7
)
target_dispatch[new_vect_redisp] += self.current_obs.actual_dispatch[
new_vect_redisp
]
if abs(target_dispatch.sum() - sum_target) >= self._tol_redisp:
adjust = self._adjust_controlable_gen(
new_gen_p_after_curtail, target_dispatch, sum_target
)
if adjust is None:
return True, None, None, None, None, None
else:
return (
True,
new_gen_p_after_curtail,
target_dispatch,
adjust,
storage_power,
storage_charge,
)
return False, None, None, None, None, None
[docs] def predict(
self,
act: BaseAction,
new_gen_p: np.ndarray = None,
new_gen_v: np.ndarray = None,
new_load_p: np.ndarray = None,
new_load_q: np.ndarray = None,
do_copy: bool = True,
) -> "Simulator":
"""Predict the state of the grid after a given action has been taken.
Parameters
----------
act : BaseAction
The action you want to take
new_gen_p : np.ndarray, optional
the new production active setpoint, by default None
new_gen_v : np.ndarray, optional
the new production voltage setpoint, by default None
new_load_p : np.ndarray, optional
the new consumption active values, by default None
new_load_q : np.ndarray, optional
the new consumption reactive values, by default None
do_copy : bool, optional
Whether to make a copy or not, by default True
Examples
---------
A possible example is:
.. code-block:: python
import grid2op
env_name = "l2rpn_case14_sandbox" # or any other name
env = grid2op.make(env_name)
obs = env.reset()
#### later in the code, for example in an Agent:
simulator = obs.get_simulator()
load_p_stressed = obs.load_p * 1.05
gen_p_stressed = obs.gen_p * 1.05
do_nothing = env.action_space()
simulator_stressed = simulator.predict(act=do_nothing,
new_gen_p=gen_p_stressed,
new_load_p=load_p_stressed)
if not simulator_stressed.converged:
# the solver fails to find a solution for this action
# you are likely to run into trouble if you use that...
... # do something
obs_stressed = simulator_stressed.current_obs
Returns
-------
Simulator
The new simulator representing the grid state after the simulation of the action.
"""
# init the result
if do_copy:
res = self.copy()
else:
res = self
this_act = act.copy()
if new_gen_p is None:
new_gen_p = 1.0 * self.current_obs.gen_p
res.set_state(
obs=None,
new_gen_p=new_gen_p,
new_gen_v=new_gen_v,
new_load_p=new_load_p,
new_load_q=new_load_q,
do_powerflow=False,
)
# "fix" the action for the redispatching / curtailment / storage part
(
has_adjusted,
new_gen_p_modif,
target_dispatch,
adjust,
storage_power,
storage_charge,
) = res._fix_redisp_curtailment_storage(this_act, new_gen_p)
if has_adjusted:
if target_dispatch is None:
res._converged = False
res.current_obs.set_game_over()
res._error = InvalidRedispatching("")
return res
redisp_modif = np.zeros(self.current_obs.n_gen)
redisp_modif[self.current_obs.gen_redispatchable] = adjust
# adjust the proper things in the observation
res.current_obs.target_dispatch = target_dispatch
this_act.redispatch = redisp_modif
res.current_obs.actual_dispatch[:] = redisp_modif
this_act._dict_inj["prod_p"] = 1.0 * new_gen_p_modif
this_act._modif_inj = True
# TODO : curtail, curtailment_limit (in observation)
res.current_obs.curtailment[:] = (
new_gen_p - new_gen_p_modif
) / act.gen_pmax
res.current_obs.curtailment_limit[:] = act.curtail
res.current_obs.curtailment_limit_effective[:] = act.curtail
res.current_obs.gen_p_before_curtail[:] = new_gen_p
res.current_obs.storage_power[:] = storage_power
res.current_obs.storage_charge[:] = storage_charge
else:
res.current_obs.storage_power[:] = 0.0
res.current_obs.actual_dispatch[:] = 0.0
# apply the action
bk_act = res.backend.my_bk_act_class()
bk_act += this_act
res.backend.apply_action(bk_act)
# run the powerflow
res._do_powerflow()
# update its observation
res._update_obs()
return res
[docs] def close(self):
"""close the underlying backend"""
if hasattr(self, "backend") and self.backend is not None:
self.backend.close()
self.backend = None
self.current_obs = None
self._converged = None
self._error = None
def __del__(self):
self.close()