|
|
@ -9,11 +9,11 @@ import stormpy.examples |
|
|
|
import stormpy.examples.files |
|
|
|
|
|
|
|
|
|
|
|
from helpers import extract_doors, extract_keys |
|
|
|
from helpers import extract_doors, extract_keys, extract_adversaries |
|
|
|
from abc import ABC |
|
|
|
|
|
|
|
import os |
|
|
|
|
|
|
|
import time |
|
|
|
class Action(): |
|
|
|
def __init__(self, idx, prob=1, labels=[]) -> None: |
|
|
|
self.idx = idx |
|
|
@ -40,7 +40,7 @@ class MiniGridShieldHandler(ShieldHandler): |
|
|
|
|
|
|
|
|
|
|
|
def __create_prism(self): |
|
|
|
result = os.system(F"{self.grid_to_prism_path} -v 'agent' -i {self.grid_file} -o {self.prism_path}") |
|
|
|
result = os.system(F"{self.grid_to_prism_path} -v 'Agent,Blue' -i {self.grid_file} -o {self.prism_path}") |
|
|
|
|
|
|
|
assert result == 0, "Prism file could not be generated" |
|
|
|
|
|
|
@ -67,19 +67,18 @@ class MiniGridShieldHandler(ShieldHandler): |
|
|
|
assert result.has_shield |
|
|
|
shield = result.shield |
|
|
|
stormpy.shields.export_shield(model, shield, "Grid.shield") |
|
|
|
|
|
|
|
action_dictionary = {} |
|
|
|
shield_scheduler = shield.construct() |
|
|
|
state_valuations = model.state_valuations |
|
|
|
choice_labeling = model.choice_labeling |
|
|
|
|
|
|
|
for stateID in model.states: |
|
|
|
choice = shield_scheduler.get_choice(stateID) |
|
|
|
choices = choice.choice_map |
|
|
|
state_valuation = model.state_valuations.get_string(stateID) |
|
|
|
|
|
|
|
actions_to_be_executed = [Action(idx= choice[1], prob=choice[0], labels=model.choice_labeling.get_labels_of_choice(model.get_choice_index(stateID, choice[1]))) for choice in choices] |
|
|
|
|
|
|
|
state_valuation = state_valuations.get_string(stateID) |
|
|
|
actions_to_be_executed = [Action(idx= choice[1], prob=choice[0], labels=choice_labeling.get_labels_of_choice(model.get_choice_index(stateID, choice[1]))) for choice in choices] |
|
|
|
action_dictionary[state_valuation] = actions_to_be_executed |
|
|
|
|
|
|
|
|
|
|
|
return action_dictionary |
|
|
|
|
|
|
|
|
|
|
@ -93,15 +92,16 @@ class MiniGridShieldHandler(ShieldHandler): |
|
|
|
def create_shield_query(env): |
|
|
|
coordinates = env.env.agent_pos |
|
|
|
view_direction = env.env.agent_dir |
|
|
|
|
|
|
|
|
|
|
|
keys = extract_keys(env) |
|
|
|
doors = extract_doors(env) |
|
|
|
adversaries = extract_adversaries(env) |
|
|
|
|
|
|
|
|
|
|
|
if env.carrying: |
|
|
|
carrying = F"Agent_is_carrying_object\t" |
|
|
|
agent_carrying = F"Agent_is_carrying_object\t" |
|
|
|
else: |
|
|
|
carrying = "!Agent_is_carrying_object\t" |
|
|
|
agent_carrying = "!Agent_is_carrying_object\t" |
|
|
|
|
|
|
|
key_positions = [] |
|
|
|
agent_key_status = [] |
|
|
@ -110,7 +110,6 @@ def create_shield_query(env): |
|
|
|
key_color = key[0].color |
|
|
|
key_x = key[1] |
|
|
|
key_y = key[2] |
|
|
|
# '[!Agent_is_carrying_object\t& !Agent_has_yellow_key\t& !AgentDone\t& Dooryellowlocked\t& !Dooryellowopen\t& xAgent=1\t& yAgent=1\t& viewAgent=0\t& xKeyyellow=2\t& yKeyyellow=2]' |
|
|
|
if env.carrying and env.carrying.type == "key": |
|
|
|
agent_key_text = F"Agent_has_{env.carrying.color}_key\t& " |
|
|
|
key_position = F"xKey{key_color}={key_x}\t& yKey{key_color}={key_y}\t" |
|
|
@ -121,7 +120,8 @@ def create_shield_query(env): |
|
|
|
key_positions.append(key_position) |
|
|
|
agent_key_status.append(agent_key_text) |
|
|
|
|
|
|
|
key_positions[-1] = key_positions[-1].strip() |
|
|
|
if key_positions: |
|
|
|
key_positions[-1] = key_positions[-1].strip() |
|
|
|
|
|
|
|
door_status = [] |
|
|
|
for door in doors: |
|
|
@ -134,11 +134,46 @@ def create_shield_query(env): |
|
|
|
status = F"!Door{door.color}locked\t& !Door{door.color}open\t&" |
|
|
|
|
|
|
|
door_status.append(status) |
|
|
|
|
|
|
|
adv_status = [] |
|
|
|
adv_positions = [] |
|
|
|
|
|
|
|
for adversary in adversaries: |
|
|
|
status = "" |
|
|
|
position = "" |
|
|
|
|
|
|
|
if adversary.carrying: |
|
|
|
carrying = F"{adversary.name}_is_carrying_object\t" |
|
|
|
else: |
|
|
|
carrying = F"!{adversary.name}_is_carrying_object\t" |
|
|
|
|
|
|
|
status = F"{carrying}& !{adversary.name}Done\t& " |
|
|
|
position = F"x{adversary.name}={adversary.cur_pos[1]}\t& y{adversary.name}={adversary.cur_pos[0]}\t& view{adversary.name}={adversary.adversary_dir}" |
|
|
|
adv_status.append(status) |
|
|
|
adv_positions.append(position) |
|
|
|
|
|
|
|
door_status_text = "" |
|
|
|
|
|
|
|
if door_status: |
|
|
|
door_status_text = F"& {''.join(door_status)}\t" |
|
|
|
|
|
|
|
adv_status_text = "" |
|
|
|
|
|
|
|
if adv_status: |
|
|
|
adv_status_text = F"& {''.join(adv_status)}" |
|
|
|
|
|
|
|
adv_positions_text = "" |
|
|
|
|
|
|
|
if adv_positions: |
|
|
|
adv_positions_text = F"\t& {''.join(adv_positions)}" |
|
|
|
|
|
|
|
key_positions_text = "" |
|
|
|
|
|
|
|
if key_positions: |
|
|
|
key_positions_text = F"\t& {''.join(key_positions)}" |
|
|
|
|
|
|
|
agent_position = F"xAgent={coordinates[0]}\t& yAgent={coordinates[1]}\t& viewAgent={view_direction}" |
|
|
|
query = f"[{carrying}& {''.join(agent_key_status)}!AgentDone\t& {''.join(door_status)} {agent_position}\t& {''.join(key_positions)}]" |
|
|
|
query = f"[{agent_carrying}& {''.join(agent_key_status)}!AgentDone\t{adv_status_text}{door_status_text}{agent_position}{adv_positions_text}{key_positions_text}]" |
|
|
|
|
|
|
|
return query |
|
|
|
|