|
|
@ -144,6 +144,161 @@ class TestSparseModelComponents: |
|
|
|
assert dtmc.choice_origins is components.choice_origins |
|
|
|
assert dtmc.choice_origins.get_number_of_identifiers() == 9 |
|
|
|
|
|
|
|
def test_build_mdp(self): |
|
|
|
nr_states = 13 |
|
|
|
nr_choices = 14 |
|
|
|
|
|
|
|
# Build transition matrix |
|
|
|
builder = stormpy.SparseMatrixBuilder(rows=0, columns=0, entries=0, force_dimensions=False, |
|
|
|
has_custom_row_grouping=True, row_groups=0) |
|
|
|
|
|
|
|
# Row group, state 0 |
|
|
|
builder.new_row_group(0) |
|
|
|
builder.add_next_value(0, 1, 0.5) |
|
|
|
builder.add_next_value(0, 2, 0.5) |
|
|
|
builder.add_next_value(1, 1, 0.2) |
|
|
|
builder.add_next_value(1, 2, 0.8) |
|
|
|
# Row group, state 1 |
|
|
|
builder.new_row_group(2) |
|
|
|
builder.add_next_value(2, 3, 0.5) |
|
|
|
builder.add_next_value(2, 4, 0.5) |
|
|
|
# Row group, state 2 |
|
|
|
builder.new_row_group(3) |
|
|
|
builder.add_next_value(3, 5, 0.5) |
|
|
|
builder.add_next_value(3, 6, 0.5) |
|
|
|
# Row group, state 3 |
|
|
|
builder.new_row_group(4) |
|
|
|
builder.add_next_value(4, 7, 0.5) |
|
|
|
builder.add_next_value(4, 1, 0.5) |
|
|
|
# Row group, state 4 |
|
|
|
builder.new_row_group(5) |
|
|
|
builder.add_next_value(5, 8, 0.5) |
|
|
|
builder.add_next_value(5, 9, 0.5) |
|
|
|
# Row group, state 5 |
|
|
|
builder.new_row_group(6) |
|
|
|
builder.add_next_value(6, 10, 0.5) |
|
|
|
builder.add_next_value(6, 11, 0.5) |
|
|
|
# Row group, state 6 |
|
|
|
builder.new_row_group(7) |
|
|
|
builder.add_next_value(7, 2, 0.5) |
|
|
|
builder.add_next_value(7, 12, 0.5) |
|
|
|
# final states |
|
|
|
for s in range(8, 14): |
|
|
|
builder.new_row_group(s) |
|
|
|
builder.add_next_value(s, s - 1, 1) |
|
|
|
|
|
|
|
transition_matrix = builder.build(nr_choices, nr_states) |
|
|
|
|
|
|
|
# state labeling |
|
|
|
state_labeling = stormpy.storage.StateLabeling(nr_states) |
|
|
|
labels = {'init', 'one', 'two', 'three', 'four', 'five', 'six', 'done', 'deadlock'} |
|
|
|
for label in labels: |
|
|
|
state_labeling.add_label(label) |
|
|
|
state_labeling.add_label_to_state('init', 0) |
|
|
|
state_labeling.add_label_to_state('one', 7) |
|
|
|
state_labeling.add_label_to_state('two', 8) |
|
|
|
state_labeling.add_label_to_state('three', 9) |
|
|
|
state_labeling.add_label_to_state('four', 10) |
|
|
|
state_labeling.add_label_to_state('five', 11) |
|
|
|
state_labeling.add_label_to_state('six', 12) |
|
|
|
|
|
|
|
state_labeling.set_states('done', stormpy.BitVector(nr_states, [7, 8, 9, 10, 11, 12])) |
|
|
|
|
|
|
|
# reward models |
|
|
|
reward_models = {} |
|
|
|
# Vector representing the state-action rewards |
|
|
|
action_reward = [0.0, 0.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0] |
|
|
|
reward_models['coin_flips'] = stormpy.SparseRewardModel(optional_state_action_reward_vector=action_reward) |
|
|
|
|
|
|
|
# choice labeling |
|
|
|
choice_labeling = stormpy.storage.ChoiceLabeling(nr_choices) |
|
|
|
choice_labels = {'a', 'b'} |
|
|
|
for label in choice_labels: |
|
|
|
choice_labeling.add_label(label) |
|
|
|
choice_labeling.add_label_to_choice('a', 0) |
|
|
|
choice_labeling.add_label_to_choice('b', 1) |
|
|
|
|
|
|
|
# state valuations |
|
|
|
manager = stormpy.ExpressionManager() |
|
|
|
var_s = manager.create_integer_variable(name='s') |
|
|
|
var_d = manager.create_integer_variable(name='d') |
|
|
|
v_builder = stormpy.StateValuationsBuilder() |
|
|
|
v_builder.add_variable(var_s) |
|
|
|
v_builder.add_variable(var_d) |
|
|
|
for s in range(7): |
|
|
|
# values: vector [value for s, value for d] |
|
|
|
v_builder.add_state(state=s, boolean_values=[], integer_values=[s, 0], rational_values=[]) |
|
|
|
for s in range(7, 13): |
|
|
|
v_builder.add_state(state=s, boolean_values=[], integer_values=[7, s - 6], rational_values=[]) |
|
|
|
state_valuations = v_builder.build(13) |
|
|
|
|
|
|
|
# choice origins |
|
|
|
prism_program = stormpy.parse_prism_program(get_example_path("mdp", "die_c1.nm")) |
|
|
|
index_to_identifier_mapping = [1, 2, 3, 4, 5, 6, 7, 8, 9, 9, 9, 9, 9, 9] |
|
|
|
|
|
|
|
id_to_command_set_mapping = [stormpy.FlatSet() for _ in range(10)] |
|
|
|
for i in range(1, 9): |
|
|
|
# 0: no origin |
|
|
|
id_to_command_set_mapping[i].insert(i - 1) |
|
|
|
id_to_command_set_mapping[9].insert(8) |
|
|
|
choice_origins = stormpy.PrismChoiceOrigins(prism_program, index_to_identifier_mapping, |
|
|
|
id_to_command_set_mapping) |
|
|
|
|
|
|
|
# Construct Components |
|
|
|
components = stormpy.SparseModelComponents(transition_matrix=transition_matrix, state_labeling=state_labeling, |
|
|
|
reward_models=reward_models, rate_transitions=False) |
|
|
|
components.state_valuations = state_valuations |
|
|
|
components.choice_labeling = choice_labeling |
|
|
|
components.choice_origins = choice_origins |
|
|
|
|
|
|
|
# Build MDP |
|
|
|
mdp = stormpy.storage.SparseMdp(components) |
|
|
|
|
|
|
|
assert type(mdp) is stormpy.SparseMdp |
|
|
|
assert not mdp.supports_parameters |
|
|
|
|
|
|
|
# Test transition matrix |
|
|
|
assert mdp.nr_choices == nr_choices |
|
|
|
assert mdp.nr_states == nr_states |
|
|
|
assert mdp.nr_transitions == 22 |
|
|
|
assert mdp.transition_matrix.nr_entries == mdp.nr_transitions |
|
|
|
for e in mdp.transition_matrix: |
|
|
|
assert e.value() == 0.5 or e.value() == 0 or e.value() == 0.2 or e.value() == 0.8 or ( |
|
|
|
e.value() == 1 and e.column > 6) |
|
|
|
for state in mdp.states: |
|
|
|
assert len(state.actions) <= 2 |
|
|
|
|
|
|
|
# Test state labeling |
|
|
|
assert mdp.labeling.get_labels() == {'init', 'deadlock', 'done', 'one', 'two', 'three', 'four', 'five', 'six'} |
|
|
|
|
|
|
|
# Test reward models |
|
|
|
assert len(mdp.reward_models) == 1 |
|
|
|
assert not mdp.reward_models["coin_flips"].has_state_rewards |
|
|
|
assert mdp.reward_models["coin_flips"].has_state_action_rewards |
|
|
|
for reward in mdp.reward_models["coin_flips"].state_action_rewards: |
|
|
|
assert reward == 1.0 or reward == 0.0 |
|
|
|
assert not mdp.reward_models["coin_flips"].has_transition_rewards |
|
|
|
|
|
|
|
# Test choice labeling |
|
|
|
assert mdp.has_choice_labeling() |
|
|
|
assert mdp.choice_labeling.get_labels() == {'a', 'b'} |
|
|
|
|
|
|
|
# Test state valuations |
|
|
|
assert mdp.has_state_valuations() |
|
|
|
assert mdp.state_valuations |
|
|
|
value_s = [None] * nr_states |
|
|
|
value_d = [None] * nr_states |
|
|
|
for s in range(0, mdp.nr_states): |
|
|
|
value_s[s] = mdp.state_valuations.get_integer_value(s, var_s) |
|
|
|
value_d[s] = mdp.state_valuations.get_integer_value(s, var_d) |
|
|
|
assert value_s == [0, 1, 2, 3, 4, 5, 6, 7, 7, 7, 7, 7, 7] |
|
|
|
assert value_d == [0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 4, 5, 6] |
|
|
|
|
|
|
|
# Test choice origins |
|
|
|
assert mdp.has_choice_origins() |
|
|
|
assert mdp.choice_origins is components.choice_origins |
|
|
|
assert mdp.choice_origins.get_number_of_identifiers() == 10 |
|
|
|
|
|
|
|
@numpy_avail |
|
|
|
def test_build_ctmc(self): |
|
|
|
import numpy as np |
|
|
@ -452,161 +607,6 @@ class TestSparseModelComponents: |
|
|
|
# Test Markovian states |
|
|
|
assert ma.markovian_states == stormpy.BitVector(5, [0, 1, 2, 3, 4]) |
|
|
|
|
|
|
|
def test_build_mdp(self): |
|
|
|
nr_states = 13 |
|
|
|
nr_choices = 14 |
|
|
|
|
|
|
|
# Build transition matrix |
|
|
|
builder = stormpy.SparseMatrixBuilder(rows=0, columns=0, entries=0, force_dimensions=False, |
|
|
|
has_custom_row_grouping=True, row_groups=0) |
|
|
|
|
|
|
|
# Row group, state 0 |
|
|
|
builder.new_row_group(0) |
|
|
|
builder.add_next_value(0, 1, 0.5) |
|
|
|
builder.add_next_value(0, 2, 0.5) |
|
|
|
builder.add_next_value(1, 1, 0.2) |
|
|
|
builder.add_next_value(1, 2, 0.8) |
|
|
|
# Row group, state 1 |
|
|
|
builder.new_row_group(2) |
|
|
|
builder.add_next_value(2, 3, 0.5) |
|
|
|
builder.add_next_value(2, 4, 0.5) |
|
|
|
# Row group, state 2 |
|
|
|
builder.new_row_group(3) |
|
|
|
builder.add_next_value(3, 5, 0.5) |
|
|
|
builder.add_next_value(3, 6, 0.5) |
|
|
|
# Row group, state 3 |
|
|
|
builder.new_row_group(4) |
|
|
|
builder.add_next_value(4, 7, 0.5) |
|
|
|
builder.add_next_value(4, 1, 0.5) |
|
|
|
# Row group, state 4 |
|
|
|
builder.new_row_group(5) |
|
|
|
builder.add_next_value(5, 8, 0.5) |
|
|
|
builder.add_next_value(5, 9, 0.5) |
|
|
|
# Row group, state 5 |
|
|
|
builder.new_row_group(6) |
|
|
|
builder.add_next_value(6, 10, 0.5) |
|
|
|
builder.add_next_value(6, 11, 0.5) |
|
|
|
# Row group, state 6 |
|
|
|
builder.new_row_group(7) |
|
|
|
builder.add_next_value(7, 2, 0.5) |
|
|
|
builder.add_next_value(7, 12, 0.5) |
|
|
|
# final states |
|
|
|
for s in range(8, 14): |
|
|
|
builder.new_row_group(s) |
|
|
|
builder.add_next_value(s, s - 1, 1) |
|
|
|
|
|
|
|
transition_matrix = builder.build(nr_choices, nr_states) |
|
|
|
|
|
|
|
# state labeling |
|
|
|
state_labeling = stormpy.storage.StateLabeling(nr_states) |
|
|
|
labels = {'init', 'one', 'two', 'three', 'four', 'five', 'six', 'done', 'deadlock'} |
|
|
|
for label in labels: |
|
|
|
state_labeling.add_label(label) |
|
|
|
state_labeling.add_label_to_state('init', 0) |
|
|
|
state_labeling.add_label_to_state('one', 7) |
|
|
|
state_labeling.add_label_to_state('two', 8) |
|
|
|
state_labeling.add_label_to_state('three', 9) |
|
|
|
state_labeling.add_label_to_state('four', 10) |
|
|
|
state_labeling.add_label_to_state('five', 11) |
|
|
|
state_labeling.add_label_to_state('six', 12) |
|
|
|
|
|
|
|
state_labeling.set_states('done', stormpy.BitVector(nr_states, [7, 8, 9, 10, 11, 12])) |
|
|
|
|
|
|
|
# reward models |
|
|
|
reward_models = {} |
|
|
|
# Vector representing the state-action rewards |
|
|
|
action_reward = [0.0, 0.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0] |
|
|
|
reward_models['coin_flips'] = stormpy.SparseRewardModel(optional_state_action_reward_vector=action_reward) |
|
|
|
|
|
|
|
# choice labeling |
|
|
|
choice_labeling = stormpy.storage.ChoiceLabeling(nr_choices) |
|
|
|
choice_labels = {'a', 'b'} |
|
|
|
for label in choice_labels: |
|
|
|
choice_labeling.add_label(label) |
|
|
|
choice_labeling.add_label_to_choice('a', 0) |
|
|
|
choice_labeling.add_label_to_choice('b', 1) |
|
|
|
|
|
|
|
# state valuations |
|
|
|
manager = stormpy.ExpressionManager() |
|
|
|
var_s = manager.create_integer_variable(name='s') |
|
|
|
var_d = manager.create_integer_variable(name='d') |
|
|
|
v_builder = stormpy.StateValuationsBuilder() |
|
|
|
v_builder.add_variable(var_s) |
|
|
|
v_builder.add_variable(var_d) |
|
|
|
for s in range(7): |
|
|
|
# values: vector [value for s, value for d] |
|
|
|
v_builder.add_state(state=s, boolean_values=[], integer_values=[s, 0], rational_values=[]) |
|
|
|
for s in range(7, 13): |
|
|
|
v_builder.add_state(state=s, boolean_values=[], integer_values=[7, s - 6], rational_values=[]) |
|
|
|
state_valuations = v_builder.build(13) |
|
|
|
|
|
|
|
# choice origins |
|
|
|
prism_program = stormpy.parse_prism_program(get_example_path("mdp", "die_c1.nm")) |
|
|
|
index_to_identifier_mapping = [1, 2, 3, 4, 5, 6, 7, 8, 9, 9, 9, 9, 9, 9] |
|
|
|
|
|
|
|
id_to_command_set_mapping = [stormpy.FlatSet() for _ in range(10)] |
|
|
|
for i in range(1, 9): |
|
|
|
# 0: no origin |
|
|
|
id_to_command_set_mapping[i].insert(i - 1) |
|
|
|
id_to_command_set_mapping[9].insert(8) |
|
|
|
choice_origins = stormpy.PrismChoiceOrigins(prism_program, index_to_identifier_mapping, |
|
|
|
id_to_command_set_mapping) |
|
|
|
|
|
|
|
# Construct Components |
|
|
|
components = stormpy.SparseModelComponents(transition_matrix=transition_matrix, state_labeling=state_labeling, |
|
|
|
reward_models=reward_models, rate_transitions=False) |
|
|
|
components.state_valuations = state_valuations |
|
|
|
components.choice_labeling = choice_labeling |
|
|
|
components.choice_origins = choice_origins |
|
|
|
|
|
|
|
# Build MDP |
|
|
|
mdp = stormpy.storage.SparseMdp(components) |
|
|
|
|
|
|
|
assert type(mdp) is stormpy.SparseMdp |
|
|
|
assert not mdp.supports_parameters |
|
|
|
|
|
|
|
# Test transition matrix |
|
|
|
assert mdp.nr_choices == nr_choices |
|
|
|
assert mdp.nr_states == nr_states |
|
|
|
assert mdp.nr_transitions == 22 |
|
|
|
assert mdp.transition_matrix.nr_entries == mdp.nr_transitions |
|
|
|
for e in mdp.transition_matrix: |
|
|
|
assert e.value() == 0.5 or e.value() == 0 or e.value() == 0.2 or e.value() == 0.8 or ( |
|
|
|
e.value() == 1 and e.column > 6) |
|
|
|
for state in mdp.states: |
|
|
|
assert len(state.actions) <= 2 |
|
|
|
|
|
|
|
# Test state labeling |
|
|
|
assert mdp.labeling.get_labels() == {'init', 'deadlock', 'done', 'one', 'two', 'three', 'four', 'five', 'six'} |
|
|
|
|
|
|
|
# Test reward models |
|
|
|
assert len(mdp.reward_models) == 1 |
|
|
|
assert not mdp.reward_models["coin_flips"].has_state_rewards |
|
|
|
assert mdp.reward_models["coin_flips"].has_state_action_rewards |
|
|
|
for reward in mdp.reward_models["coin_flips"].state_action_rewards: |
|
|
|
assert reward == 1.0 or reward == 0.0 |
|
|
|
assert not mdp.reward_models["coin_flips"].has_transition_rewards |
|
|
|
|
|
|
|
# Test choice labeling |
|
|
|
assert mdp.has_choice_labeling() |
|
|
|
assert mdp.choice_labeling.get_labels() == {'a', 'b'} |
|
|
|
|
|
|
|
# Test state valuations |
|
|
|
assert mdp.has_state_valuations() |
|
|
|
assert mdp.state_valuations |
|
|
|
value_s = [None] * nr_states |
|
|
|
value_d = [None] * nr_states |
|
|
|
for s in range(0, mdp.nr_states): |
|
|
|
value_s[s] = mdp.state_valuations.get_integer_value(s, var_s) |
|
|
|
value_d[s] = mdp.state_valuations.get_integer_value(s, var_d) |
|
|
|
assert value_s == [0, 1, 2, 3, 4, 5, 6, 7, 7, 7, 7, 7, 7] |
|
|
|
assert value_d == [0, 0, 0, 0, 0, 0, 0, 1, 2, 3, 4, 5, 6] |
|
|
|
|
|
|
|
# Test choice origins |
|
|
|
assert mdp.has_choice_origins() |
|
|
|
assert mdp.choice_origins is components.choice_origins |
|
|
|
assert mdp.choice_origins.get_number_of_identifiers() == 10 |
|
|
|
|
|
|
|
@numpy_avail |
|
|
|
def test_build_pomdp(self): |
|
|
|
import numpy as np |
|
|
|