Browse Source

fix some multi-threading issues

tempestpy_adaptions
dehnert 7 years ago
parent
commit
1d1b17a707
  1. 69
      src/storm/storage/dd/bisimulation/SignatureRefiner.cpp

69
src/storm/storage/dd/bisimulation/SignatureRefiner.cpp

@ -27,8 +27,11 @@
#include "sylvan_table.h"
#include "sylvan_int.h"
// FIXME: remove
#include "storm/storage/dd/DdManager.h"
#ifdef STORM_HAVE_INTELTBB
#include "tbb/tbb.h"
#else
#error "Need TBB at this point."
#endif
namespace storm {
namespace dd {
@ -471,6 +474,23 @@ namespace storm {
template<storm::dd::DdType DdType>
class InternalSignatureRefinerBase;
struct MutexWrapper {
MutexWrapper() : mutex() {
// Intentionally left empty.
}
MutexWrapper(MutexWrapper const& other) {
// Intentionally left empty to create new mutex when copying.
}
MutexWrapper& operator=(MutexWrapper const& other) {
// Intentionally left empty to create new mutex when copying.
return *this;
}
std::mutex mutex;
};
template<>
class InternalSignatureRefinerBase<storm::dd::DdType::Sylvan> {
public:
@ -513,10 +533,12 @@ namespace storm {
// The cache used to identify states with identical signature.
spp::sparse_hash_map<std::pair<MTBDD, MTBDD>, std::pair<BDD, BDD>, SylvanMTBDDPairHash> signatureCache;
spp::sparse_hash_map<std::pair<MTBDD, MTBDD>, BDD, SylvanMTBDDPairHash> signatureCacheSingle;
// spp::sparse_hash_map<std::pair<MTBDD, MTBDD>, std::pair<BDD, MutexWrapper>, SylvanMTBDDPairHash> signatureCacheParallel;
tbb::concurrent_unordered_map<std::pair<MTBDD, MTBDD>, std::pair<BDD, std::unique_ptr<std::mutex>>, SylvanMTBDDPairHash> signatureCacheParallel;
// The cache used to identify which old block numbers have already been reused.
spp::sparse_hash_map<MTBDD, ReuseWrapper> reuseBlocksCache;
tbb::concurrent_unordered_map<MTBDD, ReuseWrapper> reuseBlocksCacheParallel;
// A mutex that can be used to synchronize concurrent accesses to the members.
std::mutex mutex;
@ -538,8 +560,9 @@ namespace storm {
void clearCaches() {
signatureCache.clear();
signatureCacheSingle.clear();
signatureCacheParallel.clear();
reuseBlocksCache.clear();
reuseBlocksCacheParallel.clear();
}
std::pair<storm::dd::Bdd<storm::dd::DdType::Sylvan>, boost::optional<storm::dd::Bdd<storm::dd::DdType::Sylvan>>> refine(Partition<storm::dd::DdType::Sylvan, ValueType> const& oldPartition, storm::dd::Add<storm::dd::DdType::Sylvan, ValueType> const& signatureAdd) {
@ -602,7 +625,6 @@ namespace storm {
auto& reuseBlockEntry = reuseBlocksCache[partitionNode];
if (!reuseBlockEntry.isReused()) {
reuseBlockEntry.setReused();
reuseBlocksCache.emplace(partitionNode, true);
std::pair<BDD, BDD> result;
if (options.createChangedStates) {
result = std::make_pair(partitionNode, sylvan_false);
@ -750,7 +772,6 @@ namespace storm {
auto& reuseBlockEntry = reuseBlocksCache[partitionNode];
if (!reuseBlockEntry.isReused()) {
reuseBlockEntry.setReused();
reuseBlocksCache.emplace(partitionNode, true);
std::pair<BDD, BDD> result;
if (options.createChangedStates) {
result = std::make_pair(partitionNode, sylvan_false);
@ -888,34 +909,38 @@ namespace storm {
}
if (sylvan_isconst(nonBlockVariablesNode)) {
// Get the lock so we can modify the signature cache.
auto nodePair = std::make_pair(signatureNode, partitionNode);
lock.lock();
auto& signatureCacheEntry = refiner.signatureCacheParallel[nodePair];
// Check the signature cache whether we have seen this signature before.
auto nodePair = std::make_pair(signatureNode, partitionNode);
auto it = refiner.signatureCacheSingle.find(nodePair);
if (it != refiner.signatureCacheSingle.end()) {
return it->second;
// If the mutex already exists, it means that some other thread has created it and this thread is
// supposed to yield the right answer. We can therefore wait for this lock and then return the result.
if (signatureCacheEntry.second) {
std::unique_lock<std::mutex> blockLock(*signatureCacheEntry.second);
return signatureCacheEntry.first;
} else {
signatureCacheEntry.second = std::make_unique<std::mutex>();
}
std::unique_lock<std::mutex> blockLock(*signatureCacheEntry.second);
lock.unlock();
if (refiner.options.reuseBlockNumbers) {
// If this is the first time (in this traversal) that we encounter this signature, we check
// whether we can assign the old block number to it.
auto& reuseBlockEntry = refiner.reuseBlocksCache[partitionNode];
auto& reuseBlockEntry = refiner.reuseBlocksCacheParallel[partitionNode];
if (!reuseBlockEntry.isReused()) {
reuseBlockEntry.setReused();
refiner.reuseBlocksCache.emplace(partitionNode, true);
refiner.signatureCacheSingle[nodePair] = partitionNode;
signatureCacheEntry.first = partitionNode;
return partitionNode;
}
}
// Encode new block and give up lock before
result = refiner.encodeBlock(refiner.nextFreeBlockIndex++);
refiner.signatureCacheSingle[nodePair] = result;
lock.unlock();
signatureCacheEntry.first = result;
blockLock.unlock();
// Store the result in the cache.
cache_put(signatureNode|(256LL<<42), nonBlockVariablesNode, partitionNode|(refiner.numberOfRefinements<<40), result);

Loading…
Cancel
Save