9. Example using IPUPipelineEstimator
This example shows how to use the IPUPipelineEstimator
to train a simple CNN on the CIFAR-10 dataset. It can be
compared to the example using the IPUEstimator
(Section 8, Example using IPUEstimator) to see the
changes required to add pipelined execution to a model.
1import argparse
2import time
3
4import tensorflow.compat.v1 as tf
5
6from tensorflow.keras.datasets import cifar10
7from tensorflow.keras.layers import Conv2D, MaxPooling2D
8from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten
9from tensorflow.python import ipu
10
11NUM_CLASSES = 10
12
13
14def model_fn(mode, params):
15 """A simple CNN based on https://keras.io/examples/cifar10_cnn/ split
16 into two pipeline stages placed on different IPUs."""
17
18 # Tell the dropout layers whether we are training to avoid a placeholder.
19 is_training = mode == tf.estimator.ModeKeys.TRAIN
20
21 def stage1(features, labels):
22 partial = Conv2D(16, (3, 3), padding="same")(features)
23 partial = Activation("relu")(partial)
24 partial = Conv2D(16, (3, 3))(partial)
25 partial = Activation("relu")(partial)
26 partial = MaxPooling2D(pool_size=(2, 2))(partial)
27 partial = Dropout(0.25)(partial, training=is_training)
28
29 return partial, labels
30
31 def stage2(partial, labels):
32 partial = Conv2D(32, (3, 3), padding="same")(partial)
33 partial = Activation("relu")(partial)
34 partial = Conv2D(32, (3, 3))(partial)
35 partial = Activation("relu")(partial)
36 partial = MaxPooling2D(pool_size=(2, 2))(partial)
37 partial = Dropout(0.25)(partial, training=is_training)
38
39 partial = Flatten()(partial)
40 partial = Dense(256)(partial)
41 partial = Activation("relu")(partial)
42 partial = Dropout(0.5)(partial, training=is_training)
43 logits = Dense(NUM_CLASSES)(partial)
44
45 loss = tf.losses.sparse_softmax_cross_entropy(labels=labels, logits=logits)
46
47 if mode == tf.estimator.ModeKeys.TRAIN:
48 # This return value is passed to the `optimizer_function`.
49 return loss
50
51 if mode == tf.estimator.ModeKeys.EVAL:
52 predictions = tf.argmax(input=logits, axis=1, output_type=tf.int32)
53 # These return values are passed to the `eval_metrics_fn`.
54 return loss, predictions, labels
55
56 raise NotImplementedError(mode)
57
58 def optimizer_function(loss):
59 optimizer = tf.train.GradientDescentOptimizer(params["learning_rate"])
60 return ipu.pipelining_ops.OptimizerFunctionOutput(optimizer, loss)
61
62 def eval_metrics_fn(loss, predictions, labels):
63 # This is executed on the host.
64 return {
65 "loss": loss,
66 "accuracy": tf.metrics.accuracy(predictions=predictions,
67 labels=labels),
68 }
69
70 return ipu.ipu_pipeline_estimator.IPUPipelineEstimatorSpec(
71 mode,
72 computational_stages=[stage1, stage2],
73 optimizer_function=optimizer_function,
74 eval_metrics_fn=eval_metrics_fn,
75 gradient_accumulation_count=params["gradient_accumulation_count"])
76
77
78def parse_args():
79 parser = argparse.ArgumentParser()
80
81 parser.add_argument(
82 "--test-only",
83 action="store_true",
84 help="Skip training and test using latest checkpoint from model_dir.")
85
86 parser.add_argument("--batch-size",
87 type=int,
88 default=16,
89 help="The batch size.")
90
91 parser.add_argument(
92 "--gradient-accumulation-count",
93 type=int,
94 default=4,
95 help="The the number of batches that will be pipelined together.")
96
97 parser.add_argument(
98 "--iterations-per-loop",
99 type=int,
100 default=100,
101 help="The number of iterations (batches consumed) per loop on IPU.")
102
103 parser.add_argument("--log-interval",
104 type=int,
105 default=10,
106 help="Interval at which to log progress.")
107
108 parser.add_argument("--summary-interval",
109 type=int,
110 default=1,
111 help="Interval at which to write summaries.")
112
113 parser.add_argument("--training-steps",
114 type=int,
115 default=100000,
116 help="Total number of training steps.")
117
118 parser.add_argument(
119 "--learning-rate",
120 type=float,
121 default=0.01,
122 help="The learning rate used with stochastic gradient descent.")
123
124 parser.add_argument(
125 "--model-dir",
126 help="Directory where checkpoints and summaries are stored.")
127
128 return parser.parse_args()
129
130
131def create_ipu_estimator(args):
132 num_ipus_in_pipeline = 2
133
134 ipu_options = ipu.config.IPUConfig()
135 ipu_options.auto_select_ipus = num_ipus_in_pipeline
136
137 ipu_run_config = ipu.ipu_run_config.IPURunConfig(
138 num_shards=num_ipus_in_pipeline,
139 iterations_per_loop=args.iterations_per_loop,
140 ipu_options=ipu_options,
141 )
142
143 config = ipu.ipu_run_config.RunConfig(
144 ipu_run_config=ipu_run_config,
145 log_step_count_steps=args.log_interval,
146 save_summary_steps=args.summary_interval,
147 model_dir=args.model_dir,
148 )
149
150 return ipu.ipu_pipeline_estimator.IPUPipelineEstimator(
151 config=config,
152 model_fn=model_fn,
153 params={
154 "learning_rate": args.learning_rate,
155 "gradient_accumulation_count": args.gradient_accumulation_count,
156 },
157 )
158
159
160def train(ipu_estimator, args, x_train, y_train):
161 """Train a model on IPU and save checkpoints to the given `args.model_dir`."""
162 def input_fn():
163 # If using Dataset.from_tensor_slices(), the data will be embedded
164 # into the graph as constants, which makes the training graph very
165 # large and impractical. So use Dataset.from_generator() here instead,
166 # but add prefetching and caching to improve performance.
167
168 def generator():
169 return zip(x_train, y_train)
170
171 types = (x_train.dtype, y_train.dtype)
172 shapes = (x_train.shape[1:], y_train.shape[1:])
173
174 dataset = tf.data.Dataset.from_generator(generator, types, shapes)
175 dataset = dataset.prefetch(len(x_train)).cache()
176 dataset = dataset.repeat()
177 dataset = dataset.shuffle(len(x_train))
178 dataset = dataset.batch(args.batch_size, drop_remainder=True)
179
180 return dataset
181
182 # Training progress is logged as INFO, so enable that logging level
183 tf.logging.set_verbosity(tf.logging.INFO)
184
185 t0 = time.time()
186 ipu_estimator.train(input_fn=input_fn, steps=args.training_steps)
187 t1 = time.time()
188
189 duration_seconds = t1 - t0
190 images_per_second = args.training_steps * args.batch_size / duration_seconds
191 print("Took {:.2f} minutes, i.e. {:.0f} images per second".format(
192 duration_seconds / 60, images_per_second))
193
194
195def calc_batch_size(num_examples, batches_per_loop, batch_size):
196 """Reduce the batch size if needed to cover all examples without a remainder."""
197 assert batch_size > 0
198 assert num_examples % batches_per_loop == 0
199 while num_examples % (batch_size * batches_per_loop) != 0:
200 batch_size -= 1
201 return batch_size
202
203
204def test(ipu_estimator, args, x_test, y_test):
205 """Test the model on IPU by loading weights from the final checkpoint in the
206 given `args.model_dir`."""
207
208 num_test_examples = len(x_test)
209
210 test_batch_size = calc_batch_size(num_test_examples,
211 args.iterations_per_loop, args.batch_size)
212
213 if test_batch_size != args.batch_size:
214 print("Test batch size changed to {}.".format(test_batch_size))
215
216 def input_fn():
217 dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
218 dataset = dataset.batch(test_batch_size, drop_remainder=True)
219 return dataset
220
221 num_steps = num_test_examples // test_batch_size
222 metrics = ipu_estimator.evaluate(input_fn=input_fn, steps=num_steps)
223 test_loss = metrics["loss"]
224 test_accuracy = metrics["accuracy"]
225
226 print("Test loss: {:g}".format(test_loss))
227 print("Test accuracy: {:.2f}%".format(100 * test_accuracy))
228
229
230def main():
231 args = parse_args()
232 train_data, test_data = cifar10.load_data()
233
234 num_test_examples = len(test_data[0])
235 if num_test_examples % args.iterations_per_loop != 0:
236 raise ValueError(
237 ("iterations_per_loop ({}) must evenly divide the number of test "
238 "examples ({})").format(args.iterations_per_loop, num_test_examples))
239
240 ipu_estimator = create_ipu_estimator(args)
241
242 def normalise(x, y):
243 return x.astype("float32") / 255.0, y.astype("int32")
244
245 if not args.test_only:
246 print("Training...")
247 x_train, y_train = normalise(*train_data)
248 train(ipu_estimator, args, x_train, y_train)
249
250 print("Testing...")
251 x_test, y_test = normalise(*test_data)
252 test(ipu_estimator, args, x_test, y_test)
253
254
255if __name__ == "__main__":
256 main()
Download ipu_pipeline_estimator_example.py