PyTorch使用DDP进行分布式训练

Author Avatar
patrickcty 11月 18, 2020

原理

DDP 是 DistributedDataParallel 的简写,用来进行分布式训练,可以是单主机多 GPU 也可以是多主机多 GPU,以下均从单主机多 GPU 来介绍。其原理是把模型复制到其他的 GPU 上,然后在训练的过程中汇总梯度,进行迭代,从感知上就像是增大了 N 倍的显存。

启动

具体的操作是产生多个进程,每个进程在一个 GPU 上训练,然后结果自动地在主进程中进行汇总。因此启动方式需要通过 torch.distributed.launch 来启动,如下:

python -m torch.distributed.launch --nproc_per_node=2 main.py

其中 nproc_per_node 是要使用的总的 GPU 数,如果一台主机上有多个 GPU,但是只想用其中的部分来进行训练,则可以用以下命令来启动:

CUDA_VISIBLE_DEVICES=1,3,5 python -m torch.distributed.launch --nproc_per_node=3 main.py

如果想调试分布式的代码,那么用以下方式来启动:

NCCL_DEBUG=INFO python -m torch.distributed.launch --nproc_per_node=2 main.py

准备阶段

为了让程序能很好地与 GPU 交互,torch.distributed.launch 在启动进程的时候会传入 local_rank 参数,用来标识 GPU,因此我们要在训练脚本中加入相应的参数。值得注意的是,local_rank 永远是从零开始。具体代码如下:

parser.add_argument('--local_rank', type=int, default=0)

初始化分布式环境

首先要初始化进程组,对于单主机来说就用下面简单的语句即可

import torch.distributed as dist

opt = parser.parse_args()  # 解析命令行参数
torch.cuda.set_device(opt.local_rank)
dist.init_process_group('nccl')
device = torch.device(f'cuda:{opt.local_rank}')

构建分布式模型

分布式环境下默认 BN 是在主 GPU 上进行计算,然后同步到其他 GPU,因此使用普通 BN 的时候不能充分发挥分布式训练中大 batch size 的优势。如果使用 Sync BN 则会解决这个问题,这个转换也可以使用一个函数来完成。

之后把模型转换为 DDP 模型即可,注意的是每一个进程都会初始化一个 DDP 模型,device_id 指的是当前进程要用到的 GPU 标号列表。因为我们通常一个进程一个 GPU,因此这里使用 [opt.local_rank] 即可,输出的话是会输出到单个设备上,因此就不用转换为 list。需要注意的是,Sync BN 不支持单进程多 GPU。

from torch.nn.parallel import DistributedDataParallel as DDP

model = ResNet()
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model).to(device)
model = DDP(model, device_ids=[opt.local_rank], output_device=opt.local_rank)

获得分布式 data loader

分布式训练的过程中我们要保证每个 GPU 取到的是不同的数据,因此不能直接使用普通的 Dataloader,要传入一个 sampler 参数,具体也很简单,如下所示:

import torch.utils.data as data

dataset = SomeDataset(image_root, gt_root, trainsize)
# 要从原来 dataset 得到一个分布式 sampler
sampler = data.distributed.DistributedSampler(dataset)
shuffle = False  # sampler 与 shuffle 不兼容

data_loader = data.DataLoader(dataset=dataset,
                              batch_size=batchsize,
                              shuffle=shuffle,
                              num_workers=num_workers,
                              pin_memory=pin_memory,
                              sampler=sampler)
return data_loader

dataloader 中要传入 sampler 作为参数,其他的注意事项在注释中

获得总的 loss

总的准备工作都完成了,接下来就是像平常一样训练了。但是每个进程中的 loss 都是通过自己的输入得到的,如果要得到总的 loss 则需要手动同步一下,具体操作如下:

def reduce_tensor(tensor: torch.Tensor) -> torch.Tensor:
    rt = tensor.clone()
    dist.all_reduce(rt, op=dist.ReduceOp.SUM)
    rt /= dist.get_world_size()  # 这是进程的数量
    return rt
    
    
reduced_loss1 = reduce_tensor(loss1.data).item()

all_reduce 会自动获取各个进程中同名 tensor,然后通过指定的 op 来进行计算,最后再同步到各个进程当中,也就是说这是一个原地的操作。为了避免可能产生的影响,这里不是直接对原来的 tensor 进行 reduce,而是先取了副本。

保存检查点

这里有一个大坑!虽然参数会在各个进程中汇总,但是实际保存的模型的 state_dict 和非分布式的还是有区别的,如果直接载入很可能会出错,解决方法下面会提到。

测试阶段

如果你没有掉进上一节的坑里面,那么测试阶段的代码可以和非分布式测试的完全相同。

打印 log

因为各个进程代码完全一样,因此打印结果也是打印 N 份,一个简单的解决方法就是判断当前的 local_rank,只有当其为特定值的时候才打印。

if opt.local_rank == 0:
    print('some log')

遇到的坑

DDP 训练的模型通过非 DDP 进行加载之后结果非常差。

原因

保存的模型 state_dict 前缀多了一个 module.,这样在 strict=False 下载入参数的时候就相当于载入了个寂寞,因此结果很差

解决方法

  1. 加载 DDP 模型的时候重新构造 state_dict,将二者名称统一[1]:
def dist_load(state_dict):

    new_state_dict = OrderedDict()
    for k, v in state_dict.items():
        name = k[7:]  # remove 'module.' of DataParallel/DistributedDataParallel
        new_state_dict[name] = v

    return new_state_dict
  1. (==推荐==)保存 DDP 模型的时候直接保存不包含 module. 前缀[2]
torch.save(model.module.state_dict(), path_to_file)

参考资料

如果你觉得还是讲的不清楚,那就看这篇文章吧,我就是跟着这篇文章来写的。更深入的理解分析就看这个系列

总的代码

import os
import torch
import numpy as np
import pandas as pd
import torch.distributed as dist
from datetime import datetime
from torchsummary import summary
from torch.nn import SyncBatchNorm
from torch.distributed import ReduceOp
from torch.nn.parallel import DistributedDataParallel

from data import get_loader
from model.CPD_ResNet_models import CPD_ResNet
from utils import clip_gradient, adjust_lr, save_single_plot, get_train_parser, init_workspace


parser = get_train_parser(loader_type='rgb')
parser.add_argument('--local_rank', type=int, default=0)
opt = parser.parse_args()
main_proc = True if opt.local_rank == 0 else False
basedir = init_workspace(opt, main_proc)

# init distribute environment
torch.cuda.set_device(opt.local_rank)
dist.init_process_group('nccl')
device = torch.device(f'cuda:{opt.local_rank}')

if main_proc:
    print('Learning Rate: {} Model type: {}'.format(opt.lr, opt.model))

# build models
model = CPD_ResNet()
model = SyncBatchNorm.convert_sync_batchnorm(model).to(device)
model = DistributedDataParallel(model, device_ids=[opt.local_rank], output_device=opt.local_rank)
if opt.print_model and main_proc:
    print(model)
    summary(model, (4, opt.trainsize, opt.trainsize))

optimizer = torch.optim.Adam(model.parameters(), opt.lr)

# build distribute data loader
train_loader = get_loader(opt.train_img_dir, opt.train_gt_dir, loader_type='rgb',
                          batchsize=opt.batchsize, trainsize=opt.trainsize, dist=True)
total_step = len(train_loader)

# build loss
CE = torch.nn.BCEWithLogitsLoss().to(device)

# save train results in df
df_step = pd.DataFrame(columns=('loss1', 'loss2'))
df_epoch = pd.DataFrame(columns=('loss1', 'loss2'))


def reduce_tensor(tensor: torch.Tensor) -> torch.Tensor:
    rt = tensor.clone()
    dist.all_reduce(rt, op=dist.ReduceOp.SUM)
    rt /= dist.get_world_size()
    return rt


def train(train_loader, model, optimizer, epoch):
    model.train()
    epoch_loss1 = []
    epoch_loss2 = []
    for i, pack in enumerate(train_loader, start=1):
        # if main_proc:
        #     print('running')
        optimizer.zero_grad()
        images, gts = pack

        images = images.to(device)
        gts = gts.to(device)

        # if main_proc:
        #     print('data done')

        atts, dets = model(images)

        loss1 = CE(atts, gts)
        loss2 = CE(dets, gts)

        # if main_proc:
        #     print('CE done')

        loss = loss1 + loss2
        loss.backward()

        # if main_proc:
        #     print('loss done')

        # save loss results
        reduced_loss1 = reduce_tensor(loss1.data).item()
        # if main_proc:
        #     print('reduce loss1 done')
        reduced_loss2 = reduce_tensor(loss2.data).item()
        # if main_proc:
        #     print('reduce loss2 done')
        if main_proc:
            epoch_loss1.append(reduced_loss1)
            epoch_loss2.append(reduced_loss2)
            df_step.loc[df_step.shape[0]] = (reduced_loss1, reduced_loss2)

        clip_gradient(optimizer, opt.clip)
        optimizer.step()

        if (i % 400 == 0 or i == total_step) and main_proc:
            print('{} Epoch [{:03d}/{:03d}], Step [{:04d}/{:04d}], Loss1: {:.4f} Loss2: {:0.4f}'.
                  format(datetime.now(), epoch, opt.epoch, i, total_step,
                         np.mean(epoch_loss1), np.mean(epoch_loss2)))

    if main_proc:
        df_epoch.loc[df_epoch.shape[0]] = (np.mean(epoch_loss1), np.mean(epoch_loss2))
        save_path = os.path.join(basedir, 'checkpoints')
        os.makedirs(save_path, exist_ok=True)

        if (epoch + 1) % 5 == 0:
            # 注意,这里保存的是 model.module.state_dict(),这样测试的时候就不用做额外的处理
            torch.save(model.module.state_dict(), os.path.join(save_path, 'CPD_{}.pth'.format(epoch + 1)))


print("GPU {}: Let's go!".format(opt.local_rank))
for epoch in range(1, opt.epoch + 1):
    adjust_lr(optimizer, opt.lr, epoch, opt.decay_rate, opt.decay_epoch)
    train(train_loader, model, optimizer, epoch)

if main_proc:
    df_epoch.to_csv(os.path.join(basedir, 'epoch_loss.csv'), index=False)
    df_step.to_csv(os.path.join(basedir, 'step_loss.csv'), index=False)
    save_single_plot(df_epoch, ('loss1', 'loss2'), 'epoch', 'loss', basedir, 'epoch')
    save_single_plot(df_step, ('loss1', 'loss2'), 'step', 'loss', basedir, 'step')