8. Example using IPUPipelineEstimator
This example shows how to use the IPUPipelineEstimator
to train a simple CNN on the CIFAR-10 dataset. It can be
compared to the example using the IPUEstimator
(Section 7, Example using IPUEstimator) to see the
changes required to add pipelined execution to a model.
import argparse
import time
import tensorflow.compat.v1 as tf
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.layers import Conv2D, MaxPooling2D
from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten
from tensorflow.python import ipu
NUM_CLASSES = 10
def model_fn(mode, params):
"""A simple CNN based on https://keras.io/examples/cifar10_cnn/ split
into two pipeline stages placed on different IPUs."""
# Tell the dropout layers whether we are training to avoid a placeholder.
is_training = mode == tf.estimator.ModeKeys.TRAIN
def stage1(features, labels):
partial = Conv2D(16, (3, 3), padding="same")(features)
partial = Activation("relu")(partial)
partial = Conv2D(16, (3, 3))(partial)
partial = Activation("relu")(partial)
partial = MaxPooling2D(pool_size=(2, 2))(partial)
partial = Dropout(0.25)(partial, training=is_training)
return partial, labels
def stage2(partial, labels):
partial = Conv2D(32, (3, 3), padding="same")(partial)
partial = Activation("relu")(partial)
partial = Conv2D(32, (3, 3))(partial)
partial = Activation("relu")(partial)
partial = MaxPooling2D(pool_size=(2, 2))(partial)
partial = Dropout(0.25)(partial, training=is_training)
partial = Flatten()(partial)
partial = Dense(256)(partial)
partial = Activation("relu")(partial)
partial = Dropout(0.5)(partial, training=is_training)
logits = Dense(NUM_CLASSES)(partial)
loss = tf.losses.sparse_softmax_cross_entropy(labels=labels, logits=logits)
if mode == tf.estimator.ModeKeys.TRAIN:
# This return value is passed to the `optimizer_function`.
return loss
if mode == tf.estimator.ModeKeys.EVAL:
predictions = tf.argmax(input=logits, axis=1, output_type=tf.int32)
# These return values are passed to the `eval_metrics_fn`.
return loss, predictions, labels
raise NotImplementedError(mode)
def optimizer_function(loss):
optimizer = tf.train.GradientDescentOptimizer(params["learning_rate"])
return ipu.pipelining_ops.OptimizerFunctionOutput(optimizer, loss)
def eval_metrics_fn(loss, predictions, labels):
# This is executed on the host.
return {
"loss": loss,
"accuracy": tf.metrics.accuracy(predictions=predictions,
labels=labels),
}
return ipu.ipu_pipeline_estimator.IPUPipelineEstimatorSpec(
mode,
computational_stages=[stage1, stage2],
optimizer_function=optimizer_function,
eval_metrics_fn=eval_metrics_fn,
gradient_accumulation_count=params["gradient_accumulation_count"])
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--test-only",
action="store_true",
help="Skip training and test using latest checkpoint from model_dir.")
parser.add_argument("--batch-size",
type=int,
default=16,
help="The batch size.")
parser.add_argument(
"--gradient-accumulation-count",
type=int,
default=4,
help="The the number of batches that will be pipelined together.")
parser.add_argument(
"--iterations-per-loop",
type=int,
default=100,
help="The number of iterations (batches consumed) per loop on IPU.")
parser.add_argument("--log-interval",
type=int,
default=10,
help="Interval at which to log progress.")
parser.add_argument("--summary-interval",
type=int,
default=1,
help="Interval at which to write summaries.")
parser.add_argument("--training-steps",
type=int,
default=100000,
help="Total number of training steps.")
parser.add_argument(
"--learning-rate",
type=float,
default=0.01,
help="The learning rate used with stochastic gradient descent.")
parser.add_argument(
"--model-dir",
help="Directory where checkpoints and summaries are stored.")
return parser.parse_args()
def create_ipu_estimator(args):
num_ipus_in_pipeline = 2
ipu_options = ipu.config.IPUConfig()
ipu_options.auto_select_ipus = num_ipus_in_pipeline
ipu_run_config = ipu.ipu_run_config.IPURunConfig(
num_shards=num_ipus_in_pipeline,
iterations_per_loop=args.iterations_per_loop,
ipu_options=ipu_options,
)
config = ipu.ipu_run_config.RunConfig(
ipu_run_config=ipu_run_config,
log_step_count_steps=args.log_interval,
save_summary_steps=args.summary_interval,
model_dir=args.model_dir,
)
return ipu.ipu_pipeline_estimator.IPUPipelineEstimator(
config=config,
model_fn=model_fn,
params={
"learning_rate": args.learning_rate,
"gradient_accumulation_count": args.gradient_accumulation_count,
},
)
def train(ipu_estimator, args, x_train, y_train):
"""Train a model on IPU and save checkpoints to the given `args.model_dir`."""
def input_fn():
# If using Dataset.from_tensor_slices(), the data will be embedded
# into the graph as constants, which makes the training graph very
# large and impractical. So use Dataset.from_generator() here instead,
# but add prefetching and caching to improve performance.
def generator():
return zip(x_train, y_train)
types = (x_train.dtype, y_train.dtype)
shapes = (x_train.shape[1:], y_train.shape[1:])
dataset = tf.data.Dataset.from_generator(generator, types, shapes)
dataset = dataset.prefetch(len(x_train)).cache()
dataset = dataset.repeat()
dataset = dataset.shuffle(len(x_train))
dataset = dataset.batch(args.batch_size, drop_remainder=True)
return dataset
# Training progress is logged as INFO, so enable that logging level
tf.logging.set_verbosity(tf.logging.INFO)
t0 = time.time()
ipu_estimator.train(input_fn=input_fn, steps=args.training_steps)
t1 = time.time()
duration_seconds = t1 - t0
images_per_second = args.training_steps * args.batch_size / duration_seconds
print("Took {:.2f} minutes, i.e. {:.0f} images per second".format(
duration_seconds / 60, images_per_second))
def calc_batch_size(num_examples, batches_per_loop, batch_size):
"""Reduce the batch size if needed to cover all examples without a remainder."""
assert batch_size > 0
assert num_examples % batches_per_loop == 0
while num_examples % (batch_size * batches_per_loop) != 0:
batch_size -= 1
return batch_size
def test(ipu_estimator, args, x_test, y_test):
"""Test the model on IPU by loading weights from the final checkpoint in the
given `args.model_dir`."""
num_test_examples = len(x_test)
test_batch_size = calc_batch_size(num_test_examples,
args.iterations_per_loop, args.batch_size)
if test_batch_size != args.batch_size:
print("Test batch size changed to {}.".format(test_batch_size))
def input_fn():
dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
dataset = dataset.batch(test_batch_size, drop_remainder=True)
return dataset
num_steps = num_test_examples // test_batch_size
metrics = ipu_estimator.evaluate(input_fn=input_fn, steps=num_steps)
test_loss = metrics["loss"]
test_accuracy = metrics["accuracy"]
print("Test loss: {:g}".format(test_loss))
print("Test accuracy: {:.2f}%".format(100 * test_accuracy))
def main():
args = parse_args()
train_data, test_data = cifar10.load_data()
num_test_examples = len(test_data[0])
if num_test_examples % args.iterations_per_loop != 0:
raise ValueError(
("iterations_per_loop ({}) must evenly divide the number of test "
"examples ({})").format(args.iterations_per_loop, num_test_examples))
ipu_estimator = create_ipu_estimator(args)
def normalise(x, y):
return x.astype("float32") / 255.0, y.astype("int32")
if not args.test_only:
print("Training...")
x_train, y_train = normalise(*train_data)
train(ipu_estimator, args, x_train, y_train)
print("Testing...")
x_test, y_test = normalise(*test_data)
test(ipu_estimator, args, x_test, y_test)
if __name__ == "__main__":
main()