Browse Source

added velocity, unsure about it though

add_velocity_into_framework
sp 7 months ago
parent
commit
4d6733b6d8
  1. 36
      rom_evaluate.py

36
rom_evaluate.py

@ -13,7 +13,7 @@ import pickle
import queue
from dataclasses import dataclass, field
from sklearn.cluster import KMeans
from sklearn.cluster import KMeans, DBSCAN
from enum import Enum
@ -59,8 +59,10 @@ class State:
x: int
y: int
ski_position: int
#velocity: int
def default_value():
return {'action' : None, 'choiceValue' : None}
@dataclass(frozen=True)
class StateValue:
ranking: float
@ -91,18 +93,6 @@ def input_to_action(char):
if char in ["w", "a", "s", "d"]:
return char
def drawImportantStates(important_states):
draw_commands = {1: list(), 2:list(), 3:list(), 4:list(), 5:list(), 6:list(), 7:list(), 8:list(), 9:list(), 10:list(), 11:list(), 12:list(), 13:list(), 14:list()}
for state in important_states:
x = state[0].x
y = state[0].y
markerSize = 2
ski_position = state[0].ski_position
draw_commands[ski_position].append(f"-fill 'rgba(255,204,0,{state[1].ranking})' -draw 'rectangle {x-markerSize},{y-markerSize} {x+markerSize},{y+markerSize} '")
for i in range(1,15):
command = f"convert images/1_full_scaled_down.png {' '.join(draw_commands[i])} first_try_{i:02}.png"
exec(command)
def saveObservations(observations, verdict, testDir):
testDir = f"images/testing_{experiment_id}/{verdict.name}_{testDir}_{len(observations)}"
if len(observations) < 20:
@ -114,9 +104,10 @@ def saveObservations(observations, verdict, testDir):
img.save(f"{testDir}/{i:003}.png")
ski_position_counter = {1: (Action.LEFT, 40), 2: (Action.LEFT, 35), 3: (Action.LEFT, 30), 4: (Action.LEFT, 10), 5: (Action.NOOP, 1), 6: (Action.RIGHT, 10), 7: (Action.RIGHT, 30), 8: (Action.RIGHT, 40) }
def run_single_test(ale, nn_wrapper, x,y,ski_position, duration=200):
#def run_single_test(ale, nn_wrapper, x,y,ski_position, velocity, duration=50):
def run_single_test(ale, nn_wrapper, x,y,ski_position, duration=50):
#print(f"Running Test from x: {x:04}, y: {y:04}, ski_position: {ski_position}", end="")
testDir = f"{x}_{y}_{ski_position}"
testDir = f"{x}_{y}_{ski_position}"#_{velocity}"
for i, r in enumerate(ramDICT[y]):
ale.setRAM(i,r)
ski_position_setting = ski_position_counter[ski_position]
@ -124,7 +115,7 @@ def run_single_test(ale, nn_wrapper, x,y,ski_position, duration=200):
ale.act(ski_position_setting[0])
ale.setRAM(14,0)
ale.setRAM(25,x)
ale.setRAM(14,180)
ale.setRAM(14,180) # TODO
all_obs = list()
speed_list = list()
@ -132,8 +123,6 @@ def run_single_test(ale, nn_wrapper, x,y,ski_position, duration=200):
first_action = 0
for i in range(0,duration):
resized_obs = cv2.resize(ale.getScreenGrayscale(), (84,84), interpolation=cv2.INTER_AREA)
for i in range(0,4):
all_obs.append(resized_obs)
if len(all_obs) >= 4:
stack_tensor = TensorDict({"obs": np.array(all_obs[-4:])})
action = nn_wrapper.query(stack_tensor)
@ -178,7 +167,7 @@ def fillStateRanking(file_name, match=""):
choices = {key:float(value) for (key,value) in choices.items()}
#print("choices", choices)
#print("ranking_value", ranking_value)
state = State(int(stateMapping["x"]), int(stateMapping["y"]), int(stateMapping["ski_position"]))
state = State(int(stateMapping["x"]), int(stateMapping["y"]), int(stateMapping["ski_position"]))#, int(stateMapping["velocity"]))
value = StateValue(ranking_value, choices)
state_ranking[state] = value
logger.info(f"Parsing state ranking - DONE: took {toc()} seconds")
@ -196,12 +185,14 @@ def createDisjunction(formulas):
def clusterFormula(cluster):
formula = ""
#states = [(s[0].x,s[0].y, s[0].ski_position, s[0].velocity) for s in cluster]
states = [(s[0].x,s[0].y, s[0].ski_position) for s in cluster]
skiPositionGroup = defaultdict(list)
for item in states:
skiPositionGroup[item[2]].append(item)
first = True
#todo add velocity here
for skiPosition, group in skiPositionGroup.items():
formula += f"ski_position={skiPosition} & ("
yPosGroup = defaultdict(list)
@ -282,15 +273,11 @@ x = 70
nn_wrapper = SampleFactoryNNQueryWrapper()
iteration = 0
experiment_id = int(time.time())
init_mdp = "velocity_safety"
exec(f"mkdir -p images/testing_{experiment_id}", verbose=False)
#exec(f"cp 1_full_scaled_down.png images/testing_{experiment_id}/testing_0000.png")
#exec(f"cp {init_mdp}.prism {init_mdp}_000.prism")
markerSize = 1
#markerList = {1: list(), 2:list(), 3:list(), 4:list(), 5:list(), 6:list(), 7:list(), 8:list()}
imagesDir = f"images/testing_{experiment_id}"
@ -360,8 +347,10 @@ def _init_logger():
def clusterImportantStates(ranking, iteration, n_clusters=40):
logger.info(f"Starting to cluster {len(ranking)} states into {n_clusters} cluster")
tic()
#states = [[s[0].x,s[0].y, s[0].ski_position * 10, s[0].velocity * 10, s[1].ranking] for s in ranking]
states = [[s[0].x,s[0].y, s[0].ski_position * 10, s[1].ranking] for s in ranking]
kmeans = KMeans(n_clusters, random_state=0, n_init="auto").fit(states)
#dbscan = DBSCAN().fit(states)
logger.info(f"Starting to cluster {len(ranking)} states into {n_clusters} cluster - DONE: took {toc()} seconds")
clusterDict = {i : list() for i in range(0,n_clusters)}
for i, state in enumerate(ranking):
@ -399,6 +388,7 @@ if __name__ == '__main__':
x = state[0].x
y = state[0].y
ski_pos = state[0].ski_position
#velocity = state[0].velocity
result = run_single_test(ale,nn_wrapper,x,y,ski_pos, duration=50)
if result == Verdict.BAD:
if testAll:

Loading…
Cancel
Save