Source code for grid2op.Agent.alertAgent

# Copyright (c) 2023, RTE (
# See AUTHORS.txt
# This Source Code Form is subject to the terms of the Mozilla Public License, version 2.0.
# If a copy of the Mozilla Public License, version 2.0 was not distributed with this file,
# you can obtain one at
# SPDX-License-Identifier: MPL-2.0
# This file is part of Grid2Op, Grid2Op a testbed platform to model sequential decision making in power systems.

import numpy as np
from grid2op.Action import BaseAction
from grid2op.Agent.recoPowerlineAgent import RecoPowerlineAgent
from grid2op.Agent.baseAgent import BaseAgent
from grid2op.Observation import BaseObservation
from grid2op.dtypes import dt_int

[docs]class AlertAgent(BaseAgent): """ This is a :class:`AlertAgent` example, which will attempt to reconnect powerlines and send alerts on the worst possible attacks: for each disconnected powerline that can be reconnected, it will simulate the effect of reconnecting it. And reconnect the one that lead to the highest simulated reward. It will also simulate the effect of having a line disconnection on attackable lines and raise alerts for the worst ones """ def __init__(self, action_space, grid_controler=RecoPowerlineAgent, percentage_alert=30, simu_step=1, threshold=0.99): super().__init__(action_space) if isinstance(grid_controler, type): self.grid_controler = grid_controler(action_space) else: self.grid_controler = grid_controler self.percentage_alert = percentage_alert self.simu_step = simu_step self.threshold = threshold # if the max flow after a line disconnection is below threshold, then the alert is not raised # store the result of the simulation of powerline disconnection self.alertable_line_ids = type(action_space).alertable_line_ids self.n_alertable_lines = len(self.alertable_line_ids) self.nb_overloads = np.zeros(self.n_alertable_lines, dtype=dt_int) self.rho_max_N_1 = np.zeros(self.n_alertable_lines) self.N_1_actions = [self.action_space({"set_line_status": [(id_, -1)]}) for id_ in self.alertable_line_ids] self._first_k = np.zeros(self.n_alertable_lines, dtype=bool) self._first_k[:int(self.percentage_alert / 100. * self.n_alertable_lines)] = True
[docs] def act(self, observation: BaseObservation, reward: float, done: bool = False) -> BaseAction: action = self.grid_controler.act(observation, reward, done) self.nb_overloads[:] = 0 self.rho_max_N_1[:] = 0. # test which backend to know which method to call for i, tmp_act in enumerate(self.N_1_actions): # only simulate if the line is connected if observation.line_status[self.alertable_line_ids[i]]: action_to_simulate = tmp_act action_to_simulate += action action_to_simulate.remove_line_status_from_topo(observation) ( simul_obs, simul_reward, simul_done, simul_info, ) = observation.simulate(action_to_simulate, time_step=self.simu_step) rho_simu = simul_obs.rho if not simul_done: self.nb_overloads[i] = (rho_simu >= 1).sum() self.rho_max_N_1[i] = (rho_simu).max() else: self.nb_overloads[i] = type(observation).n_line self.rho_max_N_1[i] = 5. # sort the index by nb_overloads and, if nb_overloads is equal, sort by rho_max ind = (self.nb_overloads * 1000. + self.rho_max_N_1).argsort() ind = ind[::-1] # send alerts when the powerline is among the top k (not to send too many alerts) and # the max rho after the powerline disconnection is too high (above threshold) indices_to_keep = ind[self._first_k & (self.rho_max_N_1[ind] >= self.threshold)] action.raise_alert = [i for i in indices_to_keep] return action