Source code for grid2op.utils.l2rpn_2020_scores

# Copyright (c) 2019-2020, RTE (https://www.rte-france.com)
# See AUTHORS.txt
# This Source Code Form is subject to the terms of the Mozilla Public License, version 2.0.
# If a copy of the Mozilla Public License, version 2.0 was not distributed with this file,
# you can obtain one at http://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
# This file is part of Grid2Op, Grid2Op a testbed platform to model sequential decision making in power systems.
import os
import numpy as np
import json
import copy
import tempfile

from grid2op.dtypes import dt_float
from grid2op.Reward import L2RPNSandBoxScore
from grid2op.Agent import RecoPowerlineAgent
from grid2op.utils.underlying_statistics import EpisodeStatistics
from grid2op.Episode import EpisodeData

import re


[docs]class ScoreL2RPN2020(object): """ This class allows to compute the same score as the one computed for the L2RPN 2020 competitions. It uses some "EpisodeStatistics" of the environment to compute these scores. These statistics, if not available are computed at the initialization. When using it a second time these information are reused. Examples --------- This class can be used as follow: .. code-block:: python import grid2op from grid2op.utils import ScoreL2RPN2020 from grid2op.Agent import DoNothingAgent env = grid2op.make("l2rpn_case14_sandbox") nb_scenario = 2 my_score = ScoreL2RPN2020(env, nb_scenario=nb_scenario, env_seeds=[0 for _ in range(nb_scenario)], agent_seeds=[0 for _ in range(nb_scenario)] ) my_agent = DoNothingAgent(env.action_space) print(my_score.get(my_agent)) Notes ------- To prevent overfitting, we strongly recommend you to use the :func:`grid2op.Environment.Environment.train_val_split` and use this function on the built validation set only. Also note than computing the statistics, and evaluating an agent on a whole dataset of multiple GB can take a really long time and a lot of memory. This fact, again, plea in favor of using this function only on a validation set. We also strongly recommend to set the seeds of your agent (agent_seeds) and of the environment (env_seeds) if you want to use this feature. Reproducibility is really important if you want to make progress. .. warning:: The triggering (or not) of the recomputation of the statistics is not perfect for now. We recommend you to use always the same seeds (`env_seeds` and `agent_seeds` key word argument of this functions) and the same parameters (`env.parameters`) when using a given environments. You might need to clean it manually if you change one of theses things by calling :func:`ScoreL2RPN2020.clear_all()` function . """ NAME_DN = "l2rpn_dn" NAME_RP_NO_OVERFLOW = "l2rpn_no_overflow_reco"
[docs] def __init__( self, env, env_seeds=None, agent_seeds=None, nb_scenario=16, min_losses_ratio=0.8, verbose=0, max_step=-1, nb_process_stats=1, scores_func=L2RPNSandBoxScore, score_names=None, add_nb_highres_sim=False, ): self.env = env self.nb_scenario = nb_scenario self.env_seeds = env_seeds self.agent_seeds = agent_seeds self.min_losses_ratio = min_losses_ratio self.verbose = verbose self.max_step = max_step computed_scenarios = [el[1] for el in EpisodeStatistics.list_stats(self.env)] self.scores_func = scores_func # check if i need to compute stat for do nothing self.stat_dn = EpisodeStatistics(self.env, self.NAME_DN) self._recomputed_dn = self._init_stat( self.stat_dn, self.NAME_DN, computed_scenarios, nb_process_stats=nb_process_stats, score_names=score_names, ) # check if i need to compute that for do nothing without overflow disconnection param_no_overflow = copy.deepcopy(env.parameters) param_no_overflow.NO_OVERFLOW_DISCONNECTION = True # check if i need to compute that for reco powerline without overflow disconnection self.stat_no_overflow_rp = EpisodeStatistics(self.env, self.NAME_RP_NO_OVERFLOW) agent_reco = RecoPowerlineAgent(self.env.action_space) self._recomputed_no_ov_rp = self._init_stat( self.stat_no_overflow_rp, self.NAME_RP_NO_OVERFLOW, computed_scenarios, parameters=param_no_overflow, nb_process_stats=nb_process_stats, agent=agent_reco, score_names=score_names, ) self.add_nb_highres_sim = add_nb_highres_sim self.__cleared = False
def _init_stat( self, stat, stat_name, computed_scenarios, parameters=None, nb_process_stats=1, agent=None, score_names=None, ): """will check if the statistics need to be computed""" need_recompute = True if score_names is None: score_names = [EpisodeStatistics.SCORES] if EpisodeStatistics.get_name_dir(stat_name) in computed_scenarios: # the things have been computed i check if the number of scenarios is big enough scores, ids_ = stat.get(score_names[0]) metadata = stat.get_metadata() max_id = np.max(ids_) # i need to recompute if if i did not compute enough scenarios need_recompute = max_id < self.nb_scenario - 1 # if max computed_step = int(metadata["max_step"]) if computed_step > 0: # if i have computed the data with if self.max_step == -1: # i need to compute now all the dataset, so yes i have to recompute it need_recompute = True # i need to recompute only if i ask more steps than what was computed need_recompute = need_recompute or self.max_step > metadata["max_step"] # TODO check for the seeds here too # TODO and check for the class of the scores too # TODO check for the parameters too... if need_recompute: # i need to compute it if self.verbose >= 1: print( "I need to recompute the statistics for this environment. This will take a while" ) # TODO logger stat.compute( nb_scenario=self.nb_scenario, pbar=self.verbose >= 2, env_seeds=self.env_seeds, agent_seeds=self.agent_seeds, scores_func=self.scores_func, max_step=self.max_step, parameters=parameters, nb_process=nb_process_stats, agent=agent, ) stat.clear_episode_data() return need_recompute def _compute_episode_score( self, ep_id, # the ID here, which is an integer and is not the ID from chronics balblabla meta, other_rewards, dn_metadata, no_ov_metadata, score_file_to_use=None, ): """ Performs the rescaling of the score given the information stored in the "statistics" of this environment. """ # load_p, ids = self.stat_no_overflow.get("load_p") # prod_p, _ = self.stat_no_overflow.get("prod_p") load_p_rp, ids_rp = self.stat_no_overflow_rp.get("load_p") prod_p_rp, _ = self.stat_no_overflow_rp.get("load_p") if score_file_to_use is None: score_file_to_use = EpisodeStatistics.SCORES key_score_file = EpisodeStatistics.KEY_SCORE else: # should match underlying_statistics.run_env `dict_kwg["other_rewards"][XXX] = ...` # XXX is right now f"{EpisodeStatistics.KEY_SCORE}_{nm}" [this should match the XXX] real_nm = EpisodeStatistics._nm_score_from_attr_name(score_file_to_use) key_score_file = f"{EpisodeStatistics.KEY_SCORE}_{real_nm}" scores_dn, ids_dn_sc = self.stat_dn.get(score_file_to_use) # scores_no_ov, ids_noov_sc = self.stat_no_overflow.get(score_file_to_use) scores_no_ov_rp, ids_noov_sc_rp = self.stat_no_overflow_rp.get( score_file_to_use ) # reshape to have 1 dim array ids = ids_rp.reshape(-1) ids_dn_sc = ids_dn_sc.reshape(-1) ids_noov_sc_rp = ids_noov_sc_rp.reshape(-1) # there is a hugly "1" at the end of each scores due to the "game over" (or end of game), so i remove it scores_dn = scores_dn[ids_dn_sc == ep_id][:-1] scores_no_ov_rp = scores_no_ov_rp[ids_noov_sc_rp == ep_id][:-1] dn_this = dn_metadata[f"{ep_id}"] no_ov_this = no_ov_metadata[f"{ep_id}"] n_played = int(meta["nb_timestep_played"]) dn_step_played = dn_this["nb_step"] - 1 total_ts = no_ov_this["nb_step"] - 1 ep_marginal_cost = np.max(self.env.gen_cost_per_MW).astype(dt_float) min_losses_ratio = self.min_losses_ratio # remember that first observation do not count (it's generated by the environment) ep_loads = load_p_rp[ids == ep_id, :].sum(axis=1)[1:] ep_losses = prod_p_rp[ids == ep_id, :].sum(axis=1)[1:] - ep_loads if self.max_step > 0: scores_dn = scores_dn[: self.max_step] # scores_no_ov = scores_no_ov[:self.max_step] scores_no_ov_rp = scores_no_ov_rp[: self.max_step] ep_loads = ep_loads[: self.max_step] ep_losses = ep_losses[: self.max_step] # do nothing operationnal cost ep_do_nothing_operat_cost = scores_dn.sum() ep_do_nothing_operat_cost += ( ep_loads[dn_step_played:].sum() * ep_marginal_cost ) # no overflow disconnection cost ep_do_nothing_nodisc_cost = scores_no_ov_rp.sum() # this agent cumulated operationnal cost # same as above: i remove the last element which correspond to the last state, so irrelevant ep_cost = np.array([el[key_score_file] for el in other_rewards]).astype( dt_float ) if dn_metadata["max_step"] == self.max_step: ep_cost = ep_cost[:-1] ep_cost = ep_cost.sum() ep_cost += ep_loads[n_played:].sum() * ep_marginal_cost # Compute ranges worst_operat_cost = ( ep_loads.sum() * ep_marginal_cost ) # operational cost corresponding to the min score zero_operat_score = ep_do_nothing_operat_cost nodisc_oeprat_cost = ep_do_nothing_nodisc_cost best_score = ( ep_losses.sum() * min_losses_ratio ) # operational cost corresponding to the max score # Linear interp episode reward to codalab score if zero_operat_score != nodisc_oeprat_cost: # DoNothing agent doesnt complete the scenario reward_range = [ best_score, nodisc_oeprat_cost, zero_operat_score, worst_operat_cost, ] score_range = [100.0, 80.0, 0.0, -100.0] else: # DoNothing agent can complete the scenario reward_range = [best_score, zero_operat_score, worst_operat_cost] score_range = [100.0, 0.0, -100.0] ep_score = np.interp(ep_cost, reward_range, score_range) return ep_score, n_played, total_ts
[docs] def clear_all(self): """ Has side effects .. warning:: /!\\\\ Be careful /!\\\\ Clear the whole statistics directory for the 3 different computed statistics used for the score. It will remove the previously computed statistics. Once done, this cannot be undone. """ # self.stat_no_overflow.clear_all() self.stat_no_overflow_rp.clear_all() self.stat_dn.clear_all() self.__cleared = True
[docs] def get(self, agent, path_save=None, nb_process=1): """ Get the score of the agent depending on what has been computed. TODO The plots will be done later. Parameters ---------- agent: :class:`grid2op.Agent.BaseAgent` The agent you want to score path_save: ``str`` the path were you want to store the logs of your agent. nb_process: ``int`` Number of process to use for the evaluation Returns ------- all_scores: ``list`` List of the score of your agent per scenarios ts_survived: ``list`` List of the number of step your agent successfully managed for each scenario total_ts: ``list`` Total number of step for each scenario """ if self.__cleared: raise RuntimeError(EpisodeStatistics.ERROR_MSG_CLEANED) if path_save is not None: need_delete = False # TODO this is soooo dirty path_save = os.path.abspath(path_save) else: need_delete = True dir_tmp = tempfile.TemporaryDirectory() path_save = dir_tmp.name if self.verbose >= 1: print("Using a temp directory to store the intermediate data.") # TODO logger if self.verbose >= 1: print("Starts the evaluation of the agent") # TODO logger nb_highres_sim = EpisodeStatistics.run_env( self.env, env_seeds=self.env_seeds, agent_seeds=self.agent_seeds, path_save=path_save, parameters=self.env.parameters, scores_func=self.scores_func, agent=agent, max_step=self.max_step, nb_scenario=self.nb_scenario, pbar=self.verbose >= 2, nb_process=nb_process, add_nb_highres_sim=self.add_nb_highres_sim, ) # NB nb_highres_sim is None if self.add_nb_highres_sim is False ! if self.verbose >= 1: print("Start the evaluation of the scores") # TODO logger meta_data_dn = self.stat_dn.get_metadata() no_ov_metadata = self.stat_no_overflow_rp.get_metadata() all_scores = [] ts_survived = [] total_ts = [] for ep_id in range(self.nb_scenario): this_ep_nm = meta_data_dn[f"{ep_id}"]["scenario_name"] with open( os.path.join(path_save, this_ep_nm, EpisodeData.META), "r", encoding="utf-8", ) as f: this_epi_meta = json.load(f) with open( os.path.join(path_save, this_ep_nm, EpisodeData.OTHER_REWARDS), "r", encoding="utf-8", ) as f: this_epi_scores = json.load(f) score_this_ep, nb_ts_survived, total_ts_tmp = self._compute_episode_score( ep_id, meta=this_epi_meta, other_rewards=this_epi_scores, dn_metadata=meta_data_dn, no_ov_metadata=no_ov_metadata, ) all_scores.append(score_this_ep) ts_survived.append(nb_ts_survived) total_ts.append(total_ts_tmp) if need_delete: dir_tmp.cleanup() res = all_scores, ts_survived, total_ts if self.add_nb_highres_sim: res = all_scores, ts_survived, total_ts, nb_highres_sim return res
if __name__ == "__main__": import grid2op from lightsim2grid import LightSimBackend from grid2op.Agent import RandomAgent, DoNothingAgent env = grid2op.make("l2rpn_case14_sandbox", backend=LightSimBackend()) nb_scenario = 16 my_score = ScoreL2RPN2020( env, nb_scenario=nb_scenario, env_seeds=[0 for _ in range(nb_scenario)], agent_seeds=[0 for _ in range(nb_scenario)], ) my_agent = RandomAgent(env.action_space) my_agent = DoNothingAgent(env.action_space) print(my_score.get(my_agent))