You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

251 lines
9.1 KiB

import sys
import operator
from os import listdir, system
from random import randrange
from ale_py import ALEInterface, SDL_SUPPORT, Action
from colors import *
from PIL import Image
from matplotlib import pyplot as plt
import cv2
import pickle
import queue
from dataclasses import dataclass, field
from enum import Enum
from copy import deepcopy
import numpy as np
import readchar
from sample_factory.algo.utils.tensor_dict import TensorDict
from query_sample_factory_checkpoint import SampleFactoryNNQueryWrapper
import time
tempest_binary = "/home/spranger/projects/tempest-devel/ranking_release/bin/storm"
rom_file = "/home/spranger/research/Skiing/env/lib/python3.8/site-packages/AutoROM/roms/skiing.bin"
class Verdict(Enum):
INCONCLUSIVE = 1
GOOD = 2
BAD = 3
verdict_to_color_map = {Verdict.BAD: "200,0,0", Verdict.INCONCLUSIVE: "40,40,200", Verdict.GOOD: "00,200,100"}
def convert(tuples):
return dict(tuples)
@dataclass(frozen=True)
class State:
x: int
y: int
ski_position: int
def default_value():
return {'action' : None, 'choiceValue' : None}
@dataclass(frozen=True)
class StateValue:
ranking: float
choices: dict = field(default_factory=default_value)
def exec(command,verbose=True):
if verbose: print(f"Executing {command}")
system(f"echo {command} >> list_of_exec")
return system(command)
def model_to_actual(ski_position):
if ski_position == 1:
return 1
elif ski_position in [2,3]:
return 2
elif ski_position in [4,5]:
return 3
elif ski_position in [6,7]:
return 4
elif ski_position in [8,9]:
return 5
elif ski_position in [10,11]:
return 6
elif ski_position in [12,13]:
return 7
elif ski_position == 14:
return 8
def input_to_action(char):
if char == "0":
return Action.NOOP
if char == "1":
return Action.RIGHT
if char == "2":
return Action.LEFT
if char == "3":
return "reset"
if char == "4":
return "set_x"
if char == "5":
return "set_vel"
if char in ["w", "a", "s", "d"]:
return char
def drawImportantStates(important_states):
draw_commands = {1: list(), 2:list(), 3:list(), 4:list(), 5:list(), 6:list(), 7:list(), 8:list(), 9:list(), 10:list(), 11:list(), 12:list(), 13:list(), 14:list()}
for state in important_states:
x = state[0].x
y = state[0].y
markerSize = 2
ski_position = state[0].ski_position
draw_commands[ski_position].append(f"-fill 'rgba(255,204,0,{state[1].ranking})' -draw 'rectangle {x-markerSize},{y-markerSize} {x+markerSize},{y+markerSize} '")
for i in range(1,15):
command = f"convert images/1_full_scaled_down.png {' '.join(draw_commands[i])} first_try_{i:02}.png"
exec(command)
ski_position_counter = {1: (Action.LEFT, 40), 2: (Action.LEFT, 35), 3: (Action.LEFT, 30), 4: (Action.LEFT, 10), 5: (Action.NOOP, 1), 6: (Action.RIGHT, 10), 7: (Action.RIGHT, 30), 8: (Action.RIGHT, 40) }
def run_single_test(ale, nn_wrapper, x,y,ski_position, duration=200):
#print(f"Running Test from x: {x:04}, y: {y:04}, ski_position: {ski_position}", end="")
for i, r in enumerate(ramDICT[y]):
ale.setRAM(i,r)
ski_position_setting = ski_position_counter[ski_position]
for i in range(0,ski_position_setting[1]):
ale.act(ski_position_setting[0])
ale.setRAM(14,0)
ale.setRAM(25,x)
ale.setRAM(14,180)
all_obs = list()
speed_list = list()
first_action_set = False
first_action = 0
for i in range(0,duration):
resized_obs = cv2.resize(ale.getScreenGrayscale() , (84,84), interpolation=cv2.INTER_AREA)
all_obs.append(resized_obs)
if len(all_obs) >= 4:
stack_tensor = TensorDict({"obs": np.array(all_obs[-4:])})
action = nn_wrapper.query(stack_tensor)
if not first_action_set:
first_action_set = True
first_action = input_to_action(str(action))
ale.act(input_to_action(str(action)))
else:
ale.act(Action.NOOP)
speed_list.append(ale.getRAM()[14])
if len(speed_list) > 15 and sum(speed_list[-6:-1]) == 0:
return (Verdict.BAD, first_action)
#time.sleep(0.005)
return (Verdict.INCONCLUSIVE, first_action)
def optimalAction(choices):
return max(choices.items(), key=operator.itemgetter(1))[0]
def computeStateRanking(mdp_file):
command = f"{tempest_binary} --prism {mdp_file} --buildchoicelab --buildstateval --prop 'Rmax=? [C <= 1000]'"
exec(command)
def fillStateRanking(file_name, match=""):
state_ranking = dict()
try:
with open(file_name, "r") as f:
file_content = f.readlines()
for line in file_content:
if not "move=0" in line: continue
stateMapping = convert(re.findall(r"([a-zA-Z_]*[a-zA-Z])=(\d+)?", line))
#print("stateMapping", stateMapping)
choices = convert(re.findall(r"[a-zA-Z_]*(left|right|noop)[a-zA-Z_]*:(-?\d+\.?\d*)", line))
choices = {key:float(value) for (key,value) in choices.items()}
#print("choices", choices)
ranking_value = float(re.search(r"Value:([+-]?(\d*\.\d+)|\d+)", line)[0].replace("Value:",""))
#print("ranking_value", ranking_value)
state = State(int(stateMapping["x"]), int(stateMapping["y"]), int(stateMapping["ski_position"]))
value = StateValue(ranking_value, choices)
state_ranking[state] = value
return state_ranking
except EnvironmentError:
print("Ranking file not available. Exiting.")
sys.exit(1)
fixed_left_states = list()
fixed_right_states = list()
fixed_noop_states = list()
def populate_fixed_actions(state, action):
if action == Action.LEFT:
fixed_left_states.append(state)
if action == Action.RIGHT:
fixed_right_states.append(state)
if action == Action.NOOP:
fixed_noop_states.append(state)
def update_prism_file(old_prism_file, new_prism_file):
fixed_left_formula = "formula Fixed_Left = false "
fixed_right_formula = "formula Fixed_Right = false "
fixed_noop_formula = "formula Fixed_Noop = false "
for state in fixed_left_states:
fixed_left_formula += f" | (x={state.x}&y={state.y}&ski_position={state.ski_position}) "
for state in fixed_right_states:
fixed_right_formula += f" | (x={state.x}&y={state.y}&ski_position={state.ski_position}) "
for state in fixed_noop_states:
fixed_noop_formula += f" | (x={state.x}&y={state.y}&ski_position={state.ski_position}) "
fixed_left_formula += ";\n"
fixed_right_formula += ";\n"
fixed_noop_formula += ";\n"
with open(f'{old_prism_file}', 'r') as file :
filedata = file.read()
if len(fixed_left_states) > 0: filedata = re.sub(r"^formula Fixed_Left =.*$", fixed_left_formula, filedata, flags=re.MULTILINE)
if len(fixed_right_states) > 0: filedata = re.sub(r"^formula Fixed_Right =.*$", fixed_right_formula, filedata, flags=re.MULTILINE)
if len(fixed_noop_states) > 0: filedata = re.sub(r"^formula Fixed_Noop =.*$", fixed_noop_formula, filedata, flags=re.MULTILINE)
with open(f'{new_prism_file}', 'w') as file:
file.write(filedata)
ale = ALEInterface()
#if SDL_SUPPORT:
# ale.setBool("sound", True)
# ale.setBool("display_screen", True)
# Load the ROM file
ale.loadROM(rom_file)
with open('all_positions_v2.pickle', 'rb') as handle:
ramDICT = pickle.load(handle)
y_ram_setting = 60
x = 70
nn_wrapper = SampleFactoryNNQueryWrapper()
iteration = 0
id = int(time.time())
init_mdp = "velocity"
exec(f"mkdir -p images/testing_{id}")
exec(f"cp 1_full_scaled_down.png images/testing_{id}/testing_0000.png")
exec(f"cp {init_mdp}.prism {init_mdp}_000.prism")
markerSize = 1
markerList = {1: list(), 2:list(), 3:list(), 4:list(), 5:list(), 6:list(), 7:list(), 8:list()}
while True:
computeStateRanking(f"{init_mdp}_{iteration:03}.prism")
ranking = fillStateRanking("action_ranking")
sorted_ranking = sorted(ranking.items(), key=lambda x: x[1].ranking)
for important_state in sorted_ranking[-100:-1]:
optimal_choice = optimalAction(important_state[1].choices)
#print(important_state[1].choices, f"\t\tOptimal: {optimal_choice}")
x = important_state[0].x
y = important_state[0].y
ski_pos = model_to_actual(important_state[0].ski_position)
result = run_single_test(ale,nn_wrapper,x,y,ski_pos, duration=50)
#print(f".... {result}")
marker = f"-fill 'rgba({verdict_to_color_map[result[0]],0.7})' -draw 'rectangle {x-markerSize},{y-markerSize} {x+markerSize},{y+markerSize} '"
markerList[ski_pos].append(marker)
populate_fixed_actions(important_state[0], result[1])
for pos, marker in markerList.items():
command = f"convert images/testing_{id}/testing_0000.png {' '.join(marker)} images/testing_{id}/testing_{iteration+1:03}_{pos:02}.png"
exec(command, verbose=False)
exec(f"montage images/testing_{id}/testing_{iteration+1:03}_*png -geometry +0+0 -tile x1 images/testing_{id}/{iteration+1:03}.png", verbose=False)
iteration += 1
update_prism_file(f"{init_mdp}_{iteration-1:03}.prism", f"{init_mdp}_{iteration:03}.prism")