Browse Source

added velocity to rom_evaluate

add_velocity_into_framework
sp 6 months ago
parent
commit
69d06c48d0
  1. 157
      rom_evaluate.py

157
rom_evaluate.py

@ -60,7 +60,7 @@ class State:
x: int
y: int
ski_position: int
#velocity: int
velocity: int
def default_value():
return {'action' : None, 'choiceValue' : None}
@ -77,6 +77,7 @@ def exec(command,verbose=True):
num_tests_per_cluster = 50
factor_tests_per_cluster = 0.2
num_ski_positions = 8
num_velocities = 8
def input_to_action(char):
if char == "0":
@ -106,11 +107,11 @@ def saveObservations(observations, verdict, testDir):
ski_position_counter = {1: (Action.LEFT, 40), 2: (Action.LEFT, 35), 3: (Action.LEFT, 30), 4: (Action.LEFT, 10), 5: (Action.NOOP, 1), 6: (Action.RIGHT, 10), 7: (Action.RIGHT, 30), 8: (Action.RIGHT, 40) }
#def run_single_test(ale, nn_wrapper, x,y,ski_position, velocity, duration=50):
def run_single_test(ale, nn_wrapper, x,y,ski_position, duration=50):
def run_single_test(ale, nn_wrapper, x,y,ski_position, velocity, duration=50):
#def run_single_test(ale, nn_wrapper, x,y,ski_position, duration=50):
#print(f"Running Test from x: {x:04}, y: {y:04}, ski_position: {ski_position}", end="")
#testDir = f"{x}_{y}_{ski_position}_{velocity}"
testDir = f"{x}_{y}_{ski_position}"
testDir = f"{x}_{y}_{ski_position}_{velocity}"
#testDir = f"{x}_{y}_{ski_position}"
for i, r in enumerate(ramDICT[y]):
ale.setRAM(i,r)
ski_position_setting = ski_position_counter[ski_position]
@ -172,17 +173,16 @@ def fillStateRanking(file_name, match=""):
choices = {key:float(value) for (key,value) in choices.items()}
#print("choices", choices)
#print("ranking_value", ranking_value)
#state = State(int(stateMapping["x"]), int(stateMapping["y"]), int(stateMapping["ski_position"]), int(stateMapping["velocity"]))
state = State(int(stateMapping["x"]), int(stateMapping["y"]), int(stateMapping["ski_position"]))
state = State(int(stateMapping["x"]), int(stateMapping["y"]), int(stateMapping["ski_position"]), int(stateMapping["velocity"])//2)
#state = State(int(stateMapping["x"]), int(stateMapping["y"]), int(stateMapping["ski_position"]))
value = StateValue(ranking_value, choices)
state_ranking[state] = value
logger.info(f"Parsing state ranking - DONE: took {toc()} seconds")
return state_ranking
except EnvironmentError:
print("Ranking file not available. Exiting.")
toc()
sys.exit(1)
sys.exit(-1)
except:
toc()
@ -190,43 +190,69 @@ def createDisjunction(formulas):
return " | ".join(formulas)
def clusterFormula(cluster):
if len(cluster) == 0: return
formulas = list()
for state in cluster:
formulas.append(f"(x={state[0].x} & y={state[0].y} & velocity={state[0].velocity} & ski_position={state[0].ski_position})")
while len(formulas) > 1:
formulas_tmp = [f"({formulas[i]} | {formulas[i+1]})" for i in range(0,len(formulas)//2)]
if len(formulas) % 2 == 1:
formulas_tmp.append(formulas[-1])
formulas = formulas_tmp
return "(" + formulas[0] + ")"
def clusterFormulaTrimmed(cluster):
formula = ""
#states = [(s[0].x,s[0].y, s[0].ski_position, s[0].velocity) for s in cluster]
states = [(s[0].x,s[0].y, s[0].ski_position) for s in cluster]
states = [(s[0].x,s[0].y, s[0].ski_position, s[0].velocity) for s in cluster]
#states = [(s[0].x,s[0].y, s[0].ski_position) for s in cluster]
skiPositionGroup = defaultdict(list)
for item in states:
skiPositionGroup[item[2]].append(item)
first = True
#todo add velocity here
for skiPosition, group in skiPositionGroup.items():
formula += f"ski_position={skiPosition} & ("
yPosGroup = defaultdict(list)
for item in group:
yPosGroup[item[1]].append(item)
for y, y_group in yPosGroup.items():
if first:
first = False
firstVelocity = True
for skiPosition, skiPos_group in skiPositionGroup.items():
formula += f"ski_position={skiPosition} & "
velocityGroup = defaultdict(list)
for item in skiPos_group:
velocityGroup[item[3]].append(item)
for velocity, velocity_group in velocityGroup.items():
if firstVelocity:
firstVelocity = False
else:
formula += " | "
sorted_y_group = sorted(y_group, key=lambda s: s[0])
formula += f"( y={y} & ("
current_x_min = sorted_y_group[0][0]
current_x = sorted_y_group[0][0]
x_ranges = list()
for state in sorted_y_group[1:-1]:
if state[0] - current_x == 1:
current_x = state[0]
formula += f" (velocity={velocity} & "
firstY = True
yPosGroup = defaultdict(list)
for item in velocity_group:
yPosGroup[item[1]].append(item)
for y, y_group in yPosGroup.items():
if firstY:
firstY = False
else:
x_ranges.append(f" ({current_x_min}<= x & x<={current_x})")
current_x_min = state[0]
current_x = state[0]
x_ranges.append(f" ({current_x_min}<= x & x<={sorted_y_group[-1][0]})")
formula += " | ".join(x_ranges)
formula += ") )"
formula += ")"
formula += " | "
sorted_y_group = sorted(y_group, key=lambda s: s[0])
formula += f"( y={y} & ("
current_x_min = sorted_y_group[0][0]
current_x = sorted_y_group[0][0]
x_ranges = list()
for state in sorted_y_group[1:-1]:
if state[0] - current_x == 1:
current_x = state[0]
else:
x_ranges.append(f" ({current_x_min}<= x & x<={current_x})")
current_x_min = state[0]
current_x = state[0]
x_ranges.append(f" ({current_x_min}<= x & x<={sorted_y_group[-1][0]})")
formula += " | ".join(x_ranges)
formula += ") )"
formula += ")"
return formula
def createBalancedDisjunction(indices, name):
#logger.info(f"Creating balanced disjunction for {len(indices)} ({indices}) formulas")
if len(indices) == 0:
@ -245,9 +271,9 @@ def createUnsafeFormula(clusters):
formulas = ""
indices = list()
for i, cluster in enumerate(clusters):
formulas += f"formula Unsafe_{i} = {clusterFormula(cluster)};\n"
formulas += f"formula Unsafe_{i} = {clusterFormulaTrimmed(cluster)};\n"
indices.append(f"Unsafe_{i}")
return formulas + "\n" + createBalancedDisjunction(indices, "Unsafe") + label
return formulas + "\n" + createBalancedDisjunction(indices, "Unsafe")# + label
def createSafeFormula(clusters):
label = "label \"Safe\" = Safe;\n"
@ -257,7 +283,7 @@ def createSafeFormula(clusters):
formulas += f"formula Safe_{i} = {clusterFormula(cluster)};\n"
indices.append(f"Safe_{i}")
return formulas + "\n" + createBalancedDisjunction(indices, "Safe") + label
return formulas + "\n" + createBalancedDisjunction(indices, "Safe")# + label
def updatePrismFile(newFile, iteration, safeStates, unsafeStates):
logger.info("Creating next prism file")
@ -299,25 +325,32 @@ markerSize = 1
imagesDir = f"images/testing_{experiment_id}"
def drawOntoSkiPosImage(states, color, target_prefix="cluster_", alpha_factor=1.0):
markerList = {ski_position:list() for ski_position in range(1,num_ski_positions + 1)}
#markerList = {ski_position:list() for ski_position in range(1,num_ski_positions + 1)}
markerList = {(ski_position, velocity):list() for velocity in range(0, num_velocities + 1) for ski_position in range(1,num_ski_positions + 1)}
for state in states:
s = state[0]
#marker = f"-fill 'rgba({color}, {alpha_factor * state[1].ranking})' -draw 'rectangle {s.x-markerSize},{s.y-markerSize} {s.x+markerSize},{s.y+markerSize} '"
marker = f"-fill 'rgba({color}, {alpha_factor * state[1].ranking})' -draw 'point {s.x},{s.y} '"
markerList[s.ski_position].append(marker)
for pos, marker in markerList.items():
command = f"convert {imagesDir}/{target_prefix}_{pos:02}_individual.png {' '.join(marker)} {imagesDir}/{target_prefix}_{pos:02}_individual.png"
markerList[(s.ski_position, s.velocity)].append(marker)
for (pos, vel), marker in markerList.items():
command = f"convert {imagesDir}/{target_prefix}_{pos:02}_{vel:02}_individual.png {' '.join(marker)} {imagesDir}/{target_prefix}_{pos:02}_{vel:02}_individual.png"
exec(command, verbose=False)
def concatImages(prefix, iteration):
exec(f"montage {imagesDir}/{prefix}_*_individual.png -geometry +0+0 -tile x1 {imagesDir}/{prefix}_{iteration}.png", verbose=False)
exec(f"sxiv {imagesDir}/{prefix}_{iteration}.png&", verbose=False)
images = [f"{imagesDir}/{prefix}_{pos:02}_{vel:02}_individual.png" for vel in range(0,num_velocities+1) for pos in range(1,num_ski_positions+1) ]
for vel in range(0, num_velocities + 1):
for pos in range(1, num_ski_positions + 1):
command = f"convert {imagesDir}/{prefix}_{pos:02}_{vel:02}_individual.png "
command += f"-pointsize 10 -gravity NorthEast -annotate +8+0 'p{pos:02}v{vel:02}' "
command += f"{imagesDir}/{prefix}_{pos:02}_{vel:02}_individual.png"
exec(command, verbose=False)
exec(f"montage {' '.join(images)} -geometry +0+0 -tile 8x9 {imagesDir}/{prefix}_{iteration}.png", verbose=False)
#exec(f"sxiv {imagesDir}/{prefix}_{iteration}.png&", verbose=False)
def drawStatesOntoTiledImage(states, color, target, source="images/1_full_scaled_down.png", alpha_factor=1.0):
"""
Useful to draw a set of states, e.g. a single cluster
"""
markerList = {1: list(), 2:list(), 3:list(), 4:list(), 5:list(), 6:list(), 7:list(), 8:list()}
logger.info(f"Drawing {len(states)} states onto {target}")
tic()
@ -330,20 +363,27 @@ def drawStatesOntoTiledImage(states, color, target, source="images/1_full_scaled
exec(command, verbose=False)
exec(f"montage {imagesDir}/{target}_*_individual.png -geometry +0+0 -tile x1 {imagesDir}/{target}.png", verbose=False)
logger.info(f"Drawing {len(states)} states onto {target} - Done: took {toc()} seconds")
"""
def drawClusters(clusterDict, target, iteration, alpha_factor=1.0):
for ski_position in range(1, num_ski_positions + 1):
source = "images/1_full_scaled_down.png"
exec(f"cp {source} {imagesDir}/{target}_{ski_position:02}_individual.png", verbose=False)
logger.info(f"Drawing clusters")
tic()
for velocity in range(0, num_velocities + 1):
for ski_position in range(1, num_ski_positions + 1):
source = "images/1_full_scaled_down.png"
exec(f"cp {source} {imagesDir}/{target}_{ski_position:02}_{velocity:02}_individual.png", verbose=False)
for _, clusterStates in clusterDict.items():
color = f"{np.random.choice(range(256))}, {np.random.choice(range(256))}, {np.random.choice(range(256))}"
drawOntoSkiPosImage(clusterStates, color, target, alpha_factor=alpha_factor)
concatImages(target, iteration)
logger.info(f"Drawing clusters - DONE: took {toc()} seconds")
def drawResult(clusterDict, target, iteration):
for ski_position in range(1, num_ski_positions + 1):
source = "images/1_full_scaled_down.png"
exec(f"cp {source} {imagesDir}/{target}_{ski_position:02}_individual.png")
logger.info(f"Drawing clusters")
for velocity in range(0,num_velocities+1):
for ski_position in range(1, num_ski_positions + 1):
source = "images/1_full_scaled_down.png"
exec(f"cp {source} {imagesDir}/{target}_{ski_position:02}_{velocity:02}_individual.png", verbose=False)
for _, (clusterStates, result) in clusterDict.items():
color = "100,100,100"
if result == Verdict.GOOD:
@ -352,6 +392,7 @@ def drawResult(clusterDict, target, iteration):
color = "200,0,0"
drawOntoSkiPosImage(clusterStates, color, target, alpha_factor=0.7)
concatImages(target, iteration)
logger.info(f"Drawing clusters - DONE: took {toc()} seconds")
def _init_logger():
logger = logging.getLogger('main')
@ -364,8 +405,8 @@ def _init_logger():
def clusterImportantStates(ranking, iteration):
logger.info(f"Starting to cluster {len(ranking)} states into clusters")
tic()
#states = [[s[0].x,s[0].y, s[0].ski_position * 10, s[0].velocity * 10, s[1].ranking] for s in ranking]
states = [[s[0].x,s[0].y, s[0].ski_position * 30, s[1].ranking] for s in ranking]
states = [[s[0].x,s[0].y, s[0].ski_position * 20, s[0].velocity * 20, s[1].ranking] for s in ranking]
#states = [[s[0].x,s[0].y, s[0].ski_position * 30, s[1].ranking] for s in ranking]
#kmeans = KMeans(n_clusters, random_state=0, n_init="auto").fit(states)
dbscan = DBSCAN(eps=15).fit(states)
labels = dbscan.labels_
@ -400,7 +441,7 @@ if __name__ == '__main__':
for id, cluster in clusters.items():
num_tests = int(factor_tests_per_cluster * len(cluster))
num_tests = 1
logger.info(f"Testing {num_tests} states (from {len(cluster)} states) from cluster {id}")
#logger.info(f"Testing {num_tests} states (from {len(cluster)} states) from cluster {id}")
randomStates = np.random.choice(len(cluster), num_tests, replace=False)
randomStates = [cluster[i] for i in randomStates]
@ -409,9 +450,9 @@ if __name__ == '__main__':
x = state[0].x
y = state[0].y
ski_pos = state[0].ski_position
#velocity = state[0].velocity
result = run_single_test(ale,nn_wrapper,x,y,ski_pos, duration=50)
#result = run_single_test(ale,nn_wrapper,x,y,ski_pos, velocity, duration=50)
velocity = state[0].velocity
#result = run_single_test(ale,nn_wrapper,x,y,ski_pos, duration=50)
result = run_single_test(ale,nn_wrapper,x,y,ski_pos, velocity, duration=50)
result = Verdict.BAD # TODO REMOVE ME!!!!!!!!!!!!!!
if result == Verdict.BAD:
if testAll:
@ -427,7 +468,7 @@ if __name__ == '__main__':
logger.info(f"Iteration: {iteration:03} -\tSafe Results : {sum([len(c) for c in safeStates])} -\tUnsafe Results:{sum([len(c) for c in unsafeStates])}")
if testAll: drawClusters(failingPerCluster, f"failing", iteration)
drawResult(clusterResult, "result", iteration)
#drawResult(clusterResult, "result", iteration)
iteration += 1
Loading…
Cancel
Save