You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							68 lines
						
					
					
						
							2.3 KiB
						
					
					
				
			
		
		
		
			
			
			
				
					
				
				
					
				
			
		
		
	
	
							68 lines
						
					
					
						
							2.3 KiB
						
					
					
				| import sys | |
| from enum import Flag, auto | |
| 
 | |
| import numpy as np | |
| 
 | |
| 
 | |
| class Verdict(Flag): | |
|     INCONCLUSIVE = auto() | |
|     PASS = auto() | |
|     FAIL = auto() | |
| 
 | |
| 
 | |
| 
 | |
| class Simulator(): | |
|     def __init__(self, allStateActionPairs, strategy, deadlockStates, reachedStates, bound=3, numSimulations=1): | |
|         self.allStateActionPairs = { ( pair.state_id, pair.action_id ) : pair.next_state_probabilities for pair in allStateActionPairs } | |
|         self.strategy = strategy | |
|         self.deadlockStates = deadlockStates | |
|         self.reachedStates = reachedStates | |
| 
 | |
|         #print(f"Deadlock: {self.deadlockStates}") | |
|         #print(f"GoalStates: {self.reachedStates}") | |
| 
 | |
| 
 | |
|         self.bound = bound | |
|         self.numSimulations = numSimulations | |
| 
 | |
|         allStates = set([state.state_id for state in allStateActionPairs]) | |
|         allStates = allStates.difference(set(deadlockStates)) | |
|         allStates = allStates.difference(set(reachedStates)) | |
|         self.allStates = np.array(list(allStates)) | |
| 
 | |
|     def _pickRandomTestCase(self): | |
|         testCase = np.random.choice(self.allStates, 1)[0] | |
|         #self.allStates = np.delete(self.allStates, testCase) | |
|         return testCase | |
| 
 | |
|     def _simulate(self, initialStateId): | |
|         i = 0 | |
| 
 | |
|         actionId = self.strategy[initialStateId] | |
|         nextStatePair = (initialStateId, actionId) | |
| 
 | |
|         while i < self.bound: | |
|             i += 1 | |
|             nextStateProbabilities = self.allStateActionPairs[nextStatePair] | |
|             weights = list() | |
|             nextStateIds = list() | |
|             for nextStateId, probability in nextStateProbabilities.items(): | |
|                 weights.append(probability) | |
|                 nextStateIds.append(nextStateId) | |
|             nextStateId = np.random.choice(nextStateIds, 1, p=weights)[0] | |
|             if nextStateId in self.deadlockStates: | |
|                 return Verdict.FAIL, i | |
|             if nextStateId in self.reachedStates: | |
|                 return Verdict.PASS, i | |
|             nextStatePair = (nextStateId, self.strategy[nextStateId]) | |
|         return Verdict.INCONCLUSIVE, i | |
| 
 | |
|     def runTest(self): | |
|         testCase = self._pickRandomTestCase() | |
| 
 | |
|         histogram = [0,0,0] | |
|         for i in range(self.numSimulations): | |
|             result, numQueries = self._simulate(testCase) | |
|             if result == Verdict.FAIL: | |
|                 return testCase, Verdict.FAIL, numQueries | |
|             return testCase, Verdict.INCONCLUSIVE, numQueries
 |