6. Adding Events and KPIs

The complete code for this part of the tutorial can be found here

# file structure
- cutting_2d
    - main.py  # modified
    - env
        - core_env.py  # modified
        - inventory.py  # modified
        - maze_state.py
        - maze_action.py
        - renderer.py
        - maze_env.py
        - events.py  # new
        - kpi_calculator.py  # new
    - space_interfaces
        - dict_action_conversion.py
        - dict_observation_conversion.py
    - conf ...

6.1. Events

In the previous section we have trained the initial version of our cutting environment and already learned how we can watch the training process with commandline and Tensorboard logging. However, watching only standard metrics such as reward or episode step count is not always too informative with respect to the agents behaviour and the problem at hand.

For example we might be interested in how often an agent selects an invalid cutting piece or specifies and invalid cutting setting. To tackle this issue and to enable better inspection and logging tools we introduce an event system that will be also reused in the reward customization section of this tutorial.

In particular, we introduce two event types related to the cutting process as well as inventory management. For each event we can define which statistics are computed at which stage of the aggregation process (event, step, epoch) via event decorators:

  • @define_step_stats(len): Events \(e_i\) are collected as a list of events \(\{e_i\}\). The len function counts how often such an event occurred in the current environment step \(Stats_{Step}=|\{e_i\}|\).

  • @define_episode_stats(sum): Defines how the \(S\) step statistics should be aggregated to episode statistics by simply summing them up: \(Stats_{Episode}=\sum^S Stats_{Step}\)

  • @define_epoch_stats(np.mean, output_name="mean_episode_total"): A training epoch consists of N episodes. This decorator defines that epoch statistics should be the average of the contained episodes: \(Stats_{Epoch}=(\sum^N Stats_{Episode})/N\)

Below we will see that theses statistics will now be considered by the logging system as InventoryEvents and CuttingEvents. For more details on event decorators and the underlying working principles we refer to the dedicated section on event and KPI logging.

env/events.py
from abc import ABC

import numpy as np
from maze.core.log_stats.event_decorators import define_step_stats, define_episode_stats, define_epoch_stats


class CuttingEvents(ABC):
    """Events related to the cutting process."""

    @define_epoch_stats(np.mean, output_name="mean_episode_total")
    @define_episode_stats(sum)
    @define_step_stats(len)
    def invalid_piece_selected(self):
        """An invalid piece is selected for cutting."""

    @define_epoch_stats(np.mean, output_name="mean_episode_total")
    @define_episode_stats(sum)
    @define_step_stats(len)
    def valid_cut(self, current_demand: (int, int), piece_to_cut: (int, int), raw_piece_size: (int, int),
                  cutting_area: float):
        """A valid cut was performed."""

    @define_epoch_stats(np.mean, output_name="mean_episode_total")
    @define_episode_stats(sum)
    @define_step_stats(len)
    def invalid_cut(self, current_demand: (int, int), piece_to_cut: (int, int), raw_piece_size: (int, int)):
        """Invalid cutting parameters have been specified."""


class InventoryEvents(ABC):
    """Events related to inventory management."""

    @define_epoch_stats(np.mean, output_name="mean_episode_total")
    @define_episode_stats(sum)
    @define_step_stats(len)
    def piece_discarded(self, piece: (int, int)):
        """The inventory is full and a piece has been discarded."""

    @define_epoch_stats(np.mean, input_name="step_mean", output_name="step_mean")
    @define_epoch_stats(max, input_name="step_max", output_name="step_max")
    @define_episode_stats(np.mean, output_name="step_mean")
    @define_episode_stats(max, output_name="step_max")
    @define_step_stats(None)
    def pieces_in_inventory(self, value: int):
        """Reports the count of pieces currently in the inventory."""

    @define_epoch_stats(np.mean, output_name="mean_episode_total")
    @define_episode_stats(sum)
    @define_step_stats(len)
    def piece_replenished(self):
        """A new raw cutting piece has been replenished."""

6.2. KPI Calculator

The goal of the cutting 2d environment is to learn a cutting policy that requires as little as possible raw inventory pieces for fulfilling upcoming customer demand. This metric is exactly what we define as the KPI to watch and optimize, e.g. the raw_piece_usage_per_step.

As you will see below the logging system considers such KPIs and prints statistics of these along with the remaining BaseEnvEvents.

env/kpi_calculator.py
from typing import Dict

from maze.core.env.maze_state import MazeStateType
from maze.core.log_events.kpi_calculator import KpiCalculator
from maze.core.log_events.episode_event_log import EpisodeEventLog
from .events import InventoryEvents


class Cutting2dKpiCalculator(KpiCalculator):
    """KPIs for 2D cutting environment.
    The following KPIs are available: Raw pieces used per step
    """

    def calculate_kpis(self, episode_event_log: EpisodeEventLog, last_maze_state: MazeStateType) -> Dict[str, float]:
        """Calculates the KPIs at the end of episode."""

        # get overall step count of episode
        step_count = len(episode_event_log.step_event_logs)
        # count raw inventory piece replenishment events
        raw_piece_usage = 0
        for _ in episode_event_log.query_events(InventoryEvents.piece_replenished):
            raw_piece_usage += 1
        # compute step normalized raw piece usage
        return {"raw_piece_usage_per_step": raw_piece_usage / step_count}

6.3. Updating CoreEnv and Inventory

There are also a few changes we have to make in the CoreEnvironment:

  • initialize the Publisher-Subscriber and the KPI Calculator

  • creating the event topics for cutting and inventory events when setting up the environment

  • instead of writing relevant events into the info dictionary in the step function we can now trigger the respective events.

env/core_env.py
...
from maze.core.events.pubsub import Pubsub
from .events import CuttingEvents, InventoryEvents
from .kpi_calculator import Cutting2dKpiCalculator


class Cutting2DCoreEnvironment(CoreEnv):

    def __init__(self, max_pieces_in_inventory: int, raw_piece_size: (int, int), static_demand: (int, int)):
        super().__init__()

        ...

        # init pubsub for event to reward routing
        self.pubsub = Pubsub(self.context.event_service)

        # KPIs calculation
        self.kpi_calculator = Cutting2dKpiCalculator()

    def _setup_env(self):
        """Setup environment."""
        inventory_events = self.pubsub.create_event_topic(InventoryEvents)
        self.inventory = Inventory(self.max_pieces_in_inventory, self.raw_piece_size, inventory_events)
        self.inventory.replenish_piece()

        self.cutting_events = self.pubsub.create_event_topic(CuttingEvents)

    def step(self, maze_action: Cutting2DMazeAction) -> Tuple[Cutting2DMazeState, np.array, bool, Dict[Any, Any]]:
        """Summary of the step (simplified, not necessarily respecting the actual order in the code):
        1. Check if the selected piece to cut is valid (i.e. in inventory, large enough etc.)
        2. Attempt the cutting
        3. Replenish a fresh piece if needed and return an appropriate reward

        :param maze_action: Cutting MazeAction to take.
        :return: maze_state, reward, done, info
        """

        info, reward = {}, 0
        replenishment_needed = False

        # check if valid piece id was selected
        if maze_action.piece_id >= self.inventory.size():
            self.cutting_events.invalid_piece_selected()
        # perform cutting
        else:
            piece_to_cut = self.inventory.pieces[maze_action.piece_id]

            # attempt the cut
            if self.inventory.cut(maze_action, self.current_demand):
                self.cutting_events.valid_cut(current_demand=self.current_demand, piece_to_cut=piece_to_cut,
                                              raw_piece_size=self.raw_piece_size)
                replenishment_needed = piece_to_cut == self.raw_piece_size
            else:
                # assign a negative reward for invalid cutting attempts
                self.cutting_events.invalid_cut(current_demand=self.current_demand, piece_to_cut=piece_to_cut,
                                                raw_piece_size=self.raw_piece_size)
                reward = -2

        # check if replenishment is required
        if replenishment_needed:
            self.inventory.replenish_piece()
            # assign negative reward if a piece has to be replenished
            reward = -1

        # step execution finished, write step statistics
        self.inventory.log_step_statistics()

        # compile env state
        maze_state = self.get_maze_state()

        return maze_state, reward, False, info

    def get_kpi_calculator(self) -> Cutting2dKpiCalculator:
        """KPIs are supported."""
        return self.kpi_calculator

For the inventory we proceed analogously and also trigger the respective events.

env/inventory.py
...
from .events import InventoryEvents


class Inventory:
    """Holds the inventory of 2D pieces and performs cutting.
    :param max_pieces_in_inventory: Size of the inventory. If full, the oldest pieces get discarded.
    :param raw_piece_size: Size of a fresh raw (= full-size) piece.
    :param inventory_events: Inventory event dispatch proxy.
    """

    def __init__(self, max_pieces_in_inventory: int, raw_piece_size: (int, int),
                 inventory_events: InventoryEvents):
        ...

        self.inventory_events = inventory_events

    def store_piece(self, piece: (int, int)) -> None:
        """Store the given piece.
        :param piece: Piece to store.
        """
        # If we would run out of storage space, discard the oldest piece first
        if self.is_full():
            self.pieces.pop(0)
            self.inventory_events.piece_discarded(piece=piece)

        self.pieces.append(piece)

    def replenish_piece(self) -> None:
        """Add a fresh raw piece to inventory."""
        self.store_piece(self.raw_piece_size)
        self.inventory_events.piece_replenished()

    def log_step_statistics(self):
        """Log inventory statistics once per step"""
        self.inventory_events.pieces_in_inventory(self.size())

6.4. Test Script

The following snippet will instantiate the environment and run it for 15 steps.

To get access to event and KPI logging we need to wrap the environment with the LogStatsWrapper. To simplify the statistics logging setup we rely on the SimpleStatsLoggingSetup helper class.

main.py
""" Test script CoreEnv """
from maze.utils.log_stats_utils import SimpleStatsLoggingSetup
from maze.core.wrappers.log_stats_wrapper import LogStatsWrapper
from tutorial_maze_env.part04_events.env.maze_env import maze_env_factory


def main():
    # init maze environment including observation and action interfaces
    env = maze_env_factory(max_pieces_in_inventory=200,
                           raw_piece_size=[100, 100],
                           static_demand=(30, 15))

    # wrap environment with logging wrapper
    env = LogStatsWrapper(env, logging_prefix="main")

    # register a console writer and connect the writer to the statistics logging system
    with SimpleStatsLoggingSetup(env):
        # reset environment
        obs = env.reset()
        # run interaction loop
        for i in range(15):
            # sample random action
            action = env.action_space.sample()

            # take actual environment step
            obs, reward, done, info = env.step(action)


if __name__ == "__main__":
    """ main """
    main()

When running the script you will get an output as shown below. Note that statistics of both, events and KPIs, are printed along with default reward or action statistics.

 step|path                                                                      |               value
=====|==========================================================================|====================
    1|main    DiscreteActionEvents  action                substep_0/order       |     [len:15, μ:0.5]
    1|main    DiscreteActionEvents  action                substep_0/piece_idx   |    [len:15, μ:82.3]
    1|main    DiscreteActionEvents  action                substep_0/rotation    |     [len:15, μ:0.7]
    1|main    BaseEnvEvents         reward                median_step_count     |              15.000
    1|main    BaseEnvEvents         reward                mean_step_count       |              15.000
    1|main    BaseEnvEvents         reward                total_step_count      |              15.000
    1|main    BaseEnvEvents         reward                total_episode_count   |               1.000
    1|main    BaseEnvEvents         reward                episode_count         |               1.000
    1|main    BaseEnvEvents         reward                std                   |               0.000
    1|main    BaseEnvEvents         reward                mean                  |             -29.000
    1|main    BaseEnvEvents         reward                min                   |             -29.000
    1|main    BaseEnvEvents         reward                max                   |             -29.000
    1|main    InventoryEvents       piece_replenished     mean_episode_total    |               3.000
    1|main    InventoryEvents       pieces_in_inventory   step_max              |             200.000
    1|main    InventoryEvents       pieces_in_inventory   step_mean             |             200.000
    1|main    CuttingEvents         invalid_cut           mean_episode_total    |              14.000
    1|main    InventoryEvents       piece_discarded       mean_episode_total    |               2.000
    1|main    CuttingEvents         valid_cut             mean_episode_total    |               1.000
    1|main    BaseEnvEvents         kpi                   max/raw_piece_usage_..|               0.000
    1|main    BaseEnvEvents         kpi                   min/raw_piece_usage_..|               0.000
    1|main    BaseEnvEvents         kpi                   std/raw_piece_usage_..|               0.000
    1|main    BaseEnvEvents         kpi                   mean/raw_piece_usage..|               0.000