I wanted to design a multi-threaded DX12 rendering pipeline, and saw that Intel’s TBB Flow Graph might make things a lot easier for me. I felt that the documentation for the different flow graph nodes was poorly organized, which made it difficult to understand what all the different flow graph nodes do at a high level. For this reason, I wrote this blog post as a summary that puts it all in one place.
This blog post isn’t a full tutorial on TBB Flow Graph. You should read the Intel-provided documentation first. This blog post is mostly a summary in my own words of the official documentation.
If there are mistakes in my interpretation of the official documentation, or if there are important details missing, please tell me about it so I can correct it. I’ve left a few “unanswered question” bullet points in this article for information that seems lacking from the official documentation. If you have a good answer to these questions, please let me know.
Without further ado, let’s begin the summary.
The functional nodes are source_node, function_node, continue_node, and multifunction_node. These nodes have the purpose of doing computations on their inputs (if it has any) and passing outputs to their successors.
The source_node has no predecessors. It evaluates its body and outputs its result constantly. The source_node is always serial, so its body will never be called concurrently. If no successor accepts the message, the message will be put in a buffer, and it will be re-sent if a successor is added or a successor calls try_get().
The source_node keeps outputting items until its body returns false.
- Unanswered question: can it be reactivated once the body returns false?
The function_node receives an input, applies the body’s function to it, and broadcasts its output to its successors.
The function_node’s maximum concurrency can be specified in its constructor. Setting the concurrency to “tbb::flow::serial” means the body will never be executed concurrently, while setting it to “tbb::flow::unlimited” puts no limit on the body’s concurrency. The concurrency can also be set to a specific number.
If the function_node’s concurrency is already maximized and it receives yet another input, the behavior depends on its graph_buffer_policy template argument. If the buffer policy is “queueing”, then extra inputs will be put on an internal unbounded input buffer until they can be processed. Alternatively, if the policy is “rejecting”, then the function_node will reject the extra inputs, and will instead pull from its predecessors when it becomes ready to handle more inputs. The internal buffer’s allocator can be customized.
The continue_node waits for its predecessors to finish before starting its successors. It counts how many times try_put() was called on it, and when the counter reaches a target value its body is executed and its result is broadcast to its successors. By default, the target value is the number of predecessors.
Despite having N predecessors, the continue_node receives no inputs from them. Emphasis: the continue_node is not a join_node. It receives no input from its predecessors. However, the continue_node still has the ability to broadcast its output to successors.
- Note: The countdown is illustrated by the 3 horizontal bars on the left side of the diagram above.
continue_msg is an empty struct you can use as a placeholder type, like when you just want to kick-off a successor. Its use is not limited to continue_node, but one might be inclined to think that… which is why I put a note of it here.
The multifunction_node is similar to the function_node, with the main difference that it has multiple output ports. The multifunction_node’s body can try_put() to many of its output ports in one invocation. The outputs are represented as a tuple of output ports, which allows you to hook up output ports to successors, and allows you to decide which ports to send outputs to from within the body.
Note: “multifunction_node” used to be called “multioutput_function_node”
Buffering nodes allow you to pass data from one node to another using a specific buffering scheme.
The buffer_node contains internal storage that stores the messages it receives. The allocator for the buffer can be customized.
Buffered messages are forwarded to the successors in an arbitrary order. Each message is sent to only a single successor, and this successor is chosen by trying to send the message to each successor in the order they were registered, until a successor accepts the message.
Successors can reserve a message from the buffer, which gives the successor ownership of that message without removing it from the buffer. The successor can then either release ownership (leaving the message in the buffer for another successor to grab), or consume the message (removing it from the buffer.) While a message is reserved, other messages can still be added or removed from the buffer.
The priority_queue_node is similar to the buffer_node. The difference is that the messages are forwarded in order of largest priority first, as defined by a Compare template argument (defaulting to std::less).
Unlike the buffer_node, if a successor reserves a message from the priority queue, no other reservations can be made (since this would violate the priority queue’s order.) However, messages can still be pushed to the priority queue while it is reserved.
- Unanswered question: What happens if you reserve a priority queue, then push a message that makes its way to the front of the queue?
The queue_node is similar to the priority_queue_node, except that messages are sorted in first-in first-out (FIFO), in order of their insertion.
The sequencer_node is similar to the priority_queue_node, except that messages are sorted in sequence order. Sequence numbers are of type size_t, and the sequence number of a given message is determined by a user-specified function.
The overwrite_node contains a buffer big enough for only a single message. When the value is set, the new value is broadcast to all successors, but (unlike other buffer nodes) the value doesn’t get removed from the buffer when it gets sent to successors. Successors can still call try_get() and receive the same value. The node’s buffer is considered invalid until it gets set for the first time.
The write_once_node is similar to the overwrite_node. However, the single value in the buffer cannot be overwritten. If you try to put while a value is already in the buffer, the put is simply ignored. If you want to change the value in the buffer, you have to explicitly clear() it first, then you can put a new value.
Split/join nodes have the sole purpose of combining or separating streams. They either convert separate messages into tuples of messages, or the reverse.
The join_node receives multiple inputs, packs them into a tuple, then forwards that tuple to its successors. There are a few different policies for the join’s buffering, as shown above and explained below. The buffering policy is set with a template argument.
With a “queueing” join policy, the join node is given an unbounded FIFO queue for each input port. When each input port’s FIFO has at least one item in it, the inputs are removed from the queue, packed into a tuple, and broadcasted to its successors.
With a “reserving” join policy, the need for a FIFO is removed by using a “pull” approach rather than a “push” approach (as done in the queueing join.) When a predecessor puts to an input, the reserving join node marks that input as “potentially available”. When all inputs have been marked potentially available, the reserving join will try to reserve each input to see if they are still actually available. If not all inputs could be successfully reserved, all inputs are released.
If all inputs were successfully reserved, then they are packed into a tuple and broadcasted to successors. If at least one successor accepts the tuple, the input reservations are consumed. Otherwise, the inputs get released.
Since reservation is based on a pull approach, something weird happens: By design, the first push to a reserving join input port will always fail. Instead, this causes the port to switch to pull mode, which will then try_reserve the input from the predecessor. If all inputs can be reserved, they are try_consumed. For these reasons, you typically want the inputs to a reserving join to be buffer nodes (which support pulling).
For more details on reservation: https://www.threadingbuildingblocks.org/docs/help/tbb_userguide/Flow_Graph_Reservation.html
Key Matching (and Tag Matching) Join
With a “key_matching” join policy, a user-provided hash function is applied to the messages to get their associated key, and the key/value pair is stored in a hash table. When a given key exists in a key/value pair in all input ports simultaneously, the key/value pairs are removed from the hash table, and their values are packed into a tuple which is broadcasted to the successors. If no successors accept the tuple, it is saved and will be forwarded on a subsequent try_get().
- Unanswered question: What buffering scheme is used for the stored tuples? A buffer_node, or a queue_node, or what?
The “tag_matching” policy is a specialization of key_matching. The definition is simple:
typedef key_matching tag_matching;
The “tag_value” type is a simple typedef for uint64_t, so this just means you have to use a hash function that returns a uint64_t. That’s all.
The “split_node” gets a tuple as input, and hooks up each element of the tuple to a different output port. You can then connect each output port to a different successor.
The “indexer_node” receives inputs from multiple input ports, and broadcasts them to all successors. Unlike join nodes, the indexer node does not wait for all input slots to have a value. Instead, a message is broadcasted whenever any input port receives an input. The successors receive a tagged union (aka variant) that says which indexer’s input slot the value came from, and then allows you to retrieve the value of the message.
Note: “indexer_node” was previously called “or_node”.
The TBB designers put some miscellaneous node types into this category.
The “broadcast_node” simply broadcasts its inputs to all its successors. There is no buffering in this node, so messages are forwarded immediately to all successors.
The “limiter_node” is similar to a broadcast_node, but it can only output a limited number of messages. Outputted messages only count toward this threshold if the message is accepted by at least one successor. Once the maximum number of messages has been outputted, the limiter no longer accepts inputs.
The counter can be decremented by calling try_put() on the limiter_node’s “decrement” member object. The decrementer is an object which receives continue_msg as an input. The decrementer then makes its parent limiter reduce its output count by 1, and attempts to get a message from a predecessor then forward it to its successors. If the forwarding is successful, its output count will be increased by 1, bringing it back to its original value. If the forwarding is not successful, then the output count will have been reduced by 1.
An example use of this node is building a pipeline. See: https://software.intel.com/en-us/blogs/2011/09/14/how-to-make-a-pipeline-with-an-intel-threading-building-blocks-flow-graph
The “async_node” is designed to communicate with systems outside TBB. The body of the async node receives a “gateway_type” as input, which you can pass to an external system. The gateway interface lets your external system try_put() to the async node’s successors. Also, the gateway interface lets you reserve and release. Reserving/releasing tells the flow graph that async work has started and ended (respectively). This lets the flow graph know when a wait_for_all() on this node should finish.
The maximum concurrency and the buffering policy of this node can be configured.
The “composite_node” allows you to wrap multiple nodes into a single node with defined inputs and outputs. This is done by calling set_external_ports(), which lets you specify the types of the inputs and outputs of your composite node.
This node is pretty special. It’s designed for streaming the submission of kernels and data to devices through queues, used to integrate heterogeneous systems with streaming hardware (like GPUs) into TBB flow graphs. Using it requires some configuration in order to properly pass data and kernels to the device. Intel provides such a prepackaged configuration in “opencl_node”, see this overview.