You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
673 lines
24 KiB
673 lines
24 KiB
/*
|
|
Copyright 2005-2014 Intel Corporation. All Rights Reserved.
|
|
|
|
This file is part of Threading Building Blocks.
|
|
|
|
Threading Building Blocks is free software; you can redistribute it
|
|
and/or modify it under the terms of the GNU General Public License
|
|
version 2 as published by the Free Software Foundation.
|
|
|
|
Threading Building Blocks is distributed in the hope that it will be
|
|
useful, but WITHOUT ANY WARRANTY; without even the implied warranty
|
|
of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with Threading Building Blocks; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
|
|
|
|
As a special exception, you may use this file as part of a free software
|
|
library without restriction. Specifically, if other files instantiate
|
|
templates or use macros or inline functions from this file, or you compile
|
|
this file and link it with other files to produce an executable, this
|
|
file does not by itself cause the resulting executable to be covered by
|
|
the GNU General Public License. This exception does not however
|
|
invalidate any other reasons why the executable file might be covered by
|
|
the GNU General Public License.
|
|
*/
|
|
|
|
#ifndef __TBB_pipeline_H
|
|
#define __TBB_pipeline_H
|
|
|
|
#include "atomic.h"
|
|
#include "task.h"
|
|
#include "tbb_allocator.h"
|
|
#include <cstddef>
|
|
|
|
//TODO: consider more accurate method to check if need to implement <type_trais> ourself
|
|
#if !TBB_IMPLEMENT_CPP0X
|
|
#include <type_traits>
|
|
#endif
|
|
|
|
namespace tbb {
|
|
|
|
class pipeline;
|
|
class filter;
|
|
|
|
//! @cond INTERNAL
|
|
namespace internal {
|
|
|
|
// The argument for PIPELINE_VERSION should be an integer between 2 and 9
|
|
#define __TBB_PIPELINE_VERSION(x) ((unsigned char)(x-2)<<1)
|
|
|
|
typedef unsigned long Token;
|
|
typedef long tokendiff_t;
|
|
class stage_task;
|
|
class input_buffer;
|
|
class pipeline_root_task;
|
|
class pipeline_cleaner;
|
|
|
|
} // namespace internal
|
|
|
|
namespace interface6 {
|
|
template<typename T, typename U> class filter_t;
|
|
|
|
namespace internal {
|
|
class pipeline_proxy;
|
|
}
|
|
}
|
|
|
|
//! @endcond
|
|
|
|
//! A stage in a pipeline.
|
|
/** @ingroup algorithms */
|
|
class filter: internal::no_copy {
|
|
private:
|
|
//! Value used to mark "not in pipeline"
|
|
static filter* not_in_pipeline() {return reinterpret_cast<filter*>(intptr_t(-1));}
|
|
protected:
|
|
//! The lowest bit 0 is for parallel vs. serial
|
|
static const unsigned char filter_is_serial = 0x1;
|
|
|
|
//! 4th bit distinguishes ordered vs unordered filters.
|
|
/** The bit was not set for parallel filters in TBB 2.1 and earlier,
|
|
but is_ordered() function always treats parallel filters as out of order. */
|
|
static const unsigned char filter_is_out_of_order = 0x1<<4;
|
|
|
|
//! 5th bit distinguishes thread-bound and regular filters.
|
|
static const unsigned char filter_is_bound = 0x1<<5;
|
|
|
|
//! 6th bit marks input filters emitting small objects
|
|
static const unsigned char filter_may_emit_null = 0x1<<6;
|
|
|
|
//! 7th bit defines exception propagation mode expected by the application.
|
|
static const unsigned char exact_exception_propagation =
|
|
#if TBB_USE_CAPTURED_EXCEPTION
|
|
0x0;
|
|
#else
|
|
0x1<<7;
|
|
#endif /* TBB_USE_CAPTURED_EXCEPTION */
|
|
|
|
static const unsigned char current_version = __TBB_PIPELINE_VERSION(5);
|
|
static const unsigned char version_mask = 0x7<<1; // bits 1-3 are for version
|
|
public:
|
|
enum mode {
|
|
//! processes multiple items in parallel and in no particular order
|
|
parallel = current_version | filter_is_out_of_order,
|
|
//! processes items one at a time; all such filters process items in the same order
|
|
serial_in_order = current_version | filter_is_serial,
|
|
//! processes items one at a time and in no particular order
|
|
serial_out_of_order = current_version | filter_is_serial | filter_is_out_of_order,
|
|
//! @deprecated use serial_in_order instead
|
|
serial = serial_in_order
|
|
};
|
|
protected:
|
|
filter( bool is_serial_ ) :
|
|
next_filter_in_pipeline(not_in_pipeline()),
|
|
my_input_buffer(NULL),
|
|
my_filter_mode(static_cast<unsigned char>((is_serial_ ? serial : parallel) | exact_exception_propagation)),
|
|
prev_filter_in_pipeline(not_in_pipeline()),
|
|
my_pipeline(NULL),
|
|
next_segment(NULL)
|
|
{}
|
|
|
|
filter( mode filter_mode ) :
|
|
next_filter_in_pipeline(not_in_pipeline()),
|
|
my_input_buffer(NULL),
|
|
my_filter_mode(static_cast<unsigned char>(filter_mode | exact_exception_propagation)),
|
|
prev_filter_in_pipeline(not_in_pipeline()),
|
|
my_pipeline(NULL),
|
|
next_segment(NULL)
|
|
{}
|
|
|
|
// signal end-of-input for concrete_filters
|
|
void __TBB_EXPORTED_METHOD set_end_of_input();
|
|
|
|
public:
|
|
//! True if filter is serial.
|
|
bool is_serial() const {
|
|
return bool( my_filter_mode & filter_is_serial );
|
|
}
|
|
|
|
//! True if filter must receive stream in order.
|
|
bool is_ordered() const {
|
|
return (my_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial;
|
|
}
|
|
|
|
//! True if filter is thread-bound.
|
|
bool is_bound() const {
|
|
return ( my_filter_mode & filter_is_bound )==filter_is_bound;
|
|
}
|
|
|
|
//! true if an input filter can emit null
|
|
bool object_may_be_null() {
|
|
return ( my_filter_mode & filter_may_emit_null ) == filter_may_emit_null;
|
|
}
|
|
|
|
//! Operate on an item from the input stream, and return item for output stream.
|
|
/** Returns NULL if filter is a sink. */
|
|
virtual void* operator()( void* item ) = 0;
|
|
|
|
//! Destroy filter.
|
|
/** If the filter was added to a pipeline, the pipeline must be destroyed first. */
|
|
virtual __TBB_EXPORTED_METHOD ~filter();
|
|
|
|
#if __TBB_TASK_GROUP_CONTEXT
|
|
//! Destroys item if pipeline was cancelled.
|
|
/** Required to prevent memory leaks.
|
|
Note it can be called concurrently even for serial filters.*/
|
|
virtual void finalize( void* /*item*/ ) {};
|
|
#endif
|
|
|
|
private:
|
|
//! Pointer to next filter in the pipeline.
|
|
filter* next_filter_in_pipeline;
|
|
|
|
//! has the filter not yet processed all the tokens it will ever see?
|
|
// (pipeline has not yet reached end_of_input or this filter has not yet
|
|
// seen the last token produced by input_filter)
|
|
bool has_more_work();
|
|
|
|
//! Buffer for incoming tokens, or NULL if not required.
|
|
/** The buffer is required if the filter is serial or follows a thread-bound one. */
|
|
internal::input_buffer* my_input_buffer;
|
|
|
|
friend class internal::stage_task;
|
|
friend class internal::pipeline_root_task;
|
|
friend class pipeline;
|
|
friend class thread_bound_filter;
|
|
|
|
//! Storage for filter mode and dynamically checked implementation version.
|
|
const unsigned char my_filter_mode;
|
|
|
|
//! Pointer to previous filter in the pipeline.
|
|
filter* prev_filter_in_pipeline;
|
|
|
|
//! Pointer to the pipeline.
|
|
pipeline* my_pipeline;
|
|
|
|
//! Pointer to the next "segment" of filters, or NULL if not required.
|
|
/** In each segment, the first filter is not thread-bound but follows a thread-bound one. */
|
|
filter* next_segment;
|
|
};
|
|
|
|
//! A stage in a pipeline served by a user thread.
|
|
/** @ingroup algorithms */
|
|
class thread_bound_filter: public filter {
|
|
public:
|
|
enum result_type {
|
|
// item was processed
|
|
success,
|
|
// item is currently not available
|
|
item_not_available,
|
|
// there are no more items to process
|
|
end_of_stream
|
|
};
|
|
protected:
|
|
thread_bound_filter(mode filter_mode):
|
|
filter(static_cast<mode>(filter_mode | filter::filter_is_bound))
|
|
{}
|
|
public:
|
|
//! If a data item is available, invoke operator() on that item.
|
|
/** This interface is non-blocking.
|
|
Returns 'success' if an item was processed.
|
|
Returns 'item_not_available' if no item can be processed now
|
|
but more may arrive in the future, or if token limit is reached.
|
|
Returns 'end_of_stream' if there are no more items to process. */
|
|
result_type __TBB_EXPORTED_METHOD try_process_item();
|
|
|
|
//! Wait until a data item becomes available, and invoke operator() on that item.
|
|
/** This interface is blocking.
|
|
Returns 'success' if an item was processed.
|
|
Returns 'end_of_stream' if there are no more items to process.
|
|
Never returns 'item_not_available', as it blocks until another return condition applies. */
|
|
result_type __TBB_EXPORTED_METHOD process_item();
|
|
|
|
private:
|
|
//! Internal routine for item processing
|
|
result_type internal_process_item(bool is_blocking);
|
|
};
|
|
|
|
//! A processing pipeline that applies filters to items.
|
|
/** @ingroup algorithms */
|
|
class pipeline {
|
|
public:
|
|
//! Construct empty pipeline.
|
|
__TBB_EXPORTED_METHOD pipeline();
|
|
|
|
/** Though the current implementation declares the destructor virtual, do not rely on this
|
|
detail. The virtualness is deprecated and may disappear in future versions of TBB. */
|
|
virtual __TBB_EXPORTED_METHOD ~pipeline();
|
|
|
|
//! Add filter to end of pipeline.
|
|
void __TBB_EXPORTED_METHOD add_filter( filter& filter_ );
|
|
|
|
//! Run the pipeline to completion.
|
|
void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens );
|
|
|
|
#if __TBB_TASK_GROUP_CONTEXT
|
|
//! Run the pipeline to completion with user-supplied context.
|
|
void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens, tbb::task_group_context& context );
|
|
#endif
|
|
|
|
//! Remove all filters from the pipeline.
|
|
void __TBB_EXPORTED_METHOD clear();
|
|
|
|
private:
|
|
friend class internal::stage_task;
|
|
friend class internal::pipeline_root_task;
|
|
friend class filter;
|
|
friend class thread_bound_filter;
|
|
friend class internal::pipeline_cleaner;
|
|
friend class tbb::interface6::internal::pipeline_proxy;
|
|
|
|
//! Pointer to first filter in the pipeline.
|
|
filter* filter_list;
|
|
|
|
//! Pointer to location where address of next filter to be added should be stored.
|
|
filter* filter_end;
|
|
|
|
//! task who's reference count is used to determine when all stages are done.
|
|
task* end_counter;
|
|
|
|
//! Number of idle tokens waiting for input stage.
|
|
atomic<internal::Token> input_tokens;
|
|
|
|
//! Global counter of tokens
|
|
atomic<internal::Token> token_counter;
|
|
|
|
//! False until fetch_input returns NULL.
|
|
bool end_of_input;
|
|
|
|
//! True if the pipeline contains a thread-bound filter; false otherwise.
|
|
bool has_thread_bound_filters;
|
|
|
|
//! Remove filter from pipeline.
|
|
void remove_filter( filter& filter_ );
|
|
|
|
//! Not used, but retained to satisfy old export files.
|
|
void __TBB_EXPORTED_METHOD inject_token( task& self );
|
|
|
|
#if __TBB_TASK_GROUP_CONTEXT
|
|
//! Does clean up if pipeline is cancelled or exception occurred
|
|
void clear_filters();
|
|
#endif
|
|
};
|
|
|
|
//------------------------------------------------------------------------
|
|
// Support for lambda-friendly parallel_pipeline interface
|
|
//------------------------------------------------------------------------
|
|
|
|
namespace interface6 {
|
|
|
|
namespace internal {
|
|
template<typename T, typename U, typename Body> class concrete_filter;
|
|
}
|
|
|
|
//! input_filter control to signal end-of-input for parallel_pipeline
|
|
class flow_control {
|
|
bool is_pipeline_stopped;
|
|
flow_control() { is_pipeline_stopped = false; }
|
|
template<typename T, typename U, typename Body> friend class internal::concrete_filter;
|
|
public:
|
|
void stop() { is_pipeline_stopped = true; }
|
|
};
|
|
|
|
//! @cond INTERNAL
|
|
namespace internal {
|
|
|
|
template<typename T> struct tbb_large_object {enum { value = sizeof(T) > sizeof(void *) }; };
|
|
|
|
#if TBB_IMPLEMENT_CPP0X
|
|
// cannot use SFINAE in current compilers. Explicitly list the types we wish to be
|
|
// placed as-is in the pipeline input_buffers.
|
|
template<typename T> struct tbb_trivially_copyable { enum { value = false }; };
|
|
template<typename T> struct tbb_trivially_copyable <T*> { enum { value = true }; };
|
|
template<> struct tbb_trivially_copyable <short> { enum { value = true }; };
|
|
template<> struct tbb_trivially_copyable <unsigned short> { enum { value = true }; };
|
|
template<> struct tbb_trivially_copyable <int> { enum { value = !tbb_large_object<int>::value }; };
|
|
template<> struct tbb_trivially_copyable <unsigned int> { enum { value = !tbb_large_object<int>::value }; };
|
|
template<> struct tbb_trivially_copyable <long> { enum { value = !tbb_large_object<long>::value }; };
|
|
template<> struct tbb_trivially_copyable <unsigned long> { enum { value = !tbb_large_object<long>::value }; };
|
|
template<> struct tbb_trivially_copyable <float> { enum { value = !tbb_large_object<float>::value }; };
|
|
template<> struct tbb_trivially_copyable <double> { enum { value = !tbb_large_object<double>::value }; };
|
|
#else
|
|
#if __GNUC__==4 && __GNUC_MINOR__>=4 && __GXX_EXPERIMENTAL_CXX0X__
|
|
template<typename T> struct tbb_trivially_copyable { enum { value = std::has_trivial_copy_constructor<T>::value }; };
|
|
#else
|
|
template<typename T> struct tbb_trivially_copyable { enum { value = std::is_trivially_copyable<T>::value }; };
|
|
#endif //
|
|
#endif // TBB_IMPLEMENT_CPP0X
|
|
|
|
template<typename T> struct is_large_object {enum { value = tbb_large_object<T>::value || !tbb_trivially_copyable<T>::value }; };
|
|
|
|
template<typename T, bool> class token_helper;
|
|
|
|
// large object helper (uses tbb_allocator)
|
|
template<typename T>
|
|
class token_helper<T, true> {
|
|
public:
|
|
typedef typename tbb::tbb_allocator<T> allocator;
|
|
typedef T* pointer;
|
|
typedef T value_type;
|
|
static pointer create_token(const value_type & source) {
|
|
pointer output_t = allocator().allocate(1);
|
|
return new (output_t) T(source);
|
|
}
|
|
static value_type & token(pointer & t) { return *t;}
|
|
static void * cast_to_void_ptr(pointer ref) { return (void *) ref; }
|
|
static pointer cast_from_void_ptr(void * ref) { return (pointer)ref; }
|
|
static void destroy_token(pointer token) {
|
|
allocator().destroy(token);
|
|
allocator().deallocate(token,1);
|
|
}
|
|
};
|
|
|
|
// pointer specialization
|
|
template<typename T>
|
|
class token_helper<T*, false > {
|
|
public:
|
|
typedef T* pointer;
|
|
typedef T* value_type;
|
|
static pointer create_token(const value_type & source) { return source; }
|
|
static value_type & token(pointer & t) { return t;}
|
|
static void * cast_to_void_ptr(pointer ref) { return (void *)ref; }
|
|
static pointer cast_from_void_ptr(void * ref) { return (pointer)ref; }
|
|
static void destroy_token( pointer /*token*/) {}
|
|
};
|
|
|
|
// small object specialization (converts void* to the correct type, passes objects directly.)
|
|
template<typename T>
|
|
class token_helper<T, false> {
|
|
typedef union {
|
|
T actual_value;
|
|
void * void_overlay;
|
|
} type_to_void_ptr_map;
|
|
public:
|
|
typedef T pointer; // not really a pointer in this case.
|
|
typedef T value_type;
|
|
static pointer create_token(const value_type & source) {
|
|
return source; }
|
|
static value_type & token(pointer & t) { return t;}
|
|
static void * cast_to_void_ptr(pointer ref) {
|
|
type_to_void_ptr_map mymap;
|
|
mymap.void_overlay = NULL;
|
|
mymap.actual_value = ref;
|
|
return mymap.void_overlay;
|
|
}
|
|
static pointer cast_from_void_ptr(void * ref) {
|
|
type_to_void_ptr_map mymap;
|
|
mymap.void_overlay = ref;
|
|
return mymap.actual_value;
|
|
}
|
|
static void destroy_token( pointer /*token*/) {}
|
|
};
|
|
|
|
template<typename T, typename U, typename Body>
|
|
class concrete_filter: public tbb::filter {
|
|
const Body& my_body;
|
|
typedef token_helper<T,is_large_object<T>::value > t_helper;
|
|
typedef typename t_helper::pointer t_pointer;
|
|
typedef token_helper<U,is_large_object<U>::value > u_helper;
|
|
typedef typename u_helper::pointer u_pointer;
|
|
|
|
/*override*/ void* operator()(void* input) {
|
|
t_pointer temp_input = t_helper::cast_from_void_ptr(input);
|
|
u_pointer output_u = u_helper::create_token(my_body(t_helper::token(temp_input)));
|
|
t_helper::destroy_token(temp_input);
|
|
return u_helper::cast_to_void_ptr(output_u);
|
|
}
|
|
|
|
/*override*/ void finalize(void * input) {
|
|
t_pointer temp_input = t_helper::cast_from_void_ptr(input);
|
|
t_helper::destroy_token(temp_input);
|
|
}
|
|
|
|
public:
|
|
concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
|
|
};
|
|
|
|
// input
|
|
template<typename U, typename Body>
|
|
class concrete_filter<void,U,Body>: public filter {
|
|
const Body& my_body;
|
|
typedef token_helper<U, is_large_object<U>::value > u_helper;
|
|
typedef typename u_helper::pointer u_pointer;
|
|
|
|
/*override*/void* operator()(void*) {
|
|
flow_control control;
|
|
u_pointer output_u = u_helper::create_token(my_body(control));
|
|
if(control.is_pipeline_stopped) {
|
|
u_helper::destroy_token(output_u);
|
|
set_end_of_input();
|
|
return NULL;
|
|
}
|
|
return u_helper::cast_to_void_ptr(output_u);
|
|
}
|
|
|
|
public:
|
|
concrete_filter(tbb::filter::mode filter_mode, const Body& body) :
|
|
filter(static_cast<tbb::filter::mode>(filter_mode | filter_may_emit_null)),
|
|
my_body(body)
|
|
{}
|
|
};
|
|
|
|
template<typename T, typename Body>
|
|
class concrete_filter<T,void,Body>: public filter {
|
|
const Body& my_body;
|
|
typedef token_helper<T, is_large_object<T>::value > t_helper;
|
|
typedef typename t_helper::pointer t_pointer;
|
|
|
|
/*override*/ void* operator()(void* input) {
|
|
t_pointer temp_input = t_helper::cast_from_void_ptr(input);
|
|
my_body(t_helper::token(temp_input));
|
|
t_helper::destroy_token(temp_input);
|
|
return NULL;
|
|
}
|
|
/*override*/ void finalize(void* input) {
|
|
t_pointer temp_input = t_helper::cast_from_void_ptr(input);
|
|
t_helper::destroy_token(temp_input);
|
|
}
|
|
|
|
public:
|
|
concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
|
|
};
|
|
|
|
template<typename Body>
|
|
class concrete_filter<void,void,Body>: public filter {
|
|
const Body& my_body;
|
|
|
|
/** Override privately because it is always called virtually */
|
|
/*override*/ void* operator()(void*) {
|
|
flow_control control;
|
|
my_body(control);
|
|
void* output = control.is_pipeline_stopped ? NULL : (void*)(intptr_t)-1;
|
|
return output;
|
|
}
|
|
public:
|
|
concrete_filter(filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
|
|
};
|
|
|
|
//! The class that represents an object of the pipeline for parallel_pipeline().
|
|
/** It primarily serves as RAII class that deletes heap-allocated filter instances. */
|
|
class pipeline_proxy {
|
|
tbb::pipeline my_pipe;
|
|
public:
|
|
pipeline_proxy( const filter_t<void,void>& filter_chain );
|
|
~pipeline_proxy() {
|
|
while( filter* f = my_pipe.filter_list )
|
|
delete f; // filter destructor removes it from the pipeline
|
|
}
|
|
tbb::pipeline* operator->() { return &my_pipe; }
|
|
};
|
|
|
|
//! Abstract base class that represents a node in a parse tree underlying a filter_t.
|
|
/** These nodes are always heap-allocated and can be shared by filter_t objects. */
|
|
class filter_node: tbb::internal::no_copy {
|
|
/** Count must be atomic because it is hidden state for user, but might be shared by threads. */
|
|
tbb::atomic<intptr_t> ref_count;
|
|
protected:
|
|
filter_node() {
|
|
ref_count = 0;
|
|
#ifdef __TBB_TEST_FILTER_NODE_COUNT
|
|
++(__TBB_TEST_FILTER_NODE_COUNT);
|
|
#endif
|
|
}
|
|
public:
|
|
//! Add concrete_filter to pipeline
|
|
virtual void add_to( pipeline& ) = 0;
|
|
//! Increment reference count
|
|
void add_ref() {++ref_count;}
|
|
//! Decrement reference count and delete if it becomes zero.
|
|
void remove_ref() {
|
|
__TBB_ASSERT(ref_count>0,"ref_count underflow");
|
|
if( --ref_count==0 )
|
|
delete this;
|
|
}
|
|
virtual ~filter_node() {
|
|
#ifdef __TBB_TEST_FILTER_NODE_COUNT
|
|
--(__TBB_TEST_FILTER_NODE_COUNT);
|
|
#endif
|
|
}
|
|
};
|
|
|
|
//! Node in parse tree representing result of make_filter.
|
|
template<typename T, typename U, typename Body>
|
|
class filter_node_leaf: public filter_node {
|
|
const tbb::filter::mode mode;
|
|
const Body body;
|
|
/*override*/void add_to( pipeline& p ) {
|
|
concrete_filter<T,U,Body>* f = new concrete_filter<T,U,Body>(mode,body);
|
|
p.add_filter( *f );
|
|
}
|
|
public:
|
|
filter_node_leaf( tbb::filter::mode m, const Body& b ) : mode(m), body(b) {}
|
|
};
|
|
|
|
//! Node in parse tree representing join of two filters.
|
|
class filter_node_join: public filter_node {
|
|
friend class filter_node; // to suppress GCC 3.2 warnings
|
|
filter_node& left;
|
|
filter_node& right;
|
|
/*override*/~filter_node_join() {
|
|
left.remove_ref();
|
|
right.remove_ref();
|
|
}
|
|
/*override*/void add_to( pipeline& p ) {
|
|
left.add_to(p);
|
|
right.add_to(p);
|
|
}
|
|
public:
|
|
filter_node_join( filter_node& x, filter_node& y ) : left(x), right(y) {
|
|
left.add_ref();
|
|
right.add_ref();
|
|
}
|
|
};
|
|
|
|
} // namespace internal
|
|
//! @endcond
|
|
|
|
//! Create a filter to participate in parallel_pipeline
|
|
template<typename T, typename U, typename Body>
|
|
filter_t<T,U> make_filter(tbb::filter::mode mode, const Body& body) {
|
|
return new internal::filter_node_leaf<T,U,Body>(mode, body);
|
|
}
|
|
|
|
template<typename T, typename V, typename U>
|
|
filter_t<T,U> operator& (const filter_t<T,V>& left, const filter_t<V,U>& right) {
|
|
__TBB_ASSERT(left.root,"cannot use default-constructed filter_t as left argument of '&'");
|
|
__TBB_ASSERT(right.root,"cannot use default-constructed filter_t as right argument of '&'");
|
|
return new internal::filter_node_join(*left.root,*right.root);
|
|
}
|
|
|
|
//! Class representing a chain of type-safe pipeline filters
|
|
template<typename T, typename U>
|
|
class filter_t {
|
|
typedef internal::filter_node filter_node;
|
|
filter_node* root;
|
|
filter_t( filter_node* root_ ) : root(root_) {
|
|
root->add_ref();
|
|
}
|
|
friend class internal::pipeline_proxy;
|
|
template<typename T_, typename U_, typename Body>
|
|
friend filter_t<T_,U_> make_filter(tbb::filter::mode, const Body& );
|
|
template<typename T_, typename V_, typename U_>
|
|
friend filter_t<T_,U_> operator& (const filter_t<T_,V_>& , const filter_t<V_,U_>& );
|
|
public:
|
|
filter_t() : root(NULL) {}
|
|
filter_t( const filter_t<T,U>& rhs ) : root(rhs.root) {
|
|
if( root ) root->add_ref();
|
|
}
|
|
template<typename Body>
|
|
filter_t( tbb::filter::mode mode, const Body& body ) :
|
|
root( new internal::filter_node_leaf<T,U,Body>(mode, body) ) {
|
|
root->add_ref();
|
|
}
|
|
|
|
void operator=( const filter_t<T,U>& rhs ) {
|
|
// Order of operations below carefully chosen so that reference counts remain correct
|
|
// in unlikely event that remove_ref throws exception.
|
|
filter_node* old = root;
|
|
root = rhs.root;
|
|
if( root ) root->add_ref();
|
|
if( old ) old->remove_ref();
|
|
}
|
|
~filter_t() {
|
|
if( root ) root->remove_ref();
|
|
}
|
|
void clear() {
|
|
// Like operator= with filter_t() on right side.
|
|
if( root ) {
|
|
filter_node* old = root;
|
|
root = NULL;
|
|
old->remove_ref();
|
|
}
|
|
}
|
|
};
|
|
|
|
inline internal::pipeline_proxy::pipeline_proxy( const filter_t<void,void>& filter_chain ) : my_pipe() {
|
|
__TBB_ASSERT( filter_chain.root, "cannot apply parallel_pipeline to default-constructed filter_t" );
|
|
filter_chain.root->add_to(my_pipe);
|
|
}
|
|
|
|
inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain
|
|
#if __TBB_TASK_GROUP_CONTEXT
|
|
, tbb::task_group_context& context
|
|
#endif
|
|
) {
|
|
internal::pipeline_proxy pipe(filter_chain);
|
|
// tbb::pipeline::run() is called via the proxy
|
|
pipe->run(max_number_of_live_tokens
|
|
#if __TBB_TASK_GROUP_CONTEXT
|
|
, context
|
|
#endif
|
|
);
|
|
}
|
|
|
|
#if __TBB_TASK_GROUP_CONTEXT
|
|
inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain) {
|
|
tbb::task_group_context context;
|
|
parallel_pipeline(max_number_of_live_tokens, filter_chain, context);
|
|
}
|
|
#endif // __TBB_TASK_GROUP_CONTEXT
|
|
|
|
} // interface6
|
|
|
|
using interface6::flow_control;
|
|
using interface6::filter_t;
|
|
using interface6::make_filter;
|
|
using interface6::parallel_pipeline;
|
|
|
|
} // tbb
|
|
|
|
#endif /* __TBB_pipeline_H */
|