import stormpy import stormpy.simulator def mdp(): options = stormpy.BuilderOptions([]) options.set_build_state_valuations(True) options.set_build_choice_labels(True) options.set_build_all_labels() options.set_build_all_reward_models() program = stormpy.parse_prism_program("/media/task_graph_scheduling.prism") formula_str = "R{\"time\"}min=? [ F \"tasks_complete\" ];" formulas = stormpy.parse_properties_for_prism_program(formula_str, program) model = stormpy.build_sparse_model_with_options(program, options) print(model) initial_state = model.initial_states[0] assert initial_state == 0 result = stormpy.model_checking(model, formulas[0], extract_scheduler=True) assert result.has_scheduler scheduler = result.scheduler assert scheduler.memoryless assert scheduler.deterministic dtmc = model.apply_scheduler(scheduler) print(dtmc) simulator = stormpy.simulator.create_simulator(dtmc, seed=42) simulator.set_observation_mode(stormpy.simulator.SimulatorObservationMode.PROGRAM_LEVEL) step_counter = 0 busy1_counter = 0 busy2_counter = 0 print(""" ┌t1─┐ ┌t3─────┐ ┌t5───────┐ │A+B│-->│Cx(A+B)│-->│DxCx(A+B)│--. ┌t6─────────────────────┐ └───┘\ └───────┘ └─────────┘ \___\│DxCx(A+B)+((A+B)+(CxD))│ ┌t2─┐ \ ┌t4─────────┐ / /└───────────────────────┘ │CxD│-->│(A+B)+(CxD)│------------° └───┘ └───────────┘ """) print(" " * 32 + "[+,x,x,+,x,+]") while True: if simulator.is_done(): print(f"Tasks complete! Ratio: (approx.): {busy1_counter/(busy1_counter+busy2_counter):1.4f}/{busy2_counter/(busy1_counter+busy2_counter):1.4f}, Steps needed (approx.): {step_counter}") break observation, reward, labels = simulator.step() #input("") busy1 = str(observation["p1_idle"]) != ("true") busy2 = str(observation["p2_idle"]) != ("true") if busy1: busy1_counter += 1 if busy2: busy2_counter += 1 tasks = [str(observation[f"task{i}"]) for i in range(1,7)] print(f"Step {step_counter:3}: P1/2:{'busy' if busy1 else 'idle'}/{'busy' if busy2 else 'idle'} tasks: [{','.join(tasks)}], {str(observation['add_tick_1'])},{str(observation['mult_tick_1'])},{str(observation['add_tick_2'])},{str(observation['mult_tick_2'])},labels: {labels if len(labels) != 0 else ''}, ") step_counter += 1 try: with open("/media/induced_scheduling_dtmc.dot", "w") as text_file: text_file.write(dtmc.to_dot()) except Exception as e: print(e) print("Could not write to file.") input("Hit enter to evaluate: 'P=? [ F<={i} \"tasks_complete\" ]' on the DTMC:") print(dtmc) for i in range(30,61): formula_str = f"P=? [ F<={i} \"tasks_complete\" ]" formulas = stormpy.parse_properties_for_prism_program(formula_str, program) result = stormpy.model_checking(dtmc, formulas[0]) print(f"Probability to finish within {i} timesteps : {result.at(dtmc.initial_states[0]):>1.12f}") if __name__ == '__main__': mdp()