跳到主要内容
注意力机制与 Transformer 模型实战详解 | 极客日志
Python AI 算法
注意力机制与 Transformer 模型实战详解 综述由AI生成 注意力机制通过 QKV 框架解决长序列依赖问题,Transformer 模型利用多头注意力实现高度并行化计算。本文详细解析了自注意力与多头注意力的计算逻辑,结合 TensorFlow 展示了从位置编码、编码器解码器搭建到机器翻译任务训练的完整实战流程,涵盖模型编译、训练及优化技巧。
接口猎人 发布于 2026/3/28 更新于 2026/4/26 2 浏览注意力机制与 Transformer 模型实战详解
学习目标与重点
掌握注意力机制的核心原理、经典算法,以及 Transformer 模型的架构设计与实战应用。重点理解自注意力与多头注意力的计算逻辑,学会使用 TensorFlow 搭建 Transformer 模型,完成机器翻译任务。
注意力机制的核心思想
为什么需要注意力机制
传统的 RNN 和 LSTM 在处理长序列时,存在长距离依赖捕捉能力不足和并行计算效率低的问题。注意力机制的出现,解决了这两个核心痛点。
注意力机制的本质是让模型学会'聚焦'——在处理序列数据时,自动分配不同的权重给输入序列中的各个元素,重点关注与当前任务相关的信息,弱化无关信息的干扰。比如在机器翻译任务中,翻译'我爱中国'时,模型会给'我''爱''中国'分配不同的注意力权重,从而更精准地生成对应的英文翻译。
注意力机制的基本框架
注意力机制的计算通常包含查询(Query)、键(Key)、值(Value)三个核心要素,简称 QKV 框架。其计算流程可以总结为三步:
计算 Query 和所有 Key 的相似度,得到注意力分数
对注意力分数进行归一化处理(常用 Softmax 函数),得到注意力权重
用归一化后的权重对 Value 进行加权求和,得到最终的注意力输出
基础注意力计算公式如下:
$$ \text{Attention}(Q,K,V) = \text{softmax}\left(\frac{QK^T}{\sqrt{d_k}}\right)V $$
其中 $d_k$ 是 Key 的维度,除以 $\sqrt{d_k}$ 是为了防止内积结果过大,导致 Softmax 函数饱和。
import tensorflow as tf
import numpy as np
def scaled_dot_product_attention (q, k, v, mask=None ):
matmul_qk = tf.matmul(q, k, transpose_b=True )
dk = tf.cast(tf.shape(k)[-1 ], tf.float32)
scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
if mask is not None :
scaled_attention_logits += (mask * -1e9 )
attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1 )
output = tf.matmul(attention_weights, v)
return output, attention_weights
q = tf.random.normal(( , , ))
k = tf.random.normal(( , , ))
v = tf.random.normal(( , , ))
output, attn_weights = scaled_dot_product_attention(q, k, v)
( , output.shape)
( , attn_weights.shape)
2
3
4
2
3
4
2
3
4
print
"注意力输出形状:"
print
"注意力权重形状:"
注意:掩码(Mask)分为两种,一种是填充掩码,用于屏蔽输入中的无效填充部分;另一种是前瞻掩码,用于在自回归任务中防止模型看到未来的信息。
自注意力与多头注意力
自注意力机制 自注意力(Self-Attention)是注意力机制的一种特殊形式。在自注意力中,Query、Key、Value 三个矩阵都来自同一个输入序列。自注意力可以捕捉序列内部元素之间的依赖关系,比如在句子'他喜欢打篮球,因为它很有趣'中,模型可以通过自注意力机制,将'它'和'打篮球'关联起来。
对输入序列的每个元素,分别通过三个不同的线性变换,生成 Q、K、V 矩阵
按照基础注意力公式计算注意力输出
将注意力输出作为当前层的特征,传递给下一层
多头注意力机制 多头注意力(Multi-Head Attention)是 Transformer 模型的核心创新点之一。它通过多个并行的注意力头,从不同的角度捕捉序列的特征。
将输入序列的特征维度拆分为 h 个独立的子空间(h 为注意力头的数量)
对每个子空间分别计算自注意力,得到 h 个不同的注意力输出
将 h 个注意力输出拼接起来,再通过一个线性变换,得到最终的多头注意力输出
class MultiHeadAttention (tf.keras.layers.Layer):
def __init__ (self, d_model, num_heads ):
super (MultiHeadAttention, self ).__init__()
self .num_heads = num_heads
self .d_model = d_model
assert d_model % self .num_heads == 0
self .depth = d_model // self .num_heads
self .wq = tf.keras.layers.Dense(d_model)
self .wk = tf.keras.layers.Dense(d_model)
self .wv = tf.keras.layers.Dense(d_model)
self .dense = tf.keras.layers.Dense(d_model)
def split_heads (self, x, batch_size ):
x = tf.reshape(x, (batch_size, -1 , self .num_heads, self .depth))
return tf.transpose(x, perm=[0 , 2 , 1 , 3 ])
def call (self, v, k, q, mask ):
batch_size = tf.shape(q)[0 ]
q = self .wq(q)
k = self .wk(k)
v = self .wv(v)
q = self .split_heads(q, batch_size)
k = self .split_heads(k, batch_size)
v = self .split_heads(v, batch_size)
scaled_attention, attention_weights = scaled_dot_product_attention(q, k, v, mask)
scaled_attention = tf.transpose(scaled_attention, perm=[0 , 2 , 1 , 3 ])
concat_attention = tf.reshape(scaled_attention, (batch_size, -1 , self .d_model))
output = self .dense(concat_attention)
return output, attention_weights
mha = MultiHeadAttention(d_model=128 , num_heads=8 )
x = tf.random.normal((2 , 5 , 128 ))
output, attn_weights = mha(x, x, x, mask=None )
print ("多头注意力输出形状:" , output.shape)
print ("多头注意力权重形状:" , attn_weights.shape)
Transformer 模型架构详解 Transformer 模型由 Google 团队在 2017 年的论文《Attention Is All You Need》中提出。它完全基于注意力机制,摒弃了 RNN 和 CNN 的序列式结构,实现了高度并行化计算,极大提升了模型的训练效率。Transformer 的整体架构分为编码器(Encoder)和解码器(Decoder)两大部分。
编码器结构 编码器由 N 个相同的编码层堆叠而成,每个编码层包含两个子层:
多头自注意力层:捕捉输入序列内部的依赖关系
前馈神经网络层:对注意力输出进行非线性变换
每个子层都配备了残差连接和层归一化,公式为:LayerNorm(x + Sublayer(x))
解码器结构 解码器同样由 N 个相同的解码层堆叠而成,每个解码层包含三个子层:
掩码多头自注意力层:防止模型看到未来的信息
编码器 - 解码器注意力层:捕捉输入序列和输出序列之间的依赖关系
前馈神经网络层:对注意力输出进行非线性变换
每个子层同样配备残差连接和层归一化。
位置编码 Transformer 模型没有循环结构,无法感知输入序列的顺序信息。因此需要通过位置编码(Positional Encoding),将序列的位置信息注入到输入特征中。位置编码的计算方式有多种,常用的是正弦余弦位置编码:
$$ PE_{(pos,2i)} = \sin(pos / 10000^{2i/d_{model}}) $$
$$ PE_{(pos,2i+1)} = \cos(pos / 10000^{2i/d_{model}}) $$
其中 pos 是元素在序列中的位置,i 是特征维度的索引。
def positional_encoding (position, d_model ):
angle_rads = get_angles(np.arange(position)[:, np.newaxis], np.arange(d_model)[np.newaxis,:], d_model)
angle_rads[:,0 ::2 ]= np.sin(angle_rads[:,0 ::2 ])
angle_rads[:,1 ::2 ]= np.cos(angle_rads[:,1 ::2 ])
pos_encoding = angle_rads[np.newaxis,...]
return tf.cast(pos_encoding, dtype=tf.float32)
def get_angles (pos, i, d_model ):
angles = 1 / np.power(10000 ,(2 *(i //2 ))/ np.float32(d_model))
return pos * angles
pos_enc = positional_encoding(100 , 128 )
print ("位置编码形状:" , pos_enc.shape)
实战:基于 Transformer 的机器翻译任务
任务介绍与数据集准备 本次实战任务是英 - 法机器翻译。我们将使用一个小型的英法双语数据集,目标是搭建 Transformer 模型,实现英文句子到法文句子的自动翻译。
加载英法双语数据集,对文本进行预处理:分词、建立词汇表、转换为整数索引序列
统一序列长度,对过长的序列进行截断,过短的序列进行填充
划分训练集和测试集,设置批次大小进行数据加载
import tensorflow_datasets as tfds
dataset, info = tfds.load('ted_hrlr_translate/fr_to_en' , with_info=True , as_supervised=True )
train_examples, val_examples = dataset['train' ], dataset['validation' ]
tokenizer_en = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus((en.numpy()for en, fr in train_examples), target_vocab_size=2 **13 )
tokenizer_fr = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus((fr.numpy()for en, fr in train_examples), target_vocab_size=2 **13 )
def encode (lang1, lang2 ):
lang1 = [tokenizer_en.vocab_size]+ tokenizer_en.encode(lang1.numpy())+[tokenizer_en.vocab_size +1 ]
lang2 = [tokenizer_fr.vocab_size]+ tokenizer_fr.encode(lang2.numpy())+[tokenizer_fr.vocab_size +1 ]
return lang1, lang2
def tf_encode (en, fr ):
return tf.py_function(encode,[en, fr],[tf.int64, tf.int64])
MAX_LENGTH = 40
def filter_max_length (x, y, max_length=MAX_LENGTH ):
return tf.logical_and(tf.size(x)<= max_length, tf.size(y)<= max_length)
train_dataset = train_examples.map (tf_encode)
train_dataset = train_dataset.filter (filter_max_length)
train_dataset = train_dataset.cache()
train_dataset = train_dataset.shuffle(10000 )
train_dataset = train_dataset.padded_batch(64 , padded_shapes=([-1 ],[-1 ]))
train_dataset = train_dataset.prefetch(tf.data.AUTOTUNE)
val_dataset = val_examples.map (tf_encode)
val_dataset = val_dataset.filter (filter_max_length).padded_batch(64 , padded_shapes=([-1 ],[-1 ]))
搭建完整的 Transformer 模型
def point_wise_feed_forward_network (d_model, dff ):
return tf.keras.Sequential([
tf.keras.layers.Dense(dff, activation='relu' ),
tf.keras.layers.Dense(d_model)
])
class EncoderLayer (tf.keras.layers.Layer):
def __init__ (self, d_model, num_heads, dff, rate=0.1 ):
super (EncoderLayer, self ).__init__()
self .mha = MultiHeadAttention(d_model, num_heads)
self .ffn = point_wise_feed_forward_network(d_model, dff)
self .layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6 )
self .layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6 )
self .dropout1 = tf.keras.layers.Dropout(rate)
self .dropout2 = tf.keras.layers.Dropout(rate)
def call (self, x, training, mask ):
attn_output, _ = self .mha(x, x, x, mask)
attn_output = self .dropout1(attn_output, training=training)
out1 = self .layernorm1(x + attn_output)
ffn_output = self .ffn(out1)
ffn_output = self .dropout2(ffn_output, training=training)
out2 = self .layernorm2(out1 + ffn_output)
return out2
class DecoderLayer (tf.keras.layers.Layer):
def __init__ (self, d_model, num_heads, dff, rate=0.1 ):
super (DecoderLayer, self ).__init__()
self .mha1 = MultiHeadAttention(d_model, num_heads)
self .mha2 = MultiHeadAttention(d_model, num_heads)
self .ffn = point_wise_feed_forward_network(d_model, dff)
self .layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6 )
self .layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6 )
self .layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6 )
self .dropout1 = tf.keras.layers.Dropout(rate)
self .dropout2 = tf.keras.layers.Dropout(rate)
self .dropout3 = tf.keras.layers.Dropout(rate)
def call (self, x, enc_output, training, look_ahead_mask, padding_mask ):
attn1, attn_weights_block1 = self .mha1(x, x, x, look_ahead_mask)
attn1 = self .dropout1(attn1, training=training)
out1 = self .layernorm1(attn1 + x)
attn2, attn_weights_block2 = self .mha2(enc_output, enc_output, out1, padding_mask)
attn2 = self .dropout2(attn2, training=training)
out2 = self .layernorm2(attn2 + out1)
ffn_output = self .ffn(out2)
ffn_output = self .dropout3(ffn_output, training=training)
out3 = self .layernorm3(ffn_output + out2)
return out3, attn_weights_block1, attn_weights_block2
class Encoder (tf.keras.layers.Layer):
def __init__ (self, num_layers, d_model, num_heads, dff, input_vocab_size, maximum_position_encoding, rate=0.1 ):
super (Encoder, self ).__init__()
self .d_model = d_model
self .num_layers = num_layers
self .embedding = tf.keras.layers.Embedding(input_vocab_size, d_model)
self .pos_encoding = positional_encoding(maximum_position_encoding, self .d_model)
self .enc_layers = [EncoderLayer(d_model, num_heads, dff, rate) for _ in range (num_layers)]
self .dropout = tf.keras.layers.Dropout(rate)
def call (self, x, training, mask ):
seq_len = tf.shape(x)[1 ]
x = self .embedding(x)
x *= tf.math.sqrt(tf.cast(self .d_model, tf.float32))
x += self .pos_encoding[:,:seq_len,:]
x = self .dropout(x, training=training)
for i in range (self .num_layers):
x = self .enc_layers[i](x, training, mask)
return x
class Decoder (tf.keras.layers.Layer):
def __init__ (self, num_layers, d_model, num_heads, dff, target_vocab_size, maximum_position_encoding, rate=0.1 ):
super (Decoder, self ).__init__()
self .d_model = d_model
self .num_layers = num_layers
self .embedding = tf.keras.layers.Embedding(target_vocab_size, d_model)
self .pos_encoding = positional_encoding(maximum_position_encoding, d_model)
self .dec_layers = [DecoderLayer(d_model, num_heads, dff, rate) for _ in range (num_layers)]
self .dropout = tf.keras.layers.Dropout(rate)
def call (self, x, enc_output, training, look_ahead_mask, padding_mask ):
seq_len = tf.shape(x)[1 ]
attention_weights = {}
x = self .embedding(x)
x *= tf.math.sqrt(tf.cast(self .d_model, tf.float32))
x += self .pos_encoding[:,:seq_len,:]
x = self .dropout(x, training=training)
for i in range (self .num_layers):
x, block1, block2 = self .dec_layers[i](x, enc_output, training, look_ahead_mask, padding_mask)
attention_weights['decoder_layer{}_block1' .format (i+1 )] = block1
attention_weights['decoder_layer{}_block2' .format (i+1 )] = block2
return x, attention_weights
class Transformer (tf.keras.Model):
def __init__ (self, num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, pe_input, pe_target, rate=0.1 ):
super (Transformer, self ).__init__()
self .encoder = Encoder(num_layers, d_model, num_heads, dff, input_vocab_size, pe_input, rate)
self .decoder = Decoder(num_layers, d_model, num_heads, dff, target_vocab_size, pe_target, rate)
self .final_layer = tf.keras.layers.Dense(target_vocab_size)
def call (self, inp, tar, training, enc_padding_mask, look_ahead_mask, dec_padding_mask ):
enc_output = self .encoder(inp, training, enc_padding_mask)
dec_output, attention_weights = self .decoder(tar, enc_output, training, look_ahead_mask, dec_padding_mask)
final_output = self .final_layer(dec_output)
return final_output, attention_weights
num_layers = 4
d_model = 128
dff = 512
num_heads = 8
input_vocab_size = tokenizer_en.vocab_size + 2
target_vocab_size = tokenizer_fr.vocab_size + 2
dropout_rate = 0.1
transformer = Transformer(num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, pe_input=1000 , pe_target=1000 , rate=dropout_rate)
模型编译与训练 机器翻译任务属于序列生成任务,我们使用稀疏交叉熵损失函数,优化器选择 Adam,并设置学习率衰减策略。
class CustomSchedule (tf.keras.optimizers.schedules.LearningRateSchedule):
def __init__ (self, d_model, warmup_steps=4000 ):
super (CustomSchedule, self ).__init__()
self .d_model = d_model
self .d_model = tf.cast(self .d_model, tf.float32)
self .warmup_steps = warmup_steps
def __call__ (self, step ):
arg1 = tf.math.rsqrt(step)
arg2 = step * (self .warmup_steps ** -1.5 )
return tf.math.rsqrt(self .d_model) * tf.math.minimum(arg1, arg2)
learning_rate = CustomSchedule(d_model)
optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9 , beta_2=0.98 , epsilon=1e-9 )
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True , reduction='none' )
def loss_function (real, pred ):
mask = tf.math.logical_not(tf.math.equal(real, 0 ))
loss_ = loss_object(real, pred)
mask = tf.cast(mask, dtype=loss_.dtype)
loss_ *= mask
return tf.reduce_sum(loss_) / tf.reduce_sum(mask)
train_loss = tf.keras.metrics.Mean(name='train_loss' )
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy' )
@tf.function
def train_step (inp, tar ):
tar_inp = tar[:,:-1 ]
tar_real = tar[:,1 :]
enc_padding_mask, combined_mask, dec_padding_mask = create_masks(inp, tar_inp)
with tf.GradientTape() as tape:
predictions, _ = transformer(inp, tar_inp, True , enc_padding_mask, combined_mask, dec_padding_mask)
loss = loss_function(tar_real, predictions)
gradients = tape.gradient(loss, transformer.trainable_variables)
optimizer.apply_gradients(zip (gradients, transformer.trainable_variables))
train_loss(loss)
train_accuracy(tar_real, predictions)
def create_masks (inp, tar ):
enc_padding_mask = create_padding_mask(inp)
dec_padding_mask = create_padding_mask(inp)
look_ahead_mask = create_look_ahead_mask(tf.shape(tar)[1 ])
dec_target_padding_mask = create_padding_mask(tar)
combined_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)
return enc_padding_mask, combined_mask, dec_padding_mask
def create_padding_mask (seq ):
seq = tf.cast(tf.math.equal(seq, 0 ), tf.float32)
return seq[:, tf.newaxis, tf.newaxis, :]
def create_look_ahead_mask (size ):
mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1 , 0 )
return mask
EPOCHS = 20
for epoch in range (EPOCHS):
train_loss.reset_states()
train_accuracy.reset_states()
for (batch,(inp, tar)) in enumerate (train_dataset):
train_step(inp, tar)
if batch % 50 == 0 :
print (f'Epoch {epoch+1 } Batch {batch} Loss {train_loss.result():.4 f} Accuracy {train_accuracy.result():.4 f} ' )
print (f'Epoch {epoch+1 } Loss {train_loss.result():.4 f} Accuracy {train_accuracy.result():.4 f} ' )
模型优化技巧
使用标签平滑(Label Smoothing)技术,缓解模型过拟合,提升泛化能力。
采用波束搜索(Beam Search)替代贪心搜索,生成更流畅、更准确的翻译结果。
使用预训练的词向量初始化嵌入层,提升模型的特征表示能力。
本章总结 注意力机制通过 QKV 框架,让模型学会聚焦序列中的关键信息,解决了长序列依赖问题。多头注意力从多个角度捕捉序列特征,是 Transformer 模型的核心组件。Transformer 模型完全基于注意力机制,实现了高度并行化计算,在机器翻译、文本生成等任务中表现优异。
相关免费在线工具 加密/解密文本 使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
RSA密钥对生成器 生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
Mermaid 预览与可视化编辑 基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
随机西班牙地址生成器 随机生成西班牙地址(支持马德里、加泰罗尼亚、安达卢西亚、瓦伦西亚筛选),支持数量快捷选择、显示全部与下载。 在线工具,随机西班牙地址生成器在线工具,online
Gemini 图片去水印 基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online
curl 转代码 解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online