You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

274 lines
9.8 KiB

from __future__ import annotations
from minigrid.core.constants import COLOR_NAMES
from minigrid.core.grid import Grid
from minigrid.core.mission import MissionSpace
from minigrid.core.world_object import (
Ball,
Box,
Key,
Slippery,
SlipperyEast,
SlipperySouth,
SlipperyNorth,
SlipperyWest,
Lava,
Goal,
Point
)
from minigrid.minigrid_env import MiniGridEnv, is_slippery
import numpy as np
class LavaSlipperyEnv(MiniGridEnv):
def __init__(self,
randomize_start=True, size=12,
width=None,
height=None,
probability_intended=8/9,
probability_turn_intended=8/9,
obstacle_type=Lava,
goal_reward=1,
failure_penalty=-1,
per_step_penalty=0,
dense_rewards=False,
**kwargs):
self.obstacle_type = obstacle_type
self.size = size
self.probability_intended = probability_intended
self.probability_turn_intended = probability_turn_intended
if width is not None and height is not None:
self.width = width
self.height = height
elif size is not None:
self.width = size
self.height = size
else:
raise ValueError(f"Please define either width and height or a size for square environments. The set values are width: {width}, height: {height}, size: {size}.")
if obstacle_type == Lava:
mission_space = MissionSpace(
mission_func=lambda: "avoid the lava and get to the green goal square"
)
else:
mission_space = MissionSpace(
mission_func=lambda: "find the opening and get to the green goal square"
)
super().__init__(
mission_space=mission_space,
width=self.width,
height=self.height,
max_steps=200,
# Set this to True for maximum speed
see_through_walls=False,
**kwargs
)
self.randomize_start = randomize_start
self.goal_reward = goal_reward
self.failure_penalty = failure_penalty
self.dense_rewards = dense_rewards
self.per_step_penalty = per_step_penalty
def _place_slippery_lava(self, x, y):
self.put_obj(Lava(), x, y)
self.put_obj(self._create_slippery_north(), x, y - 1)
self.put_obj(self._create_slippery_south(), x, y + 1)
self.put_obj(self._create_slippery_east(), x + 1, y)
self.put_obj(self._create_slippery_west(), x - 1, y)
def create_slippery_lava_line(self, y, x_start, x_end, no_slippery_left=False, no_slippery_right=False):
if not no_slippery_left:
self.put_obj(self._create_slippery_west(), x_start - 1, y)
if not no_slippery_right:
self.put_obj(self._create_slippery_east(), x_end + 1 , y)
for x in range(x_start, x_end + 1):
self.put_obj(Lava(), x, y)
def _gen_grid(self, width, height):
assert width >= 5 and height >= 5
# Create an empty grid
self.grid = Grid(width, height)
# Generate the surrounding walls
self.grid.horz_wall(0, 0)
self.grid.horz_wall(0, height - 1)
self.grid.vert_wall(0, 0)
self.grid.vert_wall(width - 1, 0)
self.mission = (
"avoid the lava and get to the green goal square"
if self.obstacle_type == Lava
else "find the opening and get to the green goal square"
)
def disable_random_start(self):
self.randomize_start = False
def place_agent(self, spawn_on_slippery=False, agent_pos=None, agent_dir=0):
max_tries = 10_000
num_tries = 0
if self.randomize_start == True:
while True:
num_tries += 1
if num_tries > max_tries:
raise RecursionError("rejection sampling failed in place_obj")
x = np.random.randint(0, self.width)
y = np.random.randint(0, self.height)
cell = self.grid.get(*(x,y))
if cell is None or (cell.can_overlap() and not isinstance(cell, Lava) and not isinstance(cell, Goal) and (spawn_on_slippery or not is_slippery(cell))):
self.agent_pos = np.array((x, y))
self.agent_dir = np.random.randint(0, 4)
break
elif agent_dir is None:
self.agent_pos = np.array((1, 1))
self.agent_dir = 0
else:
self.agent_pos = agent_pos
self.agent_dir = agent_dir
def place_goal(self, goal_pos):
self.goal_pos = goal_pos
self.put_obj(Goal(), *self.goal_pos)
def run_bfs(self):
self.bfs_reward = self.run_BFS_reward()
self.bfs_reward = [rew * 0.1 for rew in self.bfs_reward]
def printGrid(self, init=False):
grid = super().printGrid(init)
properties_str = ""
properties_str += F"ProbTurnIntended:{self.probability_turn_intended}\n"
properties_str += F"ProbForwardIntended:{self.probability_intended}\n"
return grid + properties_str
def step(self, action):
obs, reward, terminated, truncated, info = super().step(action)
return obs, reward - self.per_step_penalty, terminated, truncated, info
class LavaSlipperyEnv1(LavaSlipperyEnv):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def _gen_grid(self, width, height):
super()._gen_grid(width, height)
w_mid = width // 2
h_mid = height // 2
self.put_obj(Lava(), w_mid - 1, h_mid - 1)
self.put_obj(Lava(), w_mid, h_mid - 1)
self.put_obj(Lava(), w_mid - 1, h_mid)
self.put_obj(Lava(), w_mid, h_mid)
self.put_obj(self._create_slippery_east(), w_mid - 2, h_mid - 1)
self.put_obj(self._create_slippery_east(), w_mid - 2, h_mid)
self.put_obj(self._create_slippery_west(), w_mid + 1, h_mid - 1)
self.put_obj(self._create_slippery_west(), w_mid + 1, h_mid)
self.put_obj(Lava(), w_mid - 1, 1)
self.put_obj(Lava(), w_mid, 1)
self.put_obj(self._create_slippery_north(), w_mid - 1, 2)
self.put_obj(self._create_slippery_north(), w_mid, 2)
self.put_obj(self._create_slippery_east(), w_mid - 2, 1)
self.put_obj(self._create_slippery_west(), w_mid + 1, 1)
self.put_obj(Lava(), w_mid - 1, height - 2)
self.put_obj(Lava(), w_mid, height - 2)
self.put_obj(self._create_slippery_south(), w_mid - 1, height - 3)
self.put_obj(self._create_slippery_south(), w_mid, height - 3)
self.put_obj(self._create_slippery_east(), w_mid - 2, height - 2)
self.put_obj(self._create_slippery_west(), w_mid + 1, height - 2)
self.place_agent(agent_pos=np.array((1, 1)), agent_dir=0)
self.place_goal(np.array((width - 2, height - 2)))
if self.dense_rewards: self.run_bfs()
class LavaSlipperyCliff(LavaSlipperyEnv):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def _gen_grid(self, width, height):
super()._gen_grid(width, height)
for i in range(1,5):
self.grid.horz_wall(3, i, width - 6, Lava)
for i in range(5,height - 3):
self.grid.horz_wall(3, i, width - 6, SlipperyNorth(probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended))
self.place_agent(agent_pos=np.array((1, 1)), agent_dir=0)
self.place_goal(np.array((width - 2, 1)))
if self.dense_rewards: self.run_bfs()
class LavaSlipperyHill(LavaSlipperyEnv):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def _gen_grid(self, width, height):
super()._gen_grid(width, height)
for i in range(1,height - 1):
self.grid.horz_wall(1, i, width - 2, SlipperyNorth)
for i in range(1,5):
self.grid.horz_wall(3, i, width - 6, Lava)
self.place_agent(agent_pos=np.array((1, 1)), agent_dir=0, spawn_on_slippery=True)
self.place_goal(np.array((width - 2, 1)))
if self.dense_rewards: self.run_bfs()
class LavaSlipperyMaze(LavaSlipperyEnv):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def _gen_grid(self, width, height):
super()._gen_grid(width, height)
slippery_tile = SlipperySouth(probability_intended=self.probability_intended, probability_turn_intended=self.probability_turn_intended)
self.grid.horz_wall(1, 3, 5, Lava)
self.grid.horz_wall(1, 4, 5, Lava)
self.grid.horz_wall(6, 3, 3, slippery_tile)
self.grid.horz_wall(6, 4, 3, slippery_tile)
self.grid.horz_wall(9, 3, 7, Lava)
self.grid.horz_wall(9, 4, 7, Lava)
self.grid.horz_wall(4, 7, 4, Lava)
self.grid.horz_wall(4, 8, 4, Lava)
self.grid.horz_wall(13, 7, 6, Lava)
self.grid.horz_wall(13, 8, 6, Lava)
self.grid.horz_wall(1, 11, 6, Lava)
self.grid.horz_wall(1, 12, 6, Lava)
self.grid.horz_wall(7, 11, 3, slippery_tile)
self.grid.horz_wall(7, 12, 3, slippery_tile)
self.grid.horz_wall(10, 11, 7, Lava)
self.grid.horz_wall(10, 12, 7, Lava)
self.grid.horz_wall(1, 15, 4, Lava)
self.grid.horz_wall(1, 16, 4, Lava)
self.grid.horz_wall(10, 15, 9, Lava)
self.grid.horz_wall(10, 16, 9, Lava)
self.place_agent(agent_pos=np.array((1, 1)), agent_dir=0)
self.place_goal(np.array((width - 2, height - 2)))
def step(self, action):
obs, reward, terminated, truncated, info = super().step(action)
if self.dense_rewards:
reward -= 0.0001 * (self.height - self.agent_pos[1])
return obs, reward, terminated, truncated, info