6. Queue Manager
The Poplar library
processes I/O operations in an asynchronous way using
data streams. The
Session
class wraps that mechanism using the
setUserOutputHandler()
and
setUserInputHandler()
functions.
This is described in detail in Section 4.5, Retrieving information from Session.
In summary, a user callback is called each time the Poplar Runtime performs
an I/O transfer.
Although such an approach, gives maximum flexibility to the user, it also imposes many responsibilities. The entire logic inside the callback must be written manually:
copying data from/to the source
ensuring that the correct data is copied when processing multiple requests with changing inputs
memory management for data I/O
application when output tensor is ready
The QueueManager
class handles some of these
responsibilities and reduces the burden on the user.
QueueManager
creates and registers callbacks for
each Anchor
for which an instance of the user
AnchorCallbackPredicate
predicate
passed to the QueueManager
constructor
returns BIND_USER_CB
.
Along with callbacks, the QueueManager
object
creates and controls a set of tensor data queues, storing pointers to the
dedicated user-memory slots. Each queue is connected to the responding tensor
data stream callback and deals with transferring and receiving data received
from Poplar.
The only way to create a QueueManager
object is
to use the Session
method
createQueueManager()
. It forwards all
parameters to the QueueManager
constructor, stores
the created object in the memory owned by Session
and returns a QueueManager
access pointer to the
user.
The lifetime of the QueueManager
object is strictly
related to the lifetime of the Session
instance.
// Assuming there is a session (Session) already created
model_runtime::QueueManager *queue_manager = session.createQueueManager();
The QueueManager
instance creates one queue per
Anchor
. There are two types of queues:
InputQueue
- dedicated to processing input dataOutputQueue
- dedicated to processing output data
Queues for the individual Anchor
anchors can be accessed via
inputQueue()
,
outputQueue()
.
Adding requests to the queue is done using
model_runtime::InputQueue::enqueue()
and
model_runtime::OutputQueue::enqueue()
.
For these methods, the user can respectively specify
ReadCompleteCallback
and
WriteCompleteCallback
notifying callbacks that will
be called when the IPU memory transfer is finished.
Each queue is based on SpscRingBuffer
that is an
implementation of a lock-free single-producer single-consumer circular FIFO
queue.
The buffer size can be set by the user in the
QueueManager
constructor. By default, the buffer
size is double the batch size (outer tensor shape dimension).
Queues are not thread-safe, the user must ensure that appropriate synchronization mechanisms (for example std::mutex) between threads are applied when adding new requests. Queues store raw pointers to the user data. The user is responsible for the memory allocation of the input and output tensors. The user has to ensure that the data exists and pointers passed to the queue remain valid until a result is returned.
void enqueueInput(model_runtime::QueueManager *queue_manager,
const std::string &input_name,
void *input_data,
int64_t data_size) {
ReadStartCallback read_start_callback = [=]() {
std::cout << "Start reading " << input_name << " " << input_data
<< "\n";
};
ReadCompleteCallback read_complete_callback = [=]() {
std::cout << "Reading completed " << input_name << " " << input_data
<< "\n";
};
ReadCompleteCallback read_complete_callback = nullptr;
queue_manager->inputQueue(input_name).enqueue(input_data, data_size,
read_start_callback, read_complete_callback);
}
void enqueueOutput(model_runtime::QueueManager *queue_manager,
const std::string &output_name,
void *output_data,
int64_t data_size) {
WriteCompleteCallback write_complete_callback = [=]() {
std::cout << "Writing completed " << output_name << " "
<< output_data << "\n";
};
queue_manager->outputQueue(output_name).enqueue(output_data, data_size,
write_complete_callback, output_data);
}
// Assuming there is a session (QueueManager) already created and memory is
// allocated
// Queues are not thread_safe, in multithreaded environment appropriate
// synchronization mechanisms must be applied
static std::mutex g_i_mutex;
const std::lock_guard<std::mutex> lock(g_i_mutex);
enqueueInput(queue_manager, "input_data", static_cast<void*>(input_data),
8);
enqueueOutput(queue_manager, "output_data", static_cast<void*>(output_data),
8);

Fig. 6.1 Processing model I/O with Model Runtime queues.
This C++ example sends several inference requests to the
IPU using QueueManager
.
1// Copyright (c) 2022 Graphcore Ltd. All rights reserved.
2#include <atomic>
3#include <chrono>
4#include <cstdlib>
5#include <memory>
6#include <string>
7#include <thread>
8#include <vector>
9
10#include <boost/program_options.hpp>
11
12#include <model_runtime/DeviceManager.hpp>
13#include <model_runtime/QueueManager.hpp>
14#include <model_runtime/Session.hpp>
15
16#include "utils.hpp"
17
18int main(int argc, char *argv[]) {
19 using namespace std::chrono_literals;
20 static const char *example_desc = "Queue manager example.";
21 const boost::program_options::variables_map vm =
22 examples::parsePopefProgramOptions(example_desc, argc, argv);
23
24 auto reader = std::make_shared<popef::Reader>();
25 for (const auto &model_file : vm["popef"].as<std::vector<std::string>>()) {
26 reader->parseFile(model_file);
27 examples::print(fmt::format("Parsed {}", model_file));
28 }
29
30 popef::ModelBuilder builder(reader);
31 std::shared_ptr<popef::Model> model = builder.createModel();
32 examples::print("Model created from PopEF");
33
34 model_runtime::SessionConfig config;
35 config.policy = model_runtime::LaunchPolicy::Immediate;
36 config.wait_config =
37 model_runtime::DeviceWaitConfig{600s /*timeout*/, 1s /*sleep_time*/};
38
39 model_runtime::Session session(model, config);
40
41 examples::print("Create and configure Queue Manager");
42 const model_runtime::AnchorCallbackPredicate pred_main_or_empty =
43 examples::filterMainOrEmpty(model);
44 model_runtime::QueueManager *queue_manager = session.createQueueManager(
45 100, nullptr, 0ns, pred_main_or_empty, pred_main_or_empty);
46
47 // In case the model is configured with Prefetch and run with max_look_ahead >
48 // 0 then there is a number of inputs needed to be fetched before the first
49 // output is generated.
50 auto input_queues_feeder = [&]() {
51 for (auto &elem : queue_manager->inputs) {
52 const std::string &name = elem.first;
53 model_runtime::InputQueue &elem_queue = elem.second;
54 const popef::TensorInfo &info = elem_queue.tensorInfo();
55
56 const auto size = info.sizeInBytes();
57 // Prepare and enqueue dummy inputs
58 std::vector<char> elem_data(size, 0);
59 elem_queue.enqueue(elem_data.data(), size, [name]() {
60 examples::print(fmt::format("Enqueued data for input: {}", name));
61 });
62 }
63 };
64
65 // Prepare buffers for produced outputs
66 std::atomic<int> cnt_outputs = 0;
67 const auto num_of_outputs = queue_manager->outputs.size();
68 std::vector<std::vector<uint8_t>> outputs;
69 outputs.reserve(num_of_outputs);
70 for (const auto &out : queue_manager->outputs) {
71 const popef::TensorInfo &info = out.second.tensorInfo();
72 const auto size = info.sizeInBytes();
73 outputs.emplace_back(size);
74 }
75
76 auto output_queues_feeder = [&]() {
77 int i = 0;
78 for (auto &out : queue_manager->outputs) {
79 const std::string &name = out.first;
80 model_runtime::OutputQueue &out_queue = out.second;
81 const popef::TensorInfo &info = out_queue.tensorInfo();
82 const int64_t size = info.sizeInBytes();
83
84 out_queue.enqueue(outputs[i++].data(), size, [name, &cnt_outputs]() {
85 examples::print(fmt::format("Received output: {}", name));
86 cnt_outputs.fetch_add(1, std::memory_order::memory_order_relaxed);
87 });
88 }
89 };
90
91 examples::print("Starting data feeders");
92 auto feeders = std::thread([&]() {
93 while (cnt_outputs == 0) {
94 input_queues_feeder();
95 output_queues_feeder();
96 };
97 });
98
99 examples::print("Running load programs");
100 session.runLoadPrograms();
101
102 examples::print("Running main programs");
103 auto mainPrograms = std::thread([&]() { session.runMainPrograms(); });
104
105 examples::print("Waiting for 1st output");
106 while (cnt_outputs.load(std::memory_order_acquire) <
107 static_cast<int>(queue_manager->outputs.size()))
108 ;
109
110 examples::print("Stopping session after first output");
111 session.stop();
112
113 mainPrograms.join();
114 feeders.join();
115
116 examples::print("Success: exiting");
117 return EXIT_SUCCESS;
118}