|
|
import sys import operator from os import listdir, system from random import randrange from ale_py import ALEInterface, SDL_SUPPORT, Action from colors import * from PIL import Image from matplotlib import pyplot as plt import cv2 import pickle import queue from dataclasses import dataclass, field
from enum import Enum
from copy import deepcopy
import numpy as np
import readchar
from sample_factory.algo.utils.tensor_dict import TensorDict from query_sample_factory_checkpoint import SampleFactoryNNQueryWrapper
import time
tempest_binary = "/home/spranger/projects/tempest-devel/ranking_release/bin/storm" mdp_file = "simplified.prism" rom_file = "/home/spranger/research/Skiing/env/lib/python3.8/site-packages/AutoROM/roms/skiing.bin"
class Verdict(Enum): INCONCLUSIVE = 1 GOOD = 2 BAD = 3
def convert(tuples): return dict(tuples)
@dataclass(frozen=True) class State: x: int y: int ski_position: int def default_value(): return {'action' : None, 'choiceValue' : None} @dataclass(frozen=True) class StateValue: ranking: float choices: dict = field(default_factory=default_value)
def exec(command,verbose=True): if verbose: print(f"Executing {command}") system(f"echo {command} >> list_of_exec") return system(command)
def model_to_actual(ski_position): if ski_position == 1: return 1 elif ski_position in [2,3]: return 2 elif ski_position in [4,5]: return 3 elif ski_position in [6,7]: return 4 elif ski_position in [8,9]: return 5 elif ski_position in [10,11]: return 6 elif ski_position in [12,13]: return 7 elif ski_position == 14: return 8
def input_to_action(char): if char == "0": return Action.NOOP if char == "1": return Action.RIGHT if char == "2": return Action.LEFT if char == "3": return "reset" if char == "4": return "set_x" if char == "5": return "set_vel" if char in ["w", "a", "s", "d"]: return char
def drawImportantStates(important_states): draw_commands = {1: list(), 2:list(), 3:list(), 4:list(), 5:list(), 6:list(), 7:list(), 8:list(), 9:list(), 10:list(), 11:list(), 12:list(), 13:list(), 14:list()} for state in important_states: x = state[0].x y = state[0].y markerSize = 2 ski_position = state[0].ski_position draw_commands[ski_position].append(f"-fill 'rgba(255,204,0,{state[1].ranking})' -draw 'rectangle {x-markerSize},{y-markerSize} {x+markerSize},{y+markerSize} '") for i in range(1,15): command = f"convert images/1_full_scaled_down.png {' '.join(draw_commands[i])} first_try_{i:02}.png" exec(command)
ski_position_counter = {1: (Action.LEFT, 40), 2: (Action.LEFT, 35), 3: (Action.LEFT, 30), 4: (Action.LEFT, 10), 5: (Action.NOOP, 1), 6: (Action.RIGHT, 10), 7: (Action.RIGHT, 30), 8: (Action.RIGHT, 40) } def run_single_test(ale, nn_wrapper, x,y,ski_position, duration=200): print(f"Running Test from x: {x:04}, y: {y:04}, ski_position: {ski_position}", end="") for i, r in enumerate(ramDICT[y]): ale.setRAM(i,r) ski_position_setting = ski_position_counter[ski_position] for i in range(0,ski_position_setting[1]): ale.act(ski_position_setting[0]) ale.setRAM(14,0) ale.setRAM(25,x) ale.setRAM(14,180)
all_obs = list() speed_list = list() first_action_set = False first_action = 0 for i in range(0,duration): resized_obs = cv2.resize(ale.getScreenGrayscale() , (84,84), interpolation=cv2.INTER_AREA) all_obs.append(resized_obs) if len(all_obs) >= 4: stack_tensor = TensorDict({"obs": np.array(all_obs[-4:])}) action = nn_wrapper.query(stack_tensor) if not first_action_set: first_action_set = True first_action = input_to_action(str(action)) ale.act(input_to_action(str(action))) else: ale.act(Action.NOOP) speed_list.append(ale.getRAM()[14]) if len(speed_list) > 15 and sum(speed_list[-6:-1]) == 0: return (Verdict.BAD, first_action) time.sleep(0.005) return (Verdict.INCONCLUSIVE, first_action)
def optimalAction(choices): return max(choices.items(), key=operator.itemgetter(1))[0]
def computeStateRanking(): command = f"{tempest_binary} --prism {mdp_file} --buildchoicelab --buildstateval --prop 'Rmax=? [C <= 1000]'" exec(command)
def fillStateRanking(file_name, match=""): state_ranking = dict() try: with open(file_name, "r") as f: file_content = f.readlines() for line in file_content: if not "move=0" in line: continue stateMapping = convert(re.findall(r"([a-zA-Z_]*[a-zA-Z])=(\d+)?", line)) #print("stateMapping", stateMapping) choices = convert(re.findall(r"[a-zA-Z_]*(left|right|noop)[a-zA-Z_]*:(-?\d+\.?\d*)", line)) choices = {key:float(value) for (key,value) in choices.items()} #print("choices", choices) ranking_value = float(re.search(r"Value:([+-]?(\d*\.\d+)|\d+)", line)[0].replace("Value:","")) #print("ranking_value", ranking_value) state = State(int(stateMapping["x"]), int(stateMapping["y"]), int(stateMapping["ski_position"])) value = StateValue(ranking_value, choices) state_ranking[state] = value return state_ranking
except EnvironmentError: print("TODO file not available. Exiting.") sys.exit(1)
computeStateRanking() ranking = fillStateRanking("action_ranking") sorted_ranking = sorted(ranking.items(), key=lambda x: x[1].ranking) ale = ALEInterface()
if SDL_SUPPORT: ale.setBool("sound", True) ale.setBool("display_screen", True)
# Load the ROM file ale.loadROM(rom_file)
with open('all_positions_v2.pickle', 'rb') as handle: ramDICT = pickle.load(handle) y_ram_setting = 60 x = 70
nn_wrapper = SampleFactoryNNQueryWrapper()
exec("cp testing_1.png /dev/shm/testing.png") for important_state in sorted_ranking[-100:-1]: optimal_choice = optimalAction(important_state[1].choices) #print(important_state[1].choices, f"\t\tOptimal: {optimal_choice}") x = important_state[0].x y = important_state[0].y ski_pos = model_to_actual(important_state[0].ski_position) action_taken = run_single_test(ale,nn_wrapper,x,y,ski_pos, duration=50) print(f".... {action_taken}") markerSize = 1 marker = f"-fill 'rgba(255,204,0,{important_state[1].ranking})' -draw 'point {x},{y} '" command = f"convert /dev/shm/testing.png {marker} /dev/shm/testing.png" exec(command, verbose=False)
|