Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
optimizer.load_state_dict(checkpoint['optimizer'])
os.remove(fname)
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
model_param_value_after = get_model_param_values(model)
for before, after in zip(model_param_values,
model_param_value_after):
name, model_param_value = before
name_after, model_param_value_after = after
self.assertEqual(name, name_after)
self.assertEqual(type(model_param_value),
type(model_param_value_after))
self.assertTrue(
(model_param_value == model_param_value_after).all())
hvd.broadcast_optimizer_state(optimizer, root_rank=0)
expected_tensors = 4
if 'momentum' not in opt_params and opt_class == torch.optim.SGD:
# SGD only maintains state when momentum is specified, otherwise
# it does not populate the state dict, so it will contain no tensors.
expected_tensors = 0
self.assertEqual(len(optimizer.state_dict()['state'].values()), expected_tensors)
opt_param_values_after = get_optimizer_param_values(optimizer)
for before, after in zip(opt_param_values, opt_param_values_after):
name, opt_param_value = before
name_after, opt_param_value_after = after
self.assertEqual(name, name_after)
self.assertEqual(type(opt_param_value),
type(opt_param_value_after))
if torch.is_tensor(opt_param_value):
def cast_and_place(self, tensor, dtype):
if dtype.is_cuda:
return tensor.cuda(hvd.local_rank()).type(dtype)
return tensor.type(dtype)
import torch.nn as nn
import torch.nn.functional as f
import torch.optim as optim
import mlflow
import horovod.torch as hvd
from utils_nlp.model.gensen.multi_task_model import MultitaskModel
from utils_nlp.model.gensen.utils import (
BufferedDataIterator,
NLIIterator,
compute_validation_loss,
)
cudnn.benchmark = True
hvd.init()
if torch.cuda.is_available():
# Horovod: pin GPU to local rank.
torch.cuda.set_device(hvd.local_rank())
def metric_average(value, name):
"""
Sync the validation loss with nodes.
:param value:
:param name:
:return:
"""
tensor = torch.tensor(value)
avg_tensor = hvd.allreduce(tensor, name=name)
return avg_tensor.item()
parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
help='learning rate (default: 0.01)')
parser.add_argument('--momentum', type=float, default=0.5, metavar='M',
help='SGD momentum (default: 0.5)')
parser.add_argument('--no-cuda', action='store_true', default=False,
help='disables CUDA training')
parser.add_argument('--seed', type=int, default=42, metavar='S',
help='random seed (default: 42)')
parser.add_argument('--log-interval', type=int, default=10, metavar='N',
help='how many batches to wait before logging training status')
parser.add_argument('--fp16-allreduce', action='store_true', default=False,
help='use fp16 compression during allreduce')
args = parser.parse_args()
args.cuda = not args.no_cuda and torch.cuda.is_available()
hvd.init()
torch.manual_seed(args.seed)
if args.cuda:
# Horovod: pin GPU to local rank.
torch.cuda.set_device(hvd.local_rank())
torch.cuda.manual_seed(args.seed)
kwargs = {}
train_dataset = \
datasets.MNIST('data-%d' % hvd.rank(), train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
train_sampler = torch.utils.data.distributed.DistributedSampler(
def _reduce(self, data):
data = hvd.allreduce(data, op=hvd.Sum)
return data
def metric_average(value, name):
"""
Sync the validation loss with nodes.
:param value:
:param name:
:return:
"""
tensor = torch.tensor(value)
avg_tensor = hvd.allreduce(tensor, name=name)
return avg_tensor.item()
def get_global_rank() -> int:
# returns -1 if not distributed, else returns global rank
# it works before dist.init_process_group
if _DISTRIBUTED_FLAG and is_horovod_available():
import horovod.torch as hvd
return hvd.rank()
return int(get_environ('RANK', -1))
"""
if self.use_distributed:
lr = lr * hvd.size()
if warmup_proportion is None:
optimizer = BertAdam(self.optimizer_params, lr=lr)
else:
optimizer = BertAdam(
self.optimizer_params,
lr=lr,
t_total=num_train_optimization_steps,
warmup=warmup_proportion,
)
if self.use_distributed:
compression = hvd.Compression.fp16 if fp16_allreduce else hvd.Compression.none
optimizer = hvd.DistributedOptimizer(
optimizer, named_parameters=self.model.named_parameters(), compression=compression
)
return optimizer
def adjust_learning_rate(epoch, batch_idx, type="cosine"):
if epoch < args.warmup_epochs:
epoch += float(batch_idx + 1) / len(train_loader)
lr_adj = 1. / hvd.size() * (epoch * (hvd.size() - 1) / args.warmup_epochs + 1)
elif type == "linear":
if epoch < 30:
lr_adj = 1.
elif epoch < 60:
lr_adj = 1e-1
elif epoch < 90:
lr_adj = 1e-2
else:
lr_adj = 1e-3
elif type == "cosine":
# self.init_lr * 0.5 * (1 + math.cos(math.pi * T_cur / T_total))
run_epochs = epoch - args.warmup_epochs
total_epochs = args.epochs - args.warmup_epochs
T_cur = float(run_epochs * len(train_loader)) + batch_idx
T_total = float(total_epochs * len(train_loader))
train_loader = torch.utils.data.DataLoader(
train_dataset, batch_size=_BATCHSIZE, sampler=train_sampler, **kwargs
)
# Autotune
cudnn.benchmark = True
logger.info("Loading model")
# Load symbol
model = models.__dict__["resnet50"](pretrained=False)
model.cuda()
if _DISTRIBUTED:
# Horovod: broadcast parameters.
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
num_gpus = hvd.size() if _DISTRIBUTED else 1
# Horovod: scale learning rate by the number of GPUs.
optimizer = optim.SGD(model.parameters(), lr=_LR * num_gpus, momentum=0.9)
if _DISTRIBUTED:
# Horovod: wrap optimizer with DistributedOptimizer.
optimizer = hvd.DistributedOptimizer(
optimizer, named_parameters=model.named_parameters()
)
criterion = F.cross_entropy
if not _FAKE:
val_sampler = _get_sampler(validation_dataset)
val_loader = torch.utils.data.DataLoader(
validation_dataset, batch_size=_BATCHSIZE, sampler=val_sampler, **kwargs