Copyright (c) 2022 Graphcore Ltd. All rights reserved.

# Pipelining

Up to now, we've run the entire model on a single IPU. This requires
the variables (and other tensors) of all layers to fit within the IPU's memory. However, large models may exceed the memory capacity on a single IPU.

To train such models while maintaining high IPU utilisation we can use  [**Pipelining**](https://docs.graphcore.ai/projects/ipu-programmers-guide/en/3.3.0/algorithmic_techniques.html?highlight=micro%20batch#model-parallelism-and-pipelining): we first split the model into **independent parts** ( pipeline **stages** ) and then setup the program so that they can be **executed by different IPUs in parallel** (thus, it is a form of **model parallelism**).

## Efficient parallel execution with pipelining

Suppose we have a model made of three layers and we define five pipeline stages:
- layer1 forward on IPU0
- layer2 forward on IPU1
- layer3 forward, loss computation, layer3 backward on IPU3
- layer2 backward on IPU1
- layer1 backward on IPU0

If only a batch is processed at each time, the pipeline is very inefficient, because we have a data dependency between the stages and they can't execute in parallel: only one ipu will be active.

- layer1 forward on IPU0
- layer2 forward on IPU1 **data dependency: needs x = layer1(x)**
- layer3 forward, loss computation, layer3 backward on IPU3 **data dependency: needs x = layer2(x)**
- layer2 backward on IPU1 **data dependency: needs dx = dlayer3(dx,activations3)**
- layer1 backward on IPU0 **data dependency: needs dx = dlayer2(dx,activations2)**

![Figure 1: Inefficient pipelining](images/inefficient_pipeline.png)
<figcaption> <b>Fig 1: </b> Inefficient pipelining: a single batch B<sub>i</sub> is processed. There is a data dependency hence only one IPU can be active to compute activations A<sub>i</sub> and gradients G<sub>i</sub>.
 </figcaption>

To implement pipelining efficiently, we need to **break the data dependency** by loading multiple micro batches and having each pipeline stage running in parallel on a different micro batch.
Gradient accumulation is perfect here, since we also need to keep the weights of the layers frozen during the whole pipeline execution and update them only at the end. 

![Figure 2: Efficient pipelining](images/pipelining_parallel.png)
 <figcaption> <b>Fig 2: </b> Efficient pipelining: multiple micro batches are loaded using gradient accumulation. Pipeline stages can execute in parallel <b>on different micro batches</b> and all IPUs are used in the main phase when the pipeline is full ( while in the ramp up and ramp down phase only a few are active)
 </figcaption>

### Activation Stash and Recomputation
Note that the gradient computation for a layer requires the layers activations calculated with respect to the same batch. In a pipeline, this computation is separated in time: when the IPU is running the backward for the layer, it has already run `backward_stage - forward_stage` forward steps. Hence we need to save all the forward activations in a FIFO queue of length `backward_stage - forward_stage`, the **activation stash**. This can be inconvenient from a memory perspective. We can instead choose to recompute the activations (all or some) to save memory at the expense of extra computation.
See also the [memory stashes and recomputation](https://docs.graphcore.ai/projects/ipu-programmers-guide/en/3.3.0/algorithmic_techniques.html?highlight=micro%20batch#memory-stashes-and-recomputation) paragraph in the ipu programmer guide.

### Efficient pipelining best practices

- With pipelining, the **`gradient_accumulation`** factor needs to be greater than **2 * number of pipeline stages** (Fig 2 shows it clearly). The greater, the better the IPUs utilisation since in that case ramp up and ramp down phases, when not all IPUs are active, only take a small fraction of the execution.
- To get the best utilisation, the layers across the IPUs should be balanced such that they each take roughly the same amount of time to compute. 
- Forward and backward stages for the same layers should be defined on the same IPU, since they require the same parameters. Moreover, backward requires the layer activations.


## Pipelining in addons

Consider the loop:

```python
for i in range(N):
  x = ops.host_load(h2d_stream)
  x = layer0.call(x) # we want this to be our stage0 on ipu0
  x = x.copy_to_ipu(1)
  x = layer1.call(x) # we want this to be our stage1 on ipu1
  ops.host_store(d2h_stream, x)
```
There is a data dependency between `layer0` -> `layer1`. As explained in the previous section, if we rewrite the program so that `layer0` and `layer1` run on different data, we break the dependency and we are able to execute them in parallel.

```python
# ----------- RAMP UP PHASE - only ipu0 active -----------
#load first batch, batch0
x0 = ops.host_load(h2d_stream) 
# run layer0 on batch0
x0_ = layer0(x0) 
# copy the output of layer0 to ipu1, where layer1 lives. 
x1 = x0_.copy_to_ipu(1)

# ------------- MAIN PHASE: both ipus active --------------
for i in range(N-1):
    # load next batch: x0 = batch1
    x0 = ops.host_load(h2d_stream) 
    # layer0 can run on batch1
    x0_ = layer0(x0) 
    # while layer1 runs on batch0
    x1_ = layer1(x1)
    ops.host_store(d2h_stream, x1_)
    # copy the output of layer0 to ipu1 for next iteration
    x1 = x0_.copy_to_ipu(1)
    
# ----------- RAMP DOWN PHASE - only ipu1 active -----------
x1_ = layer1(x1)
ops.host_store(d2h_stream, x1_)
```

The `addons.pipeline_transform` will transform the first loop into the second one. It adds ramp-up and ramp-down phases and reorders and merges synchronisation, eliminating data dependencies so that stages can be executed in parallel.


### The pipeline transform 
Before implementing a pipeline you should outline the model and create the graphs corresponding to each layer on the ipu you want them to be executed on. You can use `popxl.ipu(ipu_index)` context for that.
Every operation creation must take place in a `ipu` context.

Once you have your layers, you can define a pipeline.
To define a pipeline in addons you need to write your code in the `with addons.pipelined_execution(steps=N) as pipeline` context.
This defines the loop `for i in range(N)` to be transformed. 

Within this context you specify the pipeline stages writing operations inside a `pipeline.stage(stage_index)` context and you decide the ipu where they are executed using `popxl.ipu(ipu_index)` context.
The code snippet below shows a basic pipeline for a two-layers forward.

When the pipeline context closes, the transformation is run and the current graph ends up with a single `ops.call` that executes the pipeline:

There are no constraints on how many or which ipus a stage can run on. However, any communication between ipus will cause a syncronisation that can stall the pipeline. In general, we want the data dependencies between stages to only be represented as `ops.ipu_copy` or `Tensor.copy_to_ipu`.

Being a loop, if you have `host_load` in the pipeline you need to remember to set the `ir.num_host_transfers` property equal to the pipeline steps.

Example below shows a basic pipeline for a 2-layer forward.

In [None]:
import popxl
import popxl_addons as addons
import numpy as np
from functools import partial


class Add(addons.Module):
    def build(self, x: popxl.Tensor):
        w = self.add_variable_input(
            "weight", partial(np.random.normal, 0, 0.02, x.shape), x.dtype
        )
        x = popxl.ops.add(w, x)
        return x


ir = popxl.Ir(replication=1)
N = 4

with ir.main_graph:
    # inputs
    input_stream = popxl.h2d_stream((32,), dtype=popxl.float32)

    # create graphs in the appropriate ipu context
    with popxl.ipu(0):
        facts0, layer0 = Add().create_graph(input_stream.spec)

    with popxl.ipu(1):
        facts1, layer1 = Add().create_graph(input_stream.spec)

    # transform the graph if needed

    # bind the graphs when you prefer
    bound_layer0 = layer0.bind(facts0.init())
    bound_layer1 = layer1.bind(facts1.init())

    with popxl.in_sequence(True):
        # pipeline context
        with addons.pipelined_execution(steps=N) as p:
            # define stages on the appropriate ipu
            with p.stage(0), popxl.ipu(0):
                x = popxl.ops.host_load(input_stream)
                (x,) = bound_layer0.call(x)
                x = x.copy_to_ipu(1)

            with p.stage(1), popxl.ipu(1):
                x = bound_layer1.call(x)

graph = addons.GraphWithNamedArgs(ir.main_graph)
print("The current graph only has a call to the pipeline subgraph \n")
print(graph.print_schedule())
ir.num_host_transfers = N
inputs = np.ones(
    (
        N,
        32,
    ),
    dtype=np.float32,
)
with popxl.Session(ir, "ipu_hw") as session:
    session.run({input_stream: inputs})

If we include gradients in the example our pipelined program becomes:
```python
...
# create graphs in the appropriate ipu context
...
with popxl.in_sequence(True):
    with addons.pipelined_execution(steps=N) as p:
        with p.stage(0), popxl.ipu(0):
            # layer0 forward
            x = ops.host_load(input_stream)
            # we need info for activations
            layer0_info = bound_layer0.call_with_info(x) 
            x = layer0_info.outputs[0]
            x = x.copy_to_ipu(1)

        with p.stage(1), popxl.ipu(1):
            target = ops.host_load(target_stream)
            # layer1 forward
            layer1_info = bound_layer1.call_with_info(x)
            x = layer1_info.outputs[0]
            #loss
            loss, dx = loss_op_with_grad(x, target)
            ops.host_store(output_stream, loss)
            # layer1 backward
            layer1_activations = layer1_bwd.grad_graph_info.inputs_dict(layer1_info)
            dx, = bound_layer1_bwd.call(dx, args=layer1_activations) 
            dx = dx.copy_to_ipu(0)

        with p.stage(2), popxl.ipu(0):
            # layer0 backward
            layer0_activations = pipeline.stash_and_restore_activations(
                layer0_info,
                layer0_bwd.grad_graph_info
            )
            dx, = bound_layer0_bwd.call(dx, args=layer0_activations)
    
    # outside the pipeline context
    with popxl.ipu(0):
        # optimizer step for layer 0
    with popxl.ipu(1):
        # optimizer step for layer 1
```

## Memory stashes and recomputation
`stash_and_restore_activations` implements the FIFO activation stash: on forward pipeline stage it stashes the values into a buffer tensor, while on backwards pipeline stage it retrieves the values from the stash.
If you would like more control over when the `Stash` graph is executed, you can use `stash_and_restore_tensor` instead.

Stashing all the activations can use a lot of memory. As explained in the introduction, we can instead choose to recompute the activations.
To add recomputation, the `addons.transforms.recompute_graph` transform can be applied to the backward graph. In this case the backward program consists in a call to the forward followed by a call to the backward. Activations are recomputed and provided to the backward.
When you use recomputation on a graph, you have two forward calls, one in the forward stage and one in the backward stage. In the forward stage, intermediate tensors are not used and are optimized away by popart and poplar.

![ Figure 3: Recomputation ](images/recomputation.png)
<figcaption> <b>Fig 3: </b> Recomputation transform. The backward graph is transformed to include a call to the forward graph. Activations recomputed in this forward are used to run the backward graph. <code>I</code> are intermediate tensors and apostrophes denote derivatives.
</figcaption>

## Mnist with pipelining
We are now ready to implement mnist training with pipelining. We'll implement pipelining only for training, the testing program is unchanged.

The network we've used so far consists of 4 layers.
We are going to place the first two layers on the first ipu, and the last two on the second ipu.

Our pipeline has 3 stages in total:
- stage0 on ipu0 : fc1 forward, fc2 forward 
- stage1 on ipu1 : fc3 forward, fc4 forward, fc4 backward , fc3 backward
- stage2 on ipu0 : fc2 backward, fc1 backward
Hence, our `gradient_accumulation` factor needs to be `>= 3*2 = 6`.

Note that since each layer is treated separately, we can't create the full graph using the `Net()` module, we need to build each layer with the `Linear()` module. To simplify the code, we have thus incorporated the `gelu` activation function in the linear module. For the same reason, we introduce a `ModuleGraphs` class that gathers together the forward and backward graphs for a given layer, keeps track of the variables and performs the weights update for the layer. 

Last thing to note is that in the [previous tutorial](../3_data_parallelism) we used 
```
autodiff_with_accumulation(graph, tensors_to_accumulate_grads=graph.args.tensors)
```
Specifying `tensors_to_accumulate_grads=graph.args.tensors` means that all the weights gradient are accumulated, and the backward graph has no outputs. 

However, we will now use 
```
autodiff_with_accumulation( graph,
                            tensors_to_accumulate_grads=graph.args.tensors,
                            grads_required=(graph.graph.inputs[0], ))
```
for all layers but the first one.
The extra `grads_required=(graph.graph.inputs[0], )` means that the backward graph will return the gradient of the layer input, `dx`, each time it is called. This is needed since `dx` must be provided to previous layers for the backward to continue. In the previous tutorial it was not needed since we were differentiating the entire network.

Options for training includes a `recomputation` flag. If true, the `addons.transforms.recompute_graph()` transform is applied to the backward graph of each layer.

Note that `stash_and_restore_activations` is still there: even if intermediate activations are recomputed, some tensors need to be stashed and retrieved from the forward stage ( fc2 backward, for example, needs fc1 output from the previous stage to run.)

In [None]:
import sys
import argparse
from functools import partial
from typing import Mapping, Optional
import torch
import numpy as np
from time import time
from dataclasses import dataclass, field
import popxl
import popxl_addons as addons
import popxl.ops as ops
from typing import Union, Dict
from popxl_addons.graph import GraphWithNamedArgs
from popxl_addons.named_tensors import NamedTensors
from popxl_addons.variable_factory import NamedVariableFactories
import torch
import torchvision
from tqdm import tqdm

In [None]:
np.random.seed(42)

In [None]:
def get_mnist_data(test_batch_size: int, batch_size: int):
    training_data = torch.utils.data.DataLoader(
        torchvision.datasets.MNIST(
            "~/.torch/datasets",
            train=True,
            download=True,
            transform=torchvision.transforms.Compose(
                [
                    torchvision.transforms.ToTensor(),
                    torchvision.transforms.Normalize(
                        (0.1307,), (0.3081,)
                    ),  # mean and std computed on the training set.
                ]
            ),
        ),
        batch_size=batch_size,
        shuffle=True,
        drop_last=True,
    )

    validation_data = torch.utils.data.DataLoader(
        torchvision.datasets.MNIST(
            "~/.torch/datasets",
            train=False,
            download=True,
            transform=torchvision.transforms.Compose(
                [
                    torchvision.transforms.ToTensor(),
                    torchvision.transforms.Normalize((0.1307,), (0.3081,)),
                ]
            ),
        ),
        batch_size=test_batch_size,
        shuffle=True,
        drop_last=True,
    )
    return training_data, validation_data


def accuracy(predictions: np.ndarray, labels: np.ndarray):
    ind = np.argmax(predictions, axis=-1).flatten()
    labels = labels.detach().numpy().flatten()
    return np.mean(ind == labels) * 100.0


# include gelu
class Linear(addons.Module):
    def __init__(self, out_features: int, bias: bool = True, gelu: bool = True):
        super().__init__()
        self.out_features = out_features
        self.bias = bias
        self.gelu = gelu

    def build(self, x: popxl.Tensor) -> popxl.Tensor:
        # add a state variable to the module
        w = self.add_variable_input(
            "weight",
            partial(np.random.normal, 0, 0.02, (x.shape[-1], self.out_features)),
            x.dtype,
        )
        y = x @ w
        if self.bias:
            # add a state variable to the module
            b = self.add_variable_input("bias", partial(np.zeros, y.shape[-1]), x.dtype)
            y = y + b
        if self.gelu:
            y = ops.gelu(y)
        return y


# gelu now included in the layers
class Net(addons.Module):
    def __init__(self, cache: Optional[addons.GraphCache] = None):
        super().__init__(cache=cache)
        self.fc1 = Linear(512)
        self.fc2 = Linear(512)
        self.fc3 = Linear(512)
        self.fc4 = Linear(10, gelu=False)

    def build(self, x: popxl.Tensor):
        x = x.reshape((-1, 28 * 28))
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        x = self.fc4(x)
        return x


def evaluate_throughput(session, samples_per_step, epochs: int = 5):
    inputs = {
        stream: np.ones(
            session._full_input_shape(stream.shape), stream.dtype.as_numpy()
        )
        for stream in session.expected_inputs()
    }

    durations = []
    with session:
        for i in range(epochs):
            start = time()
            session.run(inputs)
            dur = time() - start
            durations.append(dur)

    duration = np.mean(durations)

    result_str = (
        f"Mean duration: {duration} s "
        f"Throughput: {samples_per_step/duration:6.1f} samples/s "
    )
    print(result_str)

### Configs

In [None]:
@dataclass
class Train:
    micro_batch_size: int = 8
    lr: Union[float, popxl.Tensor] = 1e-3
    epochs: int = 1
    gradient_accumulation: int = 1
    data_parallel: int = 1
    device = "ipu_hw"
    recomputation = False


@dataclass
class Test:
    micro_batch_size: int = 80
    device = "ipu_hw"


class Options:
    train = Train()
    test = Test()


opts = Options()
opts.train.micro_batch_size = 5
opts.train.lr = 1e-3
opts.train.epochs = 1
opts.train.gradient_accumulation = (
    6  # we will use three stages, hence we need gradient_accumulation >= 6
)
opts.train.data_parallel = 1
opts.train.recomputation = False
opts.test.micro_batch_size = 80

### Train

In [None]:
"""
Adam optimizer.
Defines adam update step for a single variable
"""


class Adam(addons.Module):
    # we need to specify in_sequence because a lot of operations are in place and their order
    # shouldn't be rearranged
    @popxl.in_sequence()
    def build(
        self,
        var: popxl.TensorByRef,
        grad: popxl.Tensor,
        *,
        lr: Union[float, popxl.Tensor],
        beta1: Union[float, popxl.Tensor] = 0.9,
        beta2: Union[float, popxl.Tensor] = 0.999,
        eps: Union[float, popxl.Tensor] = 1e-5,
        weight_decay: Union[float, popxl.Tensor] = 1e-2,
        first_order_dtype: popxl.dtype = popxl.float16,
        bias_correction: bool = True
    ):

        # gradient estimators for the variable var - same shape as the variable
        first_order = self.add_variable_input(
            "first_order", partial(np.zeros, var.shape), first_order_dtype, by_ref=True
        )
        ops.var_updates.accumulate_moving_average_(first_order, grad, f=beta1)

        # variance estimators for the variable var - same shape as the variable
        second_order = self.add_variable_input(
            "second_order", partial(np.zeros, var.shape), popxl.float32, by_ref=True
        )
        ops.var_updates.accumulate_moving_average_square_(second_order, grad, f=beta2)

        # adam is a biased estimator: provide the step to correct bias
        step = None
        if bias_correction:
            step = self.add_variable_input(
                "step", partial(np.zeros, ()), popxl.float32, by_ref=True
            )

        # calculate the weight increment with adam heuristic
        updater = ops.var_updates.adam_updater(
            first_order,
            second_order,
            weight=var,
            weight_decay=weight_decay,
            time_step=step,
            beta1=beta1,
            beta2=beta2,
            epsilon=eps,
        )

        # in place weight update: w += (-lr)*dw
        ops.scaled_add_(var, updater, b=-lr)

In [None]:
"""
Groups together the forward and backward graphs of a layer for easy access and handling.
"""


class Graphs:
    def __init__(
        self,
        layer_name: str,
        fwd: GraphWithNamedArgs,
        bwd: GraphWithNamedArgs,
        facts: NamedVariableFactories,
        optimizer: addons.Module,
    ):
        self.layer_name = layer_name
        self.fwd = fwd
        self.bwd = bwd
        self.facts = facts
        self.optimizer = optimizer
        self.vars = NamedTensors()

    def init_and_bind_fwd(self):
        self.vars.insert("fwd", self.facts.fwd.init(self.layer_name))
        return self.fwd.bind(self.vars.fwd)

    def init_and_bind_bwd(self):
        self.vars.insert("bwd", self.facts.bwd.init(self.layer_name))
        return self.bwd.bind(self.vars.bwd)

    def recompute_graph(self):
        self.bwd = addons.transforms.recompute_graph(self.bwd)

    def replicated_all_reduce(self):
        for g in self.vars.bwd.tensors[:-1]:
            g = ops.collectives.replicated_all_reduce_(g, op="mean")

    def optimizer_step(self, lr: Union[float, popxl.Tensor]):
        var_dict = self.vars.fwd.named_tensors
        grad_dict = self.vars.bwd.accum.to_dict()
        for name, var in var_dict.items():
            opt_facts, opt = self.optimizer.create_graph(
                var, var.spec, lr=lr, weight_decay=0.0, bias_correction=True
            )
            state = opt_facts.init()
            opt.bind(state).call(var, grad_dict[name])

        ops.var_updates.accumulator_scale_(self.vars.bwd.mean_accum_counter, 0.0)

    def reset_vars(self):
        self.vars._clear()


def create_graphs(
    layer_name: str,
    layer: addons.Module,
    optimizer: addons.module,
    opts,
    require_input0: bool,
    *args,
    **kwargs
):
    facts, graph = layer.create_graph(*args)
    # tensors_to_accumulate_grads = graph.args.tensors : accumulate gradients of the weights
    # grads_required = (graph.graph.inputs[0],): we need to return the gradient of the first input of the layer, since it
    # will be starting value for backpropagation in the other layers
    req_grads = (graph.graph.inputs[0],) if require_input0 else ()
    bwd_facts, bwd_graph = addons.transforms.autodiff_with_accumulation(
        graph, tensors_to_accumulate_grads=graph.args.tensors, grads_required=req_grads
    )
    factories = NamedVariableFactories()
    factories.insert("fwd", facts)
    factories.insert("bwd", bwd_facts)

    return Graphs(layer_name, graph, bwd_graph, factories, optimizer)

In [None]:
def train_program(opts):
    ir = popxl.Ir()
    ir.replication_factor = opts.train.data_parallel

    with ir.main_graph:
        # -----  Define input and output streams -----
        img_spec = popxl.TensorSpec(
            (opts.train.micro_batch_size, 28 * 28), popxl.float32
        )
        inner_spec = popxl.TensorSpec((opts.train.micro_batch_size, 512), popxl.float32)

        img_stream = popxl.h2d_stream(img_spec.shape, popxl.float32, "image")
        label_stream = popxl.h2d_stream(
            (opts.train.micro_batch_size,), popxl.int32, "labels"
        )
        loss_stream = popxl.d2h_stream((), popxl.float32, "loss")

        optimizer = Adam(cache=True)
        steps = opts.train.gradient_accumulation

        # ----- Create graphs -----

        # create graphs in the appropriate ipu context
        with popxl.ipu(0):
            fc1 = create_graphs("fc1", Linear(512), optimizer, opts, False, img_spec)
            fc2 = create_graphs("fc2", Linear(512), optimizer, opts, True, inner_spec)
        with popxl.ipu(1):
            fc3 = create_graphs("fc3", Linear(512), optimizer, opts, True, inner_spec)
            fc4 = create_graphs(
                "fc4", Linear(10, gelu=False), optimizer, opts, True, inner_spec
            )

        # ----- Transform graphs -----
        # for example, add recomputation
        if opts.train.recomputation:
            fc1.recompute_graph()
            fc2.recompute_graph()
            fc3.recompute_graph()
            fc4.recompute_graph()

        # ----- Construct Execution Scheme -----
        #
        #  Pipeline
        #   stage0, ipu0: fc1 forward, fc2 forward
        #   stage1, ipu1: fc3 forward, fc4 forward, loss, fc4 backward, fc3 backward
        #   stage2, ipu0: fc2 backward, fc1 backward
        #
        #  Replica Reduction
        #  Optimizer
        #
        # --------------------------------------

        # ----- Pipeline -----
        with popxl.in_sequence(True):
            # context for pipeline: when the context closes the pipeline transformation is applied  to the graph
            with addons.pipelined_execution(steps) as pipeline:

                with pipeline.stage(0), popxl.ipu(0):
                    # fc1 fc2 forward
                    img_t = ops.host_load(img_stream)
                    x: popxl.Tensor
                    fc1_info = fc1.init_and_bind_fwd().call_with_info(img_t)
                    x = fc1_info.outputs[0]
                    fc2_info = fc2.init_and_bind_fwd().call_with_info(x)
                    x = fc2_info.outputs[0]
                    x = x.copy_to_ipu(1)

                with pipeline.stage(1), popxl.ipu(1):
                    # fc3 fc4 forward
                    labels = ops.host_load(label_stream, "labels")
                    fc3_info = fc3.init_and_bind_fwd().call_with_info(x)
                    x = fc3_info.outputs[0]
                    fc4_info = fc4.init_and_bind_fwd().call_with_info(x)
                    x = fc4_info.outputs[0]

                    # loss
                    loss, dx = addons.ops.cross_entropy_with_grad(x, labels)
                    ops.host_store(loss_stream, loss)

                    # grads
                    fc3_activations = fc3.bwd.grad_graph_info.inputs_dict(fc3_info)
                    fc4_activations = fc4.bwd.grad_graph_info.inputs_dict(fc4_info)
                    (dx,) = fc4.init_and_bind_bwd().call(
                        dx, args=fc4_activations
                    )  # provide fc4 activations
                    (dx,) = fc3.init_and_bind_bwd().call(
                        dx, args=fc3_activations
                    )  # provide fc3 activations

                    dx = dx.copy_to_ipu(0)

                with pipeline.stage(2), popxl.ipu(0):
                    # using stash_and_restore_activations ensure that when the pipeline graph is created
                    # activations are stashed during forward and retrieved from the FIFO stash during backward.
                    # Needs to be called inside a stage

                    # grads
                    fc2_activ = pipeline.stash_and_restore_activations(
                        fc2_info, fc2.bwd.grad_graph_info
                    )
                    (dx,) = fc2.init_and_bind_bwd().call(dx, args=fc2_activ)

                    fc1_activ = pipeline.stash_and_restore_activations(
                        fc1_info, fc1.bwd.grad_graph_info
                    )
                    fc1.init_and_bind_bwd().call(dx, args=fc1_activ)

            # -----/ Pipeline -----

            # ----- Replica Reduction -----
            if opts.train.data_parallel > 1:
                with popxl.ipu(0):
                    fc1.replicated_all_reduce()
                    fc2.replicated_all_reduce()

                with popxl.ipu(1):
                    fc3.replicated_all_reduce()
                    fc4.replicated_all_reduce()

            # ----- Optimizer -----
            with popxl.ipu(0):
                fc1.optimizer_step(opts.train.lr)
                fc2.optimizer_step(opts.train.lr)

            with popxl.ipu(1):
                fc3.optimizer_step(opts.train.lr)
                fc4.optimizer_step(opts.train.lr)

    # we have a for loop, the number of host loads is equal to gradient_accumulation
    ir.num_host_transfers = opts.train.gradient_accumulation

    # group all the variables to be able to copy weights to the test session.
    # Names need to match with the Net() names used for inference
    vars = NamedTensors()
    vars.insert("fc1", fc1.vars.fwd)
    vars.insert("fc2", fc2.vars.fwd)
    vars.insert("fc3", fc3.vars.fwd)
    vars.insert("fc4", fc4.vars.fwd)

    return popxl.Session(ir, "ipu_hw"), [img_stream, label_stream], vars, loss_stream

In [None]:
global_batch_size = (
    opts.train.micro_batch_size
    * opts.train.gradient_accumulation
    * opts.train.data_parallel
)
training_data, test_data = get_mnist_data(opts.test.micro_batch_size, global_batch_size)
train_session, train_input_streams, train_variables, loss_stream = train_program(opts)

In [None]:
with train_session:
    nr_batches = len(training_data)
    for epoch in range(1, opts.train.epochs + 1):
        nr_batches = len(training_data)
        for epoch in range(1, opts.train.epochs + 1):
            print("Epoch {0}/{1}".format(opts.train.epochs, opts.train.epochs))
            bar = tqdm(training_data, total=nr_batches)
            for data, labels in bar:
                # reshape data accounting for replication and num hosts transfers
                data = data.reshape(
                    train_session.ir.num_host_transfers,
                    train_session.ir.replication_factor,
                    opts.train.micro_batch_size,
                    28 * 28,
                ).squeeze()
                labels = labels.reshape(
                    train_session.ir.num_host_transfers,
                    train_session.ir.replication_factor,
                    opts.train.micro_batch_size,
                ).squeeze()

                inputs: Mapping[popxl.HostToDeviceStream, np.ndarray] = dict(
                    zip(train_input_streams, [data.squeeze().float(), labels.int()])
                )
                loss = train_session.run(inputs)
                losses_np = loss[
                    loss_stream
                ]  # shape(ir.num_host_transfers, ir.replication_factor, )
                avg_loss = np.mean(losses_np)
                bar.set_description("Loss:{:0.4f}".format(avg_loss))

In [None]:
# get weights data : dictionary { train_session variables : tensor data (numpy) }
train_vars_to_data = train_session.get_tensors_data(train_variables.tensors)

### Throughput and Testing

In [None]:
def test_program(test_batch_size, device):
    ir = popxl.Ir(replication=1)
    with ir.main_graph:
        # Inputs
        in_stream = popxl.h2d_stream((test_batch_size, 28, 28), popxl.float32, "image")
        in_t = ops.host_load(in_stream)

        # Create graphs
        facts, graph = Net().create_graph(in_t)

        # Initialise variables
        variables = facts.init()

        # Forward
        (outputs,) = graph.bind(variables).call(in_t)
        out_stream = popxl.d2h_stream(outputs.shape, outputs.dtype, "outputs")
        ops.host_store(out_stream, outputs)

    ir.num_host_transfers = 1
    return popxl.Session(ir, device), [in_stream], variables, out_stream

In [None]:
# Create test program and test session
test_session, test_input_streams, test_variables, out_stream = test_program(
    opts.test.micro_batch_size, opts.test.device
)

# dictionary { train_session variables : test_session variables }
train_vars_to_test_vars = train_variables.to_mapping(test_variables)

# Create a dictionary { test_session variables : tensor data (numpy) }
# We want to copy the values before evaluating throughput on synthetic data, otherwise weights are changed
test_vars_to_data = {
    test_var: train_vars_to_data[train_var].copy()
    for train_var, test_var in train_vars_to_test_vars.items()
}

# Copy trained weights to the program, with a single host to device transfer at the end
test_session.write_variables_data(test_vars_to_data)

# evaluate the ratio samples per step / time for train session
print("train session")
evaluate_throughput(train_session, global_batch_size)

In [None]:
nr_batches = len(test_data)
sum_acc = 0.0
with test_session:
    for data, labels in tqdm(test_data, total=nr_batches):
        inputs: Mapping[popxl.HostToDeviceStream, np.ndarray] = dict(
            zip(test_input_streams, [data.squeeze().float(), labels.int()])
        )
        output = test_session.run(inputs)
        sum_acc += accuracy(output[out_stream], labels)
print("Accuracy on test set: {:0.2f}%".format(sum_acc / len(test_data)))

In [None]:
samples_per_step = (
    opts.test.micro_batch_size
)  # no data parallelism or gradient accumulation for inference in this program
print("test session")
evaluate_throughput(test_session, samples_per_step)