diff --git a/rom_evaluate.py b/rom_evaluate.py index bb4b420..e5d9c64 100644 --- a/rom_evaluate.py +++ b/rom_evaluate.py @@ -1,4 +1,6 @@ import sys +import operator +from os import listdir, system from random import randrange from ale_py import ALEInterface, SDL_SUPPORT, Action from colors import * @@ -7,6 +9,9 @@ from matplotlib import pyplot as plt import cv2 import pickle import queue +from dataclasses import dataclass, field + +from enum import Enum from copy import deepcopy @@ -19,6 +24,52 @@ from query_sample_factory_checkpoint import SampleFactoryNNQueryWrapper import time +tempest_binary = "/home/spranger/projects/tempest-devel/ranking_release/bin/storm" +mdp_file = "simplified.prism" +rom_file = "/home/spranger/research/Skiing/env/lib/python3.8/site-packages/AutoROM/roms/skiing.bin" + +class Verdict(Enum): + INCONCLUSIVE = 1 + GOOD = 2 + BAD = 3 + +def convert(tuples): + return dict(tuples) + +@dataclass(frozen=True) +class State: + x: int + y: int + ski_position: int +def default_value(): + return {'action' : None, 'choiceValue' : None} +@dataclass(frozen=True) +class StateValue: + ranking: float + choices: dict = field(default_factory=default_value) + +def exec(command,verbose=True): + if verbose: print(f"Executing {command}") + system(f"echo {command} >> list_of_exec") + return system(command) + +def model_to_actual(ski_position): + if ski_position == 1: + return 1 + elif ski_position in [2,3]: + return 2 + elif ski_position in [4,5]: + return 3 + elif ski_position in [6,7]: + return 4 + elif ski_position in [8,9]: + return 5 + elif ski_position in [10,11]: + return 6 + elif ski_position in [12,13]: + return 7 + elif ski_position == 14: + return 8 def input_to_action(char): if char == "0": @@ -36,10 +87,21 @@ def input_to_action(char): if char in ["w", "a", "s", "d"]: return char -ski_position_counter = {1: (Action.LEFT, 40), 2: (Action.LEFT, 35), 3: (Action.LEFT, 30), 4: (Action.LEFT, 10), 5: (Action.NOOP, 1), 6: (Action.RIGHT, 10), 7: (Action.RIGHT, 30), 8: (Action.RIGHT, 40) } +def drawImportantStates(important_states): + draw_commands = {1: list(), 2:list(), 3:list(), 4:list(), 5:list(), 6:list(), 7:list(), 8:list(), 9:list(), 10:list(), 11:list(), 12:list(), 13:list(), 14:list()} + for state in important_states: + x = state[0].x + y = state[0].y + markerSize = 2 + ski_position = state[0].ski_position + draw_commands[ski_position].append(f"-fill 'rgba(255,204,0,{state[1].ranking})' -draw 'rectangle {x-markerSize},{y-markerSize} {x+markerSize},{y+markerSize} '") + for i in range(1,15): + command = f"convert images/1_full_scaled_down.png {' '.join(draw_commands[i])} first_try_{i:02}.png" + exec(command) +ski_position_counter = {1: (Action.LEFT, 40), 2: (Action.LEFT, 35), 3: (Action.LEFT, 30), 4: (Action.LEFT, 10), 5: (Action.NOOP, 1), 6: (Action.RIGHT, 10), 7: (Action.RIGHT, 30), 8: (Action.RIGHT, 40) } def run_single_test(ale, nn_wrapper, x,y,ski_position, duration=200): - print(f"Running Test from x: {x:04}, y: {y:04}, ski_position: {ski_position}") + print(f"Running Test from x: {x:04}, y: {y:04}, ski_position: {ski_position}", end="") for i, r in enumerate(ramDICT[y]): ale.setRAM(i,r) ski_position_setting = ski_position_counter[ski_position] @@ -50,17 +112,60 @@ def run_single_test(ale, nn_wrapper, x,y,ski_position, duration=200): ale.setRAM(14,180) all_obs = list() + speed_list = list() + first_action_set = False + first_action = 0 for i in range(0,duration): resized_obs = cv2.resize(ale.getScreenGrayscale() , (84,84), interpolation=cv2.INTER_AREA) all_obs.append(resized_obs) if len(all_obs) >= 4: stack_tensor = TensorDict({"obs": np.array(all_obs[-4:])}) action = nn_wrapper.query(stack_tensor) + if not first_action_set: + first_action_set = True + first_action = input_to_action(str(action)) ale.act(input_to_action(str(action))) else: ale.act(Action.NOOP) + speed_list.append(ale.getRAM()[14]) + if len(speed_list) > 15 and sum(speed_list[-6:-1]) == 0: + return (Verdict.BAD, first_action) time.sleep(0.005) - + return (Verdict.INCONCLUSIVE, first_action) + +def optimalAction(choices): + return max(choices.items(), key=operator.itemgetter(1))[0] + +def computeStateRanking(): + command = f"{tempest_binary} --prism {mdp_file} --buildchoicelab --buildstateval --prop 'Rmax=? [C <= 1000]'" + exec(command) + +def fillStateRanking(file_name, match=""): + state_ranking = dict() + try: + with open(file_name, "r") as f: + file_content = f.readlines() + for line in file_content: + if not "move=0" in line: continue + stateMapping = convert(re.findall(r"([a-zA-Z_]*[a-zA-Z])=(\d+)?", line)) + #print("stateMapping", stateMapping) + choices = convert(re.findall(r"[a-zA-Z_]*(left|right|noop)[a-zA-Z_]*:(-?\d+\.?\d*)", line)) + choices = {key:float(value) for (key,value) in choices.items()} + #print("choices", choices) + ranking_value = float(re.search(r"Value:([+-]?(\d*\.\d+)|\d+)", line)[0].replace("Value:","")) + #print("ranking_value", ranking_value) + state = State(int(stateMapping["x"]), int(stateMapping["y"]), int(stateMapping["ski_position"])) + value = StateValue(ranking_value, choices) + state_ranking[state] = value + return state_ranking + + except EnvironmentError: + print("TODO file not available. Exiting.") + sys.exit(1) + +computeStateRanking() +ranking = fillStateRanking("action_ranking") +sorted_ranking = sorted(ranking.items(), key=lambda x: x[1].ranking) ale = ALEInterface() @@ -69,121 +174,26 @@ if SDL_SUPPORT: ale.setBool("display_screen", True) # Load the ROM file -rom_file = "/home/spranger/research/Skiing/env/lib/python3.8/site-packages/AutoROM/roms/skiing.bin" ale.loadROM(rom_file) -# Get the list of legal actions - with open('all_positions_v2.pickle', 'rb') as handle: ramDICT = pickle.load(handle) -#ramDICT = dict() -#for i,r in enumerate(ramDICT[235]): -# ale.setRAM(i,r) - y_ram_setting = 60 x = 70 nn_wrapper = SampleFactoryNNQueryWrapper() -#run_single_test(ale, nn_wrapper, 70,61,5) -#input("") -run_single_test(ale, nn_wrapper, 30,61,5,duration=1000) -run_single_test(ale, nn_wrapper, 114,170,7) -run_single_test(ale, nn_wrapper, 124,170,5) -run_single_test(ale, nn_wrapper, 134,170,2) -run_single_test(ale, nn_wrapper, 120,185,1) -run_single_test(ale, nn_wrapper, 134,170,8) -run_single_test(ale, nn_wrapper, 85,195,8) -velocity_set = False -for episode in range(10): - total_reward = 0 - j = 0 - while not ale.game_over(): - if not velocity_set: ale.setRAM(14,0) - j += 1 - a = input_to_action(repr(readchar.readchar())[1]) - #a = Action.NOOP - - if a == "w": - y_ram_setting -= 1 - if y_ram_setting <= 61: - y_ram_setting = 61 - for i, r in enumerate(ramDICT[y_ram_setting]): - ale.setRAM(i,r) - ale.setRAM(25,x) - ale.act(Action.NOOP) - elif a == "s": - y_ram_setting += 1 - if y_ram_setting >= 1950: - y_ram_setting = 1945 - for i, r in enumerate(ramDICT[y_ram_setting]): - ale.setRAM(i,r) - ale.setRAM(25,x) - ale.act(Action.NOOP) - elif a == "a": - x -= 1 - if x <= 0: - x = 0 - ale.setRAM(25,x) - ale.act(Action.NOOP) - elif a == "d": - x += 1 - if x >= 144: - x = 144 - ale.setRAM(25,x) - ale.act(Action.NOOP) - - elif a == "reset": - ram_pos = input("Ram Position:") - for i, r in enumerate(ramDICT[int(ram_pos)]): - ale.setRAM(i,r) - ale.act(Action.NOOP) - # Apply an action and get the resulting reward - elif a == "set_x": - x = int(input("X:")) - ale.setRAM(25, x) - ale.act(Action.NOOP) - elif a == "set_vel": - vel = input("Velocity:") - ale.setRAM(14, int(vel)) - ale.act(Action.NOOP) - velocity_set = True - else: - reward = ale.act(a) - ram = ale.getRAM() - #if j % 2 == 0: - # y_pixel = int(j*1/2) + 55 - # ramDICT[y_pixel] = ram - # print(f"saving to {y_pixel:04}") - # if y_pixel == 126 or y_pixel == 235: - # input("") - - int_old_ram = list(map(int, oldram)) - int_ram = list(map(int, ram)) - difference = list() - for o, r in zip(int_old_ram, int_ram): - difference.append(r-o) - - oldram = deepcopy(ram) - #print(f"player_x: {ram[25]},\tclock_m: {ram[104]},\tclock_s: {ram[105]},\tclock_ms: {ram[106]},\tscore: {ram[107]}") - print(f"player_x: {ram[25]},\tplayer_y: {y_ram_setting}") - #print(f"y_0: {ram[86]}, y_1: {ram[87]}, y_2: {ram[88]}, y_3: {ram[89]}, y_4: {ram[90]}, y_5: {ram[91]}, y_6: {ram[92]}, y_7: {ram[93]}, y_8: {ram[94]}") - - #for i, r in enumerate(ram): - # print('{:03}:{:02x} '.format(i,r), end="") - # if i % 16 == 15: print("") - #print("") - #for i, r in enumerate(difference): - # string = '{:02}:{:03} '.format(i%100,r) - # if r != 0: - # print(color(string, fg='red'), end="") - # else: - # print(string, end="") - # if i % 16 == 15: print("") - print("Episode %d ended with score: %d" % (episode, total_reward)) - input("") - - with open('all_positions_v2.pickle', 'wb') as handle: - pickle.dump(ramDICT, handle, protocol=pickle.HIGHEST_PROTOCOL) - ale.reset_game() +exec("cp testing_1.png /dev/shm/testing.png") +for important_state in sorted_ranking[-100:-1]: + optimal_choice = optimalAction(important_state[1].choices) + #print(important_state[1].choices, f"\t\tOptimal: {optimal_choice}") + x = important_state[0].x + y = important_state[0].y + ski_pos = model_to_actual(important_state[0].ski_position) + action_taken = run_single_test(ale,nn_wrapper,x,y,ski_pos, duration=50) + print(f".... {action_taken}") + markerSize = 1 + marker = f"-fill 'rgba(255,204,0,{important_state[1].ranking})' -draw 'point {x},{y} '" + command = f"convert /dev/shm/testing.png {marker} /dev/shm/testing.png" + exec(command, verbose=False)