9. Example using IPUPipelineEstimator
This example shows how to use the IPUPipelineEstimator
to train a simple CNN on the CIFAR-10 dataset. It can be
compared to the example using the IPUEstimator
(Example using IPUEstimator) to see the
changes required to add pipelined execution to a model.
import argparse
import time
import tensorflow.compat.v1 as tf
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.layers import Conv2D, MaxPooling2D
from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten
from tensorflow.python import ipu
NUM_CLASSES = 10
def model_fn(mode, params):
  """A simple CNN based on https://keras.io/examples/cifar10_cnn/ split
  into two pipeline stages placed on different IPUs."""
  # Tell the dropout layers whether we are training to avoid a placeholder.
  is_training = mode == tf.estimator.ModeKeys.TRAIN
  def stage1(features, labels):
    partial = Conv2D(32, (3, 3), padding="same")(features)
    partial = Activation("relu")(partial)
    partial = Conv2D(32, (3, 3))(partial)
    partial = Activation("relu")(partial)
    partial = MaxPooling2D(pool_size=(2, 2))(partial)
    partial = Dropout(0.25)(partial, training=is_training)
    return partial, labels
  def stage2(partial, labels):
    partial = Conv2D(64, (3, 3), padding="same")(partial)
    partial = Activation("relu")(partial)
    partial = Conv2D(64, (3, 3))(partial)
    partial = Activation("relu")(partial)
    partial = MaxPooling2D(pool_size=(2, 2))(partial)
    partial = Dropout(0.25)(partial, training=is_training)
    partial = Flatten()(partial)
    partial = Dense(512)(partial)
    partial = Activation("relu")(partial)
    partial = Dropout(0.5)(partial, training=is_training)
    logits = Dense(NUM_CLASSES)(partial)
    loss = tf.losses.sparse_softmax_cross_entropy(labels=labels, logits=logits)
    if mode == tf.estimator.ModeKeys.TRAIN:
      # This return value is passed to the `optimizer_function`.
      return loss
    if mode == tf.estimator.ModeKeys.EVAL:
      predictions = tf.argmax(input=logits, axis=1, output_type=tf.int32)
      # These return values are passed to the `eval_metrics_fn`.
      return loss, predictions, labels
    raise NotImplementedError(mode)
  def optimizer_function(loss):
    optimizer = tf.train.GradientDescentOptimizer(params["learning_rate"])
    return ipu.pipelining_ops.OptimizerFunctionOutput(optimizer, loss)
  def eval_metrics_fn(loss, predictions, labels):
    # This is executed on the host.
    return {
        "loss": loss,
        "accuracy": tf.metrics.accuracy(predictions=predictions,
                                        labels=labels),
    }
  return ipu.ipu_pipeline_estimator.IPUPipelineEstimatorSpec(
      mode,
      computational_stages=[stage1, stage2],
      optimizer_function=optimizer_function,
      eval_metrics_fn=eval_metrics_fn,
      gradient_accumulation_count=params["gradient_accumulation_count"],
      count_gradient_accumulation_as_iterations=True)
def parse_args():
  parser = argparse.ArgumentParser()
  parser.add_argument(
      "--test-only",
      action="store_true",
      help="Skip training and test using latest checkpoint from model_dir.")
  parser.add_argument("--batch-size",
                      type=int,
                      default=16,
                      help="The batch size.")
  parser.add_argument(
      "--gradient-accumulation-count",
      type=int,
      default=4,
      help="The the number of batches that will be pipelined together.")
  parser.add_argument(
      "--iterations-per-loop",
      type=int,
      default=100,
      help="The number of iterations (batches consumed) per loop on IPU.")
  parser.add_argument("--log-interval",
                      type=int,
                      default=10,
                      help="Interval at which to log progress.")
  parser.add_argument("--summary-interval",
                      type=int,
                      default=1,
                      help="Interval at which to write summaries.")
  parser.add_argument("--training-steps",
                      type=int,
                      default=100000,
                      help="Total number of training steps.")
  parser.add_argument(
      "--learning-rate",
      type=float,
      default=0.01,
      help="The learning rate used with stochastic gradient descent.")
  parser.add_argument(
      "--model-dir",
      help="Directory where checkpoints and summaries are stored.")
  return parser.parse_args()
def create_ipu_estimator(args):
  num_ipus_in_pipeline = 2
  ipu_options = ipu.utils.create_ipu_config()
  ipu.utils.auto_select_ipus(ipu_options, num_ipus_in_pipeline)
  ipu_run_config = ipu.ipu_run_config.IPURunConfig(
      num_shards=num_ipus_in_pipeline,
      iterations_per_loop=args.iterations_per_loop,
      ipu_options=ipu_options,
  )
  config = ipu.ipu_run_config.RunConfig(
      ipu_run_config=ipu_run_config,
      log_step_count_steps=args.log_interval,
      save_summary_steps=args.summary_interval,
      model_dir=args.model_dir,
  )
  return ipu.ipu_pipeline_estimator.IPUPipelineEstimator(
      config=config,
      model_fn=model_fn,
      params={
          "learning_rate": args.learning_rate,
          "gradient_accumulation_count": args.gradient_accumulation_count,
      },
  )
def train(ipu_estimator, args, x_train, y_train):
  """Train a model on IPU and save checkpoints to the given `args.model_dir`."""
  def input_fn():
    # If using Dataset.from_tensor_slices(), the data will be embedded
    # into the graph as constants, which makes the training graph very
    # large and impractical. So use Dataset.from_generator() here instead,
    # but add prefetching and caching to improve performance.
    def generator():
      return zip(x_train, y_train)
    types = (x_train.dtype, y_train.dtype)
    shapes = (x_train.shape[1:], y_train.shape[1:])
    dataset = tf.data.Dataset.from_generator(generator, types, shapes)
    dataset = dataset.prefetch(len(x_train)).cache()
    dataset = dataset.repeat()
    dataset = dataset.shuffle(len(x_train))
    dataset = dataset.batch(args.batch_size, drop_remainder=True)
    return dataset
  # Training progress is logged as INFO, so enable that logging level
  tf.logging.set_verbosity(tf.logging.INFO)
  t0 = time.time()
  ipu_estimator.train(input_fn=input_fn, steps=args.training_steps)
  t1 = time.time()
  duration_seconds = t1 - t0
  images_per_second = args.training_steps * args.batch_size / duration_seconds
  print("Took {:.2f} minutes, i.e. {:.0f} images per second".format(
      duration_seconds / 60, images_per_second))
def calc_batch_size(num_examples, batches_per_loop, batch_size):
  """Reduce the batch size if needed to cover all examples without a remainder."""
  assert batch_size > 0
  assert num_examples % batches_per_loop == 0
  while num_examples % (batch_size * batches_per_loop) != 0:
    batch_size -= 1
  return batch_size
def test(ipu_estimator, args, x_test, y_test):
  """Test the model on IPU by loading weights from the final checkpoint in the
  given `args.model_dir`."""
  num_test_examples = len(x_test)
  test_batch_size = calc_batch_size(num_test_examples,
                                    args.iterations_per_loop, args.batch_size)
  if test_batch_size != args.batch_size:
    print("Test batch size changed to {}.".format(test_batch_size))
  def input_fn():
    dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
    dataset = dataset.batch(test_batch_size, drop_remainder=True)
    return dataset
  num_steps = num_test_examples // test_batch_size
  metrics = ipu_estimator.evaluate(input_fn=input_fn, steps=num_steps)
  test_loss = metrics["loss"]
  test_accuracy = metrics["accuracy"]
  print("Test loss: {:g}".format(test_loss))
  print("Test accuracy: {:.2f}%".format(100 * test_accuracy))
def main():
  args = parse_args()
  train_data, test_data = cifar10.load_data()
  num_test_examples = len(test_data[0])
  if num_test_examples % args.iterations_per_loop != 0:
    raise ValueError(
        ("iterations_per_loop ({}) must evenly divide the number of test "
         "examples ({})").format(args.iterations_per_loop, num_test_examples))
  ipu_estimator = create_ipu_estimator(args)
  def normalise(x, y):
    return x.astype("float32") / 255.0, y.astype("int32")
  if not args.test_only:
    print("Training...")
    x_train, y_train = normalise(*train_data)
    train(ipu_estimator, args, x_train, y_train)
  print("Testing...")
  x_test, y_test = normalise(*test_data)
  test(ipu_estimator, args, x_test, y_test)
if __name__ == "__main__":
  main()