You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
744 lines
36 KiB
744 lines
36 KiB
from __future__ import annotations
|
|
from minigrid.core.grid import Grid
|
|
from minigrid.core.mission import MissionSpace
|
|
from minigrid.core.world_object import (
|
|
SlipperyEast,
|
|
SlipperySouth,
|
|
SlipperyNorth,
|
|
SlipperyWest,
|
|
SlipperyNorthEast,
|
|
Lava,
|
|
Goal
|
|
)
|
|
from minigrid.envs.adversaries_base import AdversaryEnv
|
|
from minigrid.core.tasks import FollowAgent, DoRandom, GoTo
|
|
|
|
from minigrid.minigrid_env import MiniGridEnv, is_slippery
|
|
|
|
import numpy as np
|
|
from itertools import product
|
|
|
|
class WindyCityEnv(MiniGridEnv):
|
|
def __init__(self,
|
|
randomize_start=True, size=10,
|
|
width=24,
|
|
height=22,
|
|
probability_intended=8/9,
|
|
probability_turn_intended=8/9,
|
|
obstacle_type=Lava,
|
|
goal_reward=1,
|
|
failure_penalty=-1,
|
|
per_step_penalty=0,
|
|
dense_rewards=False,
|
|
**kwargs):
|
|
|
|
self.obstacle_type = obstacle_type
|
|
self.size = size
|
|
self.probability_intended = probability_intended
|
|
self.probability_turn_intended = probability_turn_intended
|
|
|
|
if width is not None and height is not None:
|
|
self.width = width
|
|
self.height = height
|
|
elif size is not None:
|
|
self.width = size
|
|
self.height = size
|
|
else:
|
|
raise ValueError(f"Please define either width and height or a size for square environments. The set values are width: {width}, height: {height}, size: {size}.")
|
|
|
|
mission_space = MissionSpace(mission_func=self._gen_mission)
|
|
super().__init__(
|
|
width=self.width,
|
|
height=self.height,
|
|
max_steps=200,
|
|
# Set this to True for maximum speed
|
|
see_through_walls=False,
|
|
mission_space = mission_space,
|
|
**kwargs
|
|
)
|
|
|
|
self.randomize_start = randomize_start
|
|
self.goal_reward = goal_reward
|
|
self.failure_penalty = failure_penalty
|
|
self.dense_rewards = dense_rewards
|
|
self.per_step_penalty = per_step_penalty
|
|
|
|
self.trajectory = list()
|
|
|
|
@staticmethod
|
|
def _gen_mission():
|
|
return "Finish your task while avoiding the adversaries"
|
|
|
|
def disable_random_start(self):
|
|
self.randomize_start = False
|
|
|
|
def place_agent(self, spawn_on_slippery=False, agent_pos=None, agent_dir=0):
|
|
max_tries = 10_000
|
|
num_tries = 0
|
|
|
|
if self.randomize_start == True:
|
|
while True:
|
|
num_tries += 1
|
|
if num_tries > max_tries:
|
|
raise RecursionError("rejection sampling failed in place_obj")
|
|
x = np.random.randint(0, self.width)
|
|
y = np.random.randint(0, self.height)
|
|
|
|
cell = self.grid.get(*(x,y))
|
|
if ( cell is None or
|
|
(cell.can_overlap() and
|
|
not isinstance(cell, Lava) and
|
|
not isinstance(cell, Goal) and
|
|
(spawn_on_slippery or not is_slippery(cell)) and
|
|
not (x in [7, 8, 9, 10] and y in [9, 10]))
|
|
):
|
|
self.agent_pos = np.array((x, y))
|
|
self.agent_dir = np.random.randint(0, 4)
|
|
break
|
|
elif agent_dir is None:
|
|
self.agent_pos = np.array((1, 1))
|
|
self.agent_dir = 0
|
|
else:
|
|
self.agent_pos = agent_pos
|
|
self.agent_dir = agent_dir
|
|
self.trajectory.append((self.agent_pos, self.agent_dir))
|
|
|
|
def place_goal(self, goal_pos):
|
|
self.goal_pos = goal_pos
|
|
self.put_obj(Goal(), *self.goal_pos)
|
|
|
|
|
|
def printGrid(self, init=False):
|
|
grid = super().printGrid(init)
|
|
|
|
properties_str = ""
|
|
|
|
properties_str += F"ProbTurnIntended:{self.probability_turn_intended}\n"
|
|
properties_str += F"ProbForwardIntended:{self.probability_intended}\n"
|
|
|
|
return grid + properties_str
|
|
|
|
def step(self, action):
|
|
obs, reward, terminated, truncated, info = super().step(action)
|
|
self.trajectory.append((action, self.agent_pos, self.agent_dir))
|
|
if truncated and info["ran_into_lava"]:
|
|
print(self.trajectory)
|
|
print("truncated: ", info)
|
|
self.trajectory = list()
|
|
if truncated and info["reached_goal"]:
|
|
print("truncated: ", info)
|
|
self.trajectory = list()
|
|
elif terminated and info["ran_into_lava"]:
|
|
print(self.trajectory)
|
|
print("terminated: ", info)
|
|
self.trajectory = list()
|
|
elif terminated:
|
|
print("terminated: ", info)
|
|
self.trajectory = list()
|
|
elif truncated:
|
|
print("truncated: ", info)
|
|
self.trajectory = list()
|
|
return obs, reward - self.per_step_penalty, terminated, truncated, info
|
|
|
|
def reset(self, **kwargs) -> tuple[ObsType, dict[str, Any]]:
|
|
return super().reset(**kwargs)
|
|
|
|
def _place_building(self, col, row, width, height, obj_type=Lava):
|
|
for i in range(col, width + col):
|
|
self.grid.vert_wall(i, row, height, obj_type=obj_type)
|
|
|
|
def _gen_grid(self, width, height):
|
|
super()._gen_grid(width, height)
|
|
self.grid = Grid(width, height)
|
|
|
|
# Generate the surrounding walls
|
|
self.grid.horz_wall(0, 0)
|
|
self.grid.horz_wall(0, height - 1)
|
|
self.grid.vert_wall(0, 0)
|
|
self.grid.vert_wall(width - 1, 0)
|
|
|
|
for i in range(1, height - 1):
|
|
self.grid.horz_wall(1, i, width-2, obj_type=SlipperyNorthEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
|
|
self._place_building(13, 1, 4, 2)
|
|
self.grid.vert_wall(12, 1, 2, obj_type=SlipperyNorth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(13, 3, 4, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.vert_wall(17, 1, 2, obj_type=SlipperyNorth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
|
|
self._place_building(7, 3, 3, 4)
|
|
self.grid.vert_wall(6, 3, 4, obj_type=SlipperyNorth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.vert_wall(10, 3, 4, obj_type=SlipperyNorth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(7, 2, 3, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(7, 7, 3, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
|
|
self._place_building(15, 7, 6, 4)
|
|
self.grid.vert_wall(14, 7, 4, obj_type=SlipperyNorth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.vert_wall(14, 9, 2, obj_type=Lava)
|
|
self.grid.vert_wall(20, 7, 4, obj_type=SlipperyNorth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.vert_wall(13, 9, 2, obj_type=SlipperyNorth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(15, 6, 5, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(14, 11, 6, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
|
|
|
|
self._place_building(5, 11, 5, 6)
|
|
self.grid.vert_wall(4, 11, 6, obj_type=SlipperyNorth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.vert_wall(10, 11, 6, obj_type=SlipperyNorth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(5, 17, 5, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(5, 10, 5, obj_type=SlipperyWest("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(6, 9, 4, obj_type=SlipperyWest("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.vert_wall(9, 7, 4, obj_type=SlipperySouth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
|
|
self._place_building(21, 13, 2, 5)
|
|
self.grid.vert_wall(20, 13, 5, obj_type=SlipperyNorth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(21, 12, 2, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(21, 18, 2, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
|
|
|
|
|
|
self.place_agent(agent_pos=np.array((1, height -2)), agent_dir=0, spawn_on_slippery=True)
|
|
self.place_goal(np.array((width - 2, 1)))
|
|
if self.dense_rewards: self.run_bfs()
|
|
|
|
|
|
class WindyCityAdvEnv(AdversaryEnv):
|
|
def __init__(self,
|
|
randomize_start=True, size=10,
|
|
width=15,
|
|
height=15,
|
|
probability_intended=8/9,
|
|
probability_turn_intended=8/9,
|
|
obstacle_type=Lava,
|
|
goal_reward=1,
|
|
failure_penalty=-1,
|
|
per_step_penalty=0,
|
|
dense_rewards=False,
|
|
**kwargs):
|
|
|
|
self.obstacle_type = obstacle_type
|
|
self.size = size
|
|
self.probability_intended = probability_intended
|
|
self.probability_turn_intended = probability_turn_intended
|
|
|
|
if width is not None and height is not None:
|
|
self.width = width
|
|
self.height = height
|
|
elif size is not None:
|
|
self.width = size
|
|
self.height = size
|
|
else:
|
|
raise ValueError(f"Please define either width and height or a size for square environments. The set values are width: {width}, height: {height}, size: {size}.")
|
|
|
|
super().__init__(
|
|
width=self.width,
|
|
height=self.height,
|
|
max_steps=200,
|
|
# Set this to True for maximum speed
|
|
see_through_walls=False,
|
|
**kwargs
|
|
)
|
|
|
|
self.randomize_start = randomize_start
|
|
self.goal_reward = goal_reward
|
|
self.failure_penalty = failure_penalty
|
|
self.dense_rewards = dense_rewards
|
|
self.per_step_penalty = per_step_penalty
|
|
|
|
self.trajectory = list()
|
|
|
|
def disable_random_start(self):
|
|
self.randomize_start = False
|
|
|
|
def place_agent(self, spawn_on_slippery=False, agent_pos=None, agent_dir=0):
|
|
max_tries = 10_000
|
|
num_tries = 0
|
|
|
|
if self.randomize_start == True:
|
|
while True:
|
|
num_tries += 1
|
|
if num_tries > max_tries:
|
|
raise RecursionError("rejection sampling failed in place_obj")
|
|
x = np.random.randint(0, self.width)
|
|
y = np.random.randint(0, self.height)
|
|
|
|
cell = self.grid.get(*(x,y))
|
|
if ( cell is None or
|
|
(cell.can_overlap() and
|
|
not isinstance(cell, Lava) and
|
|
not isinstance(cell, Goal) and
|
|
(spawn_on_slippery or not is_slippery(cell)) and
|
|
not (x in [7, 8, 9, 10] and y in [9, 10]))
|
|
):
|
|
self.agent_pos = np.array((x, y))
|
|
self.agent_dir = np.random.randint(0, 4)
|
|
break
|
|
elif agent_dir is None:
|
|
self.agent_pos = np.array((1, 1))
|
|
self.agent_dir = 0
|
|
else:
|
|
self.agent_pos = agent_pos
|
|
self.agent_dir = agent_dir
|
|
self.trajectory.append((self.agent_pos, self.agent_dir))
|
|
|
|
def place_goal(self, goal_pos):
|
|
self.goal_pos = goal_pos
|
|
self.put_obj(Goal(), *self.goal_pos)
|
|
|
|
|
|
def printGrid(self, init=False):
|
|
grid = super().printGrid(init)
|
|
|
|
properties_str = ""
|
|
|
|
properties_str += F"ProbTurnIntended:{self.probability_turn_intended}\n"
|
|
properties_str += F"ProbForwardIntended:{self.probability_intended}\n"
|
|
|
|
return grid + properties_str
|
|
|
|
def step(self, action):
|
|
obs, reward, terminated, truncated, info = super().step(action)
|
|
self.trajectory.append((action, self.agent_pos, self.agent_dir, str(self.adversaries["blue"])))
|
|
if truncated and info["ran_into_lava"]:
|
|
print(self.trajectory)
|
|
print("truncated: ", info)
|
|
self.trajectory = list()
|
|
if truncated and info["reached_goal"]:
|
|
print("truncated: ", info)
|
|
self.trajectory = list()
|
|
elif terminated and info["ran_into_lava"]:
|
|
print(self.trajectory)
|
|
print("terminated: ", info)
|
|
self.trajectory = list()
|
|
elif terminated:
|
|
print("terminated: ", info)
|
|
self.trajectory = list()
|
|
elif truncated:
|
|
print("truncated: ", info)
|
|
self.trajectory = list()
|
|
return obs, reward - self.per_step_penalty, terminated, truncated, info
|
|
|
|
def reset(self, **kwargs) -> tuple[ObsType, dict[str, Any]]:
|
|
return super().reset(**kwargs)
|
|
|
|
def _place_building(self, col, row, width, height, obj_type=Lava):
|
|
for i in range(col, width + col):
|
|
self.grid.vert_wall(i, row, height, obj_type=obj_type)
|
|
|
|
def _gen_grid(self, width, height):
|
|
super()._gen_grid(width, height)
|
|
self.grid = Grid(width, height)
|
|
|
|
# Generate the surrounding walls
|
|
self.grid.horz_wall(0, 0)
|
|
self.grid.horz_wall(0, height - 1)
|
|
self.grid.vert_wall(0, 0)
|
|
self.grid.vert_wall(width - 1, 0)
|
|
|
|
for i in range(1, height - 1):
|
|
self.grid.horz_wall(1, i, width-2, obj_type=SlipperyNorthEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
|
|
self._place_building(7, 1, 4, 1)
|
|
self.grid.vert_wall(6, 1, 1, obj_type=SlipperyNorth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(7, 2, 4, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.vert_wall(11, 1, 1, obj_type=SlipperyNorth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
|
|
self._place_building(4, 5, 2, 1)
|
|
self.grid.vert_wall(3, 5, 1, obj_type=SlipperyNorth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.vert_wall(6, 5, 1, obj_type=SlipperyNorth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(4, 4, 2, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(4, 6, 2, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
|
|
self._place_building(12, 7, 2, 3)
|
|
self.grid.vert_wall(11, 7, 3, obj_type=SlipperyNorth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(11, 6, 3, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(11, 10, 3, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
|
|
|
|
self._place_building(4, 10, 2, 2)
|
|
self.grid.vert_wall(3, 10, 2, obj_type=SlipperyNorth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.vert_wall(6, 10, 2, obj_type=SlipperyNorth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(4, 12, 2, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(4, 9, 2, obj_type=SlipperyWest("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.vert_wall(5, 7, 3, obj_type=SlipperySouth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
|
|
|
|
#should spawn randomly
|
|
x = np.random.choice([1,2,3,6,7,8,9])
|
|
y = np.random.choice([6,7,8])
|
|
self.add_adversary(x, y, "blue", direction=1, tasks=[FollowAgent("red", duration=2), DoRandom(duration=2)], repeating=True)
|
|
|
|
self.place_agent(agent_pos=np.array((1, height -2)), agent_dir=0, spawn_on_slippery=True)
|
|
self.place_goal(np.array((width - 2, 1)))
|
|
if self.dense_rewards: self.run_bfs()
|
|
|
|
class WindyCity2Env(MiniGridEnv):
|
|
def __init__(self,
|
|
randomize_start=True, size=10,
|
|
width=27,
|
|
height=22,
|
|
probability_intended=8/9,
|
|
probability_turn_intended=8/9,
|
|
obstacle_type=Lava,
|
|
goal_reward=1,
|
|
failure_penalty=-1,
|
|
per_step_penalty=0,
|
|
dense_rewards=False,
|
|
two_player_winning_region_start=False,
|
|
**kwargs):
|
|
|
|
self.obstacle_type = obstacle_type
|
|
self.size = size
|
|
self.probability_intended = probability_intended
|
|
self.probability_turn_intended = probability_turn_intended
|
|
|
|
if width is not None and height is not None:
|
|
self.width = width
|
|
self.height = height
|
|
elif size is not None:
|
|
self.width = size
|
|
self.height = size
|
|
else:
|
|
raise ValueError(f"Please define either width and height or a size for square environments. The set values are width: {width}, height: {height}, size: {size}.")
|
|
|
|
mission_space = MissionSpace(mission_func=self._gen_mission)
|
|
super().__init__(
|
|
width=self.width,
|
|
height=self.height,
|
|
max_steps=200,
|
|
# Set this to True for maximum speed
|
|
see_through_walls=False,
|
|
mission_space = mission_space,
|
|
**kwargs
|
|
)
|
|
|
|
self.randomize_start = randomize_start
|
|
self.two_player_winning_region_start = two_player_winning_region_start
|
|
self.goal_reward = goal_reward
|
|
self.failure_penalty = failure_penalty
|
|
self.dense_rewards = dense_rewards
|
|
self.per_step_penalty = per_step_penalty
|
|
|
|
self.trajectory = list()
|
|
|
|
@staticmethod
|
|
def _gen_mission():
|
|
return "Finish your task while avoiding the adversaries"
|
|
|
|
def disable_random_start(self):
|
|
self.randomize_start = False
|
|
|
|
def place_agent(self, spawn_on_slippery=False, agent_pos=None, agent_dir=0):
|
|
max_tries = 10_000
|
|
num_tries = 0
|
|
|
|
if self.two_player_winning_region_start == True:
|
|
winning_region = list()
|
|
winning_region += product([1,2,3,4], [y for y in range(1, self.height-1)])
|
|
winning_region += product([x for x in range(1,12)], [1])
|
|
winning_region += product([x for x in range(1,self.width-10)], [self.height-2])
|
|
winning_region += product([x for x in range(self.width-6, self.width-1)], [1,2,3,4])
|
|
winning_region += product([x for x in range(self.width-11, self.width-1)], [5])
|
|
x, y= winning_region[np.random.choice(len(winning_region), 1)[0]]
|
|
self.agent_pos = np.array((x,y))
|
|
self.agent_dir = np.random.randint(0, 4)
|
|
self.trajectory.append((self.agent_pos, self.agent_dir))
|
|
return
|
|
|
|
if self.randomize_start == True:
|
|
while True:
|
|
num_tries += 1
|
|
if num_tries > max_tries:
|
|
raise RecursionError("rejection sampling failed in place_obj")
|
|
x = np.random.randint(0, self.width)
|
|
y = np.random.randint(0, self.height)
|
|
|
|
cell = self.grid.get(*(x,y))
|
|
if ( cell is None or
|
|
(cell.can_overlap() and
|
|
not isinstance(cell, Lava) and
|
|
not isinstance(cell, Goal) and
|
|
(spawn_on_slippery or not is_slippery(cell)) and
|
|
not (x in [7, 8, 9, 10] and y in [9, 10]))
|
|
):
|
|
self.agent_pos = np.array((x, y))
|
|
self.agent_dir = np.random.randint(0, 4)
|
|
break
|
|
elif agent_dir is None:
|
|
self.agent_pos = np.array((1, 1))
|
|
self.agent_dir = 0
|
|
else:
|
|
self.agent_pos = agent_pos
|
|
self.agent_dir = agent_dir
|
|
self.trajectory.append((self.agent_pos, self.agent_dir))
|
|
|
|
def place_goal(self, goal_pos):
|
|
self.goal_pos = goal_pos
|
|
self.put_obj(Goal(), *self.goal_pos)
|
|
|
|
|
|
def printGrid(self, init=False):
|
|
grid = super().printGrid(init)
|
|
|
|
properties_str = ""
|
|
|
|
properties_str += F"ProbTurnIntended:{self.probability_turn_intended}\n"
|
|
properties_str += F"ProbForwardIntended:{self.probability_intended}\n"
|
|
|
|
return grid + properties_str
|
|
|
|
def step(self, action):
|
|
obs, reward, terminated, truncated, info = super().step(action)
|
|
self.trajectory.append((action, self.agent_pos, self.agent_dir))
|
|
if truncated and info["ran_into_lava"]:
|
|
print(self.trajectory)
|
|
print("truncated: ", info)
|
|
self.trajectory = list()
|
|
if truncated and info["reached_goal"]:
|
|
print("truncated: ", info)
|
|
self.trajectory = list()
|
|
elif terminated and info["ran_into_lava"]:
|
|
print(self.trajectory)
|
|
print("terminated: ", info)
|
|
self.trajectory = list()
|
|
elif terminated:
|
|
print("terminated: ", info)
|
|
self.trajectory = list()
|
|
elif truncated:
|
|
print("truncated: ", info)
|
|
self.trajectory = list()
|
|
return obs, reward - self.per_step_penalty, terminated, truncated, info
|
|
|
|
def reset(self, **kwargs) -> tuple[ObsType, dict[str, Any]]:
|
|
return super().reset(**kwargs)
|
|
|
|
def _place_building(self, col, row, width, height, obj_type=Lava):
|
|
for i in range(col, width + col):
|
|
self.grid.vert_wall(i, row, height, obj_type=obj_type)
|
|
|
|
def _gen_grid(self, width, height):
|
|
super()._gen_grid(width, height)
|
|
self.grid = Grid(width, height)
|
|
|
|
# Generate the surrounding walls
|
|
self.grid.horz_wall(0, 0)
|
|
self.grid.horz_wall(0, height - 1)
|
|
self.grid.vert_wall(0, 0)
|
|
self.grid.vert_wall(width - 1, 0)
|
|
|
|
for i in range(1, height - 1):
|
|
self.grid.horz_wall(1, i, width-2, obj_type=SlipperyNorthEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
|
|
self.grid.horz_wall(1, 17, 15, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(1, 18, 16, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(1, 19, 17, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(1, 20, 18, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
|
|
self.grid.horz_wall(1, 7, 9, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(1, 8, 8, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(1, 9, 8, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(1, 10, 7, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
|
|
self._place_building(16, 1, 4, 2)
|
|
self.grid.vert_wall(15, 1, 2, obj_type=SlipperyNorth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(16, 3, 4, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.vert_wall(20, 1, 2, obj_type=SlipperyNorth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
|
|
self._place_building(10, 3, 3, 4)
|
|
#self.grid.vert_wall(9, 3, 4, obj_type=SlipperyNorth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.vert_wall(13, 3, 4, obj_type=SlipperyNorth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(10, 2, 3, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(10, 7, 3, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
|
|
self._place_building(16, 7, 8, 5)
|
|
self.grid.vert_wall(15, 7, 4, obj_type=SlipperyNorth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
#self.grid.vert_wall(17, 9, 3, obj_type=Lava)
|
|
self.grid.vert_wall(24, 7, 5, obj_type=SlipperyNorth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.vert_wall(15, 9, 3, obj_type=SlipperyNorth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(16, 6, 7, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(16, 12, 7, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.vert_wall(22, 12, 1, obj_type=SlipperyNorthEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.vert_wall(23, 13, 1, obj_type=SlipperyNorthEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
|
|
|
|
self._place_building(8, 11, 5, 6)
|
|
#self.grid.vert_wall(7, 11, 6, obj_type=SlipperyNorth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.vert_wall(13, 11, 6, obj_type=SlipperyNorth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(8, 17, 5, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(9, 10, 4, obj_type=SlipperyWest("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(10, 9, 3, obj_type=SlipperyWest("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.vert_wall(12, 7, 4, obj_type=SlipperySouth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
|
|
self._place_building(22, 14, 4, 4)
|
|
self.grid.vert_wall(21, 14, 4, obj_type=SlipperyNorth("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(22, 13, 4, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
self.grid.horz_wall(22, 18, 4, obj_type=SlipperyEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
|
|
|
|
#self.grid.vert_wall(22, 13, 1, obj_type=SlipperyNorthEast("white", probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
|
|
|
|
|
|
|
|
self.place_agent(agent_pos=np.array((18, height - 4)), agent_dir=3, spawn_on_slippery=True)
|
|
self.place_goal(np.array((width - 2, 10)))
|
|
if self.dense_rewards: self.run_bfs()
|
|
|
|
|
|
|
|
|
|
|
|
class WindyCitySmallAdv(AdversaryEnv):
|
|
def __init__(self,
|
|
randomize_start=True, size=10,
|
|
width=11,
|
|
height=9,
|
|
probability_intended=1,
|
|
probability_turn_intended=1,
|
|
obstacle_type=Lava,
|
|
goal_reward=1,
|
|
failure_penalty=-1,
|
|
per_step_penalty=0,
|
|
dense_rewards=False,
|
|
two_player_winning_region_start=False,
|
|
**kwargs):
|
|
|
|
self.obstacle_type = obstacle_type
|
|
self.size = size
|
|
self.probability_intended = probability_intended
|
|
self.probability_turn_intended = probability_turn_intended
|
|
|
|
if width is not None and height is not None:
|
|
self.width = width
|
|
self.height = height
|
|
elif size is not None:
|
|
self.width = size
|
|
self.height = size
|
|
else:
|
|
raise ValueError(f"Please define either width and height or a size for square environments. The set values are width: {width}, height: {height}, size: {size}.")
|
|
|
|
mission_space = MissionSpace(mission_func=self._gen_mission)
|
|
super().__init__(
|
|
width=self.width,
|
|
height=self.height,
|
|
max_steps=50,
|
|
# Set this to True for maximum speed
|
|
see_through_walls=False,
|
|
#mission_space = mission_space,
|
|
**kwargs
|
|
)
|
|
|
|
self.randomize_start = randomize_start
|
|
self.two_player_winning_region_start = two_player_winning_region_start
|
|
self.goal_reward = goal_reward
|
|
self.failure_penalty = failure_penalty
|
|
self.dense_rewards = dense_rewards
|
|
self.per_step_penalty = per_step_penalty
|
|
|
|
self.trajectory = list()
|
|
|
|
@staticmethod
|
|
def _gen_mission():
|
|
return "Finish your task while avoiding the adversaries"
|
|
|
|
def disable_random_start(self):
|
|
self.randomize_start = False
|
|
|
|
def place_agent(self, spawn_on_slippery=False, agent_pos=None, agent_dir=0):
|
|
max_tries = 10_000
|
|
num_tries = 0
|
|
|
|
if self.randomize_start == True:
|
|
while True:
|
|
num_tries += 1
|
|
if num_tries > max_tries:
|
|
raise RecursionError("rejection sampling failed in place_obj")
|
|
x = np.random.randint(0, self.width)
|
|
y = np.random.randint(5, self.height)
|
|
|
|
cell = self.grid.get(*(x,y))
|
|
if ( cell is None or
|
|
(cell.can_overlap() and
|
|
not isinstance(cell, Lava) and
|
|
not isinstance(cell, Goal) and
|
|
(spawn_on_slippery or not is_slippery(cell)) and
|
|
not (x in [7, 8, 9, 10] and y in [9, 10]))
|
|
):
|
|
self.agent_pos = np.array((x, y))
|
|
self.agent_dir = np.random.randint(0, 4)
|
|
break
|
|
elif agent_dir is None:
|
|
self.agent_pos = np.array((1, 1))
|
|
self.agent_dir = 0
|
|
else:
|
|
self.agent_pos = agent_pos
|
|
self.agent_dir = agent_dir
|
|
self.trajectory.append((self.agent_pos, self.agent_dir))
|
|
|
|
def place_goal(self, goal_pos):
|
|
self.goal_pos = goal_pos
|
|
self.put_obj(Goal(), *self.goal_pos)
|
|
|
|
|
|
def printGrid(self, init=False):
|
|
grid = super().printGrid(init)
|
|
|
|
properties_str = ""
|
|
|
|
properties_str += F"ProbTurnIntended:{self.probability_turn_intended}\n"
|
|
properties_str += F"ProbForwardIntended:{self.probability_intended}\n"
|
|
|
|
return grid + properties_str
|
|
|
|
def step(self, action):
|
|
obs, reward, terminated, truncated, info = super().step(action)
|
|
self.trajectory.append((action, self.agent_pos, self.agent_dir))
|
|
if truncated and info["ran_into_lava"]:
|
|
print(self.trajectory)
|
|
print("truncated: ", info)
|
|
self.trajectory = list()
|
|
if truncated and info["reached_goal"]:
|
|
print("truncated: ", info)
|
|
self.trajectory = list()
|
|
elif terminated and info["ran_into_lava"]:
|
|
print(self.trajectory)
|
|
print("terminated: ", info)
|
|
self.trajectory = list()
|
|
elif terminated and info["collision"]:
|
|
print(self.trajectory)
|
|
print("terminated: ", info)
|
|
self.trajectory = list()
|
|
elif terminated:
|
|
print("terminated: ", info)
|
|
self.trajectory = list()
|
|
elif truncated:
|
|
print("truncated: ", info)
|
|
self.trajectory = list()
|
|
return obs, reward - self.per_step_penalty, terminated, truncated, info
|
|
|
|
def reset(self, **kwargs) -> tuple[ObsType, dict[str, Any]]:
|
|
return super().reset(**kwargs)
|
|
|
|
def _place_building(self, col, row, width, height, obj_type=Lava):
|
|
for i in range(col, width + col):
|
|
self.grid.vert_wall(i, row, height, obj_type=obj_type)
|
|
|
|
def _gen_grid(self, width, height):
|
|
super()._gen_grid(width, height)
|
|
self.grid = Grid(width, height)
|
|
|
|
# Generate the surrounding walls
|
|
self.grid.horz_wall(0, 0)
|
|
self.grid.horz_wall(0, height - 1)
|
|
self.grid.vert_wall(0, 0)
|
|
self.grid.vert_wall(width - 1, 0)
|
|
|
|
|
|
self._place_building(3, 3, 5, 2)
|
|
blue_adv = self.add_adversary(2, 4, "blue", direction=3, tasks=
|
|
[GoTo((2,2)), GoTo((8,2)), GoTo((8,4)), GoTo((8,2)), GoTo((2,2)), GoTo((2,4))], repeating=True)
|
|
|
|
|
|
self.place_agent(agent_pos=np.array((5, 5)), agent_dir=3, spawn_on_slippery=True)
|
|
self.place_goal(np.array((width//2, 1)))
|
|
if self.dense_rewards: self.run_bfs()
|
|
|
|
|
|
|