6. Queue manager
The Poplar library
processes I/O operations asynchronously using
data streams. The Model Runtime
Session
class wraps the Poplar mechanism using the
setUserOutputHandler()
and
setUserInputHandler()
functions.
This is described in detail in Section 4.5, Retrieving information from a session.
In summary, a user callback is called each time the Poplar runtime performs an I/O transfer.
Although such an approach gives you maximum flexibility, it also means you have more responsibility for the implementation. You must write the entire logic inside the callback to perform the following operations:
Copy data from or to the source
Ensure that the correct data is copied when processing multiple requests with changing inputs
Perform memory management for data I/O
Run the application when the output tensor is ready
The QueueManager
class handles some of these
responsibilities and reduces the work you have to do.
QueueManager
creates and registers callbacks for
each Anchor
object for which an instance of your
AnchorCallbackPredicate
predicate
passed to the QueueManager
constructor
returns BIND_USER_CB
.
Along with callbacks, QueueManager
creates and controls a set of tensor data queues, storing pointers to the
dedicated user-memory slots. Each queue is connected to the responding tensor
data stream callback and deals with transferring and receiving data received
from Poplar.
The only way to create a QueueManager
object is
to use the Session
method
createQueueManager()
. It forwards all
parameters to the QueueManager
constructor, stores
the created object in the memory owned by Session
and returns a QueueManager
access pointer to the
model.
The lifetime of the QueueManager
object is strictly
related to the lifetime of the Session
instance.
// Assuming there is a session (Session) already created
model_runtime::QueueManager *queue_manager = session.createQueueManager();
The QueueManager
instance creates one queue per
Anchor
. There are two types of queues:
InputQueue
processes input dataOutputQueue
processes output data
Queues for the individual anchors can be accessed using
inputQueue()
and
outputQueue()
.
Add requests to the queue using
model_runtime::InputQueue::enqueue()
and
model_runtime::OutputQueue::enqueue()
.
For these methods, you can specify a
ReadCompleteCallback
or
WriteCompleteCallback
callback that will
be called when the IPU memory transfer is complete.
Each queue is based on SpscRingBuffer
, which is an
implementation of a lock-free single-producer single-consumer circular FIFO
queue.
You can set the buffer size in the
QueueManager
constructor. By default, the buffer
size is double the batch size (the outer tensor shape dimension).
Queues are not thread-safe, so you must ensure that an appropriate synchronization mechanism (for example std::mutex) is used between threads when adding new requests.
Queues store raw pointers to the user data. You are responsible for the memory allocation of the input and output tensors. You have to ensure that the data exists and pointers passed to the queue remain valid until a result is returned.
void enqueueInput(model_runtime::QueueManager *queue_manager,
const std::string &input_name,
void *input_data,
int64_t data_size) {
ReadStartCallback read_start_callback = [=]() {
std::cout << "Start reading " << input_name << " " << input_data
<< "\n";
};
ReadCompleteCallback read_complete_callback = [=]() {
std::cout << "Reading completed " << input_name << " " << input_data
<< "\n";
};
ReadCompleteCallback read_complete_callback = nullptr;
queue_manager->inputQueue(input_name).enqueue(input_data, data_size,
read_start_callback, read_complete_callback);
}
void enqueueOutput(model_runtime::QueueManager *queue_manager,
const std::string &output_name,
void *output_data,
int64_t data_size) {
WriteCompleteCallback write_complete_callback = [=]() {
std::cout << "Writing completed " << output_name << " "
<< output_data << "\n";
};
queue_manager->outputQueue(output_name).enqueue(output_data, data_size,
write_complete_callback, output_data);
}
// Assuming there is a session (QueueManager) already created and memory is
// allocated
// Queues are not thread_safe, in multithreaded environment appropriate
// synchronization mechanisms must be applied
static std::mutex g_i_mutex;
const std::lock_guard<std::mutex> lock(g_i_mutex);
enqueueInput(queue_manager, "input_data", static_cast<void*>(input_data),
8);
enqueueOutput(queue_manager, "output_data", static_cast<void*>(output_data),
8);
This C++ example sends several inference requests to the
IPU using QueueManager
.
1// Copyright (c) 2022 Graphcore Ltd. All rights reserved.
2#include <atomic>
3#include <chrono>
4#include <cstdlib>
5#include <memory>
6#include <string>
7#include <thread>
8#include <vector>
9
10#include <boost/program_options.hpp>
11
12#include <model_runtime/DeviceManager.hpp>
13#include <model_runtime/QueueManager.hpp>
14#include <model_runtime/Session.hpp>
15
16#include "utils.hpp"
17
18int main(int argc, char *argv[]) {
19 using namespace std::chrono_literals;
20 static const char *example_desc = "Queue manager example.";
21 const boost::program_options::variables_map vm =
22 examples::parsePopefProgramOptions(example_desc, argc, argv);
23
24 auto reader = std::make_shared<popef::Reader>();
25 for (const auto &model_file : vm["popef"].as<std::vector<std::string>>()) {
26 reader->parseFile(model_file);
27 examples::print(fmt::format("Parsed {}", model_file));
28 }
29
30 popef::ModelBuilder builder(reader);
31 std::shared_ptr<popef::Model> model = builder.createModel();
32 examples::print("Model created from PopEF");
33
34 model_runtime::SessionConfig config;
35 config.policy = model_runtime::LaunchPolicy::Immediate;
36 config.wait_config =
37 model_runtime::DeviceWaitConfig{600s /*timeout*/, 1s /*sleep_time*/};
38
39 model_runtime::Session session(model, config);
40
41 examples::print("Create and configure Queue Manager");
42 const model_runtime::AnchorCallbackPredicate pred_main_or_empty =
43 examples::filterMainOrEmpty(model);
44 model_runtime::QueueManager *queue_manager = session.createQueueManager(
45 100, nullptr, 0ns, pred_main_or_empty, pred_main_or_empty);
46
47 // In case the model is configured with Prefetch and run with max_look_ahead >
48 // 0 then there is a number of inputs needed to be fetched before the first
49 // output is generated.
50 auto input_queues_feeder = [&]() {
51 for (auto &elem : queue_manager->inputs) {
52 const std::string &name = elem.first;
53 model_runtime::InputQueue &elem_queue = elem.second;
54 const popef::TensorInfo &info = elem_queue.tensorInfo();
55
56 const auto size = info.sizeInBytes();
57 // Prepare and enqueue dummy inputs
58 std::vector<char> elem_data(size, 0);
59 elem_queue.enqueue(elem_data.data(), size, [name]() {
60 examples::print(fmt::format("Enqueued data for input: {}", name));
61 });
62 }
63 };
64
65 // Prepare buffers for produced outputs
66 std::atomic<int> cnt_outputs = 0;
67 const auto num_of_outputs = queue_manager->outputs.size();
68 std::vector<std::vector<uint8_t>> outputs;
69 outputs.reserve(num_of_outputs);
70 for (const auto &out : queue_manager->outputs) {
71 const popef::TensorInfo &info = out.second.tensorInfo();
72 const auto size = info.sizeInBytes();
73 outputs.emplace_back(size);
74 }
75
76 auto output_queues_feeder = [&]() {
77 int i = 0;
78 for (auto &out : queue_manager->outputs) {
79 const std::string &name = out.first;
80 model_runtime::OutputQueue &out_queue = out.second;
81 const popef::TensorInfo &info = out_queue.tensorInfo();
82 const int64_t size = info.sizeInBytes();
83
84 out_queue.enqueue(outputs[i++].data(), size, [name, &cnt_outputs]() {
85 examples::print(fmt::format("Received output: {}", name));
86 cnt_outputs.fetch_add(1, std::memory_order::memory_order_relaxed);
87 });
88 }
89 };
90
91 examples::print("Starting data feeders");
92 auto feeders = std::thread([&]() {
93 while (cnt_outputs == 0) {
94 input_queues_feeder();
95 output_queues_feeder();
96 };
97 });
98
99 examples::print("Running load programs");
100 session.runLoadPrograms();
101
102 examples::print("Running main programs");
103 auto mainPrograms = std::thread([&]() { session.runMainPrograms(); });
104
105 examples::print("Waiting for 1st output");
106 while (cnt_outputs.load(std::memory_order_acquire) <
107 static_cast<int>(queue_manager->outputs.size()))
108 ;
109
110 examples::print("Stopping session after first output");
111 session.stop();
112
113 mainPrograms.join();
114 feeders.join();
115
116 examples::print("Success: exiting");
117 return EXIT_SUCCESS;
118}