본문 바로가기
Pytorch

[Pytorch] Distributed package으로 Multi-Node Multi-GPU 학습 알아보기

by pulluper 2023. 4. 26.
반응형

안녕하세요 pulluper입니다!

 

이번 포스팅에서는 저번 포스팅에 이어서 pytorch 의 분산(distributed) pakage를 이용한

mutli-nodemulti-gpu 환경에서 학습을 하는 방법을 알아보겠습니다.

 

기본적인 용어등의 내용은 다음을 참고하면 좋을것 같습니다.

https://csm-kr.tistory.com/47

 

[pytorch] Distributed package 를 이용한 분산학습으로 Multi-GPU 효율적으로 사용하기

안녕하세요 pulluper 입니다 😁😁 이번 포스팅에서는 pytorch 의 분산(distributed) pakage를 이용해서 multi-gpu 를 모두 효율적으로 사용하는 방법을 알아보겠습니다. 이번 포스팅의 목차는 다음과 같습

csm-kr.tistory.com

 

이번에 사용할 노드는(ubuntu server 2개) 다음과 같습니다. 

node1에는 gpu가 4개 있고, node2에는 gpu가 2개 있는 2개의 머신입니다. 

각 gpu는 같은 기종입니다. 

 


1. ArgmentParser 

 

torch.distributed.launch를 통해 multi-node 학습을 진행할 경우, 자동으로 

os.environ['RANK']

os.environ['LOCAL_RANK']

os.environ['WORLD_SIZE'] 등이 생성됩니다. 

특히 torch.distributed.launch 로 실행시에

실행 스크립트에 --local_rank라는 argment는 ArgmentParser에 필수로 지정되어야 합니다. 

 

그렇지 않으면

다음과 같은 에러가 납니다. 

error: unrecognized arguments: --local_rank=0

 

참고로 이번포스팅에 사용되는 argparser는 다음과 같습니다. ("--local_rank" 필수)

def get_args_parser():
    parser = argparse.ArgumentParser(add_help=False)
    parser.add_argument('--lr', type=float, default=0.01)
    parser.add_argument('--epoch', type=int, default=90)
    parser.add_argument('--batch_size', type=int, default=1200)
    parser.add_argument('--global_rank', type=int, default=0)
    parser.add_argument('--vis_step', type=int, default=10)
    parser.add_argument('--num_workers', type=int, default=24)
    parser.add_argument("--local_rank", type=int,
                        help="Local rank. Necessary for using the torch.distributed.launch utility.")
    parser.add_argument('--world_size', type=int, default=0)
    parser.add_argument('--port', type=int, default=2022)
    parser.add_argument('--root', type=str, default='data')
    parser.add_argument('--start_epoch', type=int, default=0)
    parser.add_argument('--save_path', type=str, default='./save')
    parser.add_argument('--save_file_name', type=str, default='vgg_cifar')
    return parser

 

2. initialization

 

전의 포스팅과 달리 

torch.distributed.launch 를 사용하기 때문에 world size, global rank, local rank등을

os.environ 을 통해 다 가져올 수 있습니다. 

 

def init_for_distributed(opts):

    # 1. setting for distributed training
    opts.global_rank = int(os.environ['RANK'])
    opts.local_rank = int(os.environ['LOCAL_RANK'])
    opts.world_size = int(os.environ['WORLD_SIZE'])
    torch.cuda.set_device(opts.local_rank)
    if opts.global_rank is not None and opts.local_rank is not None:
        print("Use GPU: [{}/{}] for training".format(opts.global_rank, opts.local_rank))

    # 2. init_process_group
    dist.init_process_group(backend="nccl")
    # if put this function, the all processes block at all.
    torch.distributed.barrier()
    # convert print fn iif rank is zero
    return

 

3. dataset, model - 전 포스팅 참조

 

4. 전체 코드 (스크립트 이름은 mm.py 입니다)

 

import os
import time
import torch
import argparse
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.optim.lr_scheduler import StepLR

# for dataset
from torchvision.datasets.cifar import CIFAR10
import torchvision.transforms as tfs
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler

# for model
from torchvision.models import vgg11
from torch.nn.parallel import DistributedDataParallel as DDP

import numpy as np
import random


def set_random_seeds(random_seed=0):
    torch.manual_seed(random_seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(random_seed)
    random.seed(random_seed)


def get_args_parser():
    parser = argparse.ArgumentParser(add_help=False)
    parser.add_argument('--lr', type=float, default=0.01)
    parser.add_argument('--epoch', type=int, default=90)
    parser.add_argument('--batch_size', type=int, default=1200)
    parser.add_argument('--global_rank', type=int, default=0)
    parser.add_argument('--vis_step', type=int, default=10)
    parser.add_argument('--num_workers', type=int, default=24)
    parser.add_argument("--local_rank", type=int,
                        help="Local rank. Necessary for using the torch.distributed.launch utility.")
    parser.add_argument('--world_size', type=int, default=0)
    parser.add_argument('--port', type=int, default=2022)
    parser.add_argument('--root', type=str, default='data')
    parser.add_argument('--start_epoch', type=int, default=0)
    parser.add_argument('--save_path', type=str, default='./save')
    parser.add_argument('--save_file_name', type=str, default='vgg_cifar')
    return parser


def main(opts):
	# 1. set random seeds
    set_random_seeds(random_seed=0)
	
    # 2. initialization
    init_for_distributed(opts)

    # 3. visdom
    vis = None

    # 4. data set
    transform_train = tfs.Compose([
        tfs.Resize(256),
        tfs.RandomCrop(224),
        tfs.RandomHorizontalFlip(),
        tfs.ToTensor(),
        tfs.Normalize(mean=(0.4914, 0.4822, 0.4465),
                      std=(0.2023, 0.1994, 0.2010)),
    ])

    transform_test = tfs.Compose([
                                  tfs.Resize(256),
                                  tfs.CenterCrop(224),
                                  tfs.ToTensor(),
                                  tfs.Normalize(mean=(0.4914, 0.4822, 0.4465),
                                                std=(0.2023, 0.1994, 0.2010)),
                                        ])

    train_set = CIFAR10(root=opts.root,
                        train=True,
                        transform=transform_train,
                        download=True)

    test_set = CIFAR10(root=opts.root,
                       train=False,
                       transform=transform_test,
                       download=True)

    train_sampler = DistributedSampler(dataset=train_set, shuffle=True)
    test_sampler = DistributedSampler(dataset=test_set, shuffle=False)

    train_loader = DataLoader(dataset=train_set,
                              batch_size=int(opts.batch_size / opts.world_size),
                              shuffle=False,
                              num_workers=int(opts.num_workers / opts.world_size),
                              sampler=train_sampler,
                              pin_memory=True)

    test_loader = DataLoader(dataset=test_set,
                             batch_size=int(opts.batch_size / opts.world_size),
                             shuffle=False,
                             num_workers=int(opts.num_workers / opts.world_size),
                             sampler=test_sampler,
                             pin_memory=True)

    # 5. model
    model = vgg11(pretrained=False)
    model = model.cuda(opts.local_rank)
    model = DDP(module=model,
                device_ids=[opts.local_rank])

    # 6. criterion
    criterion = torch.nn.CrossEntropyLoss().to(opts.local_rank)

    # 7. optimizer
    optimizer = torch.optim.SGD(params=model.parameters(),
                                lr=0.01,
                                weight_decay=0.0005,
                                momentum=0.9)

    # 8. scheduler
    scheduler = StepLR(optimizer=optimizer,
                       step_size=30,
                       gamma=0.1)

    if opts.start_epoch != 0:

        checkpoint = torch.load(os.path.join(opts.save_path, opts.save_file_name) + '.{}.pth.tar'
                                .format(opts.start_epoch - 1),
                                map_location=torch.device('cuda:{}'.format(opts.local_rank)))
        model.load_state_dict(checkpoint['model_state_dict'])  # load model state dict
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])  # load optim state dict
        scheduler.load_state_dict(checkpoint['scheduler_state_dict'])  # load sched state dict
        if opts.global_rank == 0:
            print('\nLoaded checkpoint from epoch %d.\n' % (int(opts.start_epoch) - 1))

    for epoch in range(opts.start_epoch, opts.epoch):

        # 9. train
        tic = time.time()
        model.train()
        train_sampler.set_epoch(epoch)

        for i, (images, labels) in enumerate(train_loader):
            images = images.to(opts.local_rank)
            labels = labels.to(opts.local_rank)
            outputs = model(images)

            # ----------- update -----------
            optimizer.zero_grad()
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # get lr
            for param_group in optimizer.param_groups:
                lr = param_group['lr']

            # time
            toc = time.time()

            # visualization
            if (i % opts.vis_step == 0 or i == len(train_loader) - 1):
                print('GPU[{0}] Epoch [{1}/{2}], Iter [{3}/{4}], Loss: {5:.4f}, LR: {6:.5f}, Time: {7:.2f}'.format(opts.global_rank,
                                                                                                                   epoch,
                                                                                                                   opts.epoch,
                                                                                                                   i,
                                                                                                                   len(train_loader),
                                                                                                                   loss.item(),
                                                                                                                   lr,
                                                                                                                   toc - tic))
                if vis is not None and opts.local_rank == 0:
                    vis.line(X=torch.ones((1, 1)) * i + epoch * len(train_loader),
                             Y=torch.Tensor([loss]).unsqueeze(0),
                             update='append',
                             win='loss',
                             opts=dict(x_label='step',
                                       y_label='loss',
                                       title='loss',
                                       legend=['total_loss']))

        # save pth file
        if opts.local_rank == 0:
            if not os.path.exists(opts.save_path):
                os.mkdir(opts.save_path)

            checkpoint = {'epoch': epoch,
                          'model_state_dict': model.state_dict(),
                          'optimizer_state_dict': optimizer.state_dict(),
                          'scheduler_state_dict': scheduler.state_dict()}

            torch.save(checkpoint, os.path.join(opts.save_path, opts.save_file_name + '.{}.pth.tar'.format(epoch)))
            print("save pth.tar {} epoch!".format(epoch))

            # 10. test
            model.eval()

            val_avg_loss = 0
            correct_top1 = 0
            correct_top5 = 0
            total = 0

            with torch.no_grad():
                for i, (images, labels) in enumerate(test_loader):
                    images = images.to(opts.local_rank)  # [100, 3, 224, 224]
                    labels = labels.to(opts.local_rank)  # [100]
                    outputs = model(images)
                    loss = criterion(outputs, labels)
                    val_avg_loss += loss.item()
                    # ------------------------------------------------------------------------------
                    # rank 1
                    _, pred = torch.max(outputs, 1)
                    total += labels.size(0)
                    correct_top1 += (pred == labels).sum().item()

                    # ------------------------------------------------------------------------------
                    # rank 5
                    _, rank5 = outputs.topk(5, 1, True, True)
                    rank5 = rank5.t()
                    correct5 = rank5.eq(labels.view(1, -1).expand_as(rank5))

                    # ------------------------------------------------------------------------------
                    for k in range(5):  # 0, 1, 2, 3, 4, 5
                        correct_k = correct5[:k+1].reshape(-1).float().sum(0, keepdim=True)
                    correct_top5 += correct_k.item()

            accuracy_top1 = correct_top1 / total
            accuracy_top5 = correct_top5 / total

            val_avg_loss = val_avg_loss / len(test_loader)  # make mean loss
            if vis is not None:
                vis.line(X=torch.ones((1, 3)) * epoch,
                         Y=torch.Tensor([accuracy_top1, accuracy_top5, val_avg_loss]).unsqueeze(0),
                         update='append',
                         win='test_loss_acc',
                         opts=dict(x_label='epoch',
                                   y_label='test_loss and acc',
                                   title='test_loss and accuracy',
                                   legend=['accuracy_top1', 'accuracy_top5', 'avg_loss']))

            print("top-1 percentage :  {0:0.3f}%".format(correct_top1 / total * 100))
            print("top-5 percentage :  {0:0.3f}%".format(correct_top5 / total * 100))
            scheduler.step()

    return 0


def init_for_distributed(opts):

    # 1. setting for distributed training
    opts.global_rank = int(os.environ['RANK'])
    opts.local_rank = int(os.environ['LOCAL_RANK'])
    opts.world_size = int(os.environ['WORLD_SIZE'])
    torch.cuda.set_device(opts.local_rank)
    if opts.global_rank is not None and opts.local_rank is not None:
        print("Use GPU: [{}/{}] for training".format(opts.global_rank, opts.local_rank))

    # 2. init_process_group
    dist.init_process_group(backend="nccl")
    # if put this function, the all processes block at all.
    torch.distributed.barrier()

    return


if __name__ == '__main__':

    parser = argparse.ArgumentParser('vgg11 cifar training', parents=[get_args_parser()])
    opts = parser.parse_args()
    main(opts)

 


5. torch.distributed.launch 로 2개의 노드에서 학습하기

 

본 포스팅에서는 4개의 GPU가 있는 노드를 마스터 노드로 정하였습니다. 

 

5-1) 두개의 노드에 각각 mm.py의 스크립트를 다 넣어줍니다. 

 

5-2) master 주소 설정 : 먼저 TCP통신을 위해서 마스터노드(실행의 주체가 되는 노드)의 주소를 가져옵니다. 

 

서버에 가서 다음 명령어를 친 후에 

ifconfig

 

아래 사진의 두번째 줄인 inet 뒤에 나오는 주소를 기억해 둡니다. 

예를들어 123.456.789.123 이라 하겠습니다. 

 

5-2) master port 설정

 

pytorch tutorial 에서 설정한 것처럼 23456 으로 설정하겠습니다. 

 

5-3) 각 노드에서 python script 실행

 

이후 콘솔환경에서 먼저 마스터 노드에 다음과 같은 명령어로 실행을 해 줍니다. 

 

python -m torch.distributed.launch --nnodes=2 --nproc_per_node=4 --node_rank=0 --master_addr=123.456.789.123 --master_port=23456 mm.py

 

를 실행하면 다음과 같은 출력과 함께 첫번째 노드는 대기를 하게 됩니다. 

 

 

두번째 노드를 가서 다음의 명령어를 실행 해 줍니다.

(여기서 두번재 노드이므로 node_rank=1 이고, gpu가 2개 이므로 nproc_per_node=2 임에 주의 - 자신의 환경에 따라서 설정)

 

python -m torch.distributed.launch --nnodes=2 --nproc_per_node=2 --node_rank=1 --master_addr=123.456.789.123 --master_port=23456 mm.py

 

노드 2개가 다 통신이 됨을 확인하면 이후에 학습을 진행합니다. 

다시 첫번째 노드를 가 보면 학습이 진행 됨을 볼 수 있습니다. 

 

이렇게 2개의 노드의 GPU를 모두 효율적으로 사용할 수 있게 되었습니다. 

 

node1

 

node2

 


6. trouble shooting 

 

위의 방법을 수행하였는데, 학습이 안되는 경우가 있엇습니다.

이는 pytorch version 이 달라서 안 되는 문제였습니다. (pytorch1.9 vs pytorch1.10)

따라서 두 노드 모두 같은 version의 환경(python 3.8 + pytorch1.10 + cuda11.3/cuda11.4) 으로 만들어 수행했습니다. 


이 방법은 각 노드에 가서 스크립트를 각각 실행 해 주어야 하는데 번거로울 수 있습니다. 

이를 한번에 실행 시켜주는 스케쥴러인 slurm 혹은 submitit등을 다음에 포스팅 하겠습니다. 

 

pytorch 최신버전을 이용하시면 torchrun 을 이용하여도 multi-node multi-gpu 학습이 가능합니다. 

오늘의 포스팅은 여기까지입니다. 감사합니다. 


Reference 

https://www.youtube.com/watch?v=KaAJtI1T2x4&t=266s 

https://leimao.github.io/blog/PyTorch-Distributed-Training/

https://pytorch.org/docs/stable/distributed.html

 

Distributed communication package - torch.distributed — PyTorch 2.0 documentation

Shortcuts

pytorch.org

https://csm-kr.tistory.com/47

 

[pytorch] Distributed package 를 이용한 분산학습으로 Multi-GPU 효율적으로 사용하기

안녕하세요 pulluper 입니다 😁😁 이번 포스팅에서는 pytorch 의 분산(distributed) pakage를 이용해서 multi-gpu 를 모두 효율적으로 사용하는 방법을 알아보겠습니다. 이번 포스팅의 목차는 다음과 같습

csm-kr.tistory.com

 

반응형

댓글