Source code for grid2op.Converter.AnalogStateConverter

# Copyright (c) 2019-2020, RTE (https://www.rte-france.com)
# See AUTHORS.txt
# This Source Code Form is subject to the terms of the Mozilla Public License, version 2.0.
# If a copy of the Mozilla Public License, version 2.0 was not distributed with this file,
# you can obtain one at http://mozilla.org/MPL/2.0/.
# SPDX-License-Identifier: MPL-2.0
# This file is part of Grid2Op, Grid2Op a testbed platform to model sequential decision making in power systems.
import numpy as np
import math

from grid2op.Converter import Converter


[docs]class AnalogStateConverter(Converter): """ Converter that can be used with analog representation of the grid state. Details are provided in convert_obs and convert_act The grid2op observation is converted into a 1d normalied array The grid2op action is created from a set of real valued arrays It can not yet be converted to / from gym space. If this feature is interesting for you, you can reply to the issue posted at https://github.com/rte-france/Grid2Op/issues/16 """ def __init__(self, action_space, bias=0.0): super().__init__(action_space) self.__class__ = AnalogStateConverter.init_grid(action_space) self.__bias = 0.0 @staticmethod def to_norm_vect(inputv, pad_v=0.0, scale_v=1.0): v = np.asarray(inputv) v = v / scale_v vsafe = np.nan_to_num(v, nan=pad_v, posinf=pad_v, neginf=pad_v) return vsafe.astype(np.float32)
[docs] def convert_obs(self, obs): """ This converter will convert the observation into a 1D vector, with all values normalized, plus bias (if provided) Parameters ---------- obs: :class:`grid2op.Observation.Observation` The input observation. Returns ------- ``np.array`` 1D array of np.float32 normalized values """ # Store the obs for action conversion self.__obs = obs # Store some shortcuts topo = obs.topo_vect g_pos = obs.gen_pos_topo_vect l_pos = obs.load_pos_topo_vect lor_pos = obs.line_or_pos_topo_vect lex_pos = obs.line_ex_pos_topo_vect # Get time data time_li = [ obs.month / 12.0, obs.day / 31.0, obs.day_of_week / 7.0, obs.hour_of_day / 24.0, obs.minute_of_hour / 60.0, ] time_v = self.to_norm_vect(time_li) time_line_cd = self.to_norm_vect( obs.time_before_cooldown_line, pad_v=-1.0, scale_v=10.0 ) time_line_nm = self.to_norm_vect(obs.time_next_maintenance, scale_v=10.0) time_sub_cd = self.to_norm_vect( obs.time_before_cooldown_sub, pad_v=-1.0, scale_v=10.0 ) # Get generators info g_p = self.to_norm_vect(obs.prod_p, scale_v=1000.0) g_q = self.to_norm_vect(obs.prod_q, scale_v=1000.0) g_v = self.to_norm_vect(obs.prod_v, scale_v=1000.0) g_tr = self.to_norm_vect(obs.target_dispatch, scale_v=150.0) g_ar = self.to_norm_vect(obs.actual_dispatch, scale_v=150.0) g_cost = self.to_norm_vect(obs.gen_cost_per_MW, pad_v=0.0, scale_v=1.0) g_buses = np.zeros(obs.n_gen) for gen_id in range(obs.n_gen): g_buses[gen_id] = topo[g_pos[gen_id]] if g_buses[gen_id] <= 0.0: g_buses[gen_id] = 0.0 g_bus = self.to_norm_vect(g_buses, pad_v=-1.0, scale_v=3.0) # Get loads info l_p = self.to_norm_vect(obs.load_p, scale_v=1000.0) l_q = self.to_norm_vect(obs.load_q, scale_v=1000.0) l_v = self.to_norm_vect(obs.load_v, scale_v=1000.0) l_buses = np.zeros(obs.n_load) for load_id in range(obs.n_load): l_buses[load_id] = topo[l_pos[load_id]] if l_buses[load_id] <= 0.0: l_buses[load_id] = 0.0 l_bus = self.to_norm_vect(l_buses, pad_v=-1.0, scale_v=3.0) # Get lines origin info or_p = self.to_norm_vect(obs.p_or, scale_v=1000.0) or_q = self.to_norm_vect(obs.q_or, scale_v=1000.0) or_v = self.to_norm_vect(obs.v_or, scale_v=1000.0) or_buses = np.zeros(obs.n_line) for line_id in range(obs.n_line): or_buses[line_id] = topo[lor_pos[line_id]] if or_buses[line_id] <= 0.0: or_buses[line_id] = 0.0 or_bus = self.to_norm_vect(or_buses, pad_v=-1.0, scale_v=3.0) or_rho = self.to_norm_vect(obs.rho, pad_v=-1.0) # Get extremities origin info ex_p = self.to_norm_vect(obs.p_ex, scale_v=1000.0) ex_q = self.to_norm_vect(obs.q_ex, scale_v=1000.0) ex_v = self.to_norm_vect(obs.v_ex, scale_v=1000.0) ex_buses = np.zeros(obs.n_line) for line_id in range(obs.n_line): ex_buses[line_id] = topo[lex_pos[line_id]] if ex_buses[line_id] <= 0.0: ex_buses[line_id] = 0.0 ex_bus = self.to_norm_vect(ex_buses, pad_v=-1.0, scale_v=3.0) ex_rho = self.to_norm_vect(obs.rho, pad_v=-1.0) res = np.concatenate( [ # Time time_v, time_line_cd, time_sub_cd, time_line_nm, # Gens g_p, g_q, g_v, g_ar, g_tr, g_bus, g_cost, # Loads l_p, l_q, l_v, l_bus, # Origins or_p, or_q, or_v, or_bus, or_rho, # Extremities ex_p, ex_q, ex_v, ex_bus, ex_rho, ] ) return res + self.__bias
[docs] def convert_act(self, netstate): """ Create a grid2op action based on the last observation and the real valued state vectors in parameters Parameters ---------- netstate: ``tuple`` A tuple containing the following (3) elements: - netbus: ``np.array`` A numpy array of dimension n_bus(2) x dim_topo and range [0.0; 1.0]. Where the first axis represent the bus, the second the elements. Then, for element i, netbus[bus_index][i] represent the probability element i should be on bus_index + 1. The buses are then picked using argmax across dimension 0 - netline: ``np.array`` A numpy array of dimension n_line and range [0.0; 1.0] Each element representing a line status: 0 meaning disconnected and > 0.0 connected - netdisp: ``np.array`` A numpy array of dimension n_gen and range[-1.0;1.0] Each generator redispatch setpoint is then rescaled to the range [-rdown;+rup]. This is cumulative over time, as per grid2op convention. Returns ------- res: :class:`grid2op.Action.Action` An action that will change the last observation (current state) To the state described in parameters """ netbus = netstate[0] netline = netstate[1] netdisp = netstate[2] act_setbus = self.netbus_to_act_setbus(self.__obs, netbus) act_setstatus = self.netline_to_act_setstatus(self.__obs, netline) act_redispatch = self.netdisp_to_act_redispatch(self.__obs, netdisp) act = self.__call__( { "set_bus": act_setbus, "set_line_status": act_setstatus, "redispatch": act_redispatch, } ) return act
@staticmethod def size_obs(obs): dims = np.array( [ # Time 5, # Timestamp 2 * obs.n_line, obs.n_sub, # Gen obs.n_gen * 7, # Load obs.n_load * 4, # Line origins obs.n_line * 5, # Line extremities obs.n_line * 5, ] ) return np.sum(dims) @staticmethod def netbus_to_act_setbus(obs, net_bus): # n_bus x dim_topo x p([0.0; 1.0]) -> # -> dim_topo x [0 unchanged; 1: bus_1; 2 bus_2 ] # Pick the buses act_setbus = np.argmax(net_bus, axis=0) + 1 # Don't set disconnected elements act_setbus[obs.topo_vect <= 0] = 0 # Don't set elements already on the correct bus act_setbus[act_setbus == obs.topo_vect] = 0 return act_setbus @staticmethod def netline_to_act_setstatus(obs, net_line): # [0.0 Disconnect; > 0.0 Connect] -> # -> [0.0 Unchanged; -1.0 Disconnect; 1.0 Connect] act_setstatus = np.copy(net_line) act_setstatus[net_line <= 0.0] = -1 act_setstatus[net_line > 0.0] = 1 # Do no 'set' already connected lines act_setstatus[obs.line_status == (act_setstatus == 1)] = 0 # Do not 'set' already disconnected lines act_setstatus[(obs.line_status == False) == (act_setstatus == -1)] = 0 return act_setstatus @staticmethod def netdisp_to_act_redispatch(obs, net_disp): # [-1.0;1.0] -> [-ramp_down;+ramp_up] act_redispatch = np.zeros(obs.n_gen) for i, d in enumerate(net_disp): if math.isclose(d, 0.0): # Skip if 0.0 continue rmin = obs.gen_max_ramp_down[i] rmax = obs.gen_max_ramp_up[i] r = np.interp(d, [-1.0, 1.0], [-rmin, rmax]) act_redispatch[i] = round(r) # Round at 1MW return act_redispatch # Helpers to generate random actions @staticmethod def netbus_rnd(obs, n_bus=2): # Copy obs state rnd_topo = np.zeros((n_bus, obs.dim_topo)) rnd_topo[0][obs.topo_vect == 1] = 1.0 rnd_topo[1][obs.topo_vect == 2] = 1.0 # Pick a random substation rnd_sub = np.random.randint(obs.n_sub) n_elem = obs.sub_info[rnd_sub] # Pick a random number of elements to change rnd_n_changes = np.random.randint(n_elem + 1) # Pick the elements to change at random rnd_sub_elems = np.random.randint(0, n_elem, rnd_n_changes) # Set the topo vect sub_topo_pos = np.sum(obs.sub_info[0:rnd_sub]) for elem_pos in rnd_sub_elems: rnd_bus = np.random.randint(n_bus) rnd_topo[rnd_bus][sub_topo_pos + elem_pos] = 1.0 # Set the other buses to 0.0 for b in range(n_bus): if b == rnd_bus: continue rnd_topo[b][sub_topo_pos + elem_pos] = 0.0 return rnd_topo @staticmethod def netline_rnd(obs): rnd_lines = obs.line_status.astype(np.float32) rnd_lineid = np.random.randint(obs.n_line) rnd_linestatus = not obs.line_status[rnd_lineid] rnd_lines[rnd_lineid] = np.int32(rnd_linestatus) return rnd_lines @staticmethod def netdisp_rnd(obs): disp_rnd = np.zeros(obs.n_gen) # Take random gen to disp rnd_gen = np.random.randint(obs.n_gen) # Take a random disp rnd_ramp = np.random.uniform(-1.0, 1.0) disp_rnd[rnd_gen] = rnd_ramp return disp_rnd