import time, re, sys, csv, os import gym from PIL import Image from copy import deepcopy from dataclasses import dataclass, field import numpy as np from matplotlib import pyplot as plt import readchar def string_to_action(action): if action == "left": return 2 if action == "right": return 1 if action == "noop": return 0 return 0 scheduler_file = "x80_y128_pos8.sched" def convert(tuples): return dict(tuples) @dataclass(frozen=True) class State: x: int y: int ski_position: int def parse_scheduler(scheduler_file): scheduler = dict() try: with open(scheduler_file, "r") as f: file_content = f.readlines() for line in file_content: if not "move=0" in line: continue stateMapping = convert(re.findall(r"([a-zA-Z_]*[a-zA-Z])=(\d+)?", line)) #print("stateMapping", stateMapping) choice = re.findall(r"{(left|right|noop)}", line) if choice: choice = choice[0] #print("choice", choice) state = State(int(stateMapping["x"]), int(stateMapping["y"]), int(stateMapping["ski_position"])) scheduler[state] = choice return scheduler except EnvironmentError: print("TODO file not available. Exiting.") sys.exit(1) env = gym.make("ALE/Skiing-v5")#, render_mode="human") #env = gym.wrappers.ResizeObservation(env, (84, 84)) #env = gym.wrappers.GrayScaleObservation(env) observation, info = env.reset() y = 40 standstillcounter = 0 def update_y(y, ski_position): y_update = 0 global standstillcounter if ski_position in [6,7, 8,9]: standstillcounter = 0 y_update = 16 elif ski_position in [4,5, 10,11]: standstillcounter = 0 y_update = 12 elif ski_position in [2,3, 12,13]: standstillcounter = 0 y_update = 8 elif ski_position in [1, 14] and standstillcounter >= 5: if standstillcounter >= 8: print("!!!!!!!!!! no more x updates!!!!!!!!!!!") y_update = 0 elif ski_position in [1, 14]: y_update = 4 if ski_position in [1, 14]: standstillcounter += 1 return y_update def update_ski_position(ski_position, action): if action == 0: return ski_position elif action == 1: return min(ski_position+1, 14) elif action == 2: return max(ski_position-1, 1) approx_x_coordinate = 80 ski_position = 8 #scheduler = parse_scheduler(scheduler_file) j = 0 for _ in range(1000000): j += 1 #action = env.action_space.sample() # agent policy that uses the observation and info #action = int(repr(readchar.readchar())[1]) #action = string_to_action(scheduler.get(State(approx_x_coordinate, y, ski_position), "noop")) action = 0 #ski_position = update_ski_position(ski_position, action) #y_update = update_y(y, ski_position) #y += y_update if y_update else 0 #old_x = deepcopy(approx_x_coordinate) #approx_x_coordinate = int(np.mean(np.where(observation[:,:,1] == 92)[1])) #print(f"Action: {action},\tski position: {ski_position},\ty_update: {y_update},\ty: {y},\tx: {approx_x_coordinate},\tx_update:{approx_x_coordinate - old_x}") observation, reward, terminated, truncated, info = env.step(action) if terminated or truncated: observation, info = env.reset() break img = Image.fromarray(observation) img.save(f"images/{j:05}.png") #observation, reward, terminated, truncated, info = env.step(0) #observation, reward, terminated, truncated, info = env.step(0) #observation, reward, terminated, truncated, info = env.step(0) #observation, reward, terminated, truncated, info = env.step(0) env.close()