layout | title | description | date | future | htmlwidgets | hidden | section_number | previous_section_url | previous_section_name | next_section_url | next_section_name | bibliography | giscus_comments | authors | toc | _styles | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
distill |
How to Parallelize a Transformer for Training |
Here we discuss four main parallelism schemes used during LLM training: data parallelism, fully-sharded data parallelism (FSDP), tensor parallelism, and pipeline parallelism. For each, we calculate at what point we become bottlenecked by communication. |
2025-02-04 |
true |
true |
false |
5 |
../transformers |
Part 4: Transformers |
../applied-training |
Part 6: Training LLaMA |
main.bib |
true |
|
|
.fake-img {
background: #bbb;
border: 1px solid rgba(0, 0, 0, 0.1);
box-shadow: 0 0px 4px rgba(0, 0, 0, 0.1);
margin-bottom: 12px;
} .fake-img p {
font-family: monospace;
color: white;
text-align: left;
margin: 12px 0;
text-align: center;
font-size: 16px;
}
|
The goal of “model scaling” is to be able to increase the number of chips used for training or inference while achieving a proportional, linear increase in throughput (we call this strong scaling). While performance on a single chip depends on the trade-off between memory bandwidth and FLOPs, performance at the cluster level depends on hiding inter-chip communication by overlapping it with useful FLOPS. This is non-trivial, because increasing the number of chips increases the communication load while reducing the amount of per-device computation we can use to hide it. As we saw in Section 3, sharded matrix multiplications often require expensive AllGathers or ReduceScatters that can block the TPUs from doing useful work. The goal of this section is to find out when these become too expensive.
In this section, we'll discuss four common parallelism schemes: (pure) data parallelism, fully-sharded data parallelism (FSDP / ZeRO sharding), tensor parallelism (also known as model parallelism), and (briefly) pipeline parallelism. For each, we'll show what communication cost we incur and at what point that cost starts to bottleneck our compute cost.We'll focus on communication bounds — since while memory capacity constraints are important, they typically do not bound us when using rematerialization (activation checkpointing) and a very large number of chips during pre-training. We also do not discuss expert parallelism here for MoEs — which expands the design space substantially, only the base case of a dense Transformer. For this section, you can focus solely on inter-chip communication costs, since as long as we have a large enough single-chip batch size, the transfer of data from HBM to MXU is already overlapped with computation.
We'll use the following notation to simplify calculations throughout this section.
Notation | Meaning (model parameters) |
---|---|
D | dmodel ( the hidden dimension/residual stream dim) |
F | dff (the feed-forward dimension) |
B | Batch dimension (number of tokens in the batch; total, not per-device) |
T | Sequence length |
L | Number of layers in the model |
Notation | Meaning (hardware characteristic) |
---|---|
C | FLOPS/s per chip |
W | Network bandwidth (bidirectional, often subscripted as e.g. |
X | Number of chips along mesh axis X |
Y | Number of chips along an alternate mesh axis, labeled Y |
Z | Number of chips along a third mesh axis, labeled Z |
For simplicity's sake, we'll approximate a Transformer as a stack of MLP blocks — attention is a comparatively small fraction of the FLOPs for larger models as we saw in Section 4. We will also ignore the gating matmul, leaving us with the following simple structure for each layer:
{% include figure.liquid path="assets/img/simple-transformer.png" class="img-fluid" caption="Figure: a simplified Transformer layer. We treat each FFW block as a stack of two matrices Win: bf16[D, F]
(up-projection) and Wout: bf16[F, D]
(down-projection) with an input In: bf16[B, D]
." %}
Here are the 4 parallelism schemes we will discuss. Each scheme can be thought of as uniquely defined by a sharding for In, Win, Wout, and Out in the above diagram.
1. Data parallelism: activations sharded along batch, parameters and optimizer state are replicated on each device. Communication only occurs during the backwards pass.
2. Fully-sharded data parallelism (FSDP or ZeRO-3): activations sharded along batch (like pure data parallelism), parameters sharded along same mesh axis and AllGathered just-in-time before use in forward pass. Optimizer state also sharded along batch. Reduces duplicated memory.
3. Tensor parallelism (also called Megatron sharding or model parallelism): activations sharded along D ($d_\text{model}$), parameters sharded along F ($d_{ff}$). AllGather and ReduceScatter activations before and after each block. Compatible with FSDP.
4. Pipeline parallelism: weights sharded along the layer dimension, activations microbatched and rolled along the layer dimension. Communication between pipeline stages is minimal (just moving activations over a single hop). To abuse notation:
Syntax:
When your model fits on a single chip with even a tiny batch size (>240 tokens, so as to be compute-bound), you should always use simple data parallelism. Pure data parallelism splits our activations across any number of TPUs so long as the number of TPUs is smaller than our batch size. The forward pass involves no communication, but at the end of every step, each performs an AllReduce on their gradients in order to synchronize them before updating the parameters.
{% include figure.liquid path="assets/img/data-parallelism.png" class="img-fluid" caption="Figure: a diagram of pure data parallelism (forward pass). Our activations (left) are fully sharded along the batch dimension and our weights are fully replicated, so each TPU has an identical copy of the weights. This means the total memory of our weights is increased by a factor of N, but no communication is required on the forward-pass." %}
{% details Here's the full algorithm for the forward and backwards pass. We abuse notation to write dL/dOut as dOut, purely for compactness. %}
Pure Data Parallelism Algorithm:
Forward pass: need to compute Loss[BX]
- Tmp[BX, F] = In[BX, D] *D Win[D, F]
- Out[BX, D] = Tmp[BX, F] *F Wout[F, D]
- Loss[BX] = ...
Backward pass: need to compute dWout[F, D], dWin[D, F]
- dOut[BX, D] = ...
- dWout[F, D] {UX} = Tmp[BX, F] *B dOut[BX, D]
- dWout[F, D] = AllReduce(dWout[F, D] {UX}) (not on critical path, can be done async)
- dTmp[BX, F] = dOut[BX, D] *D Wout[F, D]
- dWin[D, F] {UX} = In[BX, D] *B dTmp[BX, F]
- dWin[D, F] = AllReduce(dWin[D, F] {UX}) (not on critical path, can be done async)
- dIn[BX, D] = dTmp[BX, F] *F Win[D, F] (needed for previous layers)
We ignore the details of the loss function and abbreviate
{% enddetails %}
Note that the forward pass has no communication — it's all in the backward pass! The backward pass also has the great property that the AllReduces aren't in the "critical path", meaning that each AllReduce can be performed whenever it's convenient and doesn't block you from performing subsequent operations. The overall communication cost can still bottleneck us if it exceeds our total compute cost, but it is much more forgiving from an implementation standpoint. We'll see that model/tensor parallelism doesn't have this property.
Why do this? Pure data parallelism reduces activation memory pressure by splitting our activations over the batch dimension, allowing us to almost arbitrarily increase batch size as long as we have more chips to split the batch dimension over. Especially during training when our activations often dominate our memory usage, this is very helpful.
Why not do this? Pure data parallelism does nothing to reduce memory pressure from model parameters or optimizer states, which means pure data parallelism is rarely useful for interesting models at scale where our parameters + optimizer state don't fit in a single TPU. To give a sense of scale, if we train with parameters in bf16 and optimizer state in fp32 with AdamAdam stores parameters, first order and second order accumulators. Since the params are in bfloat16 and optimizer state is in float32, this gives us 2 + 8 = 10
bytes per parameters., the largest model we can fit has
**Takeaway**: the largest model we can train with Adam and pure data parallelism has
To make this useful for real models during training, we'll need to at least partly shard the model parameters or optimizer.
When do we become bottlenecked by communication? As we can see above, we have two AllReduces per layer, each of size
As in the table above, let
Communication time: From a previous section we know that the time required to perform an AllReduce in a 1D mesh depends only on the total bytes of the array being AllReduced and the ICI bandwidth
Matmul time: Each layer comprises two matmuls in the forward pass, or four matmuls in the backwards pass, each of which requires
Since we overlap, the total time per layer is the max of these two quantities:
We become compute-bound when
The upshot is that, to remain compute-bound with data parallelism, we need the per-device batch size
Let's put in some real numbers to get a sense of scale. For TPUv5p, C=4.6e14
and W=2 * 9e10
for 1D data parallelism over ICI, so our batch size per chip must be at least 2,550 to avoid being communication-bound. Since we can do data parallelism over multiple axes, if we dedicate all three axes of a TPUv5p pod to pure data parallelism, we 3x our bandwidth
**Note on context parallelism:** throughout this section, we use
Syntax:
Fully-sharded data parallelism (often called FSDP or ZeRO-sharding) splits the model optimizer states and weights across the data parallel shards and efficiently gathers and scatters them as needed. Compared to pure data parallelism, FSDP drastically reduces per-device memory usage and saves on backward pass FLOPs, with very minimal overhead.
{% include figure.liquid path="assets/img/fsdp.png" class="img-fluid" caption="Figure: FSDP shards the contracting dimension of the MLP weights along the data dimension. This reduces memory but (from Section 3) requires us to gather the weights for W before we perform the matmul. Note that the activations (left) are not sharded along the contracting dimension, which is what forces us to gather. Note that our weight optimizer state is likewise sharded along the contracting dimension." %}
You'll remember (from Section 3) that an AllReduce can be decomposed into an AllGather and a ReduceScatter. This means that, instead of doing the full gradient AllReduce for standard data parallelism, we can shard the weights and optimizer states across chips, AllGather them at each layer during the forward pass and ReduceScatter across the weights during the backward pass at no extra cost.
{% details Here's the full algorithm for FSDP. %}
Fully-Sharded Data Parallelism (FSDP):
Forward pass: need to compute Loss[BX]
- Win[D, F] = AllGather(Win[DX, F]) (not on critical path, can do it during previous layer)
- Tmp[BX, F] = In[BX, D] *D Win[D, F] (can throw away Win[D, F] now)
- Wout[F, D] = AllGather(Wout[F, DX]) (not on critical path, can do it during previous layer)
- Out[BX, D] = Tmp[BX, F] *F Wout[F, D]
- Loss[BX] = ...
Backward pass: need to compute dWout[F, DX], dWin[DX, F]
- dOut[BX, D] = ...
- dWout[F, D] {UX} = Tmp[BX, F] *B dOut[BX, D]
- dWout[F, DX] = ReduceScatter(dWout[F, D] {UX}) (not on critical path, can be done async)
- Wout[F, D] = AllGather(Wout[F, DX]) (can be done ahead of time)
- dTmp[BX, F] = dOut[BX, D] *D Wout[F, D] (can throw away Wout[F, D] here)
- dWin[D,F] {UX} = dTmp[BX, F] *B In[BX, D]
- dWin[DX, F] = ReduceScatter(dWin[D, F] {UX}) (not on critical path, can be done async)
- Win[D, F] = AllGather(Win[DX, F]) (can be done ahead of time)
- dIn[BX, D] = dTmp[BX, F] *F Win[D, F] (needed for previous layers) (can throw away Win[D, F] here)
{% enddetails %}
This is also called "ZeRO Sharding", from "ZeRo Overhead sharding" since we don't perform any unnecessary compute or store any unnecessary state. ZeRO-{1,2,3} are used to refer to sharding the optimizer states, gradients, and weights in this way, respectively. Since all have the same communication costTechnically, FSDP adds communication in the forward pass that pure DP doesn't have, but this is in the same proportion as the backward pass so it should have no effect on the comms roofline. The key here is that ZeRO-3 turns a backward-pass AllReduce into an AllGather and a ReduceScatter, which have the same total comms volume., we can basically always do ZeRO-3 sharding, which shards the parameters, gradients, and optimizer states across a set of devices.
Why would we do this? Standard data parallelism involves a lot of duplicated work. Each TPU AllReduces the full gradient, then updates the full optimizer state (identical work on all TPUs), then updates the parameters (again, fully duplicated). For ZeRO sharding (sharding the gradients/optimizer state), instead of an AllReduce, you can ReduceScatter the gradients, update only your shard of the optimizer state, update a shard of the parameters, then AllGather the parameters as needed for your forward pass.
When do we become bottlenecked by communication? Our relative FLOPs and comms costs are exactly the same as pure data parallelism, since each AllReduce in the backward pass has become an AllGather + ReduceScatter. Recall that an AllReduce is implemented as an AllGather and a ReduceScatter, each with half the cost. Here we model the forward pass since it has the same FLOPs-to-comms ratio as the backward pass:
Therefore, as with pure data-parallelism, we are compute bound when 4.59e14 / 1.8e11 = 2550
for v5p). This is great for us, because it means if our per-device batch size is big enough to be compute-bound for pure data-parallelism, we can — without worrying about leaving the compute-bound regime — simply upgrade to FSDP, saving ourselves a massive amount of parameter and optimizer state memory! Though we did have to add communication to the forward pass, this cost is immaterial since it just overlaps with forward-pass FLOPs.
**Takeaway:** both FSDP and pure data parallelism become bandwidth bound on TPUv5 when the batch size per device is less than
For example, DeepSeek-V2 (one of the only recent strong model to release information about its training batch size) used a batch size of ~40M tokens. This would allow us to scale to roughly 47,000 chips, or around 5 TPUv5 pods, before we hit a bandwidth limit.
For LLaMA-3 70B, which was trained for approximately 6.3e24 (15e12 * 70e9 * 6)
FLOPs, we could split a batch of 16M tokens over roughly 16e6 / (2550 / 3) = 18,823
chips (roughly 2 pods of 8960 chips), each with 4.59e14
FLOPs running at 50% peak FLOPs utilization (often called MFU), and train it in approximately 17 days. Not bad! But let's explore how we can do better.
**Note on critical batch size**: somewhat unintuitively, we become more communication bottlenecked as our total batch size decreases (with fixed chip number). Data parallelism and FSDP let us scale to arbitrarily many chips so long as we can keep increasing our batch size! However, in practice, as our batch size increases, we tend to see diminishing returns in training since our gradients become almost noise-free. We also sometimes see training instability. Thus, the game of finding an optimal sharding scheme in the "unlimited compute regime" often starts from a fixed batch size, determined by scaling laws, and a known (large) number of chips, and then aims to find a partitioning that allows us to fit that small batch size on so many chips.
Syntax:
In a fully-sharded data-parallel AllReduce we move the weights across chips. We can also shard the feedforward dimension of the model and move the activations during the layer — this is called "1D model parallelism" or Megatron sharding. This can unlock a smaller efficient batch size per pod. The figure below shows an example of a single matrix sharded in this way:
{% include figure.liquid path="assets/img/model-parallelism.png" class="img-fluid" caption="Figure: an example of basic tensor parallelism. Since we're only sharding our activations over Y (unlike in FSDP where we shard over X), we replicate our activations over X. Using our standard syntax, this is A[B, DY] * B[D, FY] -> C[B, FY]. Because we're only sharding over one of the contracting dimensions, we typically AllGather the activations A before the matmul." %}
As noted, In[B, DY] *D Win[D, FY] *F Wout[FY, D] -> Out[B, DY] means we have to gather our activations before the first matmul. This is cheaper than ZeRO sharding when the activations are smaller than the weights. This is typically true only with some amount of ZeRO sharding added (which reduces the size of the gather). This is one of the reasons we tend to mix ZeRO sharding and model parallelism.
{% details Here's the algorithm for tensor parallelism! %}
Tensor Parallelism:
Forward pass: need to compute Loss[B]
- In[B, D] = AllGather(In[B, DY]) (on critical path)
- Tmp[B, FY] = In[B, D] *D Win[D, FY] (not sharded along contracting, so no comms)
- Out[B, D] {UY} = Tmp[B, FY] *F Wout[FY, D]
- Out[B, DY] = ReduceScatter(Out[B, D] {UY}) (on critical path)
- Loss[B] = ...
Backward pass: need to compute dWout[FY, D], dWin[D, FY]
- dOut[B, DY] = ...
- dOut[B, D] = AllGather(dOut[B, DY]) (on critical path)
- dWout[FY, D] = Tmp[B, FY] *B dOut[B, D]
- dTmp[B, FY] = dOut[B, D] *D Wout[FY, D] (can throw away dOut[B, D] here)
- In[B, D] = AllGather(In[B, DY]) (this can be skipped by sharing with (1) from the forward pass)
- dWin[D, FY] = dTmp[B, FY] *B In[B, D]
- dIn[B, D] {U.Y} = dTmp[B, FY] *F Win[D, FY] (needed for previous layers)
- dIn[B, DY] = ReduceScatter(dIn[B, D] {U.Y}) (on critical path)
{% enddetails %}
One nice thing about tensor parallelism is that it interacts nicely with the two matrices in our Transformer forward pass. Naively, we would do an AllReduce after each of the two matrices. But here we first do In[B, DY] * Win[D, FY] -> Tmp[B, FY] and then Tmp[B, FY] * Wout[FY, D] -> Out[B, DY]. This means we AllGather In at the beginning, and ReduceScatter Out at the end, rather than doing an AllReduce.
How costly is this? Let's only model the forward pass - the backwards pass is just the transpose of each operation here. In 1D model parallelism we AllGather the activations before the first matmul, and ReduceScatter them after the second, sending two bytes at a time (bf16). Let's figure out when we're bottlenecked by communication.
Noting that we want compute cost to be greater than comms cost, we get:
Thus for instance, for TPUv5p,
**Takeaway**: model parallelism becomes communication bound when
Note that this doesn't depend on the precision of the computation, since e.g. for int8, on TPUv5p,
Let's think about some examples:
-
On TPUv4p with LLaMA 3-70B with
$$D = 8192,$$ $$F \approx 30,000$$ , we can comfortably do 8-way model parallelism, but will be communication bound on 16 way model parallelism. The required F for model 8 way model sharding is 20k. -
For Gemma 7B,
$$F \approx 50k$$ , so we become communication bound with 19-way model parallelism. That means we could likely do 16-way and still see good performance.
Syntax:
The nice thing about FSDP and tensor parallelism is that they can be combined. By sharding Win and Wout along both axes we both save memory and compute. Because we shard B along X, we reduce the size of the model-parallel AllGathers, and because we shard F along Y, we reduce the communication overhead of FSDP. This means a combination of the two can get us to an even lower effective batch size than we saw above.
{% include figure.liquid path="assets/img/mixed-fsdp-model-parallelism.png" class="img-fluid" caption="Figure: a diagram combining FSDP and tensor parallelism. Unlike the other cases, there is no duplication of model parameters." %}
{% details Here's the full algorithm for mixed FSDP + tensor parallelism. While we have a lot of communication, all our AllGathers and ReduceScatters are smaller because we have batch-sharded our activations and tensor sharded our weights much more! %}
Forward pass: need to compute Loss[B]
- In[BX, D] = AllGatherY(In[BX, DY]) (on critical path)
- Win[D, FY] = AllGatherX(Win[DX, FY]) (can be done ahead of time)
- Tmp[BX, FY] = In[BX, D] *D Win[D, FY]
- Wout[FY, D] = AllGatherX(Wout[FY, DX]) (can be done ahead of time)
- Out[BX, D] {U.Y} = Tmp[BX, FY] *F Wout[FY, D]
- Out[BX, DY] = ReduceScatterY(Out[BX, D] {U.Y}) (on critical path)
- Loss[BX] = ...
Backward pass: need to compute dWout[FY, DX], dWin[DX, FY]
- dOut[BX, DY] = ...
- dOut[BX, D] = AllGatherY(dOut[BX, DY]) (on critical path)
- dWout[FY, D] {U.X} = Tmp[BX, FY] *B dOut[BX, D]
- dWout[FY, DX] = ReduceScatterX(dWout[FY, D] {U.X})
- Wout[FY, D] = AllGatherX(Wout[FY, DX]) (can be done ahead of time)
- dTmp[BX, FY] = dOut[BX, D] *D Wout[FY, D] (can throw away dOut[B, D] here)
- In[BX, D] = AllGatherY(In[BX, DY]) (not on critical path + this can be shared with (2) from the previous layer)
- dWin[D, FY] {U.X} = dTmp[BX, FY] *B In[BX, D]
- dWin[DX, FY] = ReduceScatterX(dWin[D, FY] {U.X})
- Win[D, FY] = AllGatherX(Win[DX, FY]) (can be done ahead of time)
- dIn[BX, D] {U.Y} = dTmp[BX, FY] *F Win[D, FY] (needed for previous layers)
- dIn[BX, DY] = ReduceScatterY(dIn[BX, D] {U.Y}) (on critical path)
{% enddetails %}
What's the right combination of FSDP and MP? A simple but key maxim is that FSDP moves weights and model parallelism moves activations. That means as our batch size shrinks (especially as we do more data parallelism), model parallelism becomes cheaper because our activations per-shard are smaller.
- Model parallelism performs
$$\mathbf{AllGather}_Y([B_X, D_Y])$$ which shrinks as$$X$$ grows. - FSDP performs
$$\mathbf{AllGather}_X([D_X, F_Y])$$ which shrinks as$$Y$$ grows.
Thus by combining both we can push our minimum batch size per replica down even more. We can calculate the optimal amount of FSDP and MP in the same way as above:
Let
And likewise our total FLOPs time is
To simplify the analysis, we make two simplifications: first, we allow
Before we ask under what conditions we'll be compute-bound, let's find the optimal values for
Differentiating this expression wrt
This is super useful! This tells us, for a given
**Takeaway:** in general, during training, the optimal amount of FSDP is
Now let's return to the question we've been asking of all our parallelism strategies: under what conditions will we be compute-bound? Since we can overlap FLOPs and comms, we are compute-bound when
which gives us
Letting
Plugging in our calculated
where the left-hand-side is proportional to the communication time and the right-hand-side is proportional to the computation time. Note that while the computation time scales linearly with the batch size (as it does regardless of parallelism), the communication time scales as the square root of the batch size. The ratio of the computation to communication time thus also scales as the square of the batch size:
To ensure that this ratio is greater than one so we are compute bound, we require
See Appendix C for an alternate derivation of this relation. To get approximate numbers, again plug in
**Takeaway:** combining tensor parallelism with FSDP allows us to drop to a
Below we plot the ratio of FLOPs to comms time for mixed FSDP + MP, comparing it both to only model parallelism and only data parallelism (FSDP), on a representative 4x4x4 chip array. While pure FSDP parallelism dominates for very large batch sizes, in the regime where batch size over number of chips is between roughly 400 and 850, a mixed FSDP + MP strategy is required in order to be compute-bound.
{% include figure.liquid path="assets/img/mixed-fsdp-comms-2.png" class="img-fluid" caption="Figure: ratio of FLOPs to comms time for optimal mixed FSDP/MP on a TPUv5p 4x4x4 slice with F=30k. As expected, model parallelism has a fixed ratio with batch size; ideal mixed FSDP + MP scales with
Here's another example of TPU v5p 16x16x16 showing the FLOPs and comms time as a function of batch size for different sharding schemes.
{% include figure.liquid path="assets/img/comms-flops-time.png" class="img-fluid" caption="Figure: time taken for communication with different parallelism schemes. The black dashed line is the time taken by the matrix multiplication FLOPs, so any curve above this line is comms-bound. We note that all strategies become comms-bound below batch size 1.5e6, which is in line with our expected 4096 * 2 * 2550^2 / (8192 * 4) = 1.6e10." %}
The black curve is the amount of time spent on model FLOPs, meaning any batch size where this is lower than all comms costs is strictly comms bound. You'll notice the black curve intersects the green curve at about 1.6e10
, as predicted.
Zooming in, we can see that devoting two axes to FSDP, and using the optical switches to reconfigure the topology to have an 8-long axis for model sharding will give us the lowest communication volume between 1M and 6M batch size per slice, while pure FSDP combination is best between 6M and 100M. This agrees with our calculations above!
{% include figure.liquid path="assets/img/comms-flops-time-zoom.png" class="img-fluid" %}
Here's an interactive animation to play with this, showing the total compute time and communication time for different batch sizes:
You'll notice this generally agrees with the above (minimum around FSDP=256, MP=16), plus or minus some wiggle factor for some slight differences in the number of axes for each.
You'll probably notice we've avoided talking about pipelining at all in the previous sections. Pipelining is a dominant strategy for GPU parallelism that is somewhat less essential on TPUs. Briefly, pipelined training involves splitting the layers of a model across multiple devices and passing the activations between pipeline stages during the forward and backward pass. The algorithm is something like:
- Initialize your data on TPU 0 with your weights sharded across the layer dimension (
$W_\text{in}[L_Z, D_X, F_Y]$ for pipelining with FSDP and tensor parallelism). - Perform the first layer on TPU 0, then copy the resulting activations to TPU 1, and repeat until you get to the last TPU.
- Compute the loss function and its derivative
$\partial L / \partial x_L$ . - For the last pipeline stage, compute the derivatives
$\partial L / \partial W_L$ and$\partial L / \partial x_{L-1}$ , then copy$\partial L / \partial x_{L-1}$ to the previous pipeline stage and repeat until you reach TPU 0.
{% details Here is some (working) Python pseudo-code %}
This pseudocode should run on a Cloud TPU VM. While it's not very efficient or realistic, it gives you a sense how data is being propagated across devices.
batch_size = 32
d_model = 128
d_ff = 4 * d_model
num_layers = len(jax.devices())
key = jax.random.PRNGKey(0)
# Pretend each layer is just a single matmul.
x = jax.random.normal(key, (batch_size, d_model))
weights = jax.random.normal(key, (num_layers, d_model, d_model))
def layer_fn(x, weight):
return x @ weight
# Assume we have num_layers == num_pipeline_stages
intermediates = [x]
for i in range(num_layers):
x = layer_fn(x, weights[i])
intermediates.append(x)
if i != num_layers - 1:
x = jax.device_put(x, jax.devices()[i+1])
def loss_fn(batch):
return jnp.mean(batch ** 2) # make up some fake loss function
loss, dx = jax.value_and_grad(loss_fn)(x)
for i in range(0, num_layers, -1):
_, f_vjp = jax.vjp(layer_fn, intermediates[i + 1], weights[i])
dx, dw = f_vjp(dx) # compute the jvp dx @ J(L)(x[i], W[i])
weights[i] = weights[i] - 0.01 * dw # update our weights
if i != 0:
dx = jax.device_put(dx, jax.devices()[i-1])
{% enddetails %}
Why is this a good idea? Pipelining is great for many reasons: it has a low communication cost between pipeline stages, meaning you can train very large models even with low bandwidth interconnects. This is often very useful on GPUs since they are not densely connected by ICI in the way TPUs are.
Why is this difficult/annoying? You might have noticed in the pseudocode above that TPU 0 is almost always idle! It's only doing work on the very first and last step of the pipeline. The period of idleness is called a pipeline bubble and is very annoying to deal with. Typically we try to mitigate this first with microbatching, which sends multiple small batches through the pipeline, keeping TPU 0 utilized for at least a larger fraction of the total step time.
A second approach is to carefully overlap the forward matmul
{% include figure.liquid path="assets/img/deepseek-pipeline.png" class="img-fluid" caption="Figure: the DeepSeek v3 pipeline schedule (from their <a href="https://github.com/deepseek-ai/DeepSeek-V3/blob/main/DeepSeek_V3.pdf\">recent paper). Orange is the forward matmul, green is the dL/dx matmul, and blue is the dL/dW matmul. By prioritizing the backwards dL/dx multiplications, we can avoid "stranding" FLOPs." %}
Because it is less critical for TPUs (which have larger interconnected pods), we won't delve into this as deeply, but it's a good exercise to understand the key pipelining bottlenecks.
Let's take a step back and look at a specific example, say training LLaMA-3 70B on TPU v5p. LLaMA-3 70B has
- We'll be ICI bound when we do model parallelism greater than
$$Y > n_\text{axes} * F / 2550 \approxeq n_\text{axes} * 11$$ . - Pure FSDP becomes ICI bound when we have a
$$\text{batch size} < 2550 / n_\text{axes}$$ . Here that means if we wanted to train with BS=2M, we'd at most be able to use$\approx 2400$ chips, which is roughly a quarter of a TPU v5p pod. - Mixed FSDP + model parallelism becomes ICI bound when we have
$$\text{batch size} < 2 \cdot 2550^2 / 30,000 = 432$$ , so this lets us scale to roughly 9k chips! However, the maximum size of a TPU v5p pod is 8k chips, and beyond that we have to scale to lower-bandwidth data-center networking (DCN).
So this gives us a nice recipe to fit on a single pod with BS=3.5M. We'd use the equation above, which gives roughly X (FSDP) = 1024 and Y (MP) = 8. If the model was larger, there would be room to expand the model sharding to 16. We have a bit of room to drop the batch size as low as BS=1.5M on that pod and still be compute bound, but we're close to the lower bound there.
To go larger than one pod, we need to scale over DCN. Because DCN has lower bandwidth, it's typically too slow to do much useful FSDP. Instead, we do pure data parallelism over the DCN axis and FSDP within a pod. Lets calculate whether the Data Center Network (DCN) holds up.
With pure data parallelism over DCN, we need to sync the weights and optimizer states during each step (as the model completes its backward pass we need to complete the AllReduce). We can actually just borrow the math from the pure data parallelism section above which tells us that we become comms bound when the
- Our total DCN ingress+egress bandwidth is 2.5e10 per host, with 4 chips per host. This gives us ~2000 hosts in the slice, and a total of
5e13
bytes of bandwidth. -
$$C_\text{pod}$$ here is the pod size times the per-chip compute, which is8k * 4.5e14 = 3.8e18
FLOPs.
As before, we become bottlenecked when
**Takeaway:** This means we can scale fairly arbitrarily across pods, so e.g. with 10 pods of 8960 chips we could do a global batch size of about 40M tokens on 89,600 chips, training LLaMA-3 70B in about 2 days.
-
Increasing parallelism or reducing batch size both tend to make us more communication-bound because they reduce the amount of compute performed per chip.
-
Up to a reasonable context length (~32k) we can get away with modeling a Transformer as a stack of MLP blocks and define each of several parallelism schemes by how they shard the two/three main matmuls per layer.
-
During training there are 4 main parallelism schemes we consider, each of which has its own bandwidth and compute requirements (data parallelism, FSDP, model parallelism).
Strategy | Description |
---|---|
Data Parallelism | Activations are batch sharded, everything else is fully-replicated, we all-reduce gradients during the backward pass. |
FSDP | Activations, weights, and optimizer are batch sharded, weights are gathered just before use, gradients are reduce-scattered. |
Model Parallelism (aka Megatron, Tensor) | Activations are sharded along |
Mixed FSDP + Model Parallelism | Both of the above, where FSDP gathers the model sharded weights. |
And here are the "formulas" for each method:
- Each of these strategies has a limit at which it becomes network/communication bound, based on their per-device compute and comms. Here's compute and comms per-layer, assuming
$$X$$ is FSDP and$$Y$$ is model parallelism.
-
Pure data parallelism is rarely useful because the model and its optimizer state use bytes = 10x parameter count. This means we can rarely fit more than a few billion parameters in memory.
-
Data parallelism and FSDP become comms bound when the
$$\text{batch size per shard} < C / W$$ , the arithmetic intensity of the network. For ICI this is 2,550 and for DCN this is 75,000. This can be increased with more parallel axes. -
Model parallelism becomes comms bound when
$$\lvert Y\rvert > F / 2550$$ . This is around 8-16 way for most models. This is independent of the batch size. -
Mixed FSDP + model parallelism allows us to drop the batch size to as low as
$$2 \cdot 2550^2 / F \approx 400$$ . This is fairly close to the point (~200) where we become HBM bandwidth bound anyway. -
Data parallelism across pods requires a minimum batch size per pod of roughly 75,000 before becoming DCN-bound.
-
Basically, if your batch sizes are big or your model is small, things are simple. You can either do data parallelism or FSDP + data parallelism across DCN. The middle section is where things get interesting.
Let's use LLaMA-2 13B as a basic model for this section. Here are some details:
hyperparam | value |
---|---|
n_layers (L) | 40 |
d_model (D) | 5,120 |
ffw_multiplier (F / D) | 2.7 |
n_heads (N) | 40 |
n_kv_heads (K) | 40 |
d_qkv (H) | 128 |
n_embeddings (V) | 32,000 |
Question 1: How many parameters does LLaMA-2 13B have (I know that's silly but do the math)? Note that, as in Transformer Math, LLaMA-3 has 3 big FFW matrices, two up-projection and one down-projection. We ignored the two "gating" einsum matrices in this section, but they behave the same as Win in this section.
{% details Click here for the answer. %}
- FFW parameters:
$$3LDF$$ =8.5e9
- Attention parameters:
$$4DNHL$$ =4.2e9
- Vocabulary parameters:
$$2VD$$ =0.3e9
- Total:
8.5e9 + 4.2e9 + 0.39e9 = 13.1e9
, as expected!
{% enddetails %}
Question 2: Let's assume we're training with BS=16M tokens and using Adam. Ignoring parallelism for a moment, how much total memory is used by the model's parameters, optimizer state, and activations? Assume we store the parameters in bf16 and the optimizer state in fp32 and checkpoint activations three times per layer (after the three big matmuls).
{% details Click here for the answer. %}
The total memory used for the parameters (bf16) and the two optimizer states (fp32, the first and second moment accumulators) is (2 + 4 + 4) * 13e9 ~ 130GB
. The activations after the first two matmuls are shaped 2 * 40 * 16e6 * 5,120 * (1 + 2 * 2.7) ~ 4.2e13 = 42TB
, since B=16e16
. All other activations are more or less negligible.
{% enddetails %}
Question 3: Assume we want to train with 32k sequence length and a total batch size of 3M tokens on a TPUv5p 16x16x16 slice. Assume we want to use bfloat16 weights and a float32 optimizer, as above.
- Can we use pure data parallelism? Why or why not?
- Can we use pure FSDP? Why or why not? With pure FSDP, how much memory will be used per device (assume we do gradient checkpointing only after the 3 big FFW matrices).
- Can we use mixed FSDP + model parallelism? Why or why not? If so, what should
$X$ and$Y$ be? How much memory will be stored per device? Using only roofline FLOPs estimates and ignoring attention, how long will each training step take?
{% details Click here for the answer. %}
First, let's write down some numbers. With 32k sequence length and a 3M batch size, we have a sequence batch size of 96. On a TPU v5p 16x16x16 slice, we have 393TB
of HBM.
-
We can't use pure data parallelism, because it replicates the parameters and optimizer states on each chip, which are already around 130GB (from Q2) which is more HBM than we have per-chip (96GB).
-
Let's start by looking purely at memory. Replacing BS=16M with 3M in Q2, we get
~7.86e12
total checkpoint activations, and with the 1.3e11 optimizer state this brings us to almost exactly 8e12 = 8TB. The TPUv5p slice has393TB
of HBM in total, so we are safely under the HBM limit. Next let's look at whether we'll be comms or compute-bound. With 4096 chips and 3 axes of parallelism, we can do a minimum batch size of850 * 4096 = 3.48M
tokens. That's slightly above our 3M batch size. So we're actually comms-bound, which is sad. So the general answer is no, we cannot do FSDP alone. -
Now we know our primary concern is being comms-bound, so let's plug in some numbers. First of all, from the discriminant above, we know our per-chip batch size with mixed FSDP + model parallelism needs to be above
$2 \cdot 2550^2 / F = 940$ here, which is actually slightly worse than pure FSDP. Obviously that's sort of an artifact of some of the approximations we made, but this suggests mixed FSDP + model parallelism isn't actually much better. Partly this is because$F$ is so small we can't do a full axis worth of model parallelism. One way around this is to do small subrings of 4 chips of tensor parallelism and dedicate the remaining bandwidth of the first axis to FSDP. We won't do the math out but it's good to check that we probably can do this without being comms-bound.
{% enddetails %}
Question 4: What if we wanted to drop to batch size 1M? How does this affect the answers to question 3? What about batch size 10M?
That's it for Part 5! For Part 6, which applies this content to real LLaMA models, [click here](../applied-training)!
Here's a nice extra figure showing how FSDP shards parameters/gradients. The rows are, in order, pure data parallelism, ZeRO-1/2/3. There's not much reason not to do ZeRO-3 since it has effectively the same communication load.
{% include figure.liquid path="assets/img/fsdp-figure.png" class="img-fluid" %}
Figure: diagram showing parameter, gradient, and optimizer state memory for pure data parallelism, ZeRO-1/2/3 respectively. Source.
Above, we simplified the Transformer layer forward pass as Out[B, D] = In[B, D] *D Win[D, F] *F Wout[F, D] . How do we derive the comms necessary for the backwards pass?
This follows fairly naturally from the rule in the previous section for a single matmul Y = X * A:
Using this, we get the following formulas (letting Tmp[B, F] stand for In[B, D] * Win[D, F]):
- dWout[F, D] = Tmp[B, F] *B dOut[B, D]
- dTmp[B, F] = dOut[B, D] *D Wout[F, D]
- dWin = dTmp[B, F] *B Tmp[B, F]
- dIn[B, D] = dTmp[B, F] *F Win[D, F]
Note that these formulas are mathematical statements, with no mention of sharding. The job of the backwards pass is to compute these four quantities. So to figure out the comms necessary, we just take the shardings of all the quantities which are to be matmulled in the four equations above (Tmp, dOut, Wout, Win), which are specified by our parallelization scheme, and use the rules of sharded matmuls to figure out what comms we have to do. Note that dOut is sharded in the same way as Out.
Above we derived that when using a combination of FSDP + model parallelism, we can be compute-bound when
Here we present an alternate derivation of this fact. We start by setting the communication time equal to the computation time, and look for a condition which makes this equality impossible.
Since
As this is a quadratic in
or by simplifying
which gives us
so our total batch size divided by the total number of chips cannot drop below
as we had derived above.