8. Adding Reward Customization

The complete code for this part of the tutorial can be found here

# file structure
- cutting_2d
    - main.py  # modified
    - env
        - core_env.py  # modified
        - inventory.py
        - maze_state.py
        - maze_action.py
        - renderer.py
        - maze_env.py  # modified
        - events.py
        - kpi_calculator.py
    - space_interfaces
        - dict_action_conversion.py
        - dict_observation_conversion.py
    - reward
        - default_reward.py  # new

8.1. Reward

In this part of the tutorial we introduce how to reuse the event system for reward shaping and customization via the RewardAggregatorInterface.

In Maze, reward aggregators usually calculate reward from the current environment state, events that happened during the last step, or a combination thereof. Calculating reward from state is generally simpler, but not a good fit for this environment – here, the reward is more concerned with what happened (was an invalid cut attempted? A new raw piece replenished?) than with the current state (i.e., the inventory state after the step). Hence, the reward calculation here is based on events (which is in general more flexible than using the environment state only).

The DefaultRewardAggregator does the following:

  • Requests the required event interfaces via get_interfaces (here CuttingEvents and InventoryEvents).

  • Collects rewards and penalties according to relevant events.

  • Aggregates the individual event rewards and penalties to a single scalar reward signal.

Note that this reward aggregator can have any form as long as it provides a scalar reward function that can be used for training. This gives a lot of flexibility in shaping rewards without the need to change the actual implementation of the environment (more on this topic).

reward/default_reward.py
from typing import List, Optional

from maze.core.annotations import override
from maze.core.env.maze_state import MazeStateType
from maze.core.env.reward import RewardAggregatorInterface
from ..env.events import CuttingEvents, InventoryEvents


class DefaultRewardAggregator(RewardAggregatorInterface):
    """Default reward scheme for the 2D cutting env.

    :param invalid_action_penalty: Negative reward assigned for an invalid cutting specification.
    :param raw_piece_usage_penalty: Negative reward assigned for starting a new raw inventory piece.
    """

    def __init__(self, invalid_action_penalty: float, raw_piece_usage_penalty: float):
        super().__init__()
        self.invalid_action_penalty = invalid_action_penalty
        self.raw_piece_usage_penalty = raw_piece_usage_penalty

    @override(RewardAggregatorInterface)
    def get_interfaces(self):
        """Specification of the event interfaces this subscriber wants to receive events from.
        Every subscriber must implement this configuration method.
        :return: A list of interface classes"""
        return [CuttingEvents, InventoryEvents]

    @override(RewardAggregatorInterface)
    def summarize_reward(self, maze_state: Optional[MazeStateType] = None) -> float:
        """Assign rewards and penalties according to respective events.

        :param maze_state: Not used by this reward aggregator.
        :return: List of individual event rewards.
        """

        rewards: List[float] = []

        # penalty for starting a new raw inventory piece
        for _ in self.query_events(InventoryEvents.piece_replenished):
            rewards.append(self.raw_piece_usage_penalty)

        # penalty for selecting an invalid piece for cutting
        for _ in self.query_events(CuttingEvents.invalid_piece_selected):
            rewards.append(self.invalid_action_penalty)

        # penalty for specifying invalid cutting parameters
        for _ in self.query_events(CuttingEvents.invalid_cut):
            rewards.append(self.invalid_action_penalty)

        return sum(rewards)

8.2. Updating the Core- and MazeEnv

We also have to make a few modifications in the CoreEnv:

  • Initialize the reward aggregator in the constructor.

  • Instead of accumulating reward in the if-else branches of the step function we summarize it only once at the end.

env/core_env.py
...

class Cutting2DCoreEnvironment(CoreEnv):
    """Environment for cutting 2D pieces based on the customer demand. Works as follows:
    ...
    :param reward_aggregator: Either an instantiated aggregator or a configuration dictionary.
    """

    def __init__(self, max_pieces_in_inventory: int, raw_piece_size: (int, int), static_demand: (int, int),
                 reward_aggregator: RewardAggregatorInterface):
        super().__init__()

        ...

        # init reward and register it with pubsub
        self.reward_aggregator = reward_aggregator
        self.pubsub.register_subscriber(self.reward_aggregator)

    def step(self, maze_action: Cutting2DMazeAction) -> Tuple[Cutting2DMazeState, np.array, bool, Dict[Any, Any]]:
        """Summary of the step (simplified, not necessarily respecting the actual order in the code):
        1. Check if the selected piece to cut is valid (i.e. in inventory, large enough etc.)
        2. Attempt the cutting
        3. Replenish a fresh piece if needed and return an appropriate reward

        :param maze_action: Cutting maze_action to take.
        :return: state, reward, done, info
        """

        info = {}
        replenishment_needed = False

        # check if valid piece id was selected
        if maze_action.piece_id >= self.inventory.size():
            self.cutting_events.invalid_piece_selected()
        # perform cutting
        else:
            piece_to_cut = self.inventory.pieces[maze_action.piece_id]

            # attempt the cut
            if self.inventory.cut(maze_action, self.current_demand):
                self.cutting_events.valid_cut(current_demand=self.current_demand, piece_to_cut=piece_to_cut,
                                              raw_piece_size=self.raw_piece_size)
                replenishment_needed = piece_to_cut == self.raw_piece_size
            else:
                # assign a negative reward for invalid cutting attempts
                self.cutting_events.invalid_cut(current_demand=self.current_demand, piece_to_cut=piece_to_cut,
                                                raw_piece_size=self.raw_piece_size)

        # check if replenishment is required
        if replenishment_needed:
            self.inventory.replenish_piece()
            # assign negative reward if a piece has to be replenished

        # step execution finished, write step statistics
        self.inventory.log_step_statistics()

        # compile env state
        maze_state = self.get_maze_state()

        # aggregate reward from events
        reward = self.reward_aggregator.summarize_reward(maze_state)

        return maze_state, reward, False, info

Finally, we update the maze_env_factory function for instantiating the trainable MazeEnv and we are all set up for training with event based, customized rewards.

env/maze_env.py
...


def maze_env_factory(max_pieces_in_inventory: int, raw_piece_size: (int, int),
                     static_demand: (int, int)) -> Cutting2DEnvironment:
    """Convenience factory function that compiles a trainable maze environment.
    (for argument details see: Cutting2DCoreEnvironment)
    """

    # init reward aggregator
    reward_aggregator = DefaultRewardAggregator(invalid_action_penalty=-2, raw_piece_usage_penalty=-1)

    # init core environment
    core_env = Cutting2DCoreEnvironment(max_pieces_in_inventory=max_pieces_in_inventory,
                                        raw_piece_size=raw_piece_size,
                                        static_demand=static_demand,
                                        reward_aggregator=reward_aggregator)

    # init maze environment including observation and action interfaces
    action_conversion = ActionConversion(max_pieces_in_inventory=max_pieces_in_inventory)
    observation_conversion = ObservationConversion(raw_piece_size=raw_piece_size,
                                                   max_pieces_in_inventory=max_pieces_in_inventory)
    return Cutting2DEnvironment(core_env, action_conversion, observation_conversion)

8.3. Where to Go Next

As the reward is implemented via a reward aggregator that is methodologically identical to the initial version there is no need to retrain the model for now. However, we highly recommend to proceed with the more advanced tutorial on Structured Environments and Action Masking.