You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							210 lines
						
					
					
						
							8.7 KiB
						
					
					
				
			
		
		
		
			
			
			
				
					
				
				
					
				
			
		
		
	
	
							210 lines
						
					
					
						
							8.7 KiB
						
					
					
				
								/*
							 | 
						|
								    Copyright 2005-2013 Intel Corporation.  All Rights Reserved.
							 | 
						|
								
							 | 
						|
								    This file is part of Threading Building Blocks.
							 | 
						|
								
							 | 
						|
								    Threading Building Blocks is free software; you can redistribute it
							 | 
						|
								    and/or modify it under the terms of the GNU General Public License
							 | 
						|
								    version 2 as published by the Free Software Foundation.
							 | 
						|
								
							 | 
						|
								    Threading Building Blocks is distributed in the hope that it will be
							 | 
						|
								    useful, but WITHOUT ANY WARRANTY; without even the implied warranty
							 | 
						|
								    of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
							 | 
						|
								    GNU General Public License for more details.
							 | 
						|
								
							 | 
						|
								    You should have received a copy of the GNU General Public License
							 | 
						|
								    along with Threading Building Blocks; if not, write to the Free Software
							 | 
						|
								    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
							 | 
						|
								
							 | 
						|
								    As a special exception, you may use this file as part of a free software
							 | 
						|
								    library without restriction.  Specifically, if other files instantiate
							 | 
						|
								    templates or use macros or inline functions from this file, or you compile
							 | 
						|
								    this file and link it with other files to produce an executable, this
							 | 
						|
								    file does not by itself cause the resulting executable to be covered by
							 | 
						|
								    the GNU General Public License.  This exception does not however
							 | 
						|
								    invalidate any other reasons why the executable file might be covered by
							 | 
						|
								    the GNU General Public License.
							 | 
						|
								*/
							 | 
						|
								
							 | 
						|
								#ifndef __TBB__aggregator_H
							 | 
						|
								#define __TBB__aggregator_H
							 | 
						|
								
							 | 
						|
								#if !TBB_PREVIEW_AGGREGATOR
							 | 
						|
								#error Set TBB_PREVIEW_AGGREGATOR before including aggregator.h
							 | 
						|
								#endif
							 | 
						|
								
							 | 
						|
								#include "atomic.h"
							 | 
						|
								#include "tbb_profiling.h"
							 | 
						|
								
							 | 
						|
								namespace tbb {
							 | 
						|
								namespace interface6 {
							 | 
						|
								
							 | 
						|
								using namespace tbb::internal;
							 | 
						|
								
							 | 
						|
								class aggregator_operation {
							 | 
						|
								    template<typename handler_type> friend class aggregator_ext;
							 | 
						|
								    uintptr_t status;
							 | 
						|
								    aggregator_operation* my_next;
							 | 
						|
								public:
							 | 
						|
								    enum aggregator_operation_status { agg_waiting=0, agg_finished };
							 | 
						|
								    aggregator_operation() : status(agg_waiting), my_next(NULL) {}
							 | 
						|
								    /// Call start before handling this operation
							 | 
						|
								    void start() { call_itt_notify(acquired, &status); }
							 | 
						|
								    /// Call finish when done handling this operation
							 | 
						|
								    /** The operation will be released to its originating thread, and possibly deleted. */
							 | 
						|
								    void finish() { itt_store_word_with_release(status, uintptr_t(agg_finished)); }
							 | 
						|
								    aggregator_operation* next() { return itt_hide_load_word(my_next);}
							 | 
						|
								    void set_next(aggregator_operation* n) { itt_hide_store_word(my_next, n); }
							 | 
						|
								};
							 | 
						|
								
							 | 
						|
								namespace internal {
							 | 
						|
								
							 | 
						|
								class basic_operation_base : public aggregator_operation {
							 | 
						|
								    friend class basic_handler;
							 | 
						|
								    virtual void apply_body() = 0;
							 | 
						|
								public:
							 | 
						|
								    basic_operation_base() : aggregator_operation() {}
							 | 
						|
								    virtual ~basic_operation_base() {}
							 | 
						|
								};
							 | 
						|
								
							 | 
						|
								template<typename Body>
							 | 
						|
								class basic_operation : public basic_operation_base, no_assign {
							 | 
						|
								    const Body& my_body;
							 | 
						|
								    /*override*/ void apply_body() { my_body(); }
							 | 
						|
								public:
							 | 
						|
								    basic_operation(const Body& b) : basic_operation_base(), my_body(b) {}
							 | 
						|
								};
							 | 
						|
								
							 | 
						|
								class basic_handler {
							 | 
						|
								public:
							 | 
						|
								    basic_handler() {}
							 | 
						|
								    void operator()(aggregator_operation* op_list) const { 
							 | 
						|
								        while (op_list) {
							 | 
						|
								            // ITT note: &(op_list->status) tag is used to cover accesses to the operation data.
							 | 
						|
								            // The executing thread "acquires" the tag (see start()) and then performs
							 | 
						|
								            // the associated operation w/o triggering a race condition diagnostics.
							 | 
						|
								            // A thread that created the operation is waiting for its status (see execute_impl()),
							 | 
						|
								            // so when this thread is done with the operation, it will "release" the tag 
							 | 
						|
								            // and update the status (see finish()) to give control back to the waiting thread.
							 | 
						|
								            basic_operation_base& request = static_cast<basic_operation_base&>(*op_list);
							 | 
						|
								            // IMPORTANT: need to advance op_list to op_list->next() before calling request.finish()
							 | 
						|
								            op_list = op_list->next();
							 | 
						|
								            request.start();
							 | 
						|
								            request.apply_body();
							 | 
						|
								            request.finish();
							 | 
						|
								        }
							 | 
						|
								    }
							 | 
						|
								};
							 | 
						|
								
							 | 
						|
								} // namespace internal
							 | 
						|
								
							 | 
						|
								//! Aggregator base class and expert interface
							 | 
						|
								/** An aggregator for collecting operations coming from multiple sources and executing
							 | 
						|
								    them serially on a single thread. */
							 | 
						|
								template <typename handler_type>
							 | 
						|
								class aggregator_ext : tbb::internal::no_copy {
							 | 
						|
								public:
							 | 
						|
								    aggregator_ext(const handler_type& h) : handler_busy(0), handle_operations(h) { mailbox = NULL; }
							 | 
						|
								
							 | 
						|
								    //! EXPERT INTERFACE: Enter a user-made operation into the aggregator's mailbox.
							 | 
						|
								    /** Details of user-made operations must be handled by user-provided handler */
							 | 
						|
								    void process(aggregator_operation *op) { execute_impl(*op); }
							 | 
						|
								
							 | 
						|
								 protected:
							 | 
						|
								    /** Place operation in mailbox, then either handle mailbox or wait for the operation 
							 | 
						|
								        to be completed by a different thread. */
							 | 
						|
								    void execute_impl(aggregator_operation& op) {
							 | 
						|
								        aggregator_operation* res;
							 | 
						|
								
							 | 
						|
								        // ITT note: &(op.status) tag is used to cover accesses to this operation. This
							 | 
						|
								        // thread has created the operation, and now releases it so that the handler
							 | 
						|
								        // thread may handle the associated operation w/o triggering a race condition;
							 | 
						|
								        // thus this tag will be acquired just before the operation is handled in the
							 | 
						|
								        // handle_operations functor.
							 | 
						|
								        call_itt_notify(releasing, &(op.status));
							 | 
						|
								        // insert the operation in the queue
							 | 
						|
								        do {
							 | 
						|
								            // ITT may flag the following line as a race; it is a false positive:
							 | 
						|
								            // This is an atomic read; we don't provide itt_hide_load_word for atomics
							 | 
						|
								            op.my_next = res = mailbox; // NOT A RACE 
							 | 
						|
								        } while (mailbox.compare_and_swap(&op, res) != res);
							 | 
						|
								        if (!res) { // first in the list; handle the operations
							 | 
						|
								            // ITT note: &mailbox tag covers access to the handler_busy flag, which this
							 | 
						|
								            // waiting handler thread will try to set before entering handle_operations.
							 | 
						|
								            call_itt_notify(acquired, &mailbox);
							 | 
						|
								            start_handle_operations();
							 | 
						|
								            __TBB_ASSERT(op.status, NULL);
							 | 
						|
								        }
							 | 
						|
								        else { // not first; wait for op to be ready
							 | 
						|
								            call_itt_notify(prepare, &(op.status));
							 | 
						|
								            spin_wait_while_eq(op.status, uintptr_t(aggregator_operation::agg_waiting));
							 | 
						|
								            itt_load_word_with_acquire(op.status);
							 | 
						|
								        }
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								
							 | 
						|
								 private:
							 | 
						|
								    //! An atomically updated list (aka mailbox) of aggregator_operations
							 | 
						|
								    atomic<aggregator_operation *> mailbox;
							 | 
						|
								
							 | 
						|
								    //! Controls thread access to handle_operations
							 | 
						|
								    /** Behaves as boolean flag where 0=false, 1=true */
							 | 
						|
								    uintptr_t handler_busy;
							 | 
						|
								
							 | 
						|
								    handler_type handle_operations;
							 | 
						|
								
							 | 
						|
								    //! Trigger the handling of operations when the handler is free
							 | 
						|
								    void start_handle_operations() {
							 | 
						|
								        aggregator_operation *pending_operations;
							 | 
						|
								
							 | 
						|
								        // ITT note: &handler_busy tag covers access to mailbox as it is passed
							 | 
						|
								        // between active and waiting handlers.  Below, the waiting handler waits until
							 | 
						|
								        // the active handler releases, and the waiting handler acquires &handler_busy as
							 | 
						|
								        // it becomes the active_handler. The release point is at the end of this
							 | 
						|
								        // function, when all operations in mailbox have been handled by the
							 | 
						|
								        // owner of this aggregator.
							 | 
						|
								        call_itt_notify(prepare, &handler_busy);
							 | 
						|
								        // get handler_busy: only one thread can possibly spin here at a time
							 | 
						|
								        spin_wait_until_eq(handler_busy, uintptr_t(0));
							 | 
						|
								        call_itt_notify(acquired, &handler_busy);
							 | 
						|
								        // acquire fence not necessary here due to causality rule and surrounding atomics
							 | 
						|
								        __TBB_store_with_release(handler_busy, uintptr_t(1));
							 | 
						|
								
							 | 
						|
								        // ITT note: &mailbox tag covers access to the handler_busy flag itself. 
							 | 
						|
								        // Capturing the state of the mailbox signifies that handler_busy has been 
							 | 
						|
								        // set and a new active handler will now process that list's operations.
							 | 
						|
								        call_itt_notify(releasing, &mailbox);
							 | 
						|
								        // grab pending_operations
							 | 
						|
								        pending_operations = mailbox.fetch_and_store(NULL);
							 | 
						|
								
							 | 
						|
								        // handle all the operations
							 | 
						|
								        handle_operations(pending_operations);
							 | 
						|
								
							 | 
						|
								        // release the handler
							 | 
						|
								        itt_store_word_with_release(handler_busy, uintptr_t(0));
							 | 
						|
								    }
							 | 
						|
								};
							 | 
						|
								
							 | 
						|
								//! Basic aggregator interface
							 | 
						|
								class aggregator : private aggregator_ext<internal::basic_handler> {
							 | 
						|
								public:
							 | 
						|
								    aggregator() : aggregator_ext<internal::basic_handler>(internal::basic_handler()) {}
							 | 
						|
								    //! BASIC INTERFACE: Enter a function for exclusvie execution by the aggregator.
							 | 
						|
								    /** The calling thread stores the function object in a basic_operation and
							 | 
						|
								        places the operation in the aggregator's mailbox */
							 | 
						|
								    template<typename Body>
							 | 
						|
								    void execute(const Body& b) {
							 | 
						|
								        internal::basic_operation<Body> op(b);
							 | 
						|
								        this->execute_impl(op);
							 | 
						|
								    }
							 | 
						|
								};
							 | 
						|
								
							 | 
						|
								} // namespace interface6
							 | 
						|
								
							 | 
						|
								using interface6::aggregator;
							 | 
						|
								using interface6::aggregator_ext;
							 | 
						|
								using interface6::aggregator_operation;
							 | 
						|
								
							 | 
						|
								} // namespace tbb
							 | 
						|
								
							 | 
						|
								#endif  // __TBB__aggregator_H
							 |