Distributed Data Parallel in PyTorch

Notes for the Distributed Data Parallel in PyTorch Tutorial Series


Collective Communication

As opposed to point-to-point communcation, collectives allow for communication patterns across all processes in a group. A group is a subset of all our processes. By default, collectives are executed on all processes, also known as the world.


From Collective Communication in Distributed Systems with PyTorch

  • Peer to Peer: one process send data to another process
  • Reduce: The reduce operation in torch.distributed is used to combine tensors from multiple GPUs or processes into a single tensor on one of the GPUs or processes.
  • All Reduce: The all_reduce operation in torch.distributed is similar to the reduce operation, but instead of returning the result on a single GPU or process, it returns the result on all GPUs or processes.
  • Scatter: The scatter operation in torch.distributed is used to divide a tensor on one GPU or process, known as the root rank, and send a portion of it to each other GPU or process. The root rank is specified as an argument when calling the scatter function.
  • Gather: The gather operation in torch.distributed is used to collect tensors from multiple GPUs or processes and concatenate them into a single tensor on one of the GPUs or processes, known as the root rank. The root rank is specified as an argument when calling the gather function.
  • Broadcast: The broadcast operation in torch.distributed is used to send a tensor from one GPU or process, known as the root rank, to all other GPUs or processes. The root rank is specified as an argument when calling the broadcast function.
  • All Gather: The all_gather operation in torch.distributed is similar to the gather operation, but instead of returning the concatenated tensor on a single GPU or process, it returns the concatenated tensor on all GPUs or processes.


Ring All Reduce

The ring implementation of Allreduce has two phases.

  1. Share-reduce phase
  2. Share-only phase

The Ring All-Reduce is an algorithm that functions in a circular fashion. During the share-reduce phase, each worker processes a segment and passes it along in a ring structure. Specifically, the process begins with worker 0 at index 0, and each subsequent index is reduced by the next worker in the sequence. For example, the data at index 1 is reduced by worker 0, index 2 by worker 1, and so forth.


Data parallel

Gradients from all the replicas are aggregated using the bucketed Ring-AllReduce algorithm

  • Host: host is the main device in the communication network. It is often required as an argument when initializing the distributed environment.
  • Port: port here mainly refers to master port on the host for communication.
  • Rank: a unique identifier that is assigned to each process (usally range from \(0\) to \(worldSize - 1\) )
  • World Size: total number of processes in a group
  • Process Group: consists of all the processes that are running on our GPUs

From Distributed Training by Shenggui Li, Siqi Mai

To illustrate these concepts, let's assume we have 2 machines (also called nodes), and each machine has 4 GPUs. When we initialize distributed environment over these two machines, we essentially launch 8 processes (4 processes on each machine) and each process is bound to a GPU.

  • Before initializing the distributed environment, we need to specify the host (master address) and port (master port). In this example, we can let host be node 0 and port be a number such as 29500. All the 8 processes will then look for the address and port and connect to one another. The default process group will then be created. The default process group has a world size of 8 and details are as follows: \[ \begin{array}{|c|c|c|c|} \hline \text { process ID } & \text { rank } & \text { Node index } & \text { GPU index } \\ \hline 0 & 0 & 0 & 0 \\ \hline 1 & 1 & 0 & 1 \\ \hline 2 & 2 & 0 & 2 \\ \hline 3 & 3 & 0 & 3 \\ \hline 4 & 4 & 1 & 0 \\ \hline 5 & 5 & 1 & 1 \\ \hline 6 & 6 & 1 & 2 \\ \hline 7 & 7 & 1 & 3 \\ \hline \end{array} \]

  • We can also create a new process group. This new process group can contain any subset of the processes. For example, we can create one containing only even-number processes, and the details of this new group will be: \[ \begin{array}{|c|c|c|c|} \hline \text { process ID } & \text { rank } & \text { Node index } & \text { GPU index } \\ \hline 0 & 0 & 0 & 0 \\ \hline 2 & 1 & 0 & 2 \\ \hline 4 & 2 & 1 & 0 \\ \hline 6 & 3 & 1 & 2 \\ \hline \end{array} \] Please note that rank is relative to the process group and one process can have a different rank in different process groups. The max rank is always world size of the process group - 1.



Multi-GPU training with DDP (code walkthrough)


Single GPU

datautils.py

1
2
3
4
5
6
7
8
9
10
11
12
13
import torch
from torch.utils.data import Dataset

class MyTrainDataset(Dataset):
def __init__(self, size):
self.size = size
self.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)]

def __len__(self):
return self.size

def __getitem__(self, index):
return self.data[index]

single_gpu.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from datautils import MyTrainDataset


class Trainer:
def __init__(
self,
model: torch.nn.Module,
train_data: DataLoader,
optimizer: torch.optim.Optimizer,
gpu_id: int, save_every: int,
) -> None:
self.gpu_id = gpu_id
self.model = model.to(gpu_id)
self.train_data = train_data
self.optimizer = optimizer
self.save_every = save_every

def _run_batch(self, source, targets):
self.optimizer.zero_grad()
output = self.model(source)
loss = F.cross_entropy(output, targets)
loss.backward()
self.optimizer.step()

def _run_epoch(self, epoch):
b_sz = len(next(iter(self.train_data))[0])
print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
for source, targets in self.train_data:
source = source.to(self.gpu_id)
targets = targets.to(self.gpu_id)
self._run_batch(source, targets)

def _save_checkpoint(self, epoch):
ckp = self.model.state_dict()
PATH = "checkpoint.pt"
torch.save(ckp, PATH)
print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")

def train(self, max_epochs: int):
for epoch in range(max_epochs):
self._run_epoch(epoch)
if epoch % self.save_every == 0:
self._save_checkpoint(epoch)


def load_train_objs():
train_set = MyTrainDataset(2048) # load your dataset
model = torch.nn.Linear(20, 1) # load your model
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
return train_set, model, optimizer


def prepare_dataloader(dataset: Dataset, batch_size: int):
return DataLoader(
dataset,
batch_size=batch_size,
pin_memory=True,
shuffle=True
)


def main(device, total_epochs, save_every, batch_size):
dataset, model, optimizer = load_train_objs()
train_data = prepare_dataloader(dataset, batch_size)
trainer = Trainer(model, train_data, optimizer, device, save_every)
trainer.train(total_epochs)


if __name__ == "__main__":
import argparse
parser = argparse.Ar gumentParser(description='simple distributed training job')
parser.add_argument('total_epochs', type=int, help='Total epochs to train the model')
parser.add_argument('save_every', type=int, help='How often to save a snapshot')
parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')
args = parser.parse_args()

device = 0 # shorthand for cuda:0
main(device, args.total_epochs, args.save_every, args.batch_size)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
python single_gpu.py 50 10
[GPU0] Epoch 0 | Batchsize: 32 | Steps: 64
Epoch 0 | Training checkpoint saved at checkpoint.pt
[GPU0] Epoch 1 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 2 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 3 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 4 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 5 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 6 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 7 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 8 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 9 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 10 | Batchsize: 32 | Steps: 64
Epoch 10 | Training checkpoint saved at checkpoint.pt
[GPU0] Epoch 11 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 12 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 13 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 14 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 15 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 16 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 17 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 18 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 19 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 20 | Batchsize: 32 | Steps: 64
Epoch 20 | Training checkpoint saved at checkpoint.pt
[GPU0] Epoch 21 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 22 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 23 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 24 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 25 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 26 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 27 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 28 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 29 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 30 | Batchsize: 32 | Steps: 64
Epoch 30 | Training checkpoint saved at checkpoint.pt
[GPU0] Epoch 31 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 32 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 33 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 34 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 35 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 36 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 37 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 38 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 39 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 40 | Batchsize: 32 | Steps: 64
Epoch 40 | Training checkpoint saved at checkpoint.pt
[GPU0] Epoch 41 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 42 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 43 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 44 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 45 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 46 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 47 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 48 | Batchsize: 32 | Steps: 64
[GPU0] Epoch 49 | Batchsize: 32 | Steps: 64



Multi GPU (mp.spawn)

  • os.environment:
    • MASTER_ADDR
    • MASTER_PORT
  • init_process_group
  • DDP(model, device)
  • Sampler:
    • dataloader shufle=False DistributedSampler(Dataset)
    • dataloader.sampler.set_epoch
  • save only gpu_id=0
  • destroy_process_group()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
python multigpu.py 50 10
[GPU0] Epoch 0 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 0 | Batchsize: 32 | Steps: 32
Epoch 0 | Training checkpoint saved at checkpoint.pt
[GPU1] Epoch 1 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 1 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 2 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 2 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 3 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 3 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 4 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 4 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 5 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 5 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 6 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 6 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 7 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 7 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 8 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 8 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 9 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 9 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 10 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 10 | Batchsize: 32 | Steps: 32
Epoch 10 | Training checkpoint saved at checkpoint.pt
[GPU0] Epoch 11 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 11 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 12 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 12 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 13 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 13 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 14 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 14 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 15 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 15 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 16 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 16 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 17 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 17 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 18 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 18 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 19 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 19 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 20 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 20 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 21 | Batchsize: 32 | Steps: 32
Epoch 20 | Training checkpoint saved at checkpoint.pt
[GPU0] Epoch 21 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 22 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 22 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 23 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 23 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 24 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 24 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 25 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 25 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 26 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 26 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 27 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 27 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 28 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 28 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 29 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 29 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 30 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 30 | Batchsize: 32 | Steps: 32
Epoch 30 | Training checkpoint saved at checkpoint.pt
[GPU0] Epoch 31 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 31 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 32 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 32 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 33 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 33 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 34 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 34 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 35 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 35 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 36 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 36 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 37 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 37 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 38 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 38 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 39 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 39 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 40 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 40 | Batchsize: 32 | Steps: 32
Epoch 40 | Training checkpoint saved at checkpoint.pt
[GPU0] Epoch 41 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 41 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 42 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 42 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 43 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 43 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 44 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 44 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 45 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 45 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 46 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 46 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 47 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 47 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 48 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 48 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 49 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 49 | Batchsize: 32 | Steps: 32



Fault-Tolerant Training (torchrun)

When scaling your job to multible devices, you of course, gain more performance, but you also increase the susceptibility of failure. A single job failure can throw your training job out of sync. Pytorch provides fault tolerance with torch run. And the idea is simple, your training script takes snapshots of your traning job at regular intervals. Then if a failure occurs, your job dosen't exit, torch run gracefully restarts all the processes, which load the latest snapshot, and you continue training from there.


torchrun handles the minutiae of distributed training so that you don't need to. For instance,

  • You don't need to set environment variables or explicitly pass therank and world_size; torchrun assigns this along with several other environment variables.
  • No need to call mp. spawn in your script; you only need a generic main() entry point, and launch the script with torchrun. This way the same script can be run in non-distributed as well as single-node and multinode setups.
  • Gracefully restarting training from the last saved training snapshot.

Belows are all: multigpu.py <-> multigpu_torchrun.py

class Trainer

截屏2024-06-02 17.16.37
截屏2024-06-02 17.17.33

main()

gpu: using all gpus, you can replace it with en explicit number

--standalone: singe node

1
torchrun --standalone --nproc_per_node=gpu multigpu_torchrun.py 50 10
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
W0602 17:29:24.735000 139896606601856 torch/distributed/run.py:757] 
W0602 17:29:24.735000 139896606601856 torch/distributed/run.py:757] *****************************************
W0602 17:29:24.735000 139896606601856 torch/distributed/run.py:757] Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed.
W0602 17:29:24.735000 139896606601856 torch/distributed/run.py:757] *****************************************
[GPU1] Epoch 0 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 0 | Batchsize: 32 | Steps: 32
Epoch 0 | Training snapshot saved at snapshot.pt
[GPU1] Epoch 1 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 1 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 2 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 2 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 3 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 3 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 4 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 4 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 5 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 5 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 6 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 6 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 7 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 7 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 8 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 8 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 9 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 9 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 10 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 10 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 11 | Batchsize: 32 | Steps: 32
Epoch 10 | Training snapshot saved at snapshot.pt
[GPU0] Epoch 11 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 12 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 12 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 13 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 13 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 14 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 14 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 15 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 15 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 16 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 16 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 17 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 17 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 18 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 18 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 19 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 19 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 20 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 20 | Batchsize: 32 | Steps: 32
Epoch 20 | Training snapshot saved at snapshot.pt
[GPU0] Epoch 21 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 21 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 22 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 22 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 23 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 23 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 24 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 24 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 25 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 25 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 26 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 26 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 27 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 27 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 28 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 28 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 29 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 29 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 30 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 30 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 31 | Batchsize: 32 | Steps: 32
Epoch 30 | Training snapshot saved at snapshot.pt
[GPU0] Epoch 31 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 32 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 32 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 33 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 33 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 34 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 34 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 35 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 35 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 36 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 36 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 37 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 37 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 38 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 38 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 39 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 39 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 40 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 40 | Batchsize: 32 | Steps: 32
Epoch 40 | Training snapshot saved at snapshot.pt
[GPU0] Epoch 41 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 41 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 42 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 42 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 43 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 43 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 44 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 44 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 45 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 45 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 46 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 46 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 47 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 47 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 48 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 48 | Batchsize: 32 | Steps: 32
[GPU0] Epoch 49 | Batchsize: 32 | Steps: 32
[GPU1] Epoch 49 | Batchsize: 32 | Steps: 32



Multi-node

Our training code really remains the same from torchrun. However, i'm gonna make a couple of tiny changes to my code for clarity.

Each GPU will have a local rank starting from 0

<img src="https://minio.yixingfu.net/blog/2024-06-02/截屏2024-06-02 22.00.40.png" width="50%">

But the global rank is unique

<img src="https://minio.yixingfu.net/blog/2024-06-02/截屏2024-06-02 22.04.16.png" width="50%">

For code, only add a self.global_rank for Trainer and change all the self.gpu_id to self.global_rank, only save checkpoint if local_rank=0 not changed.


Method1: Running torchnode on each machine

For machine 0:

1
2
3
4
5
6
7
8
9
# Use the torchrun command to launch a distributed training task with PyTorch
torchrun \
--nproc_per_node=4 \ # Number of processes to run on each node, here it's 4 processes per node
--nnodes=2 \ # Total number of nodes involved in the computation, in this case, 2 nodes
--node_rank=0 \ # Rank (or index) of this node, this code should be run on node with rank 0
--rdzv_id=456 \ # Rendezvous ID used for initialization and discovery among nodes
--rdzv_backend=c10d \ # Backend for inter-process communication, using c10d component of PyTorch
--rdzv_endpoint=172.31.43.139:29603 \ # Endpoint for the rendezvous, a central point for nodes to connect for initialization
multinode_torchrun.py 50 10

For machine 1:

1
2
3
4
5
6
7
8
9
# Use the torchrun command to launch a distributed training task with PyTorch
torchrun \
--nproc_per_node=2 \ # Number of processes to run on each node, here it's 2 processes per node, since it only have 2 GPUs
--nnodes=2 \ # Total number of nodes involved in the computation, in this case, 2 nodes
--node_rank=1 \ # Rank (or index) of this node, this code should be run on node with rank 1
--rdzv_id=456 \ # Rendezvous ID used for initialization and discovery among nodes
--rdzv_backend=c10d \ # Backend for inter-process communication, using c10d component of PyTorch
--rdzv_endpoint=172.31.43.139:29603 \ # Endpoint for the rendezvous, a central point for nodes to connect for initialization
multinode_torchrun.py 50 10
  • Common Troubleshooting:
    • check your nodes can communicate with each other
    • set a debug flag for verbose logs, export NCCL_DEBUG=INFO
    • explicitly pass network interface name, export NCCL_SOCKET_IFNAME=eth0


Method2: Running torchnode on slurm

Using a workload manager (Since i might not use it for quiet a time, skip this part)



Training a GPT-like model with DDP

in mingpt folder, run

1
torchrun --standalone --nproc_per_node=gpu main.py

I highly recommend the GitHub page for this section. It provides a detailed implementation of a handwritten GPT model using Distributed Data Parallel (DDP) training. This resource is excellent for understanding DDP, the model structure, and the pretraining process of GPT models.

Also, i add the hand wirtten CausalSelfAttention class from minGPT to the code to model.py for a deeper understanding of multi-head self attention

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
"""
Full definition of a GPT Language Model, all of it in this single file.
Adapted from https://github.com/karpathy/minGPT/blob/master/mingpt/model.py
"""

from dataclasses import dataclass
import math

import torch
import torch.nn as nn
from torch.nn import functional as F


@dataclass
class GPTConfig:
model_type: str = 'gpt2'
# model configurations
n_layer: int = None # 8
n_head: int = None # 8
n_embd: int = None # 512
# openai's values for gpt2
vocab_size: int = 50257
block_size: int = 1024 # maximum sequence length that the model can handle
# dropout hyperparameters
embd_pdrop: float = 0.1
resid_pdrop: float = 0.1
attn_pdrop: float = 0.1

@dataclass
class OptimizerConfig:
learning_rate: float = 3e-4
weight_decay: float = 0.1

class CausalSelfAttention(nn.Module):
"""
From original minGPT github page:
A vanilla multi-head masked self-attention layer with a projection at the end.
It is possible to use torch.nn.MultiheadAttention here but I am including an
explicit implementation here to show that there is nothing too scary here.
"""

def __init__(self, config):
super().__init__()
assert config.n_embd % config.n_head == 0
# key, query, value projections for all heads, but in a batch
self.c_attn = nn.Linear(config.n_embd, 3 * config.n_embd)
# output projection
self.c_proj = nn.Linear(config.n_embd, config.n_embd)
# regularization
self.attn_dropout = nn.Dropout(config.attn_pdrop)
self.resid_dropout = nn.Dropout(config.resid_pdrop)
# causal mask to ensure that attention is only applied to the left in the input sequence
self.register_buffer("bias", torch.tril(torch.ones(config.block_size, config.block_size))
.view(1, 1, config.block_size, config.block_size))
self.n_head = config.n_head
self.n_embd = config.n_embd

def forward(self, x):
B, T, C = x.size() # batch size, sequence length, embedding dimensionality (n_embd)

# calculate query, key, values for all heads in batch and move head forward to be the batch dim
q, k ,v = self.c_attn(x).split(self.n_embd, dim=2)
k = k.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)
q = q.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)
v = v.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)

# causal self-attention; Self-attend: (B, nh, T, hs) x (B, nh, hs, T) -> (B, nh, T, T)
att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))
att = att.masked_fill(self.bias[:,:,:T,:T] == 0, float('-inf'))
att = F.softmax(att, dim=-1)
att = self.attn_dropout(att)
y = att @ v # (B, nh, T, T) x (B, nh, T, hs) -> (B, nh, T, hs)
y = y.transpose(1, 2).contiguous().view(B, T, C) # re-assemble all head outputs side by side

# output projection
y = self.resid_dropout(self.c_proj(y))
return y

class MultiheadAttentionLayer(nn.Module):
"""
A multi-head masked self-attention layer with a projection at the end.
"""

def __init__(self, config, device="cpu", dtype=torch.float32):
super().__init__()
assert config.n_embd % config.n_head == 0
self.resid_drop = nn.Dropout(config.resid_pdrop)
self.c_proj = nn.Linear(config.n_embd, config.n_embd, device=device, dtype=dtype)
self.register_buffer("mask", torch.tril(torch.ones(config.block_size, config.block_size))
.view(1, 1, config.block_size, config.block_size))
self.attn = torch.nn.MultiheadAttention(
embed_dim=config.n_embd,
num_heads=config.n_head,
dropout=config.attn_pdrop,
batch_first=True,
device=device,
dtype=dtype
)

def forward(self, x):
_, seq_size, _ = x.size()
y = self.attn(x, x, x, attn_mask=self.mask[0, 0, :seq_size, :seq_size])[0]
y = self.resid_drop(self.c_proj(y))
return y

class Block(nn.Module):
""" an unassuming Transformer block """
def __init__(self, config: GPTConfig):
super().__init__()
self.ln1 = nn.LayerNorm(config.n_embd)
self.ln2 = nn.LayerNorm(config.n_embd)
self.attn = MultiheadAttentionLayer(config)
self.mlp = nn.Sequential(
nn.Linear(config.n_embd, 4 * config.n_embd),
nn.GELU(),
nn.Linear(4 * config.n_embd, config.n_embd),
nn.Dropout(config.resid_pdrop),
)

def forward(self, x):
x = x + self.attn(self.ln1(x))
x = x + self.mlp(self.ln2(x))
return x

class EmbeddingStem(nn.Module):
def __init__(self, config: GPTConfig, device="cpu", dtype=torch.float32):
super().__init__()
self.tok_emb = nn.Embedding(config.vocab_size, config.n_embd, device=device, dtype=dtype)
self.pos_emb = nn.Parameter(torch.zeros(1, config.block_size, config.n_embd, device=device, dtype=dtype))
self.drop = nn.Dropout(config.embd_pdrop)
self.block_size = config.block_size

def reset_parameters(self):
self.tok_emb.reset_parameters()

def forward(self, idx):
b, t = idx.size()
assert t <= self.block_size, f"Cannot forward sequence of length {t}, block size is only {self.block_size}"

token_embeddings = self.tok_emb(idx) # each index maps to a (learnable) embedding vector
position_embeddings = self.pos_emb[:, :t, :] # each position maps to a (learnable) position vector
return self.drop(token_embeddings + position_embeddings)

class GPT(nn.Module):
""" GPT Language Model """

def __init__(self, config: GPTConfig):
super().__init__()
self.block_size = config.block_size # 128
config = self._set_model_config(config)

# input embedding stem
self.emb_stem = EmbeddingStem(config)
# transformer
self.blocks = nn.Sequential(*[Block(config) for _ in range(config.n_layer)])
# decoder head
self.ln_f = nn.LayerNorm(config.n_embd)
self.head = nn.Linear(config.n_embd, config.vocab_size, bias=False)

# init all weights, and apply a special scaled init to the residual projections, per GPT-2 paper
self.apply(self._init_weights)
for pn, p in self.named_parameters():
if pn.endswith('c_proj.weight'):
p.data.normal_(mean=0.0, std=0.02/math.sqrt(2 * config.n_layer))

# report number of parameters (note we don't count the decoder parameters in lm_head)
n_params = sum(p.numel() for p in self.blocks.parameters())
print("number of parameters: %.2fM" % (n_params/1e6,))

def _set_model_config(self, config):
type_given = config.model_type is not None # gpt2
params_given = all([config.n_layer is not None, config.n_head is not None, config.n_embd is not None])
# assert type_given ^ params_given # exactly one of these (XOR)
if type_given and not params_given:
# translate from model_type to detailed configuration
config.__dict__.update({
# names follow the huggingface naming conventions
# GPT-1
'openai-gpt': dict(n_layer=12, n_head=12, n_embd=768), # 117M params
# GPT-2 configs
'gpt2': dict(n_layer=12, n_head=12, n_embd=768), # 124M params
'gpt2-medium': dict(n_layer=24, n_head=16, n_embd=1024), # 350M params
'gpt2-large': dict(n_layer=36, n_head=20, n_embd=1280), # 774M params
'gpt2-xl': dict(n_layer=48, n_head=25, n_embd=1600), # 1558M params
# Gophers
'gopher-44m': dict(n_layer=8, n_head=16, n_embd=512),
# (there are a number more...)
# I made these tiny models up
'gpt-mini': dict(n_layer=6, n_head=6, n_embd=192),
'gpt-micro': dict(n_layer=4, n_head=4, n_embd=128),
'gpt-nano': dict(n_layer=3, n_head=3, n_embd=48),
}[config.model_type])
return config

def _init_weights(self, module):
if isinstance(module, (nn.Linear, nn.Embedding)):
module.weight.data.normal_(mean=0.0, std=0.02)
if isinstance(module, nn.Linear) and module.bias is not None:
module.bias.data.zero_()
elif isinstance(module, nn.LayerNorm):
module.bias.data.zero_()
module.weight.data.fill_(1.0)

def forward(self, idx, targets=None):
x = self.emb_stem(idx)
x = self.blocks(x)
x = self.ln_f(x)
logits = self.head(x)

# if we are given some desired targets also calculate the loss
loss = None
if targets is not None:
loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1)

return logits, loss

@torch.no_grad()
def generate(self, idx, max_new_tokens, temperature=1.0, do_sample=False, top_k=None):
"""
Take a conditioning sequence of indices idx (LongTensor of shape (b,t)) and complete
the sequence max_new_tokens times, feeding the predictions back into the model each time.
Most likely you'll want to make sure to be in model.eval() mode of operation for this.
"""
for _ in range(max_new_tokens):
# if the sequence context is growing too long we must crop it at block_size
idx_cond = idx if idx.size(1) <= self.block_size else idx[:, -self.block_size:]
# forward the model to get the logits for the index in the sequence
logits, _ = self(idx_cond)
# pluck the logits at the final step and scale by desired temperature
logits = logits[:, -1, :] / temperature
# optionally crop the logits to only the top k options
if top_k is not None:
v, _ = torch.topk(logits, top_k)
logits[logits < v[:, [-1]]] = -float('Inf')
# apply softmax to convert logits to (normalized) probabilities
probs = F.softmax(logits, dim=-1)
# either sample from the distribution or take the most likely element
if do_sample:
idx_next = torch.multinomial(probs, num_samples=1)
else:
_, idx_next = torch.topk(probs, k=1, dim=-1)
# append sampled index to the running sequence and continue
idx = torch.cat((idx, idx_next), dim=1)

return idx


def create_optimizer(model: torch.nn.Module, opt_config: OptimizerConfig):
"""
This long function is unfortunately doing something very simple and is being very defensive:
We are separating out all parameters of the model into two buckets: those that will experience
weight decay for regularization and those that won't (biases, and layernorm/embedding weights).
We are then returning the PyTorch optimizer object.
"""

# separate out all parameters to those that will and won't experience regularizing weight decay
decay = set()
no_decay = set()
whitelist_weight_modules = (torch.nn.Linear, )
blacklist_weight_modules = (torch.nn.LayerNorm, torch.nn.Embedding)
for mn, m in model.named_modules():
for pn, p in m.named_parameters():
fpn = '%s.%s' % (mn, pn) if mn else pn # full param name
# random note: because named_modules and named_parameters are recursive
# we will see the same tensors p many many times. but doing it this way
# allows us to know which parent module any tensor p belongs to...
if pn.endswith('bias'):
# all biases will not be decayed
no_decay.add(fpn)
elif pn.endswith('weight') and isinstance(m, whitelist_weight_modules):
# weights of whitelist modules will be weight decayed
decay.add(fpn)
elif pn.endswith('in_proj_weight'):
# MHA projection layer
decay.add(fpn)
elif pn.endswith('weight') and isinstance(m, blacklist_weight_modules):
# weights of blacklist modules will NOT be weight decayed
no_decay.add(fpn)
elif pn.endswith('pos_emb'):
# positional embedding shouldn't be decayed
no_decay.add(fpn)

# validate that we considered every parameter
param_dict = {pn: p for pn, p in model.named_parameters()}
inter_params = decay & no_decay
union_params = decay | no_decay
assert len(inter_params) == 0, "parameters %s made it into both decay/no_decay sets!" % (str(inter_params), )
assert len(param_dict.keys() - union_params) == 0, "parameters %s were not separated into either decay/no_decay set!" \
% (str(param_dict.keys() - union_params), )

# create the pytorch optimizer object
optim_groups = [
{"params": [param_dict[pn] for pn in sorted(list(decay))], "weight_decay": opt_config.weight_decay},
{"params": [param_dict[pn] for pn in sorted(list(no_decay))], "weight_decay": 0.0},
]
optimizer = torch.optim.AdamW(optim_groups, lr=opt_config.learning_rate, betas=(0.9, 0.95))
return optimizer