You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							210 lines
						
					
					
						
							8.7 KiB
						
					
					
				
			
		
		
		
			
			
			
				
					
				
				
					
				
			
		
		
	
	
							210 lines
						
					
					
						
							8.7 KiB
						
					
					
				| /* | |
|     Copyright 2005-2013 Intel Corporation.  All Rights Reserved. | |
|  | |
|     This file is part of Threading Building Blocks. | |
|  | |
|     Threading Building Blocks is free software; you can redistribute it | |
|     and/or modify it under the terms of the GNU General Public License | |
|     version 2 as published by the Free Software Foundation. | |
|  | |
|     Threading Building Blocks is distributed in the hope that it will be | |
|     useful, but WITHOUT ANY WARRANTY; without even the implied warranty | |
|     of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | |
|     GNU General Public License for more details. | |
|  | |
|     You should have received a copy of the GNU General Public License | |
|     along with Threading Building Blocks; if not, write to the Free Software | |
|     Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA | |
|  | |
|     As a special exception, you may use this file as part of a free software | |
|     library without restriction.  Specifically, if other files instantiate | |
|     templates or use macros or inline functions from this file, or you compile | |
|     this file and link it with other files to produce an executable, this | |
|     file does not by itself cause the resulting executable to be covered by | |
|     the GNU General Public License.  This exception does not however | |
|     invalidate any other reasons why the executable file might be covered by | |
|     the GNU General Public License. | |
| */ | |
| 
 | |
| #ifndef __TBB__aggregator_H | |
| #define __TBB__aggregator_H | |
|  | |
| #if !TBB_PREVIEW_AGGREGATOR | |
| #error Set TBB_PREVIEW_AGGREGATOR before including aggregator.h | |
| #endif | |
|  | |
| #include "atomic.h" | |
| #include "tbb_profiling.h" | |
|  | |
| namespace tbb { | |
| namespace interface6 { | |
| 
 | |
| using namespace tbb::internal; | |
| 
 | |
| class aggregator_operation { | |
|     template<typename handler_type> friend class aggregator_ext; | |
|     uintptr_t status; | |
|     aggregator_operation* my_next; | |
| public: | |
|     enum aggregator_operation_status { agg_waiting=0, agg_finished }; | |
|     aggregator_operation() : status(agg_waiting), my_next(NULL) {} | |
|     /// Call start before handling this operation | |
|     void start() { call_itt_notify(acquired, &status); } | |
|     /// Call finish when done handling this operation | |
|     /** The operation will be released to its originating thread, and possibly deleted. */ | |
|     void finish() { itt_store_word_with_release(status, uintptr_t(agg_finished)); } | |
|     aggregator_operation* next() { return itt_hide_load_word(my_next);} | |
|     void set_next(aggregator_operation* n) { itt_hide_store_word(my_next, n); } | |
| }; | |
| 
 | |
| namespace internal { | |
| 
 | |
| class basic_operation_base : public aggregator_operation { | |
|     friend class basic_handler; | |
|     virtual void apply_body() = 0; | |
| public: | |
|     basic_operation_base() : aggregator_operation() {} | |
|     virtual ~basic_operation_base() {} | |
| }; | |
| 
 | |
| template<typename Body> | |
| class basic_operation : public basic_operation_base, no_assign { | |
|     const Body& my_body; | |
|     /*override*/ void apply_body() { my_body(); } | |
| public: | |
|     basic_operation(const Body& b) : basic_operation_base(), my_body(b) {} | |
| }; | |
| 
 | |
| class basic_handler { | |
| public: | |
|     basic_handler() {} | |
|     void operator()(aggregator_operation* op_list) const {  | |
|         while (op_list) { | |
|             // ITT note: &(op_list->status) tag is used to cover accesses to the operation data. | |
|             // The executing thread "acquires" the tag (see start()) and then performs | |
|             // the associated operation w/o triggering a race condition diagnostics. | |
|             // A thread that created the operation is waiting for its status (see execute_impl()), | |
|             // so when this thread is done with the operation, it will "release" the tag  | |
|             // and update the status (see finish()) to give control back to the waiting thread. | |
|             basic_operation_base& request = static_cast<basic_operation_base&>(*op_list); | |
|             // IMPORTANT: need to advance op_list to op_list->next() before calling request.finish() | |
|             op_list = op_list->next(); | |
|             request.start(); | |
|             request.apply_body(); | |
|             request.finish(); | |
|         } | |
|     } | |
| }; | |
| 
 | |
| } // namespace internal | |
|  | |
| //! Aggregator base class and expert interface | |
| /** An aggregator for collecting operations coming from multiple sources and executing | |
|     them serially on a single thread. */ | |
| template <typename handler_type> | |
| class aggregator_ext : tbb::internal::no_copy { | |
| public: | |
|     aggregator_ext(const handler_type& h) : handler_busy(0), handle_operations(h) { mailbox = NULL; } | |
| 
 | |
|     //! EXPERT INTERFACE: Enter a user-made operation into the aggregator's mailbox. | |
|     /** Details of user-made operations must be handled by user-provided handler */ | |
|     void process(aggregator_operation *op) { execute_impl(*op); } | |
| 
 | |
|  protected: | |
|     /** Place operation in mailbox, then either handle mailbox or wait for the operation  | |
|         to be completed by a different thread. */ | |
|     void execute_impl(aggregator_operation& op) { | |
|         aggregator_operation* res; | |
| 
 | |
|         // ITT note: &(op.status) tag is used to cover accesses to this operation. This | |
|         // thread has created the operation, and now releases it so that the handler | |
|         // thread may handle the associated operation w/o triggering a race condition; | |
|         // thus this tag will be acquired just before the operation is handled in the | |
|         // handle_operations functor. | |
|         call_itt_notify(releasing, &(op.status)); | |
|         // insert the operation in the queue | |
|         do { | |
|             // ITT may flag the following line as a race; it is a false positive: | |
|             // This is an atomic read; we don't provide itt_hide_load_word for atomics | |
|             op.my_next = res = mailbox; // NOT A RACE  | |
|         } while (mailbox.compare_and_swap(&op, res) != res); | |
|         if (!res) { // first in the list; handle the operations | |
|             // ITT note: &mailbox tag covers access to the handler_busy flag, which this | |
|             // waiting handler thread will try to set before entering handle_operations. | |
|             call_itt_notify(acquired, &mailbox); | |
|             start_handle_operations(); | |
|             __TBB_ASSERT(op.status, NULL); | |
|         } | |
|         else { // not first; wait for op to be ready | |
|             call_itt_notify(prepare, &(op.status)); | |
|             spin_wait_while_eq(op.status, uintptr_t(aggregator_operation::agg_waiting)); | |
|             itt_load_word_with_acquire(op.status); | |
|         } | |
|     } | |
| 
 | |
| 
 | |
|  private: | |
|     //! An atomically updated list (aka mailbox) of aggregator_operations | |
|     atomic<aggregator_operation *> mailbox; | |
| 
 | |
|     //! Controls thread access to handle_operations | |
|     /** Behaves as boolean flag where 0=false, 1=true */ | |
|     uintptr_t handler_busy; | |
| 
 | |
|     handler_type handle_operations; | |
| 
 | |
|     //! Trigger the handling of operations when the handler is free | |
|     void start_handle_operations() { | |
|         aggregator_operation *pending_operations; | |
| 
 | |
|         // ITT note: &handler_busy tag covers access to mailbox as it is passed | |
|         // between active and waiting handlers.  Below, the waiting handler waits until | |
|         // the active handler releases, and the waiting handler acquires &handler_busy as | |
|         // it becomes the active_handler. The release point is at the end of this | |
|         // function, when all operations in mailbox have been handled by the | |
|         // owner of this aggregator. | |
|         call_itt_notify(prepare, &handler_busy); | |
|         // get handler_busy: only one thread can possibly spin here at a time | |
|         spin_wait_until_eq(handler_busy, uintptr_t(0)); | |
|         call_itt_notify(acquired, &handler_busy); | |
|         // acquire fence not necessary here due to causality rule and surrounding atomics | |
|         __TBB_store_with_release(handler_busy, uintptr_t(1)); | |
| 
 | |
|         // ITT note: &mailbox tag covers access to the handler_busy flag itself.  | |
|         // Capturing the state of the mailbox signifies that handler_busy has been  | |
|         // set and a new active handler will now process that list's operations. | |
|         call_itt_notify(releasing, &mailbox); | |
|         // grab pending_operations | |
|         pending_operations = mailbox.fetch_and_store(NULL); | |
| 
 | |
|         // handle all the operations | |
|         handle_operations(pending_operations); | |
| 
 | |
|         // release the handler | |
|         itt_store_word_with_release(handler_busy, uintptr_t(0)); | |
|     } | |
| }; | |
| 
 | |
| //! Basic aggregator interface | |
| class aggregator : private aggregator_ext<internal::basic_handler> { | |
| public: | |
|     aggregator() : aggregator_ext<internal::basic_handler>(internal::basic_handler()) {} | |
|     //! BASIC INTERFACE: Enter a function for exclusvie execution by the aggregator. | |
|     /** The calling thread stores the function object in a basic_operation and | |
|         places the operation in the aggregator's mailbox */ | |
|     template<typename Body> | |
|     void execute(const Body& b) { | |
|         internal::basic_operation<Body> op(b); | |
|         this->execute_impl(op); | |
|     } | |
| }; | |
| 
 | |
| } // namespace interface6 | |
|  | |
| using interface6::aggregator; | |
| using interface6::aggregator_ext; | |
| using interface6::aggregator_operation; | |
| 
 | |
| } // namespace tbb | |
|  | |
| #endif  // __TBB__aggregator_H
 |