(WIP) Creating Environments: Action Masking#
In many environments, it is natural for some actions to be invalid at certain times. For example, in a game of chess, it is impossible to move a pawn forward if it is already at the front of the board. In PettingZoo, we can use action masking to prevent invalid actions from being taken.
Action masking is a more natural way of handling invalid actions than having an action have no effect, which was how we handled bumping into walls in the previous tutorial.
import functools
import random
from copy import copy
import numpy as np
from gymnasium.spaces import Discrete, MultiDiscrete
from pettingzoo.utils.env import ParallelEnv
class CustomEnvironment(ParallelEnv):
def __init__(self):
self.escape_y = None
self.escape_x = None
self.guard_y = None
self.guard_x = None
self.prisoner_y = None
self.prisoner_x = None
self.timestep = None
self.possible_agents = ["prisoner", "guard"]
def reset(self, seed=None, return_info=False, options=None):
self.agents = copy(self.possible_agents)
self.timestep = 0
self.prisoner_x = 0
self.prisoner_y = 0
self.guard_x = 7
self.guard_y = 7
self.escape_x = random.randint(2, 5)
self.escape_y = random.randint(2, 5)
observation = (
self.prisoner_x + 7 * self.prisoner_y,
self.guard_x + 7 * self.guard_y,
self.escape_x + 7 * self.escape_y,
observations = {
"prisoner": {"observation": observation, "action_mask": [0, 1, 1, 0]},
"guard": {"observation": observation, "action_mask": [1, 0, 0, 1]},
return observations
def step(self, actions):
# Execute actions
prisoner_action = actions["prisoner"]
guard_action = actions["guard"]
if prisoner_action == 0 and self.prisoner_x > 0:
self.prisoner_x -= 1
elif prisoner_action == 1 and self.prisoner_x < 6:
self.prisoner_x += 1
elif prisoner_action == 2 and self.prisoner_y > 0:
self.prisoner_y -= 1
elif prisoner_action == 3 and self.prisoner_y < 6:
self.prisoner_y += 1
if guard_action == 0 and self.guard_x > 0:
self.guard_x -= 1
elif guard_action == 1 and self.guard_x < 6:
self.guard_x += 1
elif guard_action == 2 and self.guard_y > 0:
self.guard_y -= 1
elif guard_action == 3 and self.guard_y < 6:
self.guard_y += 1
# Generate action masks
prisoner_action_mask = np.ones(4)
if self.prisoner_x == 0:
prisoner_action_mask[0] = 0 # Block left movement
elif self.prisoner_x == 6:
prisoner_action_mask[1] = 0 # Block right movement
if self.prisoner_y == 0:
prisoner_action_mask[2] = 0 # Block down movement
elif self.prisoner_y == 6:
prisoner_action_mask[3] = 0 # Block up movement
guard_action_mask = np.ones(4)
if self.guard_x == 0:
guard_action_mask[0] = 0
elif self.guard_x == 6:
guard_action_mask[1] = 0
if self.guard_y == 0:
guard_action_mask[2] = 0
elif self.guard_y == 6:
guard_action_mask[3] = 0
if self.guard_x - 1 == self.escape_x:
guard_action_mask[0] = 0
elif self.guard_x + 1 == self.escape_x:
guard_action_mask[1] = 0
if self.guard_y - 1 == self.escape_y:
guard_action_mask[2] = 0
elif self.guard_y + 1 == self.escape_y:
guard_action_mask[3] = 0
# Check termination conditions
terminations = {a: False for a in self.agents}
rewards = {a: 0 for a in self.agents}
if self.prisoner_x == self.guard_x and self.prisoner_y == self.guard_y:
rewards = {"prisoner": -1, "guard": 1}
terminations = {a: True for a in self.agents}
self.agents = []
elif self.prisoner_x == self.escape_x and self.prisoner_y == self.escape_y:
rewards = {"prisoner": 1, "guard": -1}
terminations = {a: True for a in self.agents}
self.agents = []
# Check truncation conditions (overwrites termination conditions)
truncations = {"prisoner": False, "guard": False}
if self.timestep > 100:
rewards = {"prisoner": 0, "guard": 0}
truncations = {"prisoner": True, "guard": True}
self.agents = []
self.timestep += 1
# Get observations
observation = (
self.prisoner_x + 7 * self.prisoner_y,
self.guard_x + 7 * self.guard_y,
self.escape_x + 7 * self.escape_y,
observations = {
"prisoner": {
"observation": observation,
"action_mask": prisoner_action_mask,
"guard": {"observation": observation, "action_mask": guard_action_mask},
# Get dummy infos (not used in this example)
infos = {"prisoner": {}, "guard": {}}
return observations, rewards, terminations, truncations, infos
def render(self):
grid = np.zeros((7, 7))
grid[self.prisoner_y, self.prisoner_x] = "P"
grid[self.guard_y, self.guard_x] = "G"
grid[self.escape_y, self.escape_x] = "E"
print(f"{grid} \n")
def observation_space(self, agent):