Source code for grid2op.Reward.l2RPNSandBoxScore

# Copyright (c) 2019-2020, RTE (https://www.rte-france.com)
# See AUTHORS.txt
# This Source Code Form is subject to the terms of the Mozilla Public License, version 2.0.
# If a copy of the Mozilla Public License, version 2.0 was not distributed with this file,
# you can obtain one at http://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
# This file is part of Grid2Op, Grid2Op a testbed platform to model sequential decision making in power systems.

import numpy as np

from grid2op.Reward.baseReward import BaseReward
from grid2op.dtypes import dt_float


[docs]class L2RPNSandBoxScore(BaseReward): """ INTERNAL .. warning:: /!\\\\ Internal, do not use unless you know what you are doing /!\\\\ It **must not** serve as a reward. This scored needs to be **MINIMIZED**, and a reward needs to be maximized! Also, this "reward" is not scaled or anything. Use it as your own risk. Implemented as a reward to make it easier to use in the context of the L2RPN competitions, this "reward" computed the "grid operation cost". It should not be used to train an agent. The "reward" the closest to this score is given by the :class:`RedispReward` class. """
[docs] def __init__(self, alpha_redisp=1.0, alpha_loss=1.0, alpha_storage=1.0, alpha_curtailment=1.0, reward_max=1000., logger=None): BaseReward.__init__(self, logger=logger) self.reward_min = dt_float(1.0) # carefull here between min and max... self.reward_max = dt_float(reward_max) self.alpha_redisp = dt_float(alpha_redisp) self.alpha_loss = dt_float(alpha_loss) self.alpha_storage = dt_float(alpha_storage) self.alpha_curtailment = dt_float(alpha_curtailment)
[docs] def initialize(self, env): # TODO compute reward max! return super().initialize(env)
def _get_load_p(self, env): load_p, *_ = env.backend.loads_info() return load_p def _get_gen_p(self, env): gen_p, *_ = env.backend.generators_info() return gen_p def _get_losses(self, env, gen_p, load_p): return (gen_p.sum(dtype=dt_float) - load_p.sum(dtype=dt_float)) * env.delta_time_seconds / 3600.0 def _get_marginal_cost(self, env): gen_activeprod_t = env._gen_activeprod_t p_t = np.max(env.gen_cost_per_MW[gen_activeprod_t > 0.0]).astype(dt_float) # price is per MWh be sure to convert the MW (of losses and generation) to MWh before multiplying by the cost return p_t def _get_redisp_cost(self, env, p_t): actual_dispatch = env._actual_dispatch c_redispatching = ( np.abs(actual_dispatch).sum() * p_t * env.delta_time_seconds / 3600.0 ) return c_redispatching def _get_curtail_cost(self, env, p_t): curtailment_mw = -env._sum_curtailment_mw # curtailment is always negative in the env c_curtailment = ( curtailment_mw * p_t * env.delta_time_seconds / 3600.0 ) return c_curtailment def _get_loss_cost(self, env, p_t): gen_p = self._get_gen_p(env) load_p = self._get_load_p(env) losses = self._get_losses(env, gen_p, load_p) c_loss = losses * p_t return c_loss def _get_storage_cost(self, env, p_t): c_storage = np.abs(env._storage_power).sum() * p_t * env.delta_time_seconds / 3600.0 return c_storage
[docs] def __call__(self, action, env, has_error, is_done, is_illegal, is_ambiguous): if has_error: # DO SOMETHING IN THIS CASE return self.reward_min # compute the marginal cost p_t = self._get_marginal_cost(env) # redispatching amount c_redispatching = self._get_redisp_cost(env, p_t) # curtailment amount c_curtailment = self._get_curtail_cost(env, p_t) # cost of losses c_loss = self._get_loss_cost(env, p_t) # storage units c_storage = self._get_storage_cost(env, p_t) # total "operationnal cost" c_operations = dt_float(self.alpha_loss * c_loss + self.alpha_redisp * c_redispatching + self.alpha_storage * c_storage + self.alpha_curtailment * c_curtailment) return c_operations