Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
keras.datasets.mnist.load_data('MNIST-data-%d' % bps.rank())
# The shape of downloaded data is (-1, 28, 28), hence we need to reshape it
# into (-1, 784) to feed into our network. Also, need to normalize the
# features between 0 and 1.
x_train = np.reshape(x_train, (-1, 784)) / 255.0
x_test = np.reshape(x_test, (-1, 784)) / 255.0
# Build model...
with tf.name_scope('input'):
image = tf.placeholder(tf.float32, [None, 784], name='image')
label = tf.placeholder(tf.float32, [None], name='label')
predict, loss = conv_model(image, label, tf.estimator.ModeKeys.TRAIN)
# BytePS: adjust learning rate based on number of GPUs.
opt = tf.train.RMSPropOptimizer(0.001 * bps.size())
# BytePS: add BytePS Distributed Optimizer.
opt = bps.DistributedOptimizer(opt)
global_step = tf.train.get_or_create_global_step()
train_op = opt.minimize(loss, global_step=global_step)
hooks = [
# BytePS: BroadcastGlobalVariablesHook broadcasts initial variable states
# from rank 0 to all other processes. This is necessary to ensure consistent
# initialization of all workers when training is started with random weights
# or restored from a checkpoint.
bps.BroadcastGlobalVariablesHook(0),
# BytePS: adjust number of steps based on number of GPUs.
tf.train.StopAtStepHook(last_step=200000 // bps.size()),
# BytePS: add BytePS Distributed Optimizer.
opt = bps.DistributedOptimizer(opt)
global_step = tf.train.get_or_create_global_step()
train_op = opt.minimize(loss, global_step=global_step)
hooks = [
# BytePS: BroadcastGlobalVariablesHook broadcasts initial variable states
# from rank 0 to all other processes. This is necessary to ensure consistent
# initialization of all workers when training is started with random weights
# or restored from a checkpoint.
bps.BroadcastGlobalVariablesHook(0),
# BytePS: adjust number of steps based on number of GPUs.
tf.train.StopAtStepHook(last_step=200000 // bps.size()),
tf.train.LoggingTensorHook(tensors={'step': global_step, 'loss': loss},
every_n_iter=10),
]
# BytePS: pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
config.gpu_options.visible_device_list = str(bps.local_rank())
# BytePS: save checkpoints only on worker 0 to prevent other workers from
# corrupting them.
checkpoint_dir = './checkpoints' if bps.rank() == 0 else None
training_batch_generator = train_input_generator(x_train,
y_train, batch_size=100)
# The MonitoredTrainingSession takes care of session initialization,
# Benchmark
log('Running benchmark...')
img_secs = []
for x in range(args.num_iters):
time = timeit.timeit(benchmark_step, number=args.num_batches_per_iter)
img_sec = args.batch_size * args.num_batches_per_iter / time
log('Iter #%d: %.1f img/sec per %s' % (x, img_sec, device))
img_secs.append(img_sec)
# Results
img_sec_mean = np.mean(img_secs)
img_sec_conf = 1.96 * np.std(img_secs)
log('Img/sec per %s: %.1f +-%.1f' % (device, img_sec_mean, img_sec_conf))
log('Total img/sec on %d %s(s): %.1f +-%.1f' %
(bps.size(), device, bps.size() * img_sec_mean, bps.size() * img_sec_conf))
def multiplier(epoch):
# Adjust epoch to produce round numbers at the end of each epoch, so that TensorBoard
# learning rate graphs look better.
epoch += 1. / self.steps_per_epoch
return 1. / bps.size() * (epoch * (bps.size() - 1) / warmup_epochs + 1)
super(LearningRateWarmupCallbackImpl, self).__init__(
def on_train_begin(self, logs=None):
if bps.size() <= 1:
return
with tf.device(self.device):
bcast_op = bps.broadcast_global_variables(self.root_rank)
self.backend.get_session().run(bcast_op)
def get_gradients(self, loss, params):
"""
Compute gradients of all trainable variables.
See Optimizer.get_gradients() for more info.
In DistributedOptimizer, get_gradients() is overriden to also
push_pull the gradients before returning them.
"""
gradients = super(self.__class__, self).get_gradients(loss, params)
if bps.size() > 1:
averaged_gradients = []
with tf.name_scope(self._name + "_Push_Pull") as scope:
for grad in gradients:
if grad is not None:
if self._sparse_as_dense and \
isinstance(grad, tf.IndexedSlices):
grad = tf.convert_to_tensor(grad)
avg_grad = bps.push_pull(grad, scope,
device_dense=self._device_dense,
device_sparse=self._device_sparse,
compression=self._compression)
averaged_gradients.append(avg_grad)
else:
averaged_gradients.append(None)
return averaged_gradients
else: