(WIP) Creating Environments: Action Masking#
Introduction#
In many environments, it is natural for some actions to be invalid at certain times. For example, in a game of chess, it is impossible to move a pawn forward if it is already at the front of the board. In PettingZoo, we can use action masking to prevent invalid actions from being taken.
Action masking is a more natural way of handling invalid actions than having an action have no effect, which was how we handled bumping into walls in the previous tutorial.
Code#
/custom-environment/env/custom_environment.py#
import functools
import random
from copy import copy
import numpy as np
from gymnasium.spaces import Discrete, MultiDiscrete
from pettingzoo.utils.env import ParallelEnv
class CustomEnvironment(ParallelEnv):
def __init__(self):
self.escape_y = None
self.escape_x = None
self.guard_y = None
self.guard_x = None
self.prisoner_y = None
self.prisoner_x = None
self.timestep = None
self.possible_agents = ["prisoner", "guard"]
def reset(self, seed=None, return_info=False, options=None):
self.agents = copy(self.possible_agents)
self.timestep = 0
self.prisoner_x = 0
self.prisoner_y = 0
self.guard_x = 7
self.guard_y = 7
self.escape_x = random.randint(2, 5)
self.escape_y = random.randint(2, 5)
observation = (
self.prisoner_x + 7 * self.prisoner_y,
self.guard_x + 7 * self.guard_y,
self.escape_x + 7 * self.escape_y,
)
observations = {
"prisoner": {"observation": observation, "action_mask": [0, 1, 1, 0]},
"guard": {"observation": observation, "action_mask": [1, 0, 0, 1]},
}
return observations
def step(self, actions):
# Execute actions
prisoner_action = actions["prisoner"]
guard_action = actions["guard"]
if prisoner_action == 0 and self.prisoner_x > 0:
self.prisoner_x -= 1
elif prisoner_action == 1 and self.prisoner_x < 6:
self.prisoner_x += 1
elif prisoner_action == 2 and self.prisoner_y > 0:
self.prisoner_y -= 1
elif prisoner_action == 3 and self.prisoner_y < 6:
self.prisoner_y += 1
if guard_action == 0 and self.guard_x > 0:
self.guard_x -= 1
elif guard_action == 1 and self.guard_x < 6:
self.guard_x += 1
elif guard_action == 2 and self.guard_y > 0:
self.guard_y -= 1
elif guard_action == 3 and self.guard_y < 6:
self.guard_y += 1
# Generate action masks
prisoner_action_mask = np.ones(4)
if self.prisoner_x == 0:
prisoner_action_mask[0] = 0 # Block left movement
elif self.prisoner_x == 6:
prisoner_action_mask[1] = 0 # Block right movement
if self.prisoner_y == 0:
prisoner_action_mask[2] = 0 # Block down movement
elif self.prisoner_y == 6:
prisoner_action_mask[3] = 0 # Block up movement
guard_action_mask = np.ones(4)
if self.guard_x == 0:
guard_action_mask[0] = 0
elif self.guard_x == 6:
guard_action_mask[1] = 0
if self.guard_y == 0:
guard_action_mask[2] = 0
elif self.guard_y == 6:
guard_action_mask[3] = 0
if self.guard_x - 1 == self.escape_x:
guard_action_mask[0] = 0
elif self.guard_x + 1 == self.escape_x:
guard_action_mask[1] = 0
if self.guard_y - 1 == self.escape_y:
guard_action_mask[2] = 0
elif self.guard_y + 1 == self.escape_y:
guard_action_mask[3] = 0
# Check termination conditions
terminations = {a: False for a in self.agents}
rewards = {a: 0 for a in self.agents}
if self.prisoner_x == self.guard_x and self.prisoner_y == self.guard_y:
rewards = {"prisoner": -1, "guard": 1}
terminations = {a: True for a in self.agents}
self.agents = []
elif self.prisoner_x == self.escape_x and self.prisoner_y == self.escape_y:
rewards = {"prisoner": 1, "guard": -1}
terminations = {a: True for a in self.agents}
self.agents = []
# Check truncation conditions (overwrites termination conditions)
truncations = {"prisoner": False, "guard": False}
if self.timestep > 100:
rewards = {"prisoner": 0, "guard": 0}
truncations = {"prisoner": True, "guard": True}
self.agents = []
self.timestep += 1
# Get observations
observation = (
self.prisoner_x + 7 * self.prisoner_y,
self.guard_x + 7 * self.guard_y,
self.escape_x + 7 * self.escape_y,
)
observations = {
"prisoner": {
"observation": observation,
"action_mask": prisoner_action_mask,
},
"guard": {"observation": observation, "action_mask": guard_action_mask},
}
# Get dummy infos (not used in this example)
infos = {"prisoner": {}, "guard": {}}
return observations, rewards, terminations, truncations, infos
def render(self):
grid = np.zeros((7, 7))
grid[self.prisoner_y, self.prisoner_x] = "P"
grid[self.guard_y, self.guard_x] = "G"
grid[self.escape_y, self.escape_x] = "E"
print(f"{grid} \n")
@functools.lru_cache(maxsize=None)
def observation_space(self, agent):