Browse Source

added simulator scripts

main
sp 11 months ago
parent
commit
967584157a
  1. 87
      car_pedestrian_simulator.py
  2. 71
      task_scheduling_simulator.py

87
car_pedestrian_simulator.py

@ -0,0 +1,87 @@
import stormpy
import stormpy.simulator
newline = "\n"
def printFrame(streetLength, lanes, carPos, pedPos):
sidewalk = "#" * (streetLength+1)
frame = list(sidewalk + (lanes-2)*("_"*(streetLength+1)) + sidewalk)
frame[(int(carPos[0])-1)*(streetLength+1)+int(carPos[1])+1] = "C"
frame[(int(pedPos[0])-1)*(streetLength+1)+int(pedPos[1])+1] = "P"
for i in range(lanes):
frame[i*(streetLength+1)] = newline
return "".join(frame)
def dtmc():
prism_program = stormpy.parse_prism_program("/media/car_pedestrian_dtmc.prism")
streetLength = int(str(prism_program.get_constant("streetLength").definition))
lanes = int(str(prism_program.get_constant("lanes").definition))
model = stormpy.build_model(prism_program)
simulator = stormpy.simulator.create_simulator(model, seed=42)
options = stormpy.BuilderOptions([])
options.set_build_state_valuations()
options.set_build_all_reward_models()
model = stormpy.build_sparse_model_with_options(prism_program, options)
print(model)
simulator = stormpy.simulator.create_simulator(model, seed=42)
simulator.set_observation_mode(stormpy.simulator.SimulatorObservationMode.PROGRAM_LEVEL)
while True:
if simulator.is_done():
print("Pedestrian arrived on the other side!")
break
observation, reward, labels = simulator.step()
print(observation, labels)
print(printFrame(streetLength, lanes, (observation["car_lane_pos"], observation["car_street_pos"]), (observation["ped_lane_pos"], observation["ped_street_pos"])))
input("")
def mdp():
options = stormpy.BuilderOptions([])
options.set_build_state_valuations(True)
options.set_build_choice_labels(True)
options.set_build_all_labels()
prism_program = stormpy.parse_prism_program("/media/car_pedestrian_mdp.prism")
streetLength = int(str(prism_program.get_constant("streetLength").definition))
lanes = int(str(prism_program.get_constant("lanes").definition))
formula_str = "Pmax=? [G !\"crashed\" ];"
print(stormpy.build_model(prism_program))
formulas = stormpy.parse_properties_for_prism_program(formula_str, prism_program)
model = stormpy.build_sparse_model_with_options(prism_program, options)
initial_state = model.initial_states[0]
assert initial_state == 0
result = stormpy.model_checking(model, formulas[0], extract_scheduler=True)
assert result.has_scheduler
scheduler = result.scheduler
assert scheduler.memoryless
assert scheduler.deterministic
dtmc = model.apply_scheduler(scheduler)
print(dtmc)
simulator = stormpy.simulator.create_simulator(dtmc, seed=42)
simulator.set_observation_mode(stormpy.simulator.SimulatorObservationMode.PROGRAM_LEVEL)
i=0
while True:
if simulator.is_done():
print("Pedestrian arrived on the other side!")
break
observation, reward, labels = simulator.step()
if "crashed" in labels:
print("Pedestrian did not arrive on the other side!")
break
print(observation, labels)
frame = printFrame(int(streetLength), int(lanes), (observation["car_lane_pos"], observation["car_street_pos"]), (observation["ped_lane_pos"], observation["ped_street_pos"]))
print(frame)
input("")
with open(f"/media/frame{i:03}.txt", "w") as text_file:
text_file.write(frame)
i+=1
if __name__ == '__main__':
mdp()

71
task_scheduling_simulator.py

@ -0,0 +1,71 @@
import stormpy
import stormpy.simulator
def mdp():
options = stormpy.BuilderOptions([])
options.set_build_state_valuations(True)
options.set_build_choice_labels(True)
options.set_build_all_labels()
options.set_build_all_reward_models()
program = stormpy.parse_prism_program("/media/task_graph_scheduling.prism")
formula_str = "R{\"time\"}min=? [ F \"tasks_complete\" ];"
formulas = stormpy.parse_properties_for_prism_program(formula_str, program)
model = stormpy.build_sparse_model_with_options(program, options)
print(model)
initial_state = model.initial_states[0]
assert initial_state == 0
result = stormpy.model_checking(model, formulas[0], extract_scheduler=True)
assert result.has_scheduler
scheduler = result.scheduler
assert scheduler.memoryless
assert scheduler.deterministic
dtmc = model.apply_scheduler(scheduler)
print(dtmc)
simulator = stormpy.simulator.create_simulator(dtmc, seed=42)
simulator.set_observation_mode(stormpy.simulator.SimulatorObservationMode.PROGRAM_LEVEL)
step_counter = 0
busy1_counter = 0
busy2_counter = 0
print("""
t1 t3 t5
A+B-->Cx(A+B)-->DxCx(A+B)--. t6
\ \___\DxCx(A+B)+((A+B)+(CxD))
t2 \ t4 / /
CxD-->(A+B)+(CxD)------------°
""")
print(" " * 32 + "[+,x,x,+,x,+]")
while True:
if simulator.is_done():
print(f"Tasks complete! Ratio: (approx.): {busy1_counter/(busy1_counter+busy2_counter):1.4f}/{busy2_counter/(busy1_counter+busy2_counter):1.4f}, Steps needed (approx.): {step_counter}")
break
observation, reward, labels = simulator.step()
#input("")
busy1 = str(observation["p1_idle"]) != ("true")
busy2 = str(observation["p2_idle"]) != ("true")
if busy1: busy1_counter += 1
if busy2: busy2_counter += 1
tasks = [str(observation[f"task{i}"]) for i in range(1,7)]
print(f"Step {step_counter:3}: P1/2:{'busy' if busy1 else 'idle'}/{'busy' if busy2 else 'idle'} tasks: [{','.join(tasks)}], {str(observation['add_tick_1'])},{str(observation['mult_tick_1'])},{str(observation['add_tick_2'])},{str(observation['mult_tick_2'])},labels: {labels if len(labels) != 0 else ''}, ")
step_counter += 1
try:
with open("/media/induced_scheduling_dtmc.dot", "w") as text_file:
text_file.write(dtmc.to_dot())
except Exception as e:
print(e)
print("Could not write to file.")
input("Hit enter to evaluate: 'P=? [ F<={i} \"tasks_complete\" ]' on the DTMC:")
print(dtmc)
for i in range(30,61):
formula_str = f"P=? [ F<={i} \"tasks_complete\" ]"
formulas = stormpy.parse_properties_for_prism_program(formula_str, program)
result = stormpy.model_checking(dtmc, formulas[0])
print(f"Probability to finish within {i} timesteps : {result.at(dtmc.initial_states[0]):>1.12f}")
if __name__ == '__main__':
mdp()
Loading…
Cancel
Save