跳到主要内容使用 PyTorch 从零构建 Transformer 模型:原理、代码与训练预测 | 极客日志PythonAI算法
使用 PyTorch 从零构建 Transformer 模型:原理、代码与训练预测
综述由AI生成基于 PyTorch 从零实现 Transformer 模型,涵盖位置编码、多头注意力机制、前馈网络等核心组件。通过构建完整的 Encoder-Decoder 架构,演示了数据集生成、模型定义、训练循环及预测流程。代码包含详细注释,帮助深入理解 Transformer 的工作原理与工程落地细节。
Pythonist17 浏览 使用 PyTorch 从零构建 Transformer 模型
本文基于 PyTorch 从零实现 Transformer 模型,涵盖位置编码、多头注意力机制、前馈网络等核心组件。通过构建完整的 Encoder-Decoder 架构,演示了数据集生成、模型定义、训练循环及预测流程。
步骤概述
- 安装依赖:确保环境中安装了必要的库。
- 准备数据集:创建一个简单的样例数据集。
- 定义基本组件:实现 Positional Encoding、Multi-Head Attention、Feed-Forward Network。
- 定义 Encoder 和 Decoder 层:基于基本组件构建层级结构。
- 定义完整模型:组合 Encoder 和 Decoder。
- 训练模型:编写训练代码。
- 预测:编写预测代码。
1. 安装依赖
首先,确保你已经安装了 PyTorch 和其他必要的库。
pip install torch torchvision matplotlib numpy pandas
2. 准备数据集
我们将创建一个简单的样例数据集,用于演示目的。这里我们使用一个非常简单的语言建模任务,即生成数字序列的下一个数字。
数据生成脚本 generate_data.py
import numpy as np
def generate_sequence(length, vocab_size):
return np.random.randint(vocab_size, size=length)
def generate_dataset(num_samples, seq_length, vocab_size):
X = []
y = []
for _ in range(num_samples):
sequence = generate_sequence(seq_length + 1, vocab_size)
X.append(sequence[:-1])
y.append(sequence[-1])
return np.array(X), np.array(y)
num_samples = 1000
seq_length = 10
vocab_size = 10
X_train, y_train = generate_dataset(num_samples, seq_length, vocab_size)
X_test, y_test = generate_dataset(num_samples // 10, seq_length, vocab_size)
np.save(, X_train)
np.save(, y_train)
np.save(, X_test)
np.save(, y_test)
'X_train.npy'
'y_train.npy'
'X_test.npy'
'y_test.npy'
3. 定义基本组件
我们将从头开始实现 Positional Encoding、Multi-Head Attention、Feed-Forward Network 等基本组件。
基本组件代码 components.py
import torch
import torch.nn as nn
import math
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=5000):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
def forward(self, x):
x = x + self.pe[:x.size(0), :]
return x
class MultiHeadAttention(nn.Module):
def __init__(self, embed_size, heads):
super(MultiHeadAttention, self).__init__()
self.embed_size = embed_size
self.heads = heads
self.head_dim = embed_size // heads
assert (self.head_dim * heads == embed_size), "Embedding size needs to be divisible by heads"
self.values = nn.Linear(self.head_dim, self.head_dim, bias=False)
self.keys = nn.Linear(self.head_dim, self.head_dim, bias=False)
self.queries = nn.Linear(self.head_dim, self.head_dim, bias=False)
self.fc_out = nn.Linear(heads * self.head_dim, embed_size)
def forward(self, values, keys, query, mask):
N = query.shape[0]
value_len, key_len, query_len = values.shape[1], keys.shape[1], query.shape[1]
values = values.reshape(N, value_len, self.heads, self.head_dim)
keys = keys.reshape(N, key_len, self.heads, self.head_dim)
queries = query.reshape(N, query_len, self.heads, self.head_dim)
values = self.values(values)
keys = self.keys(keys)
queries = self.queries(queries)
energy = torch.einsum("nqhd,nkhd->nhqk", [queries, keys])
if mask is not None:
energy = energy.masked_fill(mask == 0, float("-1e20"))
attention = torch.softmax(energy / (self.embed_size ** (1/2)), dim=3)
out = torch.einsum("nhql,nlhd->nqhd", [attention, values]).reshape(
N, query_len, self.heads*self.head_dim
)
out = self.fc_out(out)
return out
class FeedForward(nn.Module):
def __init__(self, embed_size, expansion_factor=4):
super(FeedForward, self).__init__()
self.model = nn.Sequential(
nn.Linear(embed_size, embed_size*expansion_factor),
nn.ReLU(),
nn.Linear(embed_size*expansion_factor, embed_size),
)
def forward(self, x):
return self.model(x)
4. 定义 Encoder 和 Decoder 层
我们将基于基本组件构建 Encoder 和 Decoder 层。
Encoder 和 Decoder 层代码 encoder_decoder.py
import torch
import torch.nn as nn
from components import PositionalEncoding, MultiHeadAttention, FeedForward
class TransformerBlock(nn.Module):
def __init__(self, embed_size, heads, dropout, forward_expansion):
super(TransformerBlock, self).__init__()
self.attention = MultiHeadAttention(embed_size, heads)
self.norm1 = nn.LayerNorm(embed_size)
self.norm2 = nn.LayerNorm(embed_size)
self.feed_forward = FeedForward(embed_size, forward_expansion)
self.dropout = nn.Dropout(dropout)
def forward(self, value, key, query, mask):
attention = self.attention(value, key, query, mask)
x = self.dropout(self.norm1(attention + query))
forward = self.feed_forward(x)
out = self.dropout(self.norm2(forward + x))
return out
class Encoder(nn.Module):
def __init__(
self,
src_vocab_size,
embed_size,
num_layers,
heads,
device,
forward_expansion,
dropout,
max_length,
):
super(Encoder, self).__init__()
self.embed_size = embed_size
self.device = device
self.word_embedding = nn.Embedding(src_vocab_size, embed_size)
self.position_embedding = PositionalEncoding(embed_size, max_length)
self.layers = nn.ModuleList(
[
TransformerBlock(
embed_size,
heads,
dropout=dropout,
forward_expansion=forward_expansion,
)
for _ in range(num_layers)
]
)
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask):
N, seq_length = x.shape
positions = torch.arange(0, seq_length).expand(N, seq_length).to(self.device)
out = self.dropout((self.word_embedding(x) + self.position_embedding(positions)))
for layer in self.layers:
out = layer(out, out, out, mask)
return out
class DecoderBlock(nn.Module):
def __init__(self, embed_size, heads, forward_expansion, dropout, device):
super(DecoderBlock, self).__init__()
self.attention = MultiHeadAttention(embed_size, heads)
self.norm = nn.LayerNorm(embed_size)
self.transformer_block = TransformerBlock(
embed_size, heads, dropout, forward_expansion
)
self.dropout = nn.Dropout(dropout)
def forward(self, x, value, key, src_mask, trg_mask):
attention = self.attention(x, x, x, trg_mask)
query = self.dropout(self.norm(attention + x))
out = self.transformer_block(value, key, query, src_mask)
return out
class Decoder(nn.Module):
def __init__(
self,
trg_vocab_size,
embed_size,
num_layers,
heads,
forward_expansion,
dropout,
device,
max_length,
):
super(Decoder, self).__init__()
self.device = device
self.word_embedding = nn.Embedding(trg_vocab_size, embed_size)
self.position_embedding = PositionalEncoding(embed_size, max_length)
self.layers = nn.ModuleList(
[
DecoderBlock(embed_size, heads, forward_expansion, dropout, device)
for _ in range(num_layers)
]
)
self.fc_out = nn.Linear(embed_size, trg_vocab_size)
self.dropout = nn.Dropout(dropout)
def forward(self, x, enc_out, src_mask, trg_mask):
N, seq_length = x.shape
positions = torch.arange(0, seq_length).expand(N, seq_length).to(self.device)
x = self.dropout((self.word_embedding(x) + self.position_embedding(positions)))
for layer in self.layers:
x = layer(x, enc_out, enc_out, src_mask, trg_mask)
out = self.fc_out(x)
return out
5. 定义完整的 Transformer 模型
我们将基于 Encoder 和 Decoder 层组合成完整的 Transformer 模型。
完整的 Transformer 模型代码 transformer_model.py
import torch
import torch.nn as nn
from encoder_decoder import Encoder, Decoder
class Transformer(nn.Module):
def __init__(
self,
src_vocab_size,
trg_vocab_size,
src_pad_idx,
trg_pad_idx,
embed_size=256,
num_layers=6,
forward_expansion=4,
heads=8,
dropout=0,
device="cpu",
max_length=100,
):
super(Transformer, self).__init__()
self.encoder = Encoder(
src_vocab_size,
embed_size,
num_layers,
heads,
device,
forward_expansion,
dropout,
max_length,
)
self.decoder = Decoder(
trg_vocab_size,
embed_size,
num_layers,
heads,
forward_expansion,
dropout,
device,
max_length,
)
self.src_pad_idx = src_pad_idx
self.trg_pad_idx = trg_pad_idx
self.device = device
def make_src_mask(self, src):
src_mask = (src != self.src_pad_idx).unsqueeze(1).unsqueeze(2)
return src_mask.to(self.device)
def make_trg_mask(self, trg):
N, trg_len = trg.shape
trg_mask = torch.tril(torch.ones((trg_len, trg_len))).expand(
N, 1, trg_len, trg_len
)
return trg_mask.to(self.device)
def forward(self, src, trg):
src_mask = self.make_src_mask(src)
trg_mask = self.make_trg_mask(trg)
enc_src = self.encoder(src, src_mask)
out = self.decoder(trg, enc_src, src_mask, trg_mask)
return out
6. 训练模型
我们将编写训练代码来训练 Transformer 模型。
训练代码 train_transformer.py
import torch
import torch.nn as nn
import torch.optim as optim
from transformer_model import Transformer
from components import PositionalEncoding, MultiHeadAttention, FeedForward
from encoder_decoder import TransformerBlock, Encoder, Decoder
import numpy as np
import math
X_train = np.load('X_train.npy')
y_train = np.load('y_train.npy')
X_test = np.load('X_test.npy')
y_test = np.load('y_test.npy')
X_train_tensor = torch.from_numpy(X_train).long()
y_train_tensor = torch.from_numpy(y_train).long()
X_test_tensor = torch.from_numpy(X_test).long()
y_test_tensor = torch.from_numpy(y_test).long()
src_vocab_size = 10
trg_vocab_size = 10
embed_size = 256
num_layers = 2
heads = 8
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
forward_expansion = 4
dropout = 0.1
max_length = 100
lr = 0.0001
batch_size = 16
epochs = 10
model = Transformer(
src_vocab_size,
trg_vocab_size,
src_pad_idx=0,
trg_pad_idx=0,
embed_size=embed_size,
num_layers=num_layers,
forward_expansion=forward_expansion,
heads=heads,
dropout=dropout,
device=device,
max_length=max_length,
).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(model.parameters(), lr=lr)
for epoch in range(epochs):
model.train()
total_loss = 0.
num_batches = len(X_train_tensor) // batch_size
for i in range(num_batches):
start_idx = i * batch_size
end_idx = (i + 1) * batch_size
src = X_train_tensor[start_idx:end_idx].permute(1, 0).to(device)
trg = y_train_tensor[start_idx:end_idx].unsqueeze(0).to(device)
optimizer.zero_grad()
output = model(src, trg)
output = output.squeeze(0)
loss = criterion(output, trg.squeeze(0))
loss.backward()
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / num_batches
print(f'Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}')
torch.save(model.state_dict(), 'model.pth')
7. 预测
我们将编写预测代码来测试 Transformer 模型的性能。
预测代码 predict_transformer.py
import torch
from transformer_model import Transformer
import numpy as np
src_vocab_size = 10
trg_vocab_size = 10
embed_size = 256
num_layers = 2
heads = 8
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
forward_expansion = 4
dropout = 0.1
max_length = 100
model = Transformer(
src_vocab_size,
trg_vocab_size,
src_pad_idx=0,
trg_pad_idx=0,
embed_size=embed_size,
num_layers=num_layers,
forward_expansion=forward_expansion,
heads=heads,
dropout=dropout,
device=device,
max_length=max_length,
).to(device)
model.load_state_dict(torch.load('model.pth'))
model.eval()
sample_input = np.array([3, 7, 2, 5, 8, 9, 1, 4, 6]).reshape((9, 1))
sample_input_tensor = torch.from_numpy(sample_input).long().to(device)
with torch.no_grad():
src_mask = model.make_src_mask(sample_input_tensor)
trg_mask = model.make_trg_mask(sample_input_tensor)
output = model(sample_input_tensor, sample_input_tensor)
_, predicted = torch.max(output, dim=-1)
next_token = predicted[-1].item()
print(f'Next token prediction: {next_token}')
总结
从头开始构建一个简单的 Transformer 模型,并使用简单的数字序列数据集进行训练和预测。以下是所有相关的代码文件:
- 数据生成脚本 (
generate_data.py)
- 基本组件代码 (
components.py)
- Encoder 和 Decoder 层代码 (
encoder_decoder.py)
- 完整的 Transformer 模型代码 (
transformer_model.py)
- 训练代码 (
train_transformer.py)
- 预测代码 (
predict_transformer.py)
通过以上步骤,你可以深入理解 Transformer 模型的内部机制,包括自注意力机制如何工作、位置编码如何注入序列信息以及 Encoder-Decoder 架构如何协同处理输入输出序列。
相关免费在线工具
- 加密/解密文本
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
- RSA密钥对生成器
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
- Mermaid 预览与可视化编辑
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
- 随机西班牙地址生成器
随机生成西班牙地址(支持马德里、加泰罗尼亚、安达卢西亚、瓦伦西亚筛选),支持数量快捷选择、显示全部与下载。 在线工具,随机西班牙地址生成器在线工具,online
- Gemini 图片去水印
基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online
- curl 转代码
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online