Table of Contents

ddp-all-reduce

Illustration of Distributed Data Parallel (DDP) all-reduce operation in PyTorch. By NVIDIA

Introduction

Training deep learning models on multiple GPUs can significantly speed up your training process, especially for large-scale datasets or complex architectures. PyTorch offers flexible and powerful support for multi-GPU training through both data parallelism and distributed data parallelism. This guide will walk you through setting up multi-GPU training in PyTorch and provide useful tips to enhance performance.

Data parallelism in PyTorch

The simplest way to train a model across multiple GPUs is to use torch.nn.DataParallel. This method splits the input across the GPUs, computes gradients in parallel, and averages them before updating the model parameters.

Example:

import torch
import torch.nn as nn
import torch.optim as optim

# Define the model
model = MyModel()

# Move the model to the available GPUs
if torch.cuda.device_count() > 1:
    model = nn.DataParallel(model)

model = model.cuda()

# Define optimizer and loss function
optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()

# Example training loop
for data, target in train_loader:
    data, target = data.cuda(), target.cuda()
    optimizer.zero_grad()
    output = model(data)
    loss = criterion(output, target)
    loss.backward()
    optimizer.step()

Tip: Use DataParallel only when necessary - Overhead: While DataParallel is easy to use, it has some overhead due to gathering and scattering data across GPUs. For better performance, especially when scaling to multiple nodes, use DistributedDataParallel (DDP) instead. - Gradients: Keep in mind that DataParallel averages the gradients across GPUs after backpropagation. Make sure to scale the learning rate accordingly (multiply by the number of GPUs) to maintain the same effective learning rate. The same applies for the batch size.

Distributed Data Parallelism (DDP)

For better performance, PyTorch provides torch.nn.parallel.DistributedDataParallel (DDP), which is more efficient for multi-GPU training, especially for multi-node setups. Indeed, when using DDP, each GPU has its own copy of the model and optimizer and communicates only when necessary, reducing communication overhead. After the forward pass, starting the backward pass, each GPU starts the all-reduce* operation to average the parameters gradients across all GPUs before even finishing the backward pass. - * All-reduce: An operation where each process sends its local data to all other processes, and each process receives the sum of all local data from all other processes (reduce operation).

Example:

import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.nn as nn

def train(rank, world_size):
    """Training function for each GPU process.

    Args:
        rank (int): The rank of the current process (one per GPU).
        world_size (int): Total number of processes.
    """
    # Initialize the process group for distributed training
    dist.init_process_group(backend="nccl", rank=rank, world_size=world_size)
    device = torch.device(f"cuda:{rank}")  # Set device to current GPU

    # Instantiate the model and move it to the current GPU
    model = MyModel().to(device)
    # Wrap the model with DDP
    ddp_model = DDP(model, device_ids=[rank])

    optimizer = torch.optim.Adam(ddp_model.parameters())
    criterion = nn.CrossEntropyLoss()

    # Example training loop
    for data, target in train_loader:
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = ddp_model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

    # Clean up
    dist.destroy_process_group()

if __name__ == "__main__":
    # Get the number of GPUs
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--local_rank", type=int)
    world_size = torch.cuda.device_count()
    local_rank = args.local_rank
    # Launch one process per GPU
    mp.spawn(train, args=(local_rank, world_size), nprocs=world_size)
  • Important: This code should be executed on each GPU separately.

Tip: nccl as Backend

  • NCCL Backend: For managing communication between GPUs during multi-GPU training, always prefer the nccl (NVIDIA Collective Communication Library) backend over gloo for GPU-based operations. NCCL is optimized for NVIDIA GPUs and ensures better performance and scalability.
Launching Multi-GPU Training
Using torch.distributed.launch

To launch multi-GPU training with DDP, you can use the following command (on each GPU separately):

python -m torch.distributed.launch --nproc_per_node=4 train.py --local_rank <GPU_ID> # GPU_ID: 0, 1, 2, 3
  • --nproc_per_node: Specifies the number of GPUs to use per node.

This way is the old way to launch multi-GPU training. The new way is to use torchrun:

Using torchrun
torchrun --nproc_per_node=4 train.py

Here you don’t need to specify the local_rank as it is automatically set by torchrun. And the mainscript is changed to:

if __name__ == "__main__":
    world_size = torch.cuda.device_count() # or os.environ['WORLD_SIZE']
    local_rank = int(os.environ['LOCAL_RANK']) # torchrun sets these environment variables
    train(local_rank, world_size) # no need for the spawn function
Using srun

You can also use srun to launch multi-GPU training on a Slurm cluster. Inside a bash script replace torchrun with srun:

srun --nproc_per_node=4 train.py

However, the environment variables set by srun have different names than those set by torchrun. You can use the following code to get the local_rank and world_size:

if __name__ == "__main__":
    world_size = torch.cuda.device_count() # or os.environ['SLURM_NTASKS']
    local_rank = int(os.environ['SLURM_PROCID'])
    train(local_rank, world_size)

Efficient Data Loading

Tip: Use DistributedSampler for Data Loaders When training with multiple GPUs using DDP, ensure that each GPU receives a unique mini-batch. This is achieved by using the torch.utils.data.DistributedSampler:

from torch.utils.data import DataLoader, DistributedSampler

# Create a DistributedSampler to ensure each GPU gets a different mini-batch
train_sampler = DistributedSampler(dataset)

# Create a DataLoader with the DistributedSampler
train_loader = DataLoader(
    dataset,
    batch_size=batch_size,
    shuffle=False,  # Don't shuffle for distributed training (handled by the sampler)
    sampler=train_sampler
)
  • Important: Call train_sampler.set_epoch(epoch) at the beginning of each epoch to ensure proper shuffling.

Tip: Pin Memory and Pre-fetch Data Maximizing data throughput is critical for fast training. Use pin_memory=True and increase the num_workers in your DataLoader:

train_loader = DataLoader(
    dataset,
    batch_size=batch_size,
    sampler=train_sampler,
    num_workers=4,
    pin_memory=True,
    prefetch_factor=2
)
  • pin_memory=True: Copies data to pinned memory, which is faster to transfer to the GPU.
  • num_workers: Increases the number of processes to load data in parallel, avoiding I/O bottlenecks.

Gradient Accumulation for Large Batch Sizes

If your batch size is too large to fit into the GPU memory, consider gradient accumulation. Instead of updating the model’s weights after each mini-batch, accumulate gradients over multiple batches before performing a weight update.

Example:

accumulation_steps = 4
optimizer.zero_grad(set_to_none=True)  # Reset gradients at the beginning

for batch_idx, (data, target) in enumerate(train_loader):
    data, target = data.cuda(), target.cuda()

    with autocast():  # Enable mixed precision if using AMP
        output = model(data)
        loss = criterion(output, target)
        loss = loss / accumulation_steps  # Normalize loss

    scaler.scale(loss).backward()  # Scale loss if using AMP

    if (batch_idx + 1) % accumulation_steps == 0:
        scaler.step(optimizer)  # Update weights if using AMP
        scaler.update()  # Update scaler if using AMP
        optimizer.zero_grad(set_to_none=True)  # Reset gradients after each update

Tip: Use AMP for Gradient Accumulation

  • AMP: Automatic Mixed Precision (AMP) can be combined with gradient accumulation to further optimize memory usage and training speed.
  • Normalization: Ensure loss normalization by dividing by accumulation_steps to maintain gradient scale.
  • Zero Gradients: Always reset gradients after the specified number of accumulation steps to prevent gradient accumulation across updates. You can also use optimizer.zero_grad(set_to_none=True) for less memory usage.

Tip: Monitor GPU Memory

  • Memory Efficiency: When working with large models or datasets, use torch.cuda.memory_summary() to monitor and optimize GPU memory usage. Tools like torch.cuda.amp for mixed precision training can also help reduce memory footprint.

Mixed Precision Training

Mixed precision training allows models to use lower precision (e.g., float16) for forward and backward passes, reducing memory consumption and speeding up training.

Example:

from torch.cuda.amp import autocast, GradScaler

scaler = GradScaler()  # Automatically scales gradients

for data, target in train_loader:
    data, target = data.cuda(), target.cuda()

    with autocast():  # Enable mixed precision
        output = model(data)
        loss = criterion(output, target)

    # Scale the loss and perform backpropagation
    scaler.scale(loss).backward()

    # Update the weights
    scaler.step(optimizer)
    scaler.update()
    optimizer.zero_grad()

Tip: Use AMP for Faster Training - AMP: Automatic Mixed Precision (AMP) can significantly speed up training by using lower precision (e.g., float16) for forward and backward passes, reducing memory consumption and improving throughput. - GradScaler: Automatically scales gradients to prevent underflow during backpropagation, especially useful when using gradient accumulation. - Zero Gradients: Always reset gradients after each update to avoid gradient accumulation across updates.

Profiling and Optimizing Performance

Tip: Use torch.profiler for Bottleneck Identification

PyTorch’s built-in profiler can help identify performance bottlenecks in your training pipeline:

import torch.profiler

with torch.profiler.profile(
    schedule=torch.profiler.schedule(wait=1, warmup=1, active=3, repeat=2),
    on_trace_ready=torch.profiler.tensorboard_trace_handler('./log'),
    record_shapes=True,
    profile_memory=True,
    with_stack=True
) as prof:
    for i, (data, target) in enumerate(train_loader):
        data, target = data.cuda(), target.cuda()
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        prof.step()
  • Use TensorBoard to visualize profiling results and optimize accordingly.

Tip: Use NVIDIA Nsight Systems for GPU Profiling For more detailed GPU profiling, consider using NVIDIA Nsight Systems to analyze GPU utilization, memory usage, and kernel performance. Refer to the NVIDIA Nsight Systems documentation and this to this post for more information.

The easy way: PyTorch Lightning

PyTorch Lightning is a lightweight PyTorch wrapper that simplifies the training process and provides built-in support for multi-GPU training, mixed precision, and distributed training. From my simple experience, I had less issues on multi-node training with PyTorch Lightning than with PyTorch. If you have such difficulties, you can give it a try.

Example:

import pytorch_lightning as pl

class LitModel(pl.LightningModule):
    def __init__(self):
        super().__init__()
        self.model = MyModel()
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = optim.Adam(self.model.parameters())

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        data, target = batch
        output = self(data)
        loss = self.criterion(output, target)
        return loss

    def configure_optimizers(self):
        return self.optimizer

if __name__ == "__main__":
    train_loader = DataLoader(...)
    # Initialize the Lightning Trainer
    trainer = pl.Trainer(devices=4, precision=16, strategy='ddp',num_nodes=2)
    model = LitModel()

    # Train the model
    trainer.fit(model, train_loader)

What is great about PyTorch Lightning is that the wrapper takes care of all the setup thus it is the same code for single GPU, multi-GPU, and multi-node training. The only thing you need to change is the Trainer configuration:

  • devices: Specify the number of GPUs to use. It can be set to 'auto' for automatic detection.
  • precision: Enable mixed precision training.
  • strategy: Use Distributed Data Parallel (DDP) for multi-GPU training (more strategies).
  • num_nodes: Specify the number of nodes for multi-node training.

Slurm Script example

If you are using Slurm to manage your training jobs, you can use the following script to launch multi-GPU training:

#!/bin/bash
#SBATCH --job-name=my_job
#SBATCH --nodes=1
#SBATCH --gres=gpu:4
#SBATCH --ntasks-per-node=4 # Important to set the number of tasks per node
#SBATCH --cpus-per-task=10 # Adjust as needed, usefull for data loading (num_workers)
#SBATCH --time=01:00:00
#SBATCH --output=%x-%j.out # Standard output
#SBATCH --error=%x-%j.err # Standard error output
#SBATCH --comment="#tag"

# Load the required modules
module load cuda/version

# Activate your python environment
source activate my_env

# Run the Lightning training script
srun python main.py --num_gpus $SLURM_GPUS_ON_NODE \
                    --num_nodes $SLURM_NNODES \
                    <other_args>  
  • Important: Your main script should be able to handle the --num_gpus and --num_nodes arguments (using argparse, refer to this post).

Keep it clean and efficient!

References

Like this post? Share on: TwitterFacebookEmail


Rachid El Montassir is a Ph.D. student working on Deep Learning for weather forecasting.

Keep Reading


Published

Category

Data Science

Tags

Stay in Touch