You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							673 lines
						
					
					
						
							24 KiB
						
					
					
				
			
		
		
		
			
			
			
				
					
				
				
					
				
			
		
		
	
	
							673 lines
						
					
					
						
							24 KiB
						
					
					
				
								/*
							 | 
						|
								    Copyright 2005-2014 Intel Corporation.  All Rights Reserved.
							 | 
						|
								
							 | 
						|
								    This file is part of Threading Building Blocks.
							 | 
						|
								
							 | 
						|
								    Threading Building Blocks is free software; you can redistribute it
							 | 
						|
								    and/or modify it under the terms of the GNU General Public License
							 | 
						|
								    version 2 as published by the Free Software Foundation.
							 | 
						|
								
							 | 
						|
								    Threading Building Blocks is distributed in the hope that it will be
							 | 
						|
								    useful, but WITHOUT ANY WARRANTY; without even the implied warranty
							 | 
						|
								    of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
							 | 
						|
								    GNU General Public License for more details.
							 | 
						|
								
							 | 
						|
								    You should have received a copy of the GNU General Public License
							 | 
						|
								    along with Threading Building Blocks; if not, write to the Free Software
							 | 
						|
								    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
							 | 
						|
								
							 | 
						|
								    As a special exception, you may use this file as part of a free software
							 | 
						|
								    library without restriction.  Specifically, if other files instantiate
							 | 
						|
								    templates or use macros or inline functions from this file, or you compile
							 | 
						|
								    this file and link it with other files to produce an executable, this
							 | 
						|
								    file does not by itself cause the resulting executable to be covered by
							 | 
						|
								    the GNU General Public License.  This exception does not however
							 | 
						|
								    invalidate any other reasons why the executable file might be covered by
							 | 
						|
								    the GNU General Public License.
							 | 
						|
								*/
							 | 
						|
								
							 | 
						|
								#ifndef __TBB_pipeline_H 
							 | 
						|
								#define __TBB_pipeline_H 
							 | 
						|
								
							 | 
						|
								#include "atomic.h"
							 | 
						|
								#include "task.h"
							 | 
						|
								#include "tbb_allocator.h"
							 | 
						|
								#include <cstddef>
							 | 
						|
								
							 | 
						|
								//TODO: consider more accurate method to check if need to implement <type_trais> ourself
							 | 
						|
								#if !TBB_IMPLEMENT_CPP0X
							 | 
						|
								#include <type_traits>
							 | 
						|
								#endif
							 | 
						|
								
							 | 
						|
								namespace tbb {
							 | 
						|
								
							 | 
						|
								class pipeline;
							 | 
						|
								class filter;
							 | 
						|
								
							 | 
						|
								//! @cond INTERNAL
							 | 
						|
								namespace internal {
							 | 
						|
								
							 | 
						|
								// The argument for PIPELINE_VERSION should be an integer between 2 and 9
							 | 
						|
								#define __TBB_PIPELINE_VERSION(x) ((unsigned char)(x-2)<<1)
							 | 
						|
								
							 | 
						|
								typedef unsigned long Token;
							 | 
						|
								typedef long tokendiff_t;
							 | 
						|
								class stage_task;
							 | 
						|
								class input_buffer;
							 | 
						|
								class pipeline_root_task;
							 | 
						|
								class pipeline_cleaner;
							 | 
						|
								
							 | 
						|
								} // namespace internal
							 | 
						|
								
							 | 
						|
								namespace interface6 {
							 | 
						|
								    template<typename T, typename U> class filter_t;
							 | 
						|
								
							 | 
						|
								    namespace internal {
							 | 
						|
								        class pipeline_proxy;
							 | 
						|
								    }
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								//! @endcond
							 | 
						|
								
							 | 
						|
								//! A stage in a pipeline.
							 | 
						|
								/** @ingroup algorithms */
							 | 
						|
								class filter: internal::no_copy {
							 | 
						|
								private:
							 | 
						|
								    //! Value used to mark "not in pipeline"
							 | 
						|
								    static filter* not_in_pipeline() {return reinterpret_cast<filter*>(intptr_t(-1));}
							 | 
						|
								protected:    
							 | 
						|
								    //! The lowest bit 0 is for parallel vs. serial
							 | 
						|
								    static const unsigned char filter_is_serial = 0x1; 
							 | 
						|
								
							 | 
						|
								    //! 4th bit distinguishes ordered vs unordered filters.
							 | 
						|
								    /** The bit was not set for parallel filters in TBB 2.1 and earlier,
							 | 
						|
								        but is_ordered() function always treats parallel filters as out of order. */
							 | 
						|
								    static const unsigned char filter_is_out_of_order = 0x1<<4;  
							 | 
						|
								
							 | 
						|
								    //! 5th bit distinguishes thread-bound and regular filters.
							 | 
						|
								    static const unsigned char filter_is_bound = 0x1<<5;  
							 | 
						|
								
							 | 
						|
								    //! 6th bit marks input filters emitting small objects
							 | 
						|
								    static const unsigned char filter_may_emit_null = 0x1<<6;
							 | 
						|
								
							 | 
						|
								    //! 7th bit defines exception propagation mode expected by the application.
							 | 
						|
								    static const unsigned char exact_exception_propagation =
							 | 
						|
								#if TBB_USE_CAPTURED_EXCEPTION
							 | 
						|
								            0x0;
							 | 
						|
								#else
							 | 
						|
								            0x1<<7;
							 | 
						|
								#endif /* TBB_USE_CAPTURED_EXCEPTION */
							 | 
						|
								
							 | 
						|
								    static const unsigned char current_version = __TBB_PIPELINE_VERSION(5);
							 | 
						|
								    static const unsigned char version_mask = 0x7<<1; // bits 1-3 are for version
							 | 
						|
								public:
							 | 
						|
								    enum mode {
							 | 
						|
								        //! processes multiple items in parallel and in no particular order
							 | 
						|
								        parallel = current_version | filter_is_out_of_order, 
							 | 
						|
								        //! processes items one at a time; all such filters process items in the same order
							 | 
						|
								        serial_in_order = current_version | filter_is_serial,
							 | 
						|
								        //! processes items one at a time and in no particular order
							 | 
						|
								        serial_out_of_order = current_version | filter_is_serial | filter_is_out_of_order,
							 | 
						|
								        //! @deprecated use serial_in_order instead
							 | 
						|
								        serial = serial_in_order
							 | 
						|
								    };
							 | 
						|
								protected:
							 | 
						|
								    filter( bool is_serial_ ) : 
							 | 
						|
								        next_filter_in_pipeline(not_in_pipeline()),
							 | 
						|
								        my_input_buffer(NULL),
							 | 
						|
								        my_filter_mode(static_cast<unsigned char>((is_serial_ ? serial : parallel) | exact_exception_propagation)),
							 | 
						|
								        prev_filter_in_pipeline(not_in_pipeline()),
							 | 
						|
								        my_pipeline(NULL),
							 | 
						|
								        next_segment(NULL)
							 | 
						|
								    {}
							 | 
						|
								    
							 | 
						|
								    filter( mode filter_mode ) :
							 | 
						|
								        next_filter_in_pipeline(not_in_pipeline()),
							 | 
						|
								        my_input_buffer(NULL),
							 | 
						|
								        my_filter_mode(static_cast<unsigned char>(filter_mode | exact_exception_propagation)),
							 | 
						|
								        prev_filter_in_pipeline(not_in_pipeline()),
							 | 
						|
								        my_pipeline(NULL),
							 | 
						|
								        next_segment(NULL)
							 | 
						|
								    {}
							 | 
						|
								
							 | 
						|
								    // signal end-of-input for concrete_filters
							 | 
						|
								    void __TBB_EXPORTED_METHOD set_end_of_input();
							 | 
						|
								
							 | 
						|
								public:
							 | 
						|
								    //! True if filter is serial.
							 | 
						|
								    bool is_serial() const {
							 | 
						|
								        return bool( my_filter_mode & filter_is_serial );
							 | 
						|
								    }  
							 | 
						|
								    
							 | 
						|
								    //! True if filter must receive stream in order.
							 | 
						|
								    bool is_ordered() const {
							 | 
						|
								        return (my_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    //! True if filter is thread-bound.
							 | 
						|
								    bool is_bound() const {
							 | 
						|
								        return ( my_filter_mode & filter_is_bound )==filter_is_bound;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    //! true if an input filter can emit null
							 | 
						|
								    bool object_may_be_null() { 
							 | 
						|
								        return ( my_filter_mode & filter_may_emit_null ) == filter_may_emit_null;
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    //! Operate on an item from the input stream, and return item for output stream.
							 | 
						|
								    /** Returns NULL if filter is a sink. */
							 | 
						|
								    virtual void* operator()( void* item ) = 0;
							 | 
						|
								
							 | 
						|
								    //! Destroy filter.  
							 | 
						|
								    /** If the filter was added to a pipeline, the pipeline must be destroyed first. */
							 | 
						|
								    virtual __TBB_EXPORTED_METHOD ~filter();
							 | 
						|
								
							 | 
						|
								#if __TBB_TASK_GROUP_CONTEXT
							 | 
						|
								    //! Destroys item if pipeline was cancelled.
							 | 
						|
								    /** Required to prevent memory leaks.
							 | 
						|
								        Note it can be called concurrently even for serial filters.*/
							 | 
						|
								    virtual void finalize( void* /*item*/ ) {};
							 | 
						|
								#endif
							 | 
						|
								
							 | 
						|
								private:
							 | 
						|
								    //! Pointer to next filter in the pipeline.
							 | 
						|
								    filter* next_filter_in_pipeline;
							 | 
						|
								
							 | 
						|
								    //! has the filter not yet processed all the tokens it will ever see?  
							 | 
						|
								    //  (pipeline has not yet reached end_of_input or this filter has not yet
							 | 
						|
								    //  seen the last token produced by input_filter)
							 | 
						|
								    bool has_more_work();
							 | 
						|
								
							 | 
						|
								    //! Buffer for incoming tokens, or NULL if not required.
							 | 
						|
								    /** The buffer is required if the filter is serial or follows a thread-bound one. */
							 | 
						|
								    internal::input_buffer* my_input_buffer;
							 | 
						|
								
							 | 
						|
								    friend class internal::stage_task;
							 | 
						|
								    friend class internal::pipeline_root_task;
							 | 
						|
								    friend class pipeline;
							 | 
						|
								    friend class thread_bound_filter;
							 | 
						|
								
							 | 
						|
								    //! Storage for filter mode and dynamically checked implementation version.
							 | 
						|
								    const unsigned char my_filter_mode;
							 | 
						|
								
							 | 
						|
								    //! Pointer to previous filter in the pipeline.
							 | 
						|
								    filter* prev_filter_in_pipeline;
							 | 
						|
								
							 | 
						|
								    //! Pointer to the pipeline.
							 | 
						|
								    pipeline* my_pipeline;
							 | 
						|
								
							 | 
						|
								    //! Pointer to the next "segment" of filters, or NULL if not required.
							 | 
						|
								    /** In each segment, the first filter is not thread-bound but follows a thread-bound one. */
							 | 
						|
								    filter* next_segment;
							 | 
						|
								};
							 | 
						|
								
							 | 
						|
								//! A stage in a pipeline served by a user thread.
							 | 
						|
								/** @ingroup algorithms */
							 | 
						|
								class thread_bound_filter: public filter {
							 | 
						|
								public:
							 | 
						|
								    enum result_type {
							 | 
						|
								        // item was processed
							 | 
						|
								        success,
							 | 
						|
								        // item is currently not available
							 | 
						|
								        item_not_available,
							 | 
						|
								        // there are no more items to process
							 | 
						|
								        end_of_stream
							 | 
						|
								    };
							 | 
						|
								protected:
							 | 
						|
								    thread_bound_filter(mode filter_mode): 
							 | 
						|
								         filter(static_cast<mode>(filter_mode | filter::filter_is_bound))
							 | 
						|
								    {}
							 | 
						|
								public:
							 | 
						|
								    //! If a data item is available, invoke operator() on that item.  
							 | 
						|
								    /** This interface is non-blocking.
							 | 
						|
								        Returns 'success' if an item was processed.
							 | 
						|
								        Returns 'item_not_available' if no item can be processed now 
							 | 
						|
								        but more may arrive in the future, or if token limit is reached. 
							 | 
						|
								        Returns 'end_of_stream' if there are no more items to process. */
							 | 
						|
								    result_type __TBB_EXPORTED_METHOD try_process_item(); 
							 | 
						|
								
							 | 
						|
								    //! Wait until a data item becomes available, and invoke operator() on that item.
							 | 
						|
								    /** This interface is blocking.
							 | 
						|
								        Returns 'success' if an item was processed.
							 | 
						|
								        Returns 'end_of_stream' if there are no more items to process.
							 | 
						|
								        Never returns 'item_not_available', as it blocks until another return condition applies. */
							 | 
						|
								    result_type __TBB_EXPORTED_METHOD process_item();
							 | 
						|
								
							 | 
						|
								private:
							 | 
						|
								    //! Internal routine for item processing
							 | 
						|
								    result_type internal_process_item(bool is_blocking);
							 | 
						|
								};
							 | 
						|
								
							 | 
						|
								//! A processing pipeline that applies filters to items.
							 | 
						|
								/** @ingroup algorithms */
							 | 
						|
								class pipeline {
							 | 
						|
								public:
							 | 
						|
								    //! Construct empty pipeline.
							 | 
						|
								    __TBB_EXPORTED_METHOD pipeline();
							 | 
						|
								
							 | 
						|
								    /** Though the current implementation declares the destructor virtual, do not rely on this 
							 | 
						|
								        detail.  The virtualness is deprecated and may disappear in future versions of TBB. */
							 | 
						|
								    virtual __TBB_EXPORTED_METHOD ~pipeline();
							 | 
						|
								
							 | 
						|
								    //! Add filter to end of pipeline.
							 | 
						|
								    void __TBB_EXPORTED_METHOD add_filter( filter& filter_ );
							 | 
						|
								
							 | 
						|
								    //! Run the pipeline to completion.
							 | 
						|
								    void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens );
							 | 
						|
								
							 | 
						|
								#if __TBB_TASK_GROUP_CONTEXT
							 | 
						|
								    //! Run the pipeline to completion with user-supplied context.
							 | 
						|
								    void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens, tbb::task_group_context& context );
							 | 
						|
								#endif
							 | 
						|
								
							 | 
						|
								    //! Remove all filters from the pipeline.
							 | 
						|
								    void __TBB_EXPORTED_METHOD clear();
							 | 
						|
								
							 | 
						|
								private:
							 | 
						|
								    friend class internal::stage_task;
							 | 
						|
								    friend class internal::pipeline_root_task;
							 | 
						|
								    friend class filter;
							 | 
						|
								    friend class thread_bound_filter;
							 | 
						|
								    friend class internal::pipeline_cleaner;
							 | 
						|
								    friend class tbb::interface6::internal::pipeline_proxy;
							 | 
						|
								
							 | 
						|
								    //! Pointer to first filter in the pipeline.
							 | 
						|
								    filter* filter_list;
							 | 
						|
								
							 | 
						|
								    //! Pointer to location where address of next filter to be added should be stored.
							 | 
						|
								    filter* filter_end;
							 | 
						|
								
							 | 
						|
								    //! task who's reference count is used to determine when all stages are done.
							 | 
						|
								    task* end_counter;
							 | 
						|
								
							 | 
						|
								    //! Number of idle tokens waiting for input stage.
							 | 
						|
								    atomic<internal::Token> input_tokens;
							 | 
						|
								
							 | 
						|
								    //! Global counter of tokens 
							 | 
						|
								    atomic<internal::Token> token_counter;
							 | 
						|
								
							 | 
						|
								    //! False until fetch_input returns NULL.
							 | 
						|
								    bool end_of_input;
							 | 
						|
								
							 | 
						|
								    //! True if the pipeline contains a thread-bound filter; false otherwise.
							 | 
						|
								    bool has_thread_bound_filters;
							 | 
						|
								
							 | 
						|
								    //! Remove filter from pipeline.
							 | 
						|
								    void remove_filter( filter& filter_ );
							 | 
						|
								
							 | 
						|
								    //! Not used, but retained to satisfy old export files.
							 | 
						|
								    void __TBB_EXPORTED_METHOD inject_token( task& self );
							 | 
						|
								
							 | 
						|
								#if __TBB_TASK_GROUP_CONTEXT
							 | 
						|
								    //! Does clean up if pipeline is cancelled or exception occurred
							 | 
						|
								    void clear_filters();
							 | 
						|
								#endif
							 | 
						|
								};
							 | 
						|
								
							 | 
						|
								//------------------------------------------------------------------------
							 | 
						|
								// Support for lambda-friendly parallel_pipeline interface
							 | 
						|
								//------------------------------------------------------------------------
							 | 
						|
								
							 | 
						|
								namespace interface6 {
							 | 
						|
								
							 | 
						|
								namespace internal {
							 | 
						|
								    template<typename T, typename U, typename Body> class concrete_filter;
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								//! input_filter control to signal end-of-input for parallel_pipeline
							 | 
						|
								class flow_control {
							 | 
						|
								    bool is_pipeline_stopped;
							 | 
						|
								    flow_control() { is_pipeline_stopped = false; }
							 | 
						|
								    template<typename T, typename U, typename Body> friend class internal::concrete_filter;
							 | 
						|
								public:
							 | 
						|
								    void stop() { is_pipeline_stopped = true; }
							 | 
						|
								};
							 | 
						|
								
							 | 
						|
								//! @cond INTERNAL
							 | 
						|
								namespace internal {
							 | 
						|
								
							 | 
						|
								template<typename T> struct tbb_large_object {enum { value = sizeof(T) > sizeof(void *) }; };
							 | 
						|
								
							 | 
						|
								#if TBB_IMPLEMENT_CPP0X
							 | 
						|
								// cannot use SFINAE in current compilers.  Explicitly list the types we wish to be
							 | 
						|
								// placed as-is in the pipeline input_buffers.
							 | 
						|
								template<typename T> struct tbb_trivially_copyable { enum { value = false }; };
							 | 
						|
								template<typename T> struct tbb_trivially_copyable <T*> { enum { value = true }; };
							 | 
						|
								template<> struct tbb_trivially_copyable <short> { enum { value = true }; };
							 | 
						|
								template<> struct tbb_trivially_copyable <unsigned short> { enum { value = true }; };
							 | 
						|
								template<> struct tbb_trivially_copyable <int> { enum { value = !tbb_large_object<int>::value }; };
							 | 
						|
								template<> struct tbb_trivially_copyable <unsigned int> { enum { value = !tbb_large_object<int>::value }; };
							 | 
						|
								template<> struct tbb_trivially_copyable <long> { enum { value = !tbb_large_object<long>::value }; };
							 | 
						|
								template<> struct tbb_trivially_copyable <unsigned long> { enum { value = !tbb_large_object<long>::value }; };
							 | 
						|
								template<> struct tbb_trivially_copyable <float> { enum { value = !tbb_large_object<float>::value }; };
							 | 
						|
								template<> struct tbb_trivially_copyable <double> { enum { value = !tbb_large_object<double>::value }; };
							 | 
						|
								#else
							 | 
						|
								#if __GNUC__==4 && __GNUC_MINOR__>=4 && __GXX_EXPERIMENTAL_CXX0X__
							 | 
						|
								template<typename T> struct tbb_trivially_copyable { enum { value = std::has_trivial_copy_constructor<T>::value }; };
							 | 
						|
								#else
							 | 
						|
								template<typename T> struct tbb_trivially_copyable { enum { value = std::is_trivially_copyable<T>::value }; };
							 | 
						|
								#endif //
							 | 
						|
								#endif // TBB_IMPLEMENT_CPP0X
							 | 
						|
								
							 | 
						|
								template<typename T> struct is_large_object {enum { value = tbb_large_object<T>::value || !tbb_trivially_copyable<T>::value }; };
							 | 
						|
								
							 | 
						|
								template<typename T, bool> class token_helper;
							 | 
						|
								
							 | 
						|
								// large object helper (uses tbb_allocator)
							 | 
						|
								template<typename T>
							 | 
						|
								class token_helper<T, true> {
							 | 
						|
								    public:
							 | 
						|
								    typedef typename tbb::tbb_allocator<T> allocator;
							 | 
						|
								    typedef T* pointer;
							 | 
						|
								    typedef T value_type;
							 | 
						|
								    static pointer create_token(const value_type & source) {
							 | 
						|
								        pointer output_t = allocator().allocate(1);
							 | 
						|
								        return new (output_t) T(source);
							 | 
						|
								    }
							 | 
						|
								    static value_type & token(pointer & t) { return *t;}
							 | 
						|
								    static void * cast_to_void_ptr(pointer ref) { return (void *) ref; }
							 | 
						|
								    static pointer cast_from_void_ptr(void * ref) { return (pointer)ref; }
							 | 
						|
								    static void destroy_token(pointer token) {
							 | 
						|
								        allocator().destroy(token);
							 | 
						|
								        allocator().deallocate(token,1);
							 | 
						|
								    }
							 | 
						|
								};
							 | 
						|
								
							 | 
						|
								// pointer specialization
							 | 
						|
								template<typename T>
							 | 
						|
								class token_helper<T*, false > {
							 | 
						|
								    public:
							 | 
						|
								    typedef T* pointer;
							 | 
						|
								    typedef T* value_type;
							 | 
						|
								    static pointer create_token(const value_type & source) { return source; }
							 | 
						|
								    static value_type & token(pointer & t) { return t;}
							 | 
						|
								    static void * cast_to_void_ptr(pointer ref) { return (void *)ref; }
							 | 
						|
								    static pointer cast_from_void_ptr(void * ref) { return (pointer)ref; }
							 | 
						|
								    static void destroy_token( pointer /*token*/) {}
							 | 
						|
								};
							 | 
						|
								
							 | 
						|
								// small object specialization (converts void* to the correct type, passes objects directly.)
							 | 
						|
								template<typename T>
							 | 
						|
								class token_helper<T, false> {
							 | 
						|
								    typedef union {
							 | 
						|
								        T actual_value;
							 | 
						|
								        void * void_overlay;
							 | 
						|
								    } type_to_void_ptr_map;
							 | 
						|
								    public:
							 | 
						|
								    typedef T pointer;  // not really a pointer in this case.
							 | 
						|
								    typedef T value_type;
							 | 
						|
								    static pointer create_token(const value_type & source) {
							 | 
						|
								        return source; }
							 | 
						|
								    static value_type & token(pointer & t) { return t;}
							 | 
						|
								    static void * cast_to_void_ptr(pointer ref) { 
							 | 
						|
								        type_to_void_ptr_map mymap; 
							 | 
						|
								        mymap.void_overlay = NULL;
							 | 
						|
								        mymap.actual_value = ref; 
							 | 
						|
								        return mymap.void_overlay; 
							 | 
						|
								    }
							 | 
						|
								    static pointer cast_from_void_ptr(void * ref) { 
							 | 
						|
								        type_to_void_ptr_map mymap;
							 | 
						|
								        mymap.void_overlay = ref;
							 | 
						|
								        return mymap.actual_value;
							 | 
						|
								    }
							 | 
						|
								    static void destroy_token( pointer /*token*/) {}
							 | 
						|
								};
							 | 
						|
								
							 | 
						|
								template<typename T, typename U, typename Body>
							 | 
						|
								class concrete_filter: public tbb::filter {
							 | 
						|
								    const Body& my_body;
							 | 
						|
								    typedef token_helper<T,is_large_object<T>::value > t_helper;
							 | 
						|
								    typedef typename t_helper::pointer t_pointer;
							 | 
						|
								    typedef token_helper<U,is_large_object<U>::value > u_helper;
							 | 
						|
								    typedef typename u_helper::pointer u_pointer;
							 | 
						|
								
							 | 
						|
								    /*override*/ void* operator()(void* input) {
							 | 
						|
								        t_pointer temp_input = t_helper::cast_from_void_ptr(input);
							 | 
						|
								        u_pointer output_u = u_helper::create_token(my_body(t_helper::token(temp_input)));
							 | 
						|
								        t_helper::destroy_token(temp_input);
							 | 
						|
								        return u_helper::cast_to_void_ptr(output_u);
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    /*override*/ void finalize(void * input) {
							 | 
						|
								        t_pointer temp_input = t_helper::cast_from_void_ptr(input);
							 | 
						|
								        t_helper::destroy_token(temp_input);
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								public:
							 | 
						|
								    concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
							 | 
						|
								};
							 | 
						|
								
							 | 
						|
								// input 
							 | 
						|
								template<typename U, typename Body>
							 | 
						|
								class concrete_filter<void,U,Body>: public filter {
							 | 
						|
								    const Body& my_body;
							 | 
						|
								    typedef token_helper<U, is_large_object<U>::value > u_helper;
							 | 
						|
								    typedef typename u_helper::pointer u_pointer;
							 | 
						|
								
							 | 
						|
								    /*override*/void* operator()(void*) {
							 | 
						|
								        flow_control control;
							 | 
						|
								        u_pointer output_u = u_helper::create_token(my_body(control));
							 | 
						|
								        if(control.is_pipeline_stopped) {
							 | 
						|
								            u_helper::destroy_token(output_u);
							 | 
						|
								            set_end_of_input();
							 | 
						|
								            return NULL;
							 | 
						|
								        }
							 | 
						|
								        return u_helper::cast_to_void_ptr(output_u);
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								public:
							 | 
						|
								    concrete_filter(tbb::filter::mode filter_mode, const Body& body) : 
							 | 
						|
								        filter(static_cast<tbb::filter::mode>(filter_mode | filter_may_emit_null)),
							 | 
						|
								        my_body(body)
							 | 
						|
								    {}
							 | 
						|
								};
							 | 
						|
								
							 | 
						|
								template<typename T, typename Body>
							 | 
						|
								class concrete_filter<T,void,Body>: public filter {
							 | 
						|
								    const Body& my_body;
							 | 
						|
								    typedef token_helper<T, is_large_object<T>::value > t_helper;
							 | 
						|
								    typedef typename t_helper::pointer t_pointer;
							 | 
						|
								   
							 | 
						|
								    /*override*/ void* operator()(void* input) {
							 | 
						|
								        t_pointer temp_input = t_helper::cast_from_void_ptr(input);
							 | 
						|
								        my_body(t_helper::token(temp_input));
							 | 
						|
								        t_helper::destroy_token(temp_input);
							 | 
						|
								        return NULL;
							 | 
						|
								    }
							 | 
						|
								    /*override*/ void finalize(void* input) {
							 | 
						|
								        t_pointer temp_input = t_helper::cast_from_void_ptr(input);
							 | 
						|
								        t_helper::destroy_token(temp_input);
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								public:
							 | 
						|
								    concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
							 | 
						|
								};
							 | 
						|
								
							 | 
						|
								template<typename Body>
							 | 
						|
								class concrete_filter<void,void,Body>: public filter {
							 | 
						|
								    const Body& my_body;
							 | 
						|
								    
							 | 
						|
								    /** Override privately because it is always called virtually */
							 | 
						|
								    /*override*/ void* operator()(void*) {
							 | 
						|
								        flow_control control;
							 | 
						|
								        my_body(control);
							 | 
						|
								        void* output = control.is_pipeline_stopped ? NULL : (void*)(intptr_t)-1; 
							 | 
						|
								        return output;
							 | 
						|
								    }
							 | 
						|
								public:
							 | 
						|
								    concrete_filter(filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
							 | 
						|
								};
							 | 
						|
								
							 | 
						|
								//! The class that represents an object of the pipeline for parallel_pipeline().
							 | 
						|
								/** It primarily serves as RAII class that deletes heap-allocated filter instances. */
							 | 
						|
								class pipeline_proxy {
							 | 
						|
								    tbb::pipeline my_pipe;
							 | 
						|
								public:
							 | 
						|
								    pipeline_proxy( const filter_t<void,void>& filter_chain );
							 | 
						|
								    ~pipeline_proxy() {
							 | 
						|
								        while( filter* f = my_pipe.filter_list ) 
							 | 
						|
								            delete f; // filter destructor removes it from the pipeline
							 | 
						|
								    }
							 | 
						|
								    tbb::pipeline* operator->() { return &my_pipe; }
							 | 
						|
								};
							 | 
						|
								
							 | 
						|
								//! Abstract base class that represents a node in a parse tree underlying a filter_t.
							 | 
						|
								/** These nodes are always heap-allocated and can be shared by filter_t objects. */
							 | 
						|
								class filter_node: tbb::internal::no_copy {
							 | 
						|
								    /** Count must be atomic because it is hidden state for user, but might be shared by threads. */
							 | 
						|
								    tbb::atomic<intptr_t> ref_count;
							 | 
						|
								protected:
							 | 
						|
								    filter_node() {
							 | 
						|
								        ref_count = 0;
							 | 
						|
								#ifdef __TBB_TEST_FILTER_NODE_COUNT
							 | 
						|
								        ++(__TBB_TEST_FILTER_NODE_COUNT);
							 | 
						|
								#endif
							 | 
						|
								    }
							 | 
						|
								public:
							 | 
						|
								    //! Add concrete_filter to pipeline 
							 | 
						|
								    virtual void add_to( pipeline& ) = 0;
							 | 
						|
								    //! Increment reference count
							 | 
						|
								    void add_ref() {++ref_count;}
							 | 
						|
								    //! Decrement reference count and delete if it becomes zero.
							 | 
						|
								    void remove_ref() {
							 | 
						|
								        __TBB_ASSERT(ref_count>0,"ref_count underflow");
							 | 
						|
								        if( --ref_count==0 ) 
							 | 
						|
								            delete this;
							 | 
						|
								    }
							 | 
						|
								    virtual ~filter_node() {
							 | 
						|
								#ifdef __TBB_TEST_FILTER_NODE_COUNT
							 | 
						|
								        --(__TBB_TEST_FILTER_NODE_COUNT);
							 | 
						|
								#endif
							 | 
						|
								    }
							 | 
						|
								};
							 | 
						|
								
							 | 
						|
								//! Node in parse tree representing result of make_filter.
							 | 
						|
								template<typename T, typename U, typename Body>
							 | 
						|
								class filter_node_leaf: public filter_node  {
							 | 
						|
								    const tbb::filter::mode mode;
							 | 
						|
								    const Body body;
							 | 
						|
								    /*override*/void add_to( pipeline& p ) {
							 | 
						|
								        concrete_filter<T,U,Body>* f = new concrete_filter<T,U,Body>(mode,body);
							 | 
						|
								        p.add_filter( *f );
							 | 
						|
								    }
							 | 
						|
								public:
							 | 
						|
								    filter_node_leaf( tbb::filter::mode m, const Body& b ) : mode(m), body(b) {}
							 | 
						|
								};
							 | 
						|
								
							 | 
						|
								//! Node in parse tree representing join of two filters.
							 | 
						|
								class filter_node_join: public filter_node {
							 | 
						|
								    friend class filter_node; // to suppress GCC 3.2 warnings
							 | 
						|
								    filter_node& left;
							 | 
						|
								    filter_node& right;
							 | 
						|
								    /*override*/~filter_node_join() {
							 | 
						|
								       left.remove_ref();
							 | 
						|
								       right.remove_ref();
							 | 
						|
								    }
							 | 
						|
								    /*override*/void add_to( pipeline& p ) {
							 | 
						|
								        left.add_to(p);
							 | 
						|
								        right.add_to(p);
							 | 
						|
								    }
							 | 
						|
								public:
							 | 
						|
								    filter_node_join( filter_node& x, filter_node& y ) : left(x), right(y) {
							 | 
						|
								       left.add_ref();
							 | 
						|
								       right.add_ref();
							 | 
						|
								    }
							 | 
						|
								};
							 | 
						|
								
							 | 
						|
								} // namespace internal
							 | 
						|
								//! @endcond
							 | 
						|
								
							 | 
						|
								//! Create a filter to participate in parallel_pipeline
							 | 
						|
								template<typename T, typename U, typename Body>
							 | 
						|
								filter_t<T,U> make_filter(tbb::filter::mode mode, const Body& body) {
							 | 
						|
								    return new internal::filter_node_leaf<T,U,Body>(mode, body);
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								template<typename T, typename V, typename U>
							 | 
						|
								filter_t<T,U> operator& (const filter_t<T,V>& left, const filter_t<V,U>& right) {
							 | 
						|
								    __TBB_ASSERT(left.root,"cannot use default-constructed filter_t as left argument of '&'");
							 | 
						|
								    __TBB_ASSERT(right.root,"cannot use default-constructed filter_t as right argument of '&'");
							 | 
						|
								    return new internal::filter_node_join(*left.root,*right.root);
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								//! Class representing a chain of type-safe pipeline filters
							 | 
						|
								template<typename T, typename U>
							 | 
						|
								class filter_t {
							 | 
						|
								    typedef internal::filter_node filter_node;
							 | 
						|
								    filter_node* root;
							 | 
						|
								    filter_t( filter_node* root_ ) : root(root_) {
							 | 
						|
								        root->add_ref();
							 | 
						|
								    }
							 | 
						|
								    friend class internal::pipeline_proxy;
							 | 
						|
								    template<typename T_, typename U_, typename Body>
							 | 
						|
								    friend filter_t<T_,U_> make_filter(tbb::filter::mode, const Body& );
							 | 
						|
								    template<typename T_, typename V_, typename U_>
							 | 
						|
								    friend filter_t<T_,U_> operator& (const filter_t<T_,V_>& , const filter_t<V_,U_>& );
							 | 
						|
								public:
							 | 
						|
								    filter_t() : root(NULL) {}
							 | 
						|
								    filter_t( const filter_t<T,U>& rhs ) : root(rhs.root) {
							 | 
						|
								        if( root ) root->add_ref();
							 | 
						|
								    }
							 | 
						|
								    template<typename Body>
							 | 
						|
								    filter_t( tbb::filter::mode mode, const Body& body ) :
							 | 
						|
								        root( new internal::filter_node_leaf<T,U,Body>(mode, body) ) {
							 | 
						|
								        root->add_ref();
							 | 
						|
								    }
							 | 
						|
								
							 | 
						|
								    void operator=( const filter_t<T,U>& rhs ) {
							 | 
						|
								        // Order of operations below carefully chosen so that reference counts remain correct
							 | 
						|
								        // in unlikely event that remove_ref throws exception.
							 | 
						|
								        filter_node* old = root;
							 | 
						|
								        root = rhs.root; 
							 | 
						|
								        if( root ) root->add_ref();
							 | 
						|
								        if( old ) old->remove_ref();
							 | 
						|
								    }
							 | 
						|
								    ~filter_t() {
							 | 
						|
								        if( root ) root->remove_ref();
							 | 
						|
								    }
							 | 
						|
								    void clear() {
							 | 
						|
								        // Like operator= with filter_t() on right side.
							 | 
						|
								        if( root ) {
							 | 
						|
								            filter_node* old = root;
							 | 
						|
								            root = NULL;
							 | 
						|
								            old->remove_ref();
							 | 
						|
								        }
							 | 
						|
								    }
							 | 
						|
								};
							 | 
						|
								
							 | 
						|
								inline internal::pipeline_proxy::pipeline_proxy( const filter_t<void,void>& filter_chain ) : my_pipe() {
							 | 
						|
								    __TBB_ASSERT( filter_chain.root, "cannot apply parallel_pipeline to default-constructed filter_t"  );
							 | 
						|
								    filter_chain.root->add_to(my_pipe);
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain
							 | 
						|
								#if __TBB_TASK_GROUP_CONTEXT
							 | 
						|
								    , tbb::task_group_context& context
							 | 
						|
								#endif
							 | 
						|
								    ) {
							 | 
						|
								    internal::pipeline_proxy pipe(filter_chain);
							 | 
						|
								    // tbb::pipeline::run() is called via the proxy
							 | 
						|
								    pipe->run(max_number_of_live_tokens
							 | 
						|
								#if __TBB_TASK_GROUP_CONTEXT
							 | 
						|
								              , context
							 | 
						|
								#endif
							 | 
						|
								    );
							 | 
						|
								}
							 | 
						|
								
							 | 
						|
								#if __TBB_TASK_GROUP_CONTEXT
							 | 
						|
								inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain) {
							 | 
						|
								    tbb::task_group_context context;
							 | 
						|
								    parallel_pipeline(max_number_of_live_tokens, filter_chain, context);
							 | 
						|
								}
							 | 
						|
								#endif // __TBB_TASK_GROUP_CONTEXT
							 | 
						|
								
							 | 
						|
								} // interface6
							 | 
						|
								
							 | 
						|
								using interface6::flow_control;
							 | 
						|
								using interface6::filter_t;
							 | 
						|
								using interface6::make_filter;
							 | 
						|
								using interface6::parallel_pipeline;
							 | 
						|
								
							 | 
						|
								} // tbb
							 | 
						|
								
							 | 
						|
								#endif /* __TBB_pipeline_H */
							 |