Browse Source

further work on evaluation script

main
sp 1 year ago
parent
commit
5156803080
  1. 232
      rom_evaluate.py

232
rom_evaluate.py

@ -1,4 +1,6 @@
import sys
import operator
from os import listdir, system
from random import randrange
from ale_py import ALEInterface, SDL_SUPPORT, Action
from colors import *
@ -7,6 +9,9 @@ from matplotlib import pyplot as plt
import cv2
import pickle
import queue
from dataclasses import dataclass, field
from enum import Enum
from copy import deepcopy
@ -19,6 +24,52 @@ from query_sample_factory_checkpoint import SampleFactoryNNQueryWrapper
import time
tempest_binary = "/home/spranger/projects/tempest-devel/ranking_release/bin/storm"
mdp_file = "simplified.prism"
rom_file = "/home/spranger/research/Skiing/env/lib/python3.8/site-packages/AutoROM/roms/skiing.bin"
class Verdict(Enum):
INCONCLUSIVE = 1
GOOD = 2
BAD = 3
def convert(tuples):
return dict(tuples)
@dataclass(frozen=True)
class State:
x: int
y: int
ski_position: int
def default_value():
return {'action' : None, 'choiceValue' : None}
@dataclass(frozen=True)
class StateValue:
ranking: float
choices: dict = field(default_factory=default_value)
def exec(command,verbose=True):
if verbose: print(f"Executing {command}")
system(f"echo {command} >> list_of_exec")
return system(command)
def model_to_actual(ski_position):
if ski_position == 1:
return 1
elif ski_position in [2,3]:
return 2
elif ski_position in [4,5]:
return 3
elif ski_position in [6,7]:
return 4
elif ski_position in [8,9]:
return 5
elif ski_position in [10,11]:
return 6
elif ski_position in [12,13]:
return 7
elif ski_position == 14:
return 8
def input_to_action(char):
if char == "0":
@ -36,10 +87,21 @@ def input_to_action(char):
if char in ["w", "a", "s", "d"]:
return char
ski_position_counter = {1: (Action.LEFT, 40), 2: (Action.LEFT, 35), 3: (Action.LEFT, 30), 4: (Action.LEFT, 10), 5: (Action.NOOP, 1), 6: (Action.RIGHT, 10), 7: (Action.RIGHT, 30), 8: (Action.RIGHT, 40) }
def drawImportantStates(important_states):
draw_commands = {1: list(), 2:list(), 3:list(), 4:list(), 5:list(), 6:list(), 7:list(), 8:list(), 9:list(), 10:list(), 11:list(), 12:list(), 13:list(), 14:list()}
for state in important_states:
x = state[0].x
y = state[0].y
markerSize = 2
ski_position = state[0].ski_position
draw_commands[ski_position].append(f"-fill 'rgba(255,204,0,{state[1].ranking})' -draw 'rectangle {x-markerSize},{y-markerSize} {x+markerSize},{y+markerSize} '")
for i in range(1,15):
command = f"convert images/1_full_scaled_down.png {' '.join(draw_commands[i])} first_try_{i:02}.png"
exec(command)
ski_position_counter = {1: (Action.LEFT, 40), 2: (Action.LEFT, 35), 3: (Action.LEFT, 30), 4: (Action.LEFT, 10), 5: (Action.NOOP, 1), 6: (Action.RIGHT, 10), 7: (Action.RIGHT, 30), 8: (Action.RIGHT, 40) }
def run_single_test(ale, nn_wrapper, x,y,ski_position, duration=200):
print(f"Running Test from x: {x:04}, y: {y:04}, ski_position: {ski_position}")
print(f"Running Test from x: {x:04}, y: {y:04}, ski_position: {ski_position}", end="")
for i, r in enumerate(ramDICT[y]):
ale.setRAM(i,r)
ski_position_setting = ski_position_counter[ski_position]
@ -50,17 +112,60 @@ def run_single_test(ale, nn_wrapper, x,y,ski_position, duration=200):
ale.setRAM(14,180)
all_obs = list()
speed_list = list()
first_action_set = False
first_action = 0
for i in range(0,duration):
resized_obs = cv2.resize(ale.getScreenGrayscale() , (84,84), interpolation=cv2.INTER_AREA)
all_obs.append(resized_obs)
if len(all_obs) >= 4:
stack_tensor = TensorDict({"obs": np.array(all_obs[-4:])})
action = nn_wrapper.query(stack_tensor)
if not first_action_set:
first_action_set = True
first_action = input_to_action(str(action))
ale.act(input_to_action(str(action)))
else:
ale.act(Action.NOOP)
speed_list.append(ale.getRAM()[14])
if len(speed_list) > 15 and sum(speed_list[-6:-1]) == 0:
return (Verdict.BAD, first_action)
time.sleep(0.005)
return (Verdict.INCONCLUSIVE, first_action)
def optimalAction(choices):
return max(choices.items(), key=operator.itemgetter(1))[0]
def computeStateRanking():
command = f"{tempest_binary} --prism {mdp_file} --buildchoicelab --buildstateval --prop 'Rmax=? [C <= 1000]'"
exec(command)
def fillStateRanking(file_name, match=""):
state_ranking = dict()
try:
with open(file_name, "r") as f:
file_content = f.readlines()
for line in file_content:
if not "move=0" in line: continue
stateMapping = convert(re.findall(r"([a-zA-Z_]*[a-zA-Z])=(\d+)?", line))
#print("stateMapping", stateMapping)
choices = convert(re.findall(r"[a-zA-Z_]*(left|right|noop)[a-zA-Z_]*:(-?\d+\.?\d*)", line))
choices = {key:float(value) for (key,value) in choices.items()}
#print("choices", choices)
ranking_value = float(re.search(r"Value:([+-]?(\d*\.\d+)|\d+)", line)[0].replace("Value:",""))
#print("ranking_value", ranking_value)
state = State(int(stateMapping["x"]), int(stateMapping["y"]), int(stateMapping["ski_position"]))
value = StateValue(ranking_value, choices)
state_ranking[state] = value
return state_ranking
except EnvironmentError:
print("TODO file not available. Exiting.")
sys.exit(1)
computeStateRanking()
ranking = fillStateRanking("action_ranking")
sorted_ranking = sorted(ranking.items(), key=lambda x: x[1].ranking)
ale = ALEInterface()
@ -69,121 +174,26 @@ if SDL_SUPPORT:
ale.setBool("display_screen", True)
# Load the ROM file
rom_file = "/home/spranger/research/Skiing/env/lib/python3.8/site-packages/AutoROM/roms/skiing.bin"
ale.loadROM(rom_file)
# Get the list of legal actions
with open('all_positions_v2.pickle', 'rb') as handle:
ramDICT = pickle.load(handle)
#ramDICT = dict()
#for i,r in enumerate(ramDICT[235]):
# ale.setRAM(i,r)
y_ram_setting = 60
x = 70
nn_wrapper = SampleFactoryNNQueryWrapper()
#run_single_test(ale, nn_wrapper, 70,61,5)
#input("")
run_single_test(ale, nn_wrapper, 30,61,5,duration=1000)
run_single_test(ale, nn_wrapper, 114,170,7)
run_single_test(ale, nn_wrapper, 124,170,5)
run_single_test(ale, nn_wrapper, 134,170,2)
run_single_test(ale, nn_wrapper, 120,185,1)
run_single_test(ale, nn_wrapper, 134,170,8)
run_single_test(ale, nn_wrapper, 85,195,8)
velocity_set = False
for episode in range(10):
total_reward = 0
j = 0
while not ale.game_over():
if not velocity_set: ale.setRAM(14,0)
j += 1
a = input_to_action(repr(readchar.readchar())[1])
#a = Action.NOOP
if a == "w":
y_ram_setting -= 1
if y_ram_setting <= 61:
y_ram_setting = 61
for i, r in enumerate(ramDICT[y_ram_setting]):
ale.setRAM(i,r)
ale.setRAM(25,x)
ale.act(Action.NOOP)
elif a == "s":
y_ram_setting += 1
if y_ram_setting >= 1950:
y_ram_setting = 1945
for i, r in enumerate(ramDICT[y_ram_setting]):
ale.setRAM(i,r)
ale.setRAM(25,x)
ale.act(Action.NOOP)
elif a == "a":
x -= 1
if x <= 0:
x = 0
ale.setRAM(25,x)
ale.act(Action.NOOP)
elif a == "d":
x += 1
if x >= 144:
x = 144
ale.setRAM(25,x)
ale.act(Action.NOOP)
elif a == "reset":
ram_pos = input("Ram Position:")
for i, r in enumerate(ramDICT[int(ram_pos)]):
ale.setRAM(i,r)
ale.act(Action.NOOP)
# Apply an action and get the resulting reward
elif a == "set_x":
x = int(input("X:"))
ale.setRAM(25, x)
ale.act(Action.NOOP)
elif a == "set_vel":
vel = input("Velocity:")
ale.setRAM(14, int(vel))
ale.act(Action.NOOP)
velocity_set = True
else:
reward = ale.act(a)
ram = ale.getRAM()
#if j % 2 == 0:
# y_pixel = int(j*1/2) + 55
# ramDICT[y_pixel] = ram
# print(f"saving to {y_pixel:04}")
# if y_pixel == 126 or y_pixel == 235:
# input("")
int_old_ram = list(map(int, oldram))
int_ram = list(map(int, ram))
difference = list()
for o, r in zip(int_old_ram, int_ram):
difference.append(r-o)
oldram = deepcopy(ram)
#print(f"player_x: {ram[25]},\tclock_m: {ram[104]},\tclock_s: {ram[105]},\tclock_ms: {ram[106]},\tscore: {ram[107]}")
print(f"player_x: {ram[25]},\tplayer_y: {y_ram_setting}")
#print(f"y_0: {ram[86]}, y_1: {ram[87]}, y_2: {ram[88]}, y_3: {ram[89]}, y_4: {ram[90]}, y_5: {ram[91]}, y_6: {ram[92]}, y_7: {ram[93]}, y_8: {ram[94]}")
#for i, r in enumerate(ram):
# print('{:03}:{:02x} '.format(i,r), end="")
# if i % 16 == 15: print("")
#print("")
#for i, r in enumerate(difference):
# string = '{:02}:{:03} '.format(i%100,r)
# if r != 0:
# print(color(string, fg='red'), end="")
# else:
# print(string, end="")
# if i % 16 == 15: print("")
print("Episode %d ended with score: %d" % (episode, total_reward))
input("")
with open('all_positions_v2.pickle', 'wb') as handle:
pickle.dump(ramDICT, handle, protocol=pickle.HIGHEST_PROTOCOL)
ale.reset_game()
exec("cp testing_1.png /dev/shm/testing.png")
for important_state in sorted_ranking[-100:-1]:
optimal_choice = optimalAction(important_state[1].choices)
#print(important_state[1].choices, f"\t\tOptimal: {optimal_choice}")
x = important_state[0].x
y = important_state[0].y
ski_pos = model_to_actual(important_state[0].ski_position)
action_taken = run_single_test(ale,nn_wrapper,x,y,ski_pos, duration=50)
print(f".... {action_taken}")
markerSize = 1
marker = f"-fill 'rgba(255,204,0,{important_state[1].ranking})' -draw 'point {x},{y} '"
command = f"convert /dev/shm/testing.png {marker} /dev/shm/testing.png"
exec(command, verbose=False)
Loading…
Cancel
Save