6. Adding Events and KPIs¶
The complete code for this part of the tutorial can be found here
# file structure
- cutting_2d
- main.py # modified
- env
- core_env.py # modified
- inventory.py # modified
- maze_state.py
- maze_action.py
- renderer.py
- maze_env.py
- events.py # new
- kpi_calculator.py # new
- space_interfaces
- dict_action_conversion.py
- dict_observation_conversion.py
- conf ...
Page Overview
6.1. Events¶
In the previous section we have trained the initial version of our cutting environment and already learned how we can watch the training process with commandline and Tensorboard logging. However, watching only standard metrics such as reward or episode step count is not always too informative with respect to the agents behaviour and the problem at hand.
For example we might be interested in how often an agent selects an invalid cutting piece or specifies and invalid cutting setting. To tackle this issue and to enable better inspection and logging tools we introduce an event system that will be also reused in the reward customization section of this tutorial.
In particular, we introduce two event types related to the cutting process as well as inventory management. For each event we can define which statistics are computed at which stage of the aggregation process (event, step, epoch) via event decorators:
@define_step_stats(len)
: Events \(e_i\) are collected as a list of events \(\{e_i\}\). Thelen
function counts how often such an event occurred in the current environment step \(Stats_{Step}=|\{e_i\}|\).@define_episode_stats(sum)
: Defines how the \(S\) step statistics should be aggregated to episode statistics by simply summing them up: \(Stats_{Episode}=\sum^S Stats_{Step}\)@define_epoch_stats(np.mean, output_name="mean_episode_total")
: A training epoch consists of N episodes. This decorator defines that epoch statistics should be the average of the contained episodes: \(Stats_{Epoch}=(\sum^N Stats_{Episode})/N\)
Below we will see that theses statistics will now be considered by the logging system as InventoryEvents and CuttingEvents. For more details on event decorators and the underlying working principles we refer to the dedicated section on event and KPI logging.
from abc import ABC
import numpy as np
from maze.core.log_stats.event_decorators import define_step_stats, define_episode_stats, define_epoch_stats
class CuttingEvents(ABC):
"""Events related to the cutting process."""
@define_epoch_stats(np.mean, output_name="mean_episode_total")
@define_episode_stats(sum)
@define_step_stats(len)
def invalid_piece_selected(self):
"""An invalid piece is selected for cutting."""
@define_epoch_stats(np.mean, output_name="mean_episode_total")
@define_episode_stats(sum)
@define_step_stats(len)
def valid_cut(self, current_demand: (int, int), piece_to_cut: (int, int), raw_piece_size: (int, int),
cutting_area: float):
"""A valid cut was performed."""
@define_epoch_stats(np.mean, output_name="mean_episode_total")
@define_episode_stats(sum)
@define_step_stats(len)
def invalid_cut(self, current_demand: (int, int), piece_to_cut: (int, int), raw_piece_size: (int, int)):
"""Invalid cutting parameters have been specified."""
class InventoryEvents(ABC):
"""Events related to inventory management."""
@define_epoch_stats(np.mean, output_name="mean_episode_total")
@define_episode_stats(sum)
@define_step_stats(len)
def piece_discarded(self, piece: (int, int)):
"""The inventory is full and a piece has been discarded."""
@define_epoch_stats(np.mean, input_name="step_mean", output_name="step_mean")
@define_epoch_stats(max, input_name="step_max", output_name="step_max")
@define_episode_stats(np.mean, output_name="step_mean")
@define_episode_stats(max, output_name="step_max")
@define_step_stats(None)
def pieces_in_inventory(self, value: int):
"""Reports the count of pieces currently in the inventory."""
@define_epoch_stats(np.mean, output_name="mean_episode_total")
@define_episode_stats(sum)
@define_step_stats(len)
def piece_replenished(self):
"""A new raw cutting piece has been replenished."""
6.2. KPI Calculator¶
The goal of the cutting 2d environment is to learn a cutting policy that requires as little as possible raw inventory pieces for fulfilling upcoming customer demand. This metric is exactly what we define as the KPI to watch and optimize, e.g. the raw_piece_usage_per_step.
As you will see below the logging system considers such KPIs and prints statistics of these along with the remaining BaseEnvEvents.
from typing import Dict
from maze.core.env.maze_state import MazeStateType
from maze.core.log_events.kpi_calculator import KpiCalculator
from maze.core.log_events.episode_event_log import EpisodeEventLog
from .events import InventoryEvents
class Cutting2dKpiCalculator(KpiCalculator):
"""KPIs for 2D cutting environment.
The following KPIs are available: Raw pieces used per step
"""
def calculate_kpis(self, episode_event_log: EpisodeEventLog, last_maze_state: MazeStateType) -> Dict[str, float]:
"""Calculates the KPIs at the end of episode."""
# get overall step count of episode
step_count = len(episode_event_log.step_event_logs)
# count raw inventory piece replenishment events
raw_piece_usage = 0
for _ in episode_event_log.query_events(InventoryEvents.piece_replenished):
raw_piece_usage += 1
# compute step normalized raw piece usage
return {"raw_piece_usage_per_step": raw_piece_usage / step_count}
6.3. Updating CoreEnv and Inventory¶
There are also a few changes we have to make in the CoreEnvironment:
initialize the Publisher-Subscriber and the KPI Calculator
creating the event topics for cutting and inventory events when setting up the environment
instead of writing relevant events into the info dictionary in the step function we can now trigger the respective events.
...
from maze.core.events.pubsub import Pubsub
from .events import CuttingEvents, InventoryEvents
from .kpi_calculator import Cutting2dKpiCalculator
class Cutting2DCoreEnvironment(CoreEnv):
def __init__(self, max_pieces_in_inventory: int, raw_piece_size: (int, int), static_demand: (int, int)):
super().__init__()
...
# init pubsub for event to reward routing
self.pubsub = Pubsub(self.context.event_service)
# KPIs calculation
self.kpi_calculator = Cutting2dKpiCalculator()
def _setup_env(self):
"""Setup environment."""
inventory_events = self.pubsub.create_event_topic(InventoryEvents)
self.inventory = Inventory(self.max_pieces_in_inventory, self.raw_piece_size, inventory_events)
self.inventory.replenish_piece()
self.cutting_events = self.pubsub.create_event_topic(CuttingEvents)
def step(self, maze_action: Cutting2DMazeAction) -> Tuple[Cutting2DMazeState, np.array, bool, Dict[Any, Any]]:
"""Summary of the step (simplified, not necessarily respecting the actual order in the code):
1. Check if the selected piece to cut is valid (i.e. in inventory, large enough etc.)
2. Attempt the cutting
3. Replenish a fresh piece if needed and return an appropriate reward
:param maze_action: Cutting MazeAction to take.
:return: maze_state, reward, done, info
"""
info, reward = {}, 0
replenishment_needed = False
# check if valid piece id was selected
if maze_action.piece_id >= self.inventory.size():
self.cutting_events.invalid_piece_selected()
# perform cutting
else:
piece_to_cut = self.inventory.pieces[maze_action.piece_id]
# attempt the cut
if self.inventory.cut(maze_action, self.current_demand):
self.cutting_events.valid_cut(current_demand=self.current_demand, piece_to_cut=piece_to_cut,
raw_piece_size=self.raw_piece_size)
replenishment_needed = piece_to_cut == self.raw_piece_size
else:
# assign a negative reward for invalid cutting attempts
self.cutting_events.invalid_cut(current_demand=self.current_demand, piece_to_cut=piece_to_cut,
raw_piece_size=self.raw_piece_size)
reward = -2
# check if replenishment is required
if replenishment_needed:
self.inventory.replenish_piece()
# assign negative reward if a piece has to be replenished
reward = -1
# step execution finished, write step statistics
self.inventory.log_step_statistics()
# compile env state
maze_state = self.get_maze_state()
return maze_state, reward, False, info
def get_kpi_calculator(self) -> Cutting2dKpiCalculator:
"""KPIs are supported."""
return self.kpi_calculator
For the inventory we proceed analogously and also trigger the respective events.
...
from .events import InventoryEvents
class Inventory:
"""Holds the inventory of 2D pieces and performs cutting.
:param max_pieces_in_inventory: Size of the inventory. If full, the oldest pieces get discarded.
:param raw_piece_size: Size of a fresh raw (= full-size) piece.
:param inventory_events: Inventory event dispatch proxy.
"""
def __init__(self, max_pieces_in_inventory: int, raw_piece_size: (int, int),
inventory_events: InventoryEvents):
...
self.inventory_events = inventory_events
def store_piece(self, piece: (int, int)) -> None:
"""Store the given piece.
:param piece: Piece to store.
"""
# If we would run out of storage space, discard the oldest piece first
if self.is_full():
self.pieces.pop(0)
self.inventory_events.piece_discarded(piece=piece)
self.pieces.append(piece)
def replenish_piece(self) -> None:
"""Add a fresh raw piece to inventory."""
self.store_piece(self.raw_piece_size)
self.inventory_events.piece_replenished()
def log_step_statistics(self):
"""Log inventory statistics once per step"""
self.inventory_events.pieces_in_inventory(self.size())
6.4. Test Script¶
The following snippet will instantiate the environment and run it for 15 steps.
To get access to event and KPI logging we need to wrap the environment with the
LogStatsWrapper
.
To simplify the statistics logging setup we rely on the
SimpleStatsLoggingSetup
helper class.
""" Test script CoreEnv """
from maze.utils.log_stats_utils import SimpleStatsLoggingSetup
from maze.core.wrappers.log_stats_wrapper import LogStatsWrapper
from tutorial_maze_env.part04_events.env.maze_env import maze_env_factory
def main():
# init maze environment including observation and action interfaces
env = maze_env_factory(max_pieces_in_inventory=200,
raw_piece_size=[100, 100],
static_demand=(30, 15))
# wrap environment with logging wrapper
env = LogStatsWrapper(env, logging_prefix="main")
# register a console writer and connect the writer to the statistics logging system
with SimpleStatsLoggingSetup(env):
# reset environment
obs = env.reset()
# run interaction loop
for i in range(15):
# sample random action
action = env.action_space.sample()
# take actual environment step
obs, reward, done, info = env.step(action)
if __name__ == "__main__":
""" main """
main()
When running the script you will get an output as shown below. Note that statistics of both, events and KPIs, are printed along with default reward or action statistics.
step|path | value
=====|==========================================================================|====================
1|main DiscreteActionEvents action substep_0/order | [len:15, μ:0.5]
1|main DiscreteActionEvents action substep_0/piece_idx | [len:15, μ:82.3]
1|main DiscreteActionEvents action substep_0/rotation | [len:15, μ:0.7]
1|main BaseEnvEvents reward median_step_count | 15.000
1|main BaseEnvEvents reward mean_step_count | 15.000
1|main BaseEnvEvents reward total_step_count | 15.000
1|main BaseEnvEvents reward total_episode_count | 1.000
1|main BaseEnvEvents reward episode_count | 1.000
1|main BaseEnvEvents reward std | 0.000
1|main BaseEnvEvents reward mean | -29.000
1|main BaseEnvEvents reward min | -29.000
1|main BaseEnvEvents reward max | -29.000
1|main InventoryEvents piece_replenished mean_episode_total | 3.000
1|main InventoryEvents pieces_in_inventory step_max | 200.000
1|main InventoryEvents pieces_in_inventory step_mean | 200.000
1|main CuttingEvents invalid_cut mean_episode_total | 14.000
1|main InventoryEvents piece_discarded mean_episode_total | 2.000
1|main CuttingEvents valid_cut mean_episode_total | 1.000
1|main BaseEnvEvents kpi max/raw_piece_usage_..| 0.000
1|main BaseEnvEvents kpi min/raw_piece_usage_..| 0.000
1|main BaseEnvEvents kpi std/raw_piece_usage_..| 0.000
1|main BaseEnvEvents kpi mean/raw_piece_usage..| 0.000