안녕하세요 pulluper입니다!
이번 포스팅에서는 저번 포스팅에 이어서 pytorch 의 분산(distributed) pakage를 이용한
mutli-node의 multi-gpu 환경에서 학습을 하는 방법을 알아보겠습니다.
기본적인 용어등의 내용은 다음을 참고하면 좋을것 같습니다.
이번에 사용할 노드는(ubuntu server 2개) 다음과 같습니다.
node1에는 gpu가 4개 있고, node2에는 gpu가 2개 있는 2개의 머신입니다.
각 gpu는 같은 기종입니다.
1. ArgmentParser
torch.distributed.launch를 통해 multi-node 학습을 진행할 경우, 자동으로
os.environ['RANK']
os.environ['LOCAL_RANK']
os.environ['WORLD_SIZE'] 등이 생성됩니다.
특히 torch.distributed.launch 로 실행시에
실행 스크립트에 --local_rank라는 argment는 ArgmentParser에 필수로 지정되어야 합니다.
그렇지 않으면
다음과 같은 에러가 납니다.
error: unrecognized arguments: --local_rank=0
참고로 이번포스팅에 사용되는 argparser는 다음과 같습니다. ("--local_rank" 필수)
def get_args_parser():
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument('--lr', type=float, default=0.01)
parser.add_argument('--epoch', type=int, default=90)
parser.add_argument('--batch_size', type=int, default=1200)
parser.add_argument('--global_rank', type=int, default=0)
parser.add_argument('--vis_step', type=int, default=10)
parser.add_argument('--num_workers', type=int, default=24)
parser.add_argument("--local_rank", type=int,
help="Local rank. Necessary for using the torch.distributed.launch utility.")
parser.add_argument('--world_size', type=int, default=0)
parser.add_argument('--port', type=int, default=2022)
parser.add_argument('--root', type=str, default='data')
parser.add_argument('--start_epoch', type=int, default=0)
parser.add_argument('--save_path', type=str, default='./save')
parser.add_argument('--save_file_name', type=str, default='vgg_cifar')
return parser
2. initialization
전의 포스팅과 달리
torch.distributed.launch 를 사용하기 때문에 world size, global rank, local rank등을
os.environ 을 통해 다 가져올 수 있습니다.
def init_for_distributed(opts):
# 1. setting for distributed training
opts.global_rank = int(os.environ['RANK'])
opts.local_rank = int(os.environ['LOCAL_RANK'])
opts.world_size = int(os.environ['WORLD_SIZE'])
torch.cuda.set_device(opts.local_rank)
if opts.global_rank is not None and opts.local_rank is not None:
print("Use GPU: [{}/{}] for training".format(opts.global_rank, opts.local_rank))
# 2. init_process_group
dist.init_process_group(backend="nccl")
# if put this function, the all processes block at all.
torch.distributed.barrier()
# convert print fn iif rank is zero
return
3. dataset, model - 전 포스팅 참조
4. 전체 코드 (스크립트 이름은 mm.py 입니다)
import os
import time
import torch
import argparse
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.optim.lr_scheduler import StepLR
# for dataset
from torchvision.datasets.cifar import CIFAR10
import torchvision.transforms as tfs
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
# for model
from torchvision.models import vgg11
from torch.nn.parallel import DistributedDataParallel as DDP
import numpy as np
import random
def set_random_seeds(random_seed=0):
torch.manual_seed(random_seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(random_seed)
random.seed(random_seed)
def get_args_parser():
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument('--lr', type=float, default=0.01)
parser.add_argument('--epoch', type=int, default=90)
parser.add_argument('--batch_size', type=int, default=1200)
parser.add_argument('--global_rank', type=int, default=0)
parser.add_argument('--vis_step', type=int, default=10)
parser.add_argument('--num_workers', type=int, default=24)
parser.add_argument("--local_rank", type=int,
help="Local rank. Necessary for using the torch.distributed.launch utility.")
parser.add_argument('--world_size', type=int, default=0)
parser.add_argument('--port', type=int, default=2022)
parser.add_argument('--root', type=str, default='data')
parser.add_argument('--start_epoch', type=int, default=0)
parser.add_argument('--save_path', type=str, default='./save')
parser.add_argument('--save_file_name', type=str, default='vgg_cifar')
return parser
def main(opts):
# 1. set random seeds
set_random_seeds(random_seed=0)
# 2. initialization
init_for_distributed(opts)
# 3. visdom
vis = None
# 4. data set
transform_train = tfs.Compose([
tfs.Resize(256),
tfs.RandomCrop(224),
tfs.RandomHorizontalFlip(),
tfs.ToTensor(),
tfs.Normalize(mean=(0.4914, 0.4822, 0.4465),
std=(0.2023, 0.1994, 0.2010)),
])
transform_test = tfs.Compose([
tfs.Resize(256),
tfs.CenterCrop(224),
tfs.ToTensor(),
tfs.Normalize(mean=(0.4914, 0.4822, 0.4465),
std=(0.2023, 0.1994, 0.2010)),
])
train_set = CIFAR10(root=opts.root,
train=True,
transform=transform_train,
download=True)
test_set = CIFAR10(root=opts.root,
train=False,
transform=transform_test,
download=True)
train_sampler = DistributedSampler(dataset=train_set, shuffle=True)
test_sampler = DistributedSampler(dataset=test_set, shuffle=False)
train_loader = DataLoader(dataset=train_set,
batch_size=int(opts.batch_size / opts.world_size),
shuffle=False,
num_workers=int(opts.num_workers / opts.world_size),
sampler=train_sampler,
pin_memory=True)
test_loader = DataLoader(dataset=test_set,
batch_size=int(opts.batch_size / opts.world_size),
shuffle=False,
num_workers=int(opts.num_workers / opts.world_size),
sampler=test_sampler,
pin_memory=True)
# 5. model
model = vgg11(pretrained=False)
model = model.cuda(opts.local_rank)
model = DDP(module=model,
device_ids=[opts.local_rank])
# 6. criterion
criterion = torch.nn.CrossEntropyLoss().to(opts.local_rank)
# 7. optimizer
optimizer = torch.optim.SGD(params=model.parameters(),
lr=0.01,
weight_decay=0.0005,
momentum=0.9)
# 8. scheduler
scheduler = StepLR(optimizer=optimizer,
step_size=30,
gamma=0.1)
if opts.start_epoch != 0:
checkpoint = torch.load(os.path.join(opts.save_path, opts.save_file_name) + '.{}.pth.tar'
.format(opts.start_epoch - 1),
map_location=torch.device('cuda:{}'.format(opts.local_rank)))
model.load_state_dict(checkpoint['model_state_dict']) # load model state dict
optimizer.load_state_dict(checkpoint['optimizer_state_dict']) # load optim state dict
scheduler.load_state_dict(checkpoint['scheduler_state_dict']) # load sched state dict
if opts.global_rank == 0:
print('\nLoaded checkpoint from epoch %d.\n' % (int(opts.start_epoch) - 1))
for epoch in range(opts.start_epoch, opts.epoch):
# 9. train
tic = time.time()
model.train()
train_sampler.set_epoch(epoch)
for i, (images, labels) in enumerate(train_loader):
images = images.to(opts.local_rank)
labels = labels.to(opts.local_rank)
outputs = model(images)
# ----------- update -----------
optimizer.zero_grad()
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
# get lr
for param_group in optimizer.param_groups:
lr = param_group['lr']
# time
toc = time.time()
# visualization
if (i % opts.vis_step == 0 or i == len(train_loader) - 1):
print('GPU[{0}] Epoch [{1}/{2}], Iter [{3}/{4}], Loss: {5:.4f}, LR: {6:.5f}, Time: {7:.2f}'.format(opts.global_rank,
epoch,
opts.epoch,
i,
len(train_loader),
loss.item(),
lr,
toc - tic))
if vis is not None and opts.local_rank == 0:
vis.line(X=torch.ones((1, 1)) * i + epoch * len(train_loader),
Y=torch.Tensor([loss]).unsqueeze(0),
update='append',
win='loss',
opts=dict(x_label='step',
y_label='loss',
title='loss',
legend=['total_loss']))
# save pth file
if opts.local_rank == 0:
if not os.path.exists(opts.save_path):
os.mkdir(opts.save_path)
checkpoint = {'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'scheduler_state_dict': scheduler.state_dict()}
torch.save(checkpoint, os.path.join(opts.save_path, opts.save_file_name + '.{}.pth.tar'.format(epoch)))
print("save pth.tar {} epoch!".format(epoch))
# 10. test
model.eval()
val_avg_loss = 0
correct_top1 = 0
correct_top5 = 0
total = 0
with torch.no_grad():
for i, (images, labels) in enumerate(test_loader):
images = images.to(opts.local_rank) # [100, 3, 224, 224]
labels = labels.to(opts.local_rank) # [100]
outputs = model(images)
loss = criterion(outputs, labels)
val_avg_loss += loss.item()
# ------------------------------------------------------------------------------
# rank 1
_, pred = torch.max(outputs, 1)
total += labels.size(0)
correct_top1 += (pred == labels).sum().item()
# ------------------------------------------------------------------------------
# rank 5
_, rank5 = outputs.topk(5, 1, True, True)
rank5 = rank5.t()
correct5 = rank5.eq(labels.view(1, -1).expand_as(rank5))
# ------------------------------------------------------------------------------
for k in range(5): # 0, 1, 2, 3, 4, 5
correct_k = correct5[:k+1].reshape(-1).float().sum(0, keepdim=True)
correct_top5 += correct_k.item()
accuracy_top1 = correct_top1 / total
accuracy_top5 = correct_top5 / total
val_avg_loss = val_avg_loss / len(test_loader) # make mean loss
if vis is not None:
vis.line(X=torch.ones((1, 3)) * epoch,
Y=torch.Tensor([accuracy_top1, accuracy_top5, val_avg_loss]).unsqueeze(0),
update='append',
win='test_loss_acc',
opts=dict(x_label='epoch',
y_label='test_loss and acc',
title='test_loss and accuracy',
legend=['accuracy_top1', 'accuracy_top5', 'avg_loss']))
print("top-1 percentage : {0:0.3f}%".format(correct_top1 / total * 100))
print("top-5 percentage : {0:0.3f}%".format(correct_top5 / total * 100))
scheduler.step()
return 0
def init_for_distributed(opts):
# 1. setting for distributed training
opts.global_rank = int(os.environ['RANK'])
opts.local_rank = int(os.environ['LOCAL_RANK'])
opts.world_size = int(os.environ['WORLD_SIZE'])
torch.cuda.set_device(opts.local_rank)
if opts.global_rank is not None and opts.local_rank is not None:
print("Use GPU: [{}/{}] for training".format(opts.global_rank, opts.local_rank))
# 2. init_process_group
dist.init_process_group(backend="nccl")
# if put this function, the all processes block at all.
torch.distributed.barrier()
return
if __name__ == '__main__':
parser = argparse.ArgumentParser('vgg11 cifar training', parents=[get_args_parser()])
opts = parser.parse_args()
main(opts)
5. torch.distributed.launch 로 2개의 노드에서 학습하기
본 포스팅에서는 4개의 GPU가 있는 노드를 마스터 노드로 정하였습니다.
5-1) 두개의 노드에 각각 mm.py의 스크립트를 다 넣어줍니다.
5-2) master 주소 설정 : 먼저 TCP통신을 위해서 마스터노드(실행의 주체가 되는 노드)의 주소를 가져옵니다.
서버에 가서 다음 명령어를 친 후에
ifconfig
아래 사진의 두번째 줄인 inet 뒤에 나오는 주소를 기억해 둡니다.
예를들어 123.456.789.123 이라 하겠습니다.
5-2) master port 설정
pytorch tutorial 에서 설정한 것처럼 23456 으로 설정하겠습니다.
5-3) 각 노드에서 python script 실행
이후 콘솔환경에서 먼저 마스터 노드에 다음과 같은 명령어로 실행을 해 줍니다.
python -m torch.distributed.launch --nnodes=2 --nproc_per_node=4 --node_rank=0 --master_addr=123.456.789.123 --master_port=23456 mm.py
를 실행하면 다음과 같은 출력과 함께 첫번째 노드는 대기를 하게 됩니다.
두번째 노드를 가서 다음의 명령어를 실행 해 줍니다.
(여기서 두번재 노드이므로 node_rank=1 이고, gpu가 2개 이므로 nproc_per_node=2 임에 주의 - 자신의 환경에 따라서 설정)
python -m torch.distributed.launch --nnodes=2 --nproc_per_node=2 --node_rank=1 --master_addr=123.456.789.123 --master_port=23456 mm.py
노드 2개가 다 통신이 됨을 확인하면 이후에 학습을 진행합니다.
다시 첫번째 노드를 가 보면 학습이 진행 됨을 볼 수 있습니다.
이렇게 2개의 노드의 GPU를 모두 효율적으로 사용할 수 있게 되었습니다.
node1
node2
6. trouble shooting
위의 방법을 수행하였는데, 학습이 안되는 경우가 있엇습니다.
이는 pytorch version 이 달라서 안 되는 문제였습니다. (pytorch1.9 vs pytorch1.10)
따라서 두 노드 모두 같은 version의 환경(python 3.8 + pytorch1.10 + cuda11.3/cuda11.4) 으로 만들어 수행했습니다.
이 방법은 각 노드에 가서 스크립트를 각각 실행 해 주어야 하는데 번거로울 수 있습니다.
이를 한번에 실행 시켜주는 스케쥴러인 slurm 혹은 submitit등을 다음에 포스팅 하겠습니다.
pytorch 최신버전을 이용하시면 torchrun 을 이용하여도 multi-node multi-gpu 학습이 가능합니다.
오늘의 포스팅은 여기까지입니다. 감사합니다.
Reference
https://www.youtube.com/watch?v=KaAJtI1T2x4&t=266s
https://leimao.github.io/blog/PyTorch-Distributed-Training/
https://pytorch.org/docs/stable/distributed.html
'Pytorch' 카테고리의 다른 글
[Pytorch] 구글 드라이브에서 pretrained 모델(pth) torchvision 다운폴더로 받아서 실행하기. (0) | 2023.05.23 |
---|---|
[Pytorch] Remote server에서의 visdom설정 (0) | 2023.05.10 |
[Pytorch] torch.roll 설명 및 예제 (0) | 2023.04.10 |
[Pytorch] RTX3060 window에서 최신 anaconda, 그래픽 드라이버, cuda11.7, cudnn, pytorch2.0 설치 (0) | 2023.03.29 |
[Pytorch] torch.nn.Unfold (0) | 2023.03.16 |
댓글