斯坦福大学 | CS336 | 从零开始构建语言模型 | Spring 2025 | 笔记 | Assignment 1: Transformer LM Architecture Implement

斯坦福大学 | CS336 | 从零开始构建语言模型 | Spring 2025 | 笔记 | Assignment 1: Transformer LM Architecture Implement

目录

前言

在上篇文章 斯坦福大学 | CS336 | 从零开始构建语言模型 | Spring 2025 | 笔记 | Assignment 1: Transformer Language Model Architecture 中,我们已经了解了 Transformer Language Model Architecture 的作业要求,下面我们就一起来看看这些作业该如何实现,本篇文章记录 CS336 作业 Assignment 1: Basics 中的 Transformer Language Model Architecture 实现,仅供自己参考😄

Note:博主并未遵循 from-scratch 的宗旨,所有代码几乎均由 ChatGPT 完成

Assignment 1
https://github.com/stanford-cs336/assignment1-basics

referencehttps://chatgpt.com/

referencehttps://github.com/donglinkang2021/cs336-assignment1-basics

referencehttps://github.com/Louisym/Stanford-CS336-spring25

1. Problem (linear): Implementing the linear module (1 point)

Deliverable:请实现一个 Linear 类,该类继承自 torch.nn.Module,并执行线性变换,你的实现应当遵循 PyTorch 内置 nn.Linear 模块的接口设计,但不包含偏置(bias)参数或偏置项,我们推荐使用如下接口:

def__init__(self, in_features, out_features, device=None, dtype=None)

用于构造一个线性变换模块,该函数应当接收以下参数:

  • in_features: int:输入的最终维度
  • out_features: int:输出的最终维度
  • device: torch.device | None = None:用于存放参数的设备
  • dtype: torch.dtype | None = None:参数的数据类型
defforward(self, x: torch.Tensor)-> torch.Tensor 

将线性变换应用到输入张量上

实现时请务必注意以下几点

  • 继承自 nn.Module
  • 调用父类构造函数(super().__init__()
  • 构造并存储参数矩阵为 W W W(而不是 W ⊤ W^{\top} W⊤),这是出于内存排列顺序的考虑,该参数应存放在一个 nn.Parameter
  • 不要 使用 nn.Linearnn.functional.linear

关于参数初始化,请使用上文给出的初始化设置,并结合 torch.nn.init.trunc_normal_ 来初始化权重参数

为了测试你的 Linear 模块,请先实现测试适配器[adapters.run_linear],该适配器应当将给定的权重加载到你的 Linear 模块中,你可以使用 Module.load_state_dict 来完成这一操作

随后,运行以下命令进行测试:

uv run pytest -k test_linear 

代码实现如下:

import math import torch from torch import nn classLinear(nn.Module):""" A bias-free Linear layer that matches torch.nn.Linear's interface (except it has no bias) Stores weight as W with shape (out_features, in_features) """def__init__(self, in_features:int, out_features:int, device: torch.device |None=None, dtype: torch.dtype |None=None):super().__init__() self.in_features =int(in_features) self.out_features =int(out_features)# Store W (NOT W^T): shape (d_out, d_in) self.weight = nn.Parameter( torch.empty((self.out_features, self.in_features), device=device, dtype=dtype))# Init: N(0, 2/(d_in+d_out)), truncated to [-3σ,3σ] sigma = math.sqrt(2.0/(self.in_features + self.out_features)) nn.init.trunc_normal_(self.weight, mean=0.0, std=sigma, a=-3.0* sigma, b=3.0* sigma)defforward(self, x: torch.Tensor)-> torch.Tensor:# x: (..., d_in) -> (..., d_out)# Because weight is (d_out, d_in), we need x @ weight.T.# Use einsum to make the intended dims explicit.return torch.einsum("... i, o i -> ... o", x, self.weight)

测试适配器[adapters.run_linear]的实现如下:

defrun_linear( d_in:int, d_out:int, weights: Float[Tensor," d_out d_in"], in_features: Float[Tensor," ... d_in"],)-> Float[Tensor," ... d_out"]:""" Given the weights of a Linear layer, compute the transformation of a batched input. Args: in_dim (int): The size of the input dimension out_dim (int): The size of the output dimension weights (Float[Tensor, "d_out d_in"]): The linear weights to use in_features (Float[Tensor, "... d_in"]): The output tensor to apply the function to Returns: Float[Tensor, "... d_out"]: The transformed output of your linear module. """from cs336_basics.modules import Linear layer = Linear(d_in, d_out, device=in_features.device, dtype=in_features.dtype) layer.load_state_dict({"weight": weights.to(device=in_features.device, dtype=in_features.dtype)})return layer(in_features)

执行 uv run pytest -k test_linear 后输出如下:

在这里插入图片描述

2. Problem (embedding): Implement the embedding module (1 point)

Deliverable:请实现一个 Embedding 类,该类继承自 torch.nn.Module,并执行嵌入查找(embedding lookup),你的实现应当遵循 PyTorch 内置 nn.Embedding 模块的接口设计,我们推荐使用如下接口:

def__init__(self, num_embeddings, embedding_dim, device=None, dtype=None)

用于构造一个嵌入模块,该函数应当接收以下参数:

  • num_embeddings:词表大小(vocabulary size)
  • embedding_dim : int:嵌入向量的维度,即 d_model
  • device: torch.device | None = None:用于存放参数的设备
  • dtype: torch.dtype | None = None:参数的数据类型
defforward(self, token_ids: torch.Tensor)-> torch.Tensor 

根据给定的 token ID,查找并返回对应的嵌入向量

实现时请务必注意以下几点

  • 继承自 nn.Module
  • 调用父类构造函数(super().__init__()
  • 将嵌入矩阵初始化并存储为一个 nn.Parameter
  • 嵌入矩阵的最后一个维度必须是d_model
  • 不要 使用 nn.Embeddingnn.functional.embedding

关于参数初始化,同样请使用前文给出的初始化设置,并使用 torch.nn.init.trunc_normal_ 来初始化嵌入权重

为了测试你的实现,请先实现测试适配器[adapters.run_embedding],随后,运行以下命令进行测试:

uv run pytest -k test_embedding 

代码实现如下:

import torch from torch import nn classEmbedding(nn.Module):""" A learnable embedding lookup table, equivalent to torch.nn.Embedding This module maps integer token IDs to continuous vectors of fixed dimensionality (embedding_dim). The embedding matrix is stored as a learnable parameter of shape (num_embeddings, embedding_dim). """def__init__(self, num_embeddings:int, embedding_dim:int, device: torch.device |None=None, dtype: torch.dtype |None=None):super().__init__() self.num_embeddings =int(num_embeddings) self.embedding_dim =int(embedding_dim)# Weight matrix with shape (vocab_size, d_model) self.weight = nn.Parameter( torch.empty((self.num_embeddings, self.embedding_dim), device=device, dtype=dtype))# Init: N(0, 1), truncated to [-3, 3] nn.init.trunc_normal_(self.weight, mean=0.0, std=1.0, a=-3.0, b=3.0)defforward(self, token_ids: torch.Tensor)-> torch.Tensor:# token_ids: (...) int -> output: (..., d_model)return self.weight[token_ids]

测试适配器[adapters.run_embedding]的实现如下:

defrun_embedding( vocab_size:int, d_model:int, weights: Float[Tensor," vocab_size d_model"], token_ids: Int[Tensor," ..."],)-> Float[Tensor," ... d_model"]:""" Given the weights of an Embedding layer, get the embeddings for a batch of token ids. Args: vocab_size (int): The number of embeddings in the vocabulary d_model (int): The size of the embedding dimension weights (Float[Tensor, "vocab_size d_model"]): The embedding vectors to fetch from token_ids (Int[Tensor, "..."]): The set of token ids to fetch from the Embedding layer Returns: Float[Tensor, "... d_model"]: Batch of embeddings returned by your Embedding layer. """from cs336_basics.modules import Embedding layer = Embedding(vocab_size, d_model, device=weights.device, dtype=weights.dtype) layer.load_state_dict({"weight": weights.to(device=weights.device, dtype=weights.dtype)})return layer(token_ids)

执行 uv run pytest -k test_embedding 后输出如下:

在这里插入图片描述

3. Problem (rmsnorm): Root Mean Square Layer Normalization (1 point)

Deliverable:请将 RMSNorm 实现为一个 torch.nn.Module,我们推荐使用如下接口:

def__init__(self, d_model:int, eps:float=1e-5, device=None, dtype=None)

用于构造 RMSNorm 模块,该函数应当接收以下参数:

  • d_model: int:模型的隐藏维度
  • eps: float = 1e-5:用于数值稳定性的 ε \varepsilon ε 参数
  • device: torch.device | None = None:用于存放参数的设备
  • dtype: torch.dtype | None = None:参数的数据类型
defforward(self, x: torch.Tensor)-> torch.Tensor 

对形状为 (batch_size, sequence_length, d_model) 的输入张量进行处理,并返回 形状相同 的输出张量

Note:请记得在执行归一化之前,先将输入提升为torch.float32,并在计算完成后 再转换回原始数据类型,如前文所述。

为了测试你的实现,请先实现测试适配器[adapters.run_rmsnorm],随后运行以下命令进行测试:

uv run pytest -k test_rmsnorm 

代码实现如下:

import torch from torch import nn classRMSNorm(nn.Module):""" Root Mean Square Layer Normalization (RMSNorm). For an input vector a in R^{d_model}: RMS(a) = sqrt(mean(a^2) + eps) RMSNorm(a) = (a / RMS(a)) * g """def__init__(self, d_model:int, eps:float=1e-5, device: torch.device |None=None, dtype: torch.dtype |None=None):super().__init__() self.d_model =int(d_model) self.eps =float(eps)# Learnable gain parameter (g), shape (d_model,) self.weight = nn.Parameter(torch.ones((self.d_model,), device=device, dtype=dtype))defforward(self, x: torch.Tensor)-> torch.Tensor: in_dtype = x.dtype x_fp32 = x.to(torch.float32)# Compute RMS over the last dimension: sqrt(mean(x^2) + eps) rms = torch.sqrt(x_fp32.pow(2).mean(dim=-1, keepdim=True)+ self.eps)# Normalize and apply gain; do match in float32 then cast back y =(x_fp32 / rms)* self.weight.to(torch.float32)return y.to(in_dtype)

测试适配器[adapters.run_rmsnorm]的实现如下:

defrun_rmsnorm( d_model:int, eps:float, weights: Float[Tensor," d_model"], in_features: Float[Tensor," ... d_model"],)-> Float[Tensor," ... d_model"]:"""Given the weights of a RMSNorm affine transform, return the output of running RMSNorm on the input features. Args: d_model (int): The dimensionality of the RMSNorm input. eps: (float): A value added to the denominator for numerical stability. weights (Float[Tensor, "d_model"]): RMSNorm weights. in_features (Float[Tensor, "... d_model"]): Input features to run RMSNorm on. Can have arbitrary leading dimensions. Returns: Float[Tensor,"... d_model"]: Tensor of with the same shape as `in_features` with the output of running RMSNorm of the `in_features`. """from cs336_basics.modules import RMSNorm layer = RMSNorm(d_model=d_model, eps=eps, device=in_features.device, dtype=weights.dtype) layer.load_state_dict({"weight": weights.to(device=in_features.device, dtype=weights.dtype)})return layer(in_features)

执行 uv run pytest -k test_rmsnorm 后输出如下:

在这里插入图片描述

4. Problem (positionwise_feedforward): Implement the position-wise feed-forward network (2 points)

Deliverable:请实现一个 SwiGLU 前馈网络,该网络由 SiLU 激活函数GLU(门控线性单元) 组成

Note:在这一具体实现中,为了提高数值稳定性,你可以在代码中直接使用 torch.sigmoid

在实现时,你应当将前馈网络的中间维度 d ff d_\text{ff} dff​ 设为大约 d ff ≈ 8 3 × d model d_{\text{ff}} \approx \frac{8}{3} \times d_{\text{model}} dff​≈38​×dmodel​,同时需要确保 前馈网络内部层的维度是 64 的整数倍,以便更好地利用硬件性能

为了使用我们提供的测试用例验证你的实现,你需要先实现测试适配器[adapters.run_swiglu],随后运行以下命令进行测试:

uv run pytest -k test_swiglu 

代码实现如下:

import math import torch from torch import nn defround_up_to_multiple(x:int, multiple:int)->int:"""Round x up to the nearest positive multiple of `multiple`."""if multiple <=0:raise ValueError("multiple must be a positive integer")returnint(((x + multiple -1)// multiple)* multiple)defdefault_d_ff(d_model:int, multiple_of:int=64)->int:""" Compute the recommended SwiGLU hidden size. We use d_ff ~= (8/3) * d_model and then round up to a hardware-friendly multiple (typically 64). """ raw =int(math.ceil((8.0* d_model)/3.0))return round_up_to_multiple(raw, multiple_of)classSwiGLU(nn.Module):""" Position-wise feed-forward network using the SwiGLU nonlinearity. The transformation is: FFN(x) = W2( SiLU(W1 x) ⊙ (W3 x) ) where SiLU(z) = z * sigmoid(z), and ⊙ is elementwise multiplication. Shapes: input: (..., d_model) W1, W3: (d_ff, d_model) implemented as Linear(d_model -> d_ff) W2: (d_model, d_ff) implemented as Linear(d_ff -> d_model) output: (..., d_model) """def__init__( self, d_model:int, d_ff:int|None=None,*, multiple_of:int=64, device: torch.device |None=None, dtype: torch.dtype |None=None):super().__init__() self.d_model =int(d_model) self.d_ff =int(d_ff)if d_ff isnotNoneelse default_d_ff(self.d_model, multiple_of)# Two up-prpjections and one down-projection (no bias) self.w1 = Linear(self.d_model, self.d_ff, device=device, dtype=dtype) self.w2 = Linear(self.d_ff, self.d_model, device=device, dtype=dtype) self.w3 = Linear(self.d_model, self.d_ff, device=device, dtype=dtype)@staticmethoddefsilu(x: torch.Tensor)-> torch.Tensor:return x * torch.sigmoid(x)defforward(self, x: torch.Tensor)-> torch.Tensor: a = self.w1(x) b = self.w3(x) gated = self.silu(a)* b return self.w2(gated)

测试适配器[adapters.run_swiglu]的实现如下:

defrun_swiglu( d_model:int, d_ff:int, w1_weight: Float[Tensor," d_ff d_model"], w2_weight: Float[Tensor," d_model d_ff"], w3_weight: Float[Tensor," d_ff d_model"], in_features: Float[Tensor," ... d_model"],)-> Float[Tensor," ... d_model"]:"""Given the weights of a SwiGLU network, return the output of your implementation with these weights. Args: d_model (int): Dimensionality of the feedforward input and output. d_ff (int): Dimensionality of the up-project happening internally to your swiglu. w1_weight (Float[Tensor, "d_ff d_model"]): Stored weights for W1 w2_weight (Float[Tensor, "d_model d_ff"]): Stored weights for W2 w3_weight (Float[Tensor, "d_ff d_model"]): Stored weights for W3 in_features (Float[Tensor, "... d_model"]): Input embeddings to the feed-forward layer. Returns: Float[Tensor, "... d_model"]: Output embeddings of the same shape as the input embeddings. """# Example:# If your state dict keys match, you can use `load_state_dict()`# swiglu.load_state_dict(weights)# You can also manually assign the weights# swiglu.w1.weight.data = w1_weight# swiglu.w2.weight.data = w2_weight# swiglu.w3.weight.data = w3_weightfrom cs336_basics.modules import SwiGLU swiglu = SwiGLU(d_model=d_model, d_ff=d_ff, device=in_features.device, dtype=w1_weight.dtype) swiglu.load_state_dict({"w1.weight": w1_weight.to(device=in_features.device, dtype=w1_weight.dtype),"w2.weight": w2_weight.to(device=in_features.device, dtype=w2_weight.dtype),"w3.weight": w3_weight.to(device=in_features.device, dtype=w3_weight.dtype)})return swiglu(in_features)

执行 uv run pytest -k test_swiglu 后输出如下:

在这里插入图片描述

5. Problem (rope): Implement RoPE (2 points)

Deliverable:请实现一个 RotaryPositionalEmbedding 类,用于将 RoPE(旋转位置编码) 应用于输入张量,推荐使用如下接口:

def__init__(self, theta:float, d_k:int, max_seq_len:int, device=None)

用于构造 RoPE 模块,并在需要时创建缓冲区(buffers),该构造函数应当接收以下参数:

  • theta: float:RoPE 中使用的常数 Θ \Theta Θ
  • d_k: int:查询(query)与键(key)向量的维度
  • max_seq_len: int:可能输入的最大序列长度
  • device: torch.device | None = None:用于存放缓冲区的设备
defforward(self, x: torch.Tensor, token_positions: torch.Tensor)-> torch.Tensor 

对形状为 (..., seq_len, d_k) 的输入张量进行处理,并返回 形状相同 的输出张量

请注意以下几点:

  • 你的实现应当 支持任意数量的批处理维度,即 xseq_len 之前可以有任意多个 batch 维度
  • 可以假设 token_positions 是一个形状为 (..., seq_len) 的张量,用于指定序列维度上各 token 的位置
  • 你应当使用 token_positions,在序列维度上 切片(slice) 你可能已经预计算好的 cossin 张量

为了测试你的实现,请完成适配器[adapters.run_rope],并确保通过以下测试命令:

uv run pytest -k test_rope 

代码实现如下:

import torch from torch import nn classRoPE(nn.Module):""" Rotary Positional Embeddings (RoPE). Applies a position-dependent rotation to the last dimension (d_k) of an input tensor. The rotation is applied pairwise on (x[..., 0], x[..., 1], x[..., 2], x[..., 3]), ... This module has no learnable parameters. It can precompute and cache cos/sin table. """def__init__(self, theta:float, d_k:int, max_seq_len:int, device: torch.device |None=None):super().__init__()if d_k %2!=0:raise ValueError(f"d_k must be even for RoPE, got d_k={d_k}")if max_seq_len <=0:raise ValueError(f"max_seq_len must be positive, got {max_seq_len}") self.theta =float(theta) self.d_k =int(d_k) self.max_seq_len =int(max_seq_len)# Precompute inverse frequencies for even indices:# inv_freq[j] = theta^(-2j/d_k), where j indexes pairs (0, 1, ..., d_k/2 - 1). pair_idx = torch.arange(0, self.d_k,2, device=device, dtype=torch.float32) inv_freq = self.theta **(-pair_idx / self.d_k)# Positions [0, 1, ..., max_seq_len-1] positions = torch.arange(self.max_seq_len, device=device, dtype=torch.float32)# Angles: (max_seq_len, d_k/2) angles = positions[:,None]* inv_freq[None,:] cos = torch.cos(angles)# (max_seq_len, d_k/2) sin = torch.sin(angles)# (max_seq_len, d_k/2)# Cache as non-persistent buffers (not saved in state_dict) self.register_buffer("cos", cos, persistent=False) self.register_buffer("sin", sin, persistent=False)defforward(self, x: torch.Tensor, token_positions: torch.Tensor)-> torch.Tensor:""" Args: x: Tensor of shape (..., seq_len, d_k) token_positions: Tensor of shape (..., seq_len) with integer positions Returns: Tensor of shape (..., seq_len, d_k) after applying RoPE. """if x.size(-1)!= self.d_k:raise ValueError(f"Expected x.size(-1)==d_k=={self.d_k}, got {x.size(-1)}")# token_positions is used to slice cached cos/sin along the sequence of dimension.# Shapes after indexing: (..., seq_len, d_k/2) pos = token_positions.to(device=x.device) cos = self.cos.index_select(0, pos.reshape(-1)).reshape(*pos.shape,-1) sin = self.sin.index_select(0, pos.reshape(-1)).reshape(*pos.shape,-1)# Promote to float32 for numerical stability, then cast back x_fp32 = x.to(torch.float32) cos = cos.to(torch.float32) sin = sin.to(torch.float32) x_even = x_fp32[...,::2]# (..., seq_len, d_k/2) x_odd = x_fp32[...,1::2]# (..., seq_len, d_k/2)# make cos/sin broadcastable for inputs like (B, H, S, d_k)while cos.dim()< x_even.dim(): cos = cos.unsqueeze(cos.dim()-2) sin = sin.unsqueeze(sin.dim()-2)# Apply 2D rotation for each pair. out_even = x_even * cos - x_odd * sin out_odd = x_even * sin + x_odd * cos # Interleave even/odd back to (..., seq_len, d_k) out = torch.stack((out_even, out_odd), dim=-1).flatten(-2)return out.to(dtype=x.dtype)

测试适配器[adapters.run_rope]的实现如下:

defrun_rope( d_k:int, theta:float, max_seq_len:int, in_query_or_key: Float[Tensor," ... sequence_length d_k"], token_positions: Int[Tensor," ... sequence_length"],)-> Float[Tensor," ... sequence_length d_k"]:""" Run RoPE for a given input tensor. Args: d_k (int): Embedding dimension size for the query or key tensor. theta (float): RoPE parameter. max_seq_len (int): Maximum sequence length to pre-cache if your implementation does that. in_query_or_key (Float[Tensor, "... sequence_length d_k"]): Input tensor to run RoPE on. token_positions (Int[Tensor, "... sequence_length"]): Tensor of shape (batch_size, sequence_length) with the token positions Returns: Float[Tensor, " ... sequence_length d_k"]: Tensor with RoPEd input. """from cs336_basics.modules import RoPE rope = RoPE(theta=theta, d_k=d_k, max_seq_len=max_seq_len, device=in_query_or_key.device)return rope(in_query_or_key, token_positions)

执行 uv run pytest -k test_rope 后输出如下:

在这里插入图片描述

6. Problem (softmax): Implement softmax (1 point)

Deliverable:编写一个函数,用于对一个张量应用 softmax 操作,你的函数应当接受两个参数:一个输入张量(tensor)和一个维度索引 i i i,并在输入张量的第 i i i 个维度上应用 softmax 运算。

输出张量应当与输入张量具有 相同的性质,但其第 i i i 个维度上的值将构成一个 归一化的概率分布。为避免数值稳定性问题,请使用如下技巧:在第 i i i 个维度上,对该维度的所有元素减去该维度上的最大值,再计算 softmax

为了测试你的实现,请完成适配器[adapters.run_softmax],并确保通过以下测试命令:

uv run pytest -k test_softmax_matches_pytorch 

代码实现如下:

defsoftmax(x: torch.Tensor, dim:int)-> torch.Tensor:""" Numerically stable softmax over a given dimension. This implementation subtracts the maximum value along `dim` before exponentiation to improve numerical stability. Args: x (torch.Tensor): Input tensor. dim (int): Dimension over which to apply softmax. Returns: torch.Tensor: Softmax output with the same shape/dtype/device as `x`. """# Subtract max for numerical stability (keepdim for correct broadcasting) x_max = torch.amax(x, dim=dim, keepdim=True) z = x - x_max exp_z = torch.exp(z) sum_exp = torch.sum(exp_z, dim=dim, keepdim=True)return exp_z / sum_exp 

测试适配器[adapters.run_softmax]的实现如下:

defrun_softmax(in_features: Float[Tensor," ..."], dim:int)-> Float[Tensor," ..."]:""" Given a tensor of inputs, return the output of softmaxing the given `dim` of the input. Args: in_features (Float[Tensor, "..."]): Input features to softmax. Shape is arbitrary. dim (int): Dimension of the `in_features` to apply softmax to. Returns: Float[Tensor, "..."]: Tensor of with the same shape as `in_features` with the output of softmax normalizing the specified `dim`. """from cs336_basics.modules import softmax return softmax(in_features, dim)

执行 uv run pytest -k test_softmax 后输出如下:

在这里插入图片描述

7. Problem (scaled_dot_product_attention): Implement scaled dot-product attention (5 points)

Deliverable:实现缩放点积注意力(scaled dot-product attention)函数,你的实现需要支持如下输入形式:

  • Query 和 Key 的形状为 ( batch_size , … , seq_len , d_k ) \left(\text{batch\_size}, \ldots, \text{seq\_len}, \text{d\_k}\right) (batch_size,…,seq_len,d_k)
  • Value 的形状为: ( batch_size , … , seq_len , d_v ) \left(\text{batch\_size}, \ldots, \text{seq\_len}, \text{d\_v}\right) (batch_size,…,seq_len,d_v)

其中,“…” 表示任意数量的其他类似 batch 的维度,函数应当返回形状为 ( batch_size , … , d_v ) \left(\text{batch\_size}, \ldots, \text{d\_v}\right) (batch_size,…,d_v) 的输出张量,关于 batch-like 维度的详细讨论可参考 第 1.3 小节

你的实现还需要支持一个 可选的、由用户提供的布尔掩码(mask),其形状为 ( seq_len , seq_len ) \left(\text{seq\_len},\text{seq\_len}\right) (seq_len,seq_len),对应掩码中值为 True 的位置,其对应的注意力概率在该维度上应当 共同归一化为 1,而掩码值为 False 的位置,其注意力概率应当为 0

为了验证你的实现是否正确,你需要在[adapters.run_scaled_dot_product_attention]中完成测试适配器,随后运行:

uv run pytest -k test_scaled_dot_product_attention 

以测试你对 三阶输入张量 的实现

运行:

uv run pytest -k test_4d_scaled_dot_product_attention 

以测试你对 四阶输入张量 的实现

代码实现如下:

import math import torch defscaled_dot_product_attention( query: torch.Tensor, key: torch.Tensor, value: torch.tensor, mask: torch.Tensor |None=None)-> torch.Tensor:""" Scaled dot-product attention. Args: query: Tensor of shape (..., seq_len, d_k) key: Tensor of shape (..., seq_len, d_k) value: Tensor of shape (..., seq_len, d_v) mask: Optional bool tensor of shape (seq_len, seq_len), where True means the position is allowed and False means it is masked out. Returns: Tensor of shape (..., seq_len, d_v) """if query.dim()<2or key.dim()<2or value.dim()<2:raise ValueError("query/key/value must have shape (..., seq_len, d_*)")if query.shape[:-2]!= key.shape[:-2]or query.shape[:-2]!= value.shape[:-2]:raise ValueError("batch dimensions of query, key, value must match") d_k = query.shape[-1]if d_k != key.shape[-1]:raise ValueError("query and key must have the same d_k")# Compute attention logits in float32 for stability q = query.to(torch.float32) k = key.to(torch.float32) v = value.to(torch.float32) scale =1.0/ math.sqrt(d_k)# logits: (..., seq_len, seq_len) logits = torch.einsum("... s d, ... t d -> ... s t", q, k)* scale if mask isnotNone:if mask.dtype != torch.bool:raise TypeError("mask must be a boolean tensor")# Broadcast mask to logits shape: (..., seq_len, seq_len)# True = keep, False = mask out. neg_inf = torch.finfo(torch.float32).min logits = torch.where(mask.to(device=logits.device), logits, neg_inf)# probs: (..., seq_len, seq_len) probs = softmax(logits, dim=-1)if mask isnotNone:# Ensure exact zeros on masked positions (softmax(-inf) should already be 0,# but this makes behavior robust under extreme values). probs = probs * mask.to(device=probs.device, dtype=probs.dtype)# out: (..., seq_len, d_v) out = torch.einsum("... s t, ... t d -> ... s d", probs, v)# Cast back to the original value dtypereturn out.to(dtype=value.dtype)

测试适配器[adapters.run_scaled_dot_product_attention]的实现如下:

defrun_scaled_dot_product_attention( Q: Float[Tensor," ... queries d_k"], K: Float[Tensor," ... keys d_k"], V: Float[Tensor," ... values d_v"], mask: Bool[Tensor," ... queries keys"]|None=None,)-> Float[Tensor," ... queries d_v"]:""" Given key (K), query (Q), and value (V) tensors, return the output of your scaled dot product attention implementation. Args: Q (Float[Tensor, " ... queries d_k"]): Query tensor K (Float[Tensor, " ... keys d_k"]): Key tensor V (Float[Tensor, " ... values d_v"]): Values tensor mask (Bool[Tensor, " ... queries keys"] | None): Mask tensor Returns: Float[Tensor, " ... queries d_v"]: Output of SDPA """from cs336_basics.modules import scaled_dot_product_attention return scaled_dot_product_attention(query=Q, key=K, value=V, mask=mask)

执行 uv run pytest -k test_scaled_dot_product_attentionuv run pytest -k test_4d_scaled_dot_product_attention 的输出如下所示:

在这里插入图片描述

8. Problem (multihead_self_attention): Implement causal multi-head self-attention (5 points)

Deliverable:实现一个 因果多头自注意力(causal multi-head self-attention) 模块,形式为一个 torch.nn.Module,你的实现至少应当接收以下参数:

  • d_model: int:Transformer 块输入的特征维度
  • num_heads: int:多头自注意力中使用的注意力头数量

按照 [Vaswani+ 2017] 的设定,令 d k = d v = d model / h d_k = d_v = d_{\text{model}}/{h} dk​=dv​=dmodel​/h,其中 h h h 为注意力头的数量

为了使用我们提供的测试来验证你的实现,你需要在[adapters.run_multihead_self_attention]中实现对应的测试适配器,随后运行:

uv run pytest -k test_multihead_self_attention 

来测试你的实现是否正确

代码实现如下:

import math import torch from torch import nn classCausalMultiHeadSelfAttention(nn.Module):""" Causal multi-head self-attention (no RoPE). This module computes: Q = W_Q x, K = W_K x, V = W_V x heads = SDPA(Q_heads, K_heads, V_heads, causal_mask) out = W_O concat(heads) Shapes: x: (..., seq_len, d_model) QKV: (..., seq_len, d_model) heads view: (..., num_heads, seq_len, head_dim) output: (..., seq_len, d_model) """def__init__(self, d_model:int, num_heads:int, device: torch.device |None=None, dtype: torch.dtype |None=None):super().__init__() self.d_model =int(d_model) self.num_heads =int(num_heads)if self.d_model % self.num_heads !=0:raise ValueError("d_model must be divisible by num_heads") self.head_dim = self.d_model // self.num_heads # d_k = d_v = d_model / h# Separate projections (one matmul each). Combining into one is an optional optimization self.q_proj = Linear(self.d_model, self.d_model, device=device, dtype=dtype) self.k_proj = Linear(self.d_model, self.d_model, device=device, dtype=dtype) self.v_proj = Linear(self.d_model, self.d_model, device=device, dtype=dtype) self.o_proj = Linear(self.d_model, self.d_model, device=device, dtype=dtype)@staticmethoddef_causal_mask(seq_len:int, device: torch.device)-> torch.Tensor:""" Build a (seq_len, seq_len) causal mask where True means "allowed" """return torch.tril(torch.ones((seq_len, seq_len), device=device, dtype=torch.bool))defforward(self, x: torch.Tensor)-> torch.Tensor:""" Args: x: Tensor of shape (..., seq_len, d_model) Returns: Tensor of shape (..., seq_len, d_model) """if x.size(-1)!= self.d_model:raise ValueError(f"Expected last dim {self.d_model}, got {x.size(-1)}") seq_len = x.size(-2) device = x.device # Project to Q, K, V: (..., seq_len, d_model) q = self.q_proj(x) k = self.k_proj(x) v = self.v_proj(x)# Reshape into heads: (..., seq_len, num_heads, head_dim)# Then move heads into a batch-like dimension: (..., num_heads, seq_len, head_dim) new_shape = q.shape[:-1]+(self.num_heads, self.head_dim) q = q.view(new_shape).transpose(-3,-2) k = k.view(new_shape).transpose(-3,-2) v = v.view(new_shape).transpose(-3,-2)# Causal mask shared across heads and batches mask = self._causal_mask(seq_len, device=device)# SDPA: (..., num_heads, seq_len, head_dim) out = scaled_dot_product_attention(q, k, v, mask=mask)# Merge heads: (..., seq_len, d_model) out = out.transpose(-3,-2).contiguous().view(x.shape[:-1]+(self.d_model,))# Output projection: (..., seq_len, d_model)return self.o_proj(out)classCausalMultiHeadSelfAttentionWithRoPE(nn.Module):""" Causal multi-head self-attention with RoPE applied to Q and K (not V). This version uses a fused QKV projection: qkv = W_qkv x q, k, v = split(qkv) """def__init__( self, d_model:int, num_heads:int, theta:float, max_seq_len:int, device: torch.device |None=None, dtype: torch.dtype |None=None):super().__init__() self.d_model =int(d_model) self.num_heads =int(num_heads)if self.d_model % self.num_heads !=0:raise ValueError("d_model must be divisible by num_heads") self.head_dim = self.d_model // self.num_heads # Separate projections (matches reference state_dict keys). self.q_proj = Linear(self.d_model, self.d_model, device=device, dtype=dtype) self.k_proj = Linear(self.d_model, self.d_model, device=device, dtype=dtype) self.v_proj = Linear(self.d_model, self.d_model, device=device, dtype=dtype) self.output_proj = Linear(self.d_model, self.d_model, device=device, dtype=dtype)# RoPE operates on per-head dimension self.rope = RoPE(theta=theta, d_k=self.head_dim, max_seq_len=max_seq_len, device=device)@staticmethoddef_causal_mask(seq_len:int, device: torch.device)-> torch.Tensor:"""Build a (seq_len, seq_len) causal mask where True means 'allowed'."""return torch.tril(torch.ones((seq_len, seq_len), device=device, dtype=torch.bool))defforward(self, x: torch.Tensor, token_positions: torch.Tensor)-> torch.Tensor:""" Args: x: Tensor of shape (..., seq_len, d_model) token_positions: Tensor of shape (..., seq_len) Returns: Tensor of shape (..., seq_len, d_model) """if x.size(-1)!= self.d_model:raise ValueError(f"Expected last dim {self.d_model}, got {x.size(-1)}") seq_len = x.size(-2) device = x.device # Project to Q, K, V: (..., seq_len, d_model) q = self.q_proj(x) k = self.k_proj(x) v = self.v_proj(x)# Reshape into heads: (..., seq_len, num_heads, head_dim)# Then transpose to (..., num_heads, seq_len, head_dim) new_shape = q.shape[:-1]+(self.num_heads, self.head_dim) q = q.view(new_shape).transpose(-3,-2) k = k.view(new_shape).transpose(-3,-2) v = v.view(new_shape).transpose(-3,-2)# Apply RoPE to Q and K for each head (heads are treated as batch-like dims) q = self.rope(q, token_positions) k = self.rope(k, token_positions)# Causal mask shared across heads and batches mask = self._causal_mask(seq_len, device=device)# Attention: (..., num_heads, seq_len, head_dim) out = scaled_dot_product_attention(q, k, v, mask=mask)# Merge heads back: (..., seq_len, d_model) out = out.transpose(-3,-2).contiguous().view(x.shape[:-1]+(self.d_model,))return self.output_proj(out)

测试适配器[adapters.run_multihead_self_attention]的实现如下:

defrun_multihead_self_attention( d_model:int, num_heads:int, q_proj_weight: Float[Tensor," d_k d_in"], k_proj_weight: Float[Tensor," d_k d_in"], v_proj_weight: Float[Tensor," d_v d_in"], o_proj_weight: Float[Tensor," d_model d_v"], in_features: Float[Tensor," ... sequence_length d_in"],)-> Float[Tensor," ... sequence_length d_out"]:""" Given the key, query, and value projection weights of a naive unbatched implementation of multi-head attention, return the output of an optimized batched implementation. This implementation should handle the key, query, and value projections for all heads in a single matrix multiply. This function should not use RoPE. See section 3.2.2 of Vaswani et al., 2017. Args: d_model (int): Dimensionality of the feedforward input and output. num_heads (int): Number of heads to use in multi-headed attention. max_seq_len (int): Maximum sequence length to pre-cache if your implementation does that. q_proj_weight (Float[Tensor, "d_k d_in"]): Weights for the Q projection k_proj_weight (Float[Tensor, "d_k d_in"]): Weights for the K projection v_proj_weight (Float[Tensor, "d_k d_in"]): Weights for the V projection o_proj_weight (Float[Tensor, "d_model d_v"]): Weights for the output projection in_features (Float[Tensor, "... sequence_length d_in"]): Tensor to run your implementation on. Returns: Float[Tensor, " ... sequence_length d_out"]: Tensor with the output of running your optimized, batched multi-headed attention implementation with the given QKV projection weights and input features. """from cs336_basics.modules import CausalMultiHeadSelfAttention mha = CausalMultiHeadSelfAttention(d_model=d_model, num_heads=num_heads, device=in_features.device, dtype=q_proj_weight.dtype) mha.load_state_dict({"q_proj.weight": q_proj_weight.to(device=in_features.device, dtype=q_proj_weight.dtype),"k_proj.weight": k_proj_weight.to(device=in_features.device, dtype=k_proj_weight.dtype),"v_proj.weight": v_proj_weight.to(device=in_features.device, dtype=v_proj_weight.dtype),"o_proj.weight": o_proj_weight.to(device=in_features.device, dtype=o_proj_weight.dtype),})return mha(in_features)defrun_multihead_self_attention_with_rope( d_model:int, num_heads:int, max_seq_len:int, theta:float, q_proj_weight: Float[Tensor," d_k d_in"], k_proj_weight: Float[Tensor," d_k d_in"], v_proj_weight: Float[Tensor," d_v d_in"], o_proj_weight: Float[Tensor," d_model d_v"], in_features: Float[Tensor," ... sequence_length d_in"], token_positions: Int[Tensor," ... sequence_length"]|None=None,)-> Float[Tensor," ... sequence_length d_out"]:""" Given the key, query, and value projection weights of a naive unbatched implementation of multi-head attention, return the output of an optimized batched implementation. This implementation should handle the key, query, and value projections for all heads in a single matrix multiply. This version of MHA should include RoPE. In this case, the RoPE embedding dimension must be the head embedding dimension (d_model // num_heads). See section 3.2.2 of Vaswani et al., 2017. Args: d_model (int): Dimensionality of the feedforward input and output. num_heads (int): Number of heads to use in multi-headed attention. max_seq_len (int): Maximum sequence length to pre-cache if your implementation does that. theta (float): RoPE parameter. q_proj_weight (Float[Tensor, "d_k d_in"]): Weights for the Q projection k_proj_weight (Float[Tensor, "d_k d_in"]): Weights for the K projection v_proj_weight (Float[Tensor, "d_k d_in"]): Weights for the V projection o_proj_weight (Float[Tensor, "d_model d_v"]): Weights for the output projection in_features (Float[Tensor, "... sequence_length d_in"]): Tensor to run your implementation on. token_positions (Int[Tensor, " ... sequence_length"] | None): Optional tensor with the positions of the tokens Returns: Float[Tensor, " ... sequence_length d_out"]: Tensor with the output of running your optimized, batched multi-headed attention implementation with the given QKV projection weights and input features. """from cs336_basics.modules import CausalMultiHeadSelfAttentionWithRoPE if token_positions isNone:# Default positions: 0..seq_len-1, broadcast to batch-like dims seq_len = in_features.size(-2) token_positions = torch.arange(seq_len, device=in_features.device, dtype=torch.long) token_positions = token_positions.view(*([1]*(in_features.dim()-2)), seq_len) mha = CausalMultiHeadSelfAttentionWithRoPE( d_model=d_model, num_heads=num_heads, theta=theta, max_seq_len=max_seq_len, device=in_features.device, dtype=q_proj_weight.dtype ) mha.load_state_dict({"q_proj.weight": q_proj_weight.to(device=in_features.device, dtype=q_proj_weight.dtype),"k_proj.weight": k_proj_weight.to(device=in_features.device, dtype=k_proj_weight.dtype),"v_proj.weight": v_proj_weight.to(device=in_features.device, dtype=v_proj_weight.dtype),"output_proj.weight": o_proj_weight.to(device=in_features.device, dtype=o_proj_weight.dtype),})return mha(in_features, token_positions)

执行 uv run pytest -k test_multihead_self_attention 的输出如下所示:

在这里插入图片描述

9. Problem (transformer_block): Implement the Transformer block (3 points)

请按照 §1.5 中的描述并参考 Figure 2,实现一个 pre-norm Transformer 块,你的 Transformer 块至少应当接受以下参数:

  • d_model: int:Transformer 块输入的特征维度
  • num_heads: int:多头自注意力中使用的注意力头数量
  • d_ff: int:位置前馈网络中内部隐藏层的维度

为了测试你的实现,请在[adapters.run_transformer_block]中实现对应的测试适配器,然后运行:

uv run pytest -k test_transformer_block 

以验证你的实现是否正确

Deliverable:一份能够通过所有提供测试的 Transformer 块实现代码

代码实现如下:

import torch from torch import nn classTransformerBlock(nn.Module):""" Pre-norm Transformer block. Structure (pre-norm): y = x + Attn(RMSNorm(x)) z = y + FFN(RMSNorm(y)) This block uses causal multi-head self-attention with RoPE """def__init__( self, d_model:int, num_heads:int, d_ff:int,*, max_seq_len:int, theta:float, eps:float=1e-5, device: torch.device |None=None, dtype: torch.dtype |None=None):super().__init__() self.d_model =int(d_model) self.num_heads =int(num_heads) self.d_ff =int(d_ff) self.ln1 = RMSNorm(self.d_model, eps=eps, device=device, dtype=dtype) self.attn = CausalMultiHeadSelfAttentionWithRoPE( d_model=self.d_model, num_heads=self.num_heads, theta=theta, max_seq_len=max_seq_len, device=device, dtype=dtype ) self.ln2 = RMSNorm(self.d_model, eps=eps, device=device, dtype=dtype) self.ffn = SwiGLU(self.d_model, d_ff=self.d_ff, device=device, dtype=dtype)defforward(self, x: torch.Tensor, token_positions: torch.Tensor)-> torch.Tensor:""" Args: x: Tensor of shape (batch, seq_len, d_model) token_positions: Tensor of shape (batch, seq_len) or broadcastable to it Returns: Tensor of shape (batch, seq_len, d_model) """# Pre-norm attention + residual h = self.ln1(x) x = x + self.attn(h, token_positions)# Pre-norm FFN + residual h = self.ln2(x) x = x + self.ffn(h)return x 

测试适配器[adapters.run_transformer_block]的实现如下:

defrun_transformer_block( d_model:int, num_heads:int, d_ff:int, max_seq_len:int, theta:float, weights:dict[str, Tensor], in_features: Float[Tensor," batch sequence_length d_model"],)-> Float[Tensor," batch sequence_length d_model"]:""" Given the weights of a pre-norm Transformer block and input features, return the output of running the Transformer block on the input features. This function should use RoPE. Depending on your implementation, you may simply need to pass the relevant args to your TransformerBlock constructor, or you may need to initialize your own RoPE class and pass that instead. Args: d_model (int): The dimensionality of the Transformer block input. num_heads (int): Number of heads to use in multi-headed attention. `d_model` must be evenly divisible by `num_heads`. d_ff (int): Dimensionality of the feed-forward inner layer. max_seq_len (int): Maximum sequence length to pre-cache if your implementation does that. theta (float): RoPE parameter. weights (dict[str, Tensor]): State dict of our reference implementation. The keys of this dictionary are: - `attn.q_proj.weight` The query projections for all `num_heads` attention heads. Shape is (d_model, d_model). The rows are ordered by matrices of shape (num_heads, d_k), so `attn.q_proj.weight == torch.cat([q_heads.0.weight, ..., q_heads.N.weight], dim=0)`. - `attn.k_proj.weight` The key projections for all `num_heads` attention heads. Shape is (d_model, d_model). The rows are ordered by matrices of shape (num_heads, d_k), so `attn.k_proj.weight == torch.cat([k_heads.0.weight, ..., k_heads.N.weight], dim=0)`. - `attn.v_proj.weight` The value projections for all `num_heads` attention heads. Shape is (d_model, d_model). The rows are ordered by matrices of shape (num_heads, d_v), so `attn.v_proj.weight == torch.cat([v_heads.0.weight, ..., v_heads.N.weight], dim=0)`. - `attn.output_proj.weight` Weight of the multi-head self-attention output projection Shape is (d_model, d_model). - `ln1.weight` Weights of affine transform for the first RMSNorm applied in the transformer block. Shape is (d_model,). - `ffn.w1.weight` Weight of the first linear transformation in the FFN. Shape is (d_model, d_ff). - `ffn.w2.weight` Weight of the second linear transformation in the FFN. Shape is (d_ff, d_model). - `ffn.w3.weight` Weight of the third linear transformation in the FFN. Shape is (d_model, d_ff). - `ln2.weight` Weights of affine transform for the second RMSNorm applied in the transformer block. Shape is (d_model,). in_features (Float[Tensor, "batch sequence_length d_model"]): Tensor to run your implementation on. Returns: Float[Tensor, "batch sequence_length d_model"] Tensor with the output of running the Transformer block on the input features while using RoPE. """from cs336_basics.modules import TransformerBlock block = TransformerBlock( d_model=d_model, num_heads=num_heads, d_ff=d_ff, max_seq_len=max_seq_len,\ theta=theta, device=in_features.device, dtype=in_features.dtype )# Move weights to the right device/dtype sd ={k: v.to(device=in_features.device, dtype=in_features.dtype)for k, v in weights.items()} block.load_state_dict(sd)# Build token positions: shape (batch, seq_len) batch, seq_len, _ = in_features.shape token_position = torch.arange(seq_len, device=in_features.device, dtype=torch.long).view(1, seq_len).expand(batch, seq_len)return block(in_features, token_position)

执行 uv run pytest -k test_transformer_block 的输出如下所示:

在这里插入图片描述

10. Problem (transformer_lm): Implementing the Transformer LM (3 points)

现在我们将所有模块组合在一起,整体流程如 Figure 1 中的高层结构示意所示,按照 §1.1.1 中对嵌入层(embedding)的描述,首先对输入进行嵌入处理,然后将结果送入 num_layers 个 Transformer 块中,最后再将输出传入三个输出层,从而得到在整个词表上的概率分布

现在是把所有组件整合在一起的时候了!请按照 §1.1 中的描述,并结合 Figure 1 所示的结构,实现一个 Transformer 语言模型。至少,你的实现需要支持前面所有 Transformer 块的构造参数,此外还应支持以下额外参数:

  • vocab_size: int:词表大小,用于确定词嵌入矩阵(token embedding matrix)的维度
  • context_length: int:最大上下文长度,用于确定位置嵌入矩阵(position embedding matrix)的维度
  • num_layers: int:使用的 Transformer 块的数量

为了使用我们提供的测试来验证你的实现,你首先需要在[adapters.run_transformer_lm]中实现测试适配器,然后运行:

uv run pytest -k test_transformer_lm 

以测试你的实现

Deliverable:一个能够通过上述测试的 Transformer 语言模型模块

代码实现如下:

import torch from torch import nn from cs336_basics.modules import Embedding, Linear, RMSNorm, TransformerBlock classTransformerLM(nn.Module):""" A Transformer language model composed of: token embedding -> N pre-norm Transformer blocks -> final RMSNorm -> LM head. This implementation uses RoPE inside each TransformerBlock's attention module. """def__init__( self, vocab_size:int, context_length:int, d_model:int, num_layers:int, num_heads:int, d_ff:int,*, rope_theta:float, max_seq_len:int|None=None, eps:float=1e-5, device: torch.device |None=None, dtype: torch.dtype |None=None,):super().__init__() self.vocab_size =int(vocab_size) self.context_length =int(context_length) self.d_model =int(d_model) self.num_layers =int(num_layers) self.max_seq_len =int(max_seq_len if max_seq_len isnotNoneelse context_length)# Token embedding table: (vocab_size, d_model) self.token_embeddings = Embedding(self.vocab_size, self.d_model, device=device, dtype=dtype)# Stack of pre-norm Transformer blocks self.layers = nn.ModuleList([ TransformerBlock( d_model=self.d_model, num_heads=num_heads, d_ff=d_ff, max_seq_len=self.max_seq_len, theta=rope_theta, eps=eps, device=device, dtype=dtype )for _ inrange(self.num_layers)])# Final normalization before the LM head self.ln_final = RMSNorm(self.d_model, eps=eps, device=device, dtype=dtype)# Output projection to vocabulary logits: weight shape (vocab_size, d_model) self.lm_head = Linear(self.d_model, self.vocab_size, device=device, dtype=dtype)defforward(self, in_indices: torch.Tensor)-> torch.Tensor:""" Args: in_indices: LongTensor of shape (batch, seq_len) Returns: logits: Tensor of shape (batch, seq_len, vocab_size) """if in_indices.dim()!=2:raise ValueError(f"in_indices must have shape (batch, seq_len), got {tuple(in_indices.shape)}") batch, seq_len = in_indices.shape if seq_len > self.context_length:raise ValueError(f"seq_len={seq_len} exceeds context_length={self.context_length}")# Token positions for RoPE: (batch, seq_len) token_positions = torch.arange(seq_len, device=in_indices.device, dtype=torch.long).view(1, seq_len) token_positions = token_positions.expand(batch, seq_len)# Embed tokens: (batch, seq_len, d_model) x = self.token_embeddings(in_indices)# Apply Transformer blocksfor block in self.layers: x = block(x, token_positions)# Final norm and vocabulary projection x = self.ln_final(x) logits = self.lm_head(x)return logits 

测试适配器[adapters.run_transformer_lm]的实现如下:

defrun_transformer_lm( vocab_size:int, context_length:int, d_model:int, num_layers:int, num_heads:int, d_ff:int, rope_theta:float, weights:dict[str, Tensor], in_indices: Int[Tensor," batch_size sequence_length"],)-> Float[Tensor," batch_size sequence_length vocab_size"]:r"""Given the weights of a Transformer language model and input indices, return the output of running a forward pass on the input indices. This function should use RoPE. Args: vocab_size (int): The number of unique items in the output vocabulary to be predicted. context_length (int): The maximum number of tokens to process at once. d_model (int): The dimensionality of the model embeddings and sublayer outputs. num_layers (int): The number of Transformer layers to use. num_heads (int): Number of heads to use in multi-headed attention. `d_model` must be evenly divisible by `num_heads`. d_ff (int): Dimensionality of the feed-forward inner layer (section 3.3). rope_theta (float): The RoPE $\Theta$ parameter. weights (dict[str, Tensor]): State dict of our reference implementation. {num_layers} refers to an integer between `0` and `num_layers - 1` (the layer index). The keys of this dictionary are: - `token_embeddings.weight` Token embedding matrix. Shape is (vocab_size, d_model). - `layers.{num_layers}.attn.q_proj.weight` The query projections for all `num_heads` attention heads. Shape is (num_heads * (d_model / num_heads), d_model). The rows are ordered by matrices of shape (num_heads, d_k), so `attn.q_proj.weight == torch.cat([q_heads.0.weight, ..., q_heads.N.weight], dim=0)`. - `layers.{num_layers}.attn.k_proj.weight` The key projections for all `num_heads` attention heads. Shape is (num_heads * (d_model / num_heads), d_model). The rows are ordered by matrices of shape (num_heads, d_k), so `attn.k_proj.weight == torch.cat([k_heads.0.weight, ..., k_heads.N.weight], dim=0)`. - `layers.{num_layers}.attn.v_proj.weight` The value projections for all `num_heads` attention heads. Shape is (num_heads * (d_model / num_heads), d_model). The rows are ordered by matrices of shape (num_heads, d_v), so `attn.v_proj.weight == torch.cat([v_heads.0.weight, ..., v_heads.N.weight], dim=0)`. - `layers.{num_layers}.attn.output_proj.weight` Weight of the multi-head self-attention output projection Shape is ((d_model / num_heads) * num_heads, d_model). - `layers.{num_layers}.ln1.weight` Weights of affine transform for the first RMSNorm applied in the transformer block. Shape is (d_model,). - `layers.{num_layers}.ffn.w1.weight` Weight of the first linear transformation in the FFN. Shape is (d_model, d_ff). - `layers.{num_layers}.ffn.w2.weight` Weight of the second linear transformation in the FFN. Shape is (d_ff, d_model). - `layers.{num_layers}.ffn.w3.weight` Weight of the third linear transformation in the FFN. Shape is (d_model, d_ff). - `layers.{num_layers}.ln2.weight` Weights of affine transform for the second RMSNorm applied in the transformer block. Shape is (d_model,). - `ln_final.weight` Weights of affine transform for RMSNorm applied to the output of the final transformer block. Shape is (d_model, ). - `lm_head.weight` Weights of the language model output embedding. Shape is (vocab_size, d_model). in_indices (Int[Tensor, "batch_size sequence_length"]) Tensor with input indices to run the language model on. Shape is (batch_size, sequence_length), where `sequence_length` is at most `context_length`. Returns: Float[Tensor, "batch_size sequence_length vocab_size"]: Tensor with the predicted unnormalized next-word distribution for each token. """from cs336_basics.transformer_lm import TransformerLM model = TransformerLM( vocab_size=vocab_size, context_length=context_length, d_model=d_model, num_layers=num_layers, num_heads=num_heads, d_ff=d_ff, rope_theta=rope_theta, device=in_indices.device, dtype=torch.float32 )# Move weights to the correct device/dtype before loading. sd ={k: v.to(device=in_indices.device, dtype=torch.float32)for k, v in weights.items()} model.load_state_dict(sd)return model(in_indices)

执行 uv run pytest -k test_transformer_lm 的输出如下所示:

在这里插入图片描述

11. Problem (transformer_accounting): Transformer LM resource accounting (5 points)

Resource accounting.

理解 Transformer 各个组成部分在 计算量和内存 方面的消耗是非常有帮助的,接下来我们将通过几个步骤进行一次基础的 FLOPs(浮点运算次数)核算

由于 Transformer 中绝大多数 FLOPs 都来自矩阵乘法,因此我们的核心思路非常简单:

  1. 列出 Transformer 前向传播过程中涉及的所有矩阵乘法
  2. 将每一个矩阵乘法转换为所需的 FLOPs 数量

在第二步中,下面这个事实会非常有用:

Rule: 给定矩阵 A ∈ R m × n A \in \mathbb{R}^{m \times n} A∈Rm×n 和 B ∈ R n × p B \in \mathbb{R}^{n \times p} B∈Rn×p,矩阵乘积 A B AB AB 需要 2 m n p 2mnp 2mnp 次 FLOPs

这是因为 ( A B ) [ i , j ] = A [ i , : ] ⋅ B [ : , j ] (AB)[i, j] = A[i, :] \cdot B[:, j] (AB)[i,j]=A[i,:]⋅B[:,j] 这个点积需要 n n n 次加法和 n n n 次乘法,总共是 2 n 2n 2n 次 FLOPs,而矩阵 A B AB AB 一共有 m × p m\times p m×p 个元素,因此,总 FLOPs 数为 ( 2 n ) ( m p ) = 2 m n p (2n)(mp) = 2mnp (2n)(mp)=2mnp

在继续下一个问题之前,建议你先逐一检查自己实现的 Transformer blockTransformer 语言模型(Transformer LM) 中的每一个组件,列出其中所有涉及的矩阵乘法,以及它们各自对应的 FLOPs 开销

(a)考虑 GPT-2 XL,其配置如下:

  • vocab_size:50,257
  • context_length:1,024
  • num_layers:48
  • d_model:1,600
  • num_heads:25
  • d_ff:6,400

假设我们使用上述配置构建了模型:

  • 这个模型一共有多少个 可训练参数
  • 如果每个参数都使用 单精度浮点数(float32) 表示,仅加载该模型需要多少内存?

参数量

  • Token embedding:vocab * d_model
  • LM head:vocab * d_model
  • 每层 Attention(Q/K/V/O):4 * d_model^2
  • 每层 SwiGLU FFN(W1/W3 上投影 + W2 下投影):3 * d_model * d_ff
  • 每层 RMSNorm(ln1/ln2):2 * d_model
  • 最终 RMSNorm:d_model

总参数

N = 48 ⋅ ( 4 d 2 + 3 d d f f + 2 d ) + 2 V d + d N = 48\cdot(4d^2 + 3dd_{ff} + 2d) + 2Vd + d N=48⋅(4d2+3ddff​+2d)+2Vd+d

Deliverable这个模型一共有约 2.13B 个可训练参数,如果每个参数都使用 float32 表示,仅加载该模型需要 8.51GB。

(b)请识别完成一次 GPT-2 XL 规模模型 前向传播所需的 所有矩阵乘法操作,假设输入序列长度等于 context_length,这些矩阵乘法总共需要多少 FLOPs

Deliverable:列出所有矩阵乘法(附简要说明),并给出所需 FLOPs 的总数。

每层(总共有 48 层)矩阵乘法

1. Q/K/V/Output 投影(4 次)

  • (S,d) · (d,d) -> (S,d)
  • 每次 FLOPs:2*S*d*d
  • 每层合计: F proj = 4 ⋅ 2 S d 2 F_{\text{proj}} = 4 \cdot 2Sd^2 Fproj​=4⋅2Sd2

2. 注意力分数 QKᵀ(每头)

  • (S,d_k)·(d_k,S)->(S,S),所有头合计 d_k*h=d
  • F Q K T = 2 S 2 d F_{QK^T} = 2S^2d FQKT​=2S2d

3. 注意力加权 AV(每头)

  • (S,S)·(S,d_v)->(S,d_v),所有头合计 d_v*h=d
  • F A V = 2 S 2 d F_{AV} = 2S^2d FAV​=2S2d

所以注意力两次 matmul 合计: F attn = 4 S 2 d F_{\text{attn}} = 4S^2d Fattn​=4S2d

4. SwiGLU FFN 三次 matmul(W1, W3, W2)

  • (S,d)·(d,d_ff) 两次 + (S,d_ff)·(d_ff,d) 一次
  • F ffn = 6 S d d f f F_{\text{ffn}} = 6Sdd_{ff} Fffn​=6Sddff​

词表输出层(1 次)

5. LM head

  • (S,d)·(d,V)->(S,V)
  • F lm = 2 S d V F_{\text{lm}} = 2SdV Flm​=2SdV

代入数值(batch=1)

  • 每层:
    • F_proj = 20.97152e9
    • F_attn = 6.7108864e9
    • F_ffn = 62.91456e9
    • 每层合计 ≈ 90.5969664e9
  • 48 层总计:
    • F_layers ≈ 4.3486543872e12
  • LM head:
    • F_lm ≈ 1.646821376e11

总前向 FLOPs:

F total ≈ 4.5133365248 × 10 12 FLOPs ( ≈ 4.51  TFLOPs ) F_{\text{total}} \approx 4.5133365248\times 10^{12} \text{FLOPs}(\approx 4.51\text{ TFLOPs}) Ftotal​≈4.5133365248×1012FLOPs(≈4.51 TFLOPs)

(c)基于你在上一步中的分析,模型中 哪些部分消耗了最多的 FLOPs

Deliverable在这个设置下,FFN(SwiGLU)是最大头(每层约 62.9B FLOPs),其次是 Q/K/V/O 投影(约 21.0B),再其次是 注意力两次 matmul(QKᵀ + AV)(约 6.7B);lm_head占比相对较小。

(d)请将你的分析扩展到以下模型配置:

  • GPT-2 small:12 层,d_model = 768,12 个注意力头
  • GPT-2 medium:24 层,d_model = 1024,16 个注意力头
  • GPT-2 large:36 层,d_model = 1280,20 个注意力头

随着模型规模的增大:

  • Transformer LM 的哪些组成部分在总 FLOPs 中所占比例 增加
  • 哪些部分所占比例 减少

Deliverable:对每个模型给出各组件的 FLOPs 分解(以占总前向 FLOPs 的比例表示),并用一到两句话说明模型规模变化如何影响各组件的 FLOPs 占比。

GPT-2 small(L=12, d=768, h=12)

  • Projections:16.58%
  • Attention matmuls:11.06%
  • FFN:49.75%
  • LM head:22.61%

GPT-2 medium(L=24, d=1024, h=16)

  • Projections:19.96%
  • Attention matmuls:9.98%
  • FFN:59.87%
  • LM head:10.20%

GPT-2 large(L=36, d=1280, h=20)

  • Projections:21.40%
  • Attention matmuls:8.56%
  • FFN:64.20%
  • LM head:5.84%

随着模型变大(层数/宽度增大),与d^2d*d_ff成正比的 FFN 与投影占比上升;与S^2*d成正比的 注意力 matmul 占比下降,而lm_head~S*d*V)占比也明显下降。

(e)以 GPT-2 XL 为例,将 context_length 增加到 16,384

  • 单次前向传播的 总 FLOPs 会如何变化?
  • 模型各组件在 FLOPs 中的 相对贡献比例 将如何变化?

Deliverablecontex_length增长 16× 时,线性项(投影、FFN、LM head)FLOPs 也增长 16×,而注意力 matmul (QKᵀ、AV)的 FLOPS 是平方增长,即增长 256×。总 FLOPs 将从 ≈ 4.51×10¹² 增加到 ≈ 1.495×10¹⁴ FLOPs(约 33.1× 增长),相对贡献也会变化,其中注意力 matmul 将变成主导项(约 55%),FFN 与投影占比下降(FFN 约 32%,投影约 11%,LM head 约 1.8%)。

OK,以上就是本次 Transformer Language Model Architecture 作业的全部实现了

结语

本篇文章我们完整实现了 CS336 Assignment 1 中的 Transformer Language Model,从最基础的线性层、嵌入层与归一化模块出发,逐步构建出支持 RoPE、因果多头自注意力、SwiGLU 前馈网络与 pre-norm 结构的完整 Transformer Block,并最终将其堆叠为一个可端到端前向计算的语言模型

与只关注单个模块不同,本次实现的关键收获在于:所有组件在真实模型结构中是如何协同工作的。从 batch-like 维度的处理方式,到注意力中 Q/K/V 的张量重排,再到 RoPE 在多头维度上的应用,这些细节只有在全部拼接起来之后,才会真正暴露其复杂性与约束条件

此外,通过对 GPT-2 XL 规模模型的参数量与 FLOPs 核算,可以清晰地看到 Transformer 在计算与内存上的主要瓶颈来自何处,也为后续我们理解训练吞吐、显存占用、并行化策略与模型规模扩展问题提供了一个可量化的基准

完成本小节后,我们已经具备了一个结构正确、数值稳定、接口清晰的 Transformer LM 实现。在接下来的作业中,我们将进一步进入训练过程本身,包括损失函数、优化器、数据加载以及训练效率问题,敬请期待🤗

源码下载链接

参考

Read more

【Python全栈开发】第8讲 | Web 全栈之巅:FastAPI 高性能后端开发

环境声明 * Python版本:Python 3.12+ (建议使用 3.10 以上版本) * 开发工具:PyCharm 或 VS Code * 操作系统:Windows / macOS / Linux (通用) 1. 为什么是 FastAPI? 如果你还在学习传统的 Django 或者 Flask,那这一讲你得认真看看了。 在现代全栈开发里,FastAPI 已经是很多大厂和初创公司的首选。为什么? 1. 速度快:它的运行速度可以和 NodeJS 或 Go 媲美,这在 Python 界是突破性的。 2. 类型驱动:它利用 Python 的类型提示(Type Hints),能自动帮你生成接口文档、做数据校验。

By Ne0inhk
一篇从底层搞懂 Python 虚拟环境,熟练使用和创建 Python 虚拟环境(实用版:Pycharm、Anaconda)

一篇从底层搞懂 Python 虚拟环境,熟练使用和创建 Python 虚拟环境(实用版:Pycharm、Anaconda)

写本篇的目的是作者虽然理解 Python 虚拟环境的概念,但对于其实现方法和实现逻辑很模糊,特别是一直感觉没搞懂 Pycharm 里的解释器设置逻辑(导致每次新建项目时 Pycharm 里好多 [invalid] 解释器已经乱作一团了…)再加上新下载的 Anaconda,已经傻傻分不清了。。 所以,希望通过这一篇文章来梳理一下虚拟环境的实现逻辑,以及如何合理使用和创建虚拟环境。 1 虚拟环境实现逻辑理解 1.1 Python 全局环境 想要了解 Python 虚拟环境,就必须先了解与之相对的—— Python 全局环境,它是实现虚拟环境的基础。 Python 全局环境其实就是指我们下载安装的 Python ,(我们下载安装 Python 本质上就是在安装 Python 环境) 那么我们下载的这个 Python 环境都包含什么呢?如下: 其实就是一个 Python312(或Python38/39,取决于版本号)文件夹,里面包括:

By Ne0inhk
Python数据统计完全指南:从入门到实战

Python数据统计完全指南:从入门到实战

文章目录 * 1. 数据统计基础与环境配置 * 1.1 Python数据科学生态系统 * 1.2 环境配置与安装 * 2. 数据获取与加载 * 2.1 从不同数据源加载数据 * 2.2 数据基本信息查看 * 3. 数据清洗与预处理 * 3.1 缺失值处理 * 3.2 数据转换与编码 * 4. 描述性统计分析 * 4.1 基本统计量计算 * 4.2 高级统计分析 1. 数据统计基础与环境配置 1.1 Python数据科学生态系统 Python在数据统计领域的强大主要得益于其丰富的库生态系统: # 核心数据分析库import pandas as pd import numpy as np # 数据可视化库import matplotlib.pyplot

By Ne0inhk
【Python基础:语法第一课】Python 基础语法详解:变量、类型、动态特性与运算符实战,构建完整的编程基础认知体系

【Python基础:语法第一课】Python 基础语法详解:变量、类型、动态特性与运算符实战,构建完整的编程基础认知体系

🎬 个人主页:艾莉丝努力练剑 ❄专栏传送门:《C语言》《数据结构与算法》《C/C++干货分享&学习过程记录》 《Linux操作系统编程详解》《笔试/面试常见算法:从基础到进阶》《Python干货分享》 ⭐️为天地立心,为生民立命,为往圣继绝学,为万世开太平 🎬 艾莉丝的简介: 文章目录 * 1 ~> 常量和表达式 * 2 ~> 变量和类型 * 2.1 变量是什么 * 2.2 变量的语法 * 2.2.1 定义变量 * 2.2.2 使用变量 * 2.3 变量的类型:对于不同种类的变量作出区分 * 2.3.1 整数 * 2.

By Ne0inhk