注意力机制与 Transformer 模型实战
注意力机制核心原理及 QKV 框架,阐述自注意力与多头注意力计算逻辑。介绍 Transformer 模型编码器、解码器架构及位置编码方法。通过 TensorFlow 实战英法机器翻译任务,展示数据预处理、模型搭建、编译训练及优化技巧全过程。解决长序列依赖捕捉与并行计算效率问题。

注意力机制核心原理及 QKV 框架,阐述自注意力与多头注意力计算逻辑。介绍 Transformer 模型编码器、解码器架构及位置编码方法。通过 TensorFlow 实战英法机器翻译任务,展示数据预处理、模型搭建、编译训练及优化技巧全过程。解决长序列依赖捕捉与并行计算效率问题。

💡 学习目标:掌握注意力机制的核心原理、经典注意力算法,以及 Transformer 模型的架构设计与实战应用。 💡 学习重点:理解自注意力与多头注意力的计算逻辑,学会使用 TensorFlow 搭建 Transformer 模型,完成机器翻译任务。
💡 传统的 RNN 和 LSTM 在处理长序列时,存在长距离依赖捕捉能力不足和并行计算效率低的问题。注意力机制的出现,解决了这两个核心痛点。 注意力机制的本质是让模型学会'聚焦'——在处理序列数据时,自动分配不同的权重给输入序列中的各个元素,重点关注与当前任务相关的信息,弱化无关信息的干扰。 比如在机器翻译任务中,翻译'我爱中国'时,模型会给'我''爱''中国'分配不同的注意力权重,从而更精准地生成对应的英文翻译。
💡 注意力机制的计算通常包含**查询(Query)、键(Key)、值(Value)**三个核心要素,简称 QKV 框架。 其计算流程可以总结为三步: ① 计算 Query 和所有 Key 的相似度,得到注意力分数 ② 对注意力分数进行归一化处理(常用 Softmax 函数),得到注意力权重 ③ 用归一化后的权重对 Value 进行加权求和,得到最终的注意力输出
基础注意力计算公式: Attention(Q,K,V) = softmax(QK^T / sqrt(d_k)) * V 其中 d_k 是 Key 的维度,除以 sqrt(d_k) 是为了防止内积结果过大,导致 Softmax 函数饱和。
import tensorflow as tf
import numpy as np
# 实现基础注意力计算
def scaled_dot_product_attention(q, k, v, mask=None):
# 计算 Q 和 K 的点积
matmul_qk = tf.matmul(q, k, transpose_b=True)
# 获取 k 的维度
dk = tf.cast(tf.shape(k)[-1], tf.float32)
# 缩放点积
scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
# 应用掩码(可选)
if mask is not None:
scaled_attention_logits += (mask * -1e9)
# 计算注意力权重
attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
# 计算最终输出
output = tf.matmul(attention_weights, v)
return output, attention_weights
# 模拟输入:批次大小=2,序列长度=3,特征维度=4
q = tf.random.normal((2, 3, 4))
k = tf.random.normal((2, 3, 4))
v = tf.random.normal((2, 3, 4))
output, attn_weights = scaled_dot_product_attention(q, k, v)
print("注意力输出形状:", output.shape)
print("注意力权重形状:", attn_weights.shape)
⚠️ 注意:掩码(Mask)分为两种,一种是填充掩码,用于屏蔽输入中的无效填充部分;另一种是前瞻掩码,用于在自回归任务中防止模型看到未来的信息。
💡 自注意力(Self-Attention)是注意力机制的一种特殊形式。在自注意力中,Query、Key、Value 三个矩阵都来自同一个输入序列。 自注意力可以捕捉序列内部元素之间的依赖关系,比如在句子'他喜欢打篮球,因为它很有趣'中,模型可以通过自注意力机制,将'它'和'打篮球'关联起来。
自注意力的计算步骤: ① 对输入序列的每个元素,分别通过三个不同的线性变换,生成 Q、K、V 矩阵 ② 按照基础注意力公式计算注意力输出 ③ 将注意力输出作为当前层的特征,传递给下一层
💡 多头注意力(Multi-Head Attention)是 Transformer 模型的核心创新点之一。它通过多个并行的注意力头,从不同的角度捕捉序列的特征。 多头注意力的计算流程: ① 将输入序列的特征维度拆分为 h 个独立的子空间(h 为注意力头的数量) ② 对每个子空间分别计算自注意力,得到 h 个不同的注意力输出 ③ 将 h 个注意力输出拼接起来,再通过一个线性变换,得到最终的多头注意力输出
class MultiHeadAttention(tf.keras.layers.Layer):
def __init__(self, d_model, num_heads):
super().__init__()
self.num_heads = num_heads
self.d_model = d_model
# 确保 d_model 可以被 num_heads 整除
assert d_model % self.num_heads == 0
# 每个头的维度
self.depth = d_model // self.num_heads
# 定义 Q、K、V 和输出的线性变换层
self.wq = tf.keras.layers.Dense(d_model)
self.wk = tf.keras.layers.Dense(d_model)
self.wv = tf.keras.layers.Dense(d_model)
self.dense = tf.keras.layers.Dense(d_model)
def split_heads(self, x, batch_size):
# 将特征维度拆分为多个头
x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
return tf.transpose(x, perm=[0, 2, 1, 3])
def call(self, v, k, q, mask):
batch_size = tf.shape(q)[0]
# 生成 Q、K、V 矩阵
q = self.wq(q)
k = self.wk(k)
v = self.wv(v)
# 拆分多头
q = self.split_heads(q, batch_size)
k = self.split_heads(k, batch_size)
v = self.split_heads(v, batch_size)
# 计算缩放点积注意力
scaled_attention, attention_weights = scaled_dot_product_attention(q, k, v, mask)
# 拼接多头输出
scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))
# 线性变换输出
output = self.dense(concat_attention)
return output, attention_weights
# 测试多头注意力层
mha = MultiHeadAttention(d_model=128, num_heads=8)
# 模拟输入:批次大小=2,序列长度=5,特征维度=128
x = tf.random.normal((2, 5, 128))
output, attn_weights = mha(x, x, x, mask=None)
print("多头注意力输出形状:", output.shape)
print("多头注意力权重形状:", attn_weights.shape)
💡 Transformer 模型由 Google 团队在 2017 年的论文《Attention Is All You Need》中提出。它完全基于注意力机制,摒弃了 RNN 和 CNN 的序列式结构,实现了高度并行化计算,极大提升了模型的训练效率。 Transformer 的整体架构分为**编码器(Encoder)和解码器(Decoder)**两大部分。
编码器由 N 个相同的编码层堆叠而成,每个编码层包含两个子层:
解码器同样由 N 个相同的解码层堆叠而成,每个解码层包含三个子层:
💡 Transformer 模型没有循环结构,无法感知输入序列的顺序信息。因此需要通过位置编码(Positional Encoding),将序列的位置信息注入到输入特征中。 位置编码的计算方式有多种,常用的是正弦余弦位置编码: PE(pos, 2i) = sin(pos / 10000^(2i/d_model)) PE(pos, 2i+1) = cos(pos / 10000^(2i/d_model)) 其中 pos 是元素在序列中的位置,i 是特征维度的索引。
def positional_encoding(position, d_model):
angle_rads = get_angles(np.arange(position)[:, np.newaxis], np.arange(d_model)[np.newaxis, :], d_model)
# 对偶数索引使用 sin
angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
# 对奇数索引使用 cos
angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
pos_encoding = angle_rads[np.newaxis, ...]
return tf.cast(pos_encoding, dtype=tf.float32)
def get_angles(pos, i, d_model):
angles = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
return pos * angles
# 生成位置编码:序列长度=100,特征维度=128
pos_enc = positional_encoding(100, 128)
print("位置编码形状:", pos_enc.shape)
💡 本次实战任务是英 - 法机器翻译。我们将使用一个小型的英法双语数据集,目标是搭建 Transformer 模型,实现英文句子到法文句子的自动翻译。
① 加载英法双语数据集,对文本进行预处理:分词、建立词汇表、转换为整数索引序列 ② 统一序列长度,对过长的序列进行截断,过短的序列进行填充 ③ 划分训练集和测试集,设置批次大小进行数据加载
import tensorflow_datasets as tfds
# 加载英法翻译数据集(示例)
dataset, info = tfds.load('ted_hrlr_translate/fr_to_en', with_info=True, as_supervised=True)
train_examples, val_examples = dataset['train'], dataset['validation']
# 构建分词器
tokenizer_en = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(
(en.numpy() for en, fr in train_examples), target_vocab_size=2**13)
tokenizer_fr = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(
(fr.numpy() for en, fr in train_examples), target_vocab_size=2**13)
# 文本处理函数
def encode(lang1, lang2):
# 添加开始和结束标记
lang1 = [tokenizer_en.vocab_size] + tokenizer_en.encode(lang1.numpy()) + [tokenizer_en.vocab_size + 1]
lang2 = [tokenizer_fr.vocab_size] + tokenizer_fr.encode(lang2.numpy()) + [tokenizer_fr.vocab_size + 1]
return lang1, lang2
def tf_encode(en, fr):
return tf.py_function(encode, [en, fr], [tf.int64, tf.int64])
# 设置序列最大长度
MAX_LENGTH = 40
def filter_max_length(x, y, max_length=MAX_LENGTH):
return tf.logical_and(tf.size(x) <= max_length, tf.size(y) <= max_length)
# 处理训练集和验证集
train_dataset = train_examples.map(tf_encode)
train_dataset = train_dataset.filter(filter_max_length)
train_dataset = train_dataset.cache()
train_dataset = train_dataset.shuffle(10000)
train_dataset = train_dataset.padded_batch(64, padded_shapes=([-1], [-1]))
train_dataset = train_dataset.prefetch(tf.data.AUTOTUNE)
val_dataset = val_examples.map(tf_encode)
val_dataset = val_dataset.filter(filter_max_length).padded_batch(64, padded_shapes=([-1], [-1]))
# 定义前馈神经网络
def point_wise_feed_forward_network(d_model, dff):
return tf.keras.Sequential([
tf.keras.layers.Dense(dff, activation='relu'),
tf.keras.layers.Dense(d_model)
])
# 定义编码层
class EncoderLayer(tf.keras.layers.Layer):
def __init__(self, d_model, num_heads, dff, rate=0.1):
super().__init__()
self.mha = MultiHeadAttention(d_model, num_heads)
self.ffn = point_wise_feed_forward_network(d_model, dff)
self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
self.dropout1 = tf.keras.layers.Dropout(rate)
self.dropout2 = tf.keras.layers.Dropout(rate)
def call(self, x, training, mask):
attn_output, _ = self.mha(x, x, x, mask)
attn_output = self.dropout1(attn_output, training=training)
out1 = self.layernorm1(x + attn_output)
ffn_output = self.ffn(out1)
ffn_output = self.dropout2(ffn_output, training=training)
out2 = self.layernorm2(out1 + ffn_output)
return out2
# 定义解码器层
class DecoderLayer(tf.keras.layers.Layer):
def __init__(self, d_model, num_heads, dff, rate=0.1):
super().__init__()
self.mha1 = MultiHeadAttention(d_model, num_heads)
self.mha2 = MultiHeadAttention(d_model, num_heads)
self.ffn = point_wise_feed_forward_network(d_model, dff)
self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
self.dropout1 = tf.keras.layers.Dropout(rate)
self.dropout2 = tf.keras.layers.Dropout(rate)
self.dropout3 = tf.keras.layers.Dropout(rate)
def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask)
attn1 = self.dropout1(attn1, training=training)
out1 = self.layernorm1(attn1 + x)
attn2, attn_weights_block2 = self.mha2(enc_output, enc_output, out1, padding_mask)
attn2 = self.dropout2(attn2, training=training)
out2 = self.layernorm2(attn2 + out1)
ffn_output = self.ffn(out2)
ffn_output = self.dropout3(ffn_output, training=training)
out3 = self.layernorm3(ffn_output + out2)
return out3, attn_weights_block1, attn_weights_block2
# 定义完整编码器
class Encoder(tf.keras.layers.Layer):
def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, maximum_position_encoding, rate=0.1):
super().__init__()
self.d_model = d_model
self.num_layers = num_layers
self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model)
self.pos_encoding = positional_encoding(maximum_position_encoding, self.d_model)
self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]
self.dropout = tf.keras.layers.Dropout(rate)
def call(self, x, training, mask):
seq_len = tf.shape(x)[1]
x = self.embedding(x)
x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
x += self.pos_encoding[:, :seq_len, :]
x = self.dropout(x, training=training)
for i in range(self.num_layers):
x = self.enc_layers[i](x, training, mask)
return x
# 定义完整解码器
class Decoder(tf.keras.layers.Layer):
def __init__(self, num_layers, d_model, num_heads, dff, target_vocab_size, maximum_position_encoding, rate=0.1):
super().__init__()
self.d_model = d_model
self.num_layers = num_layers
self.embedding = tf.keras.layers.Embedding(target_vocab_size, d_model)
self.pos_encoding = positional_encoding(maximum_position_encoding, d_model)
self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]
self.dropout = tf.keras.layers.Dropout(rate)
def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
seq_len = tf.shape(x)[1]
attention_weights = {}
x = self.embedding(x)
x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
x += self.pos_encoding[:, :seq_len, :]
x = self.dropout(x, training=training)
for i in range(self.num_layers):
x, block1, block2 = self.dec_layers[i](x, enc_output, training, look_ahead_mask, padding_mask)
attention_weights['decoder_layer{}_block1'.format(i+1)] = block1
attention_weights['decoder_layer{}_block2'.format(i+1)] = block2
return x, attention_weights
# 定义完整 Transformer 模型
class Transformer(tf.keras.Model):
def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, pe_input, pe_target, rate=0.1):
super().__init__()
self.encoder = Encoder(num_layers, d_model, num_heads, dff, input_vocab_size, pe_input, rate)
self.decoder = Decoder(num_layers, d_model, num_heads, dff, target_vocab_size, pe_target, rate)
self.final_layer = tf.keras.layers.Dense(target_vocab_size)
def call(self, inp, tar, training, enc_padding_mask, look_ahead_mask, dec_padding_mask):
enc_output = self.encoder(inp, training, enc_padding_mask)
dec_output, attention_weights = self.decoder(tar, enc_output, training, look_ahead_mask, dec_padding_mask)
final_output = self.final_layer(dec_output)
return final_output, attention_weights
# 设置模型参数
num_layers = 4
d_model = 128
dff = 512
num_heads = 8
input_vocab_size = tokenizer_en.vocab_size + 2
target_vocab_size = tokenizer_fr.vocab_size + 2
dropout_rate = 0.1
# 初始化模型
transformer = Transformer(num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, pe_input=1000, pe_target=1000, rate=dropout_rate)
💡 机器翻译任务属于序列生成任务,我们使用稀疏交叉熵损失函数,优化器选择 Adam,并设置学习率衰减策略。
# 定义学习率调度器
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
def __init__(self, d_model, warmup_steps=4000):
super().__init__()
self.d_model = d_model
self.d_model = tf.cast(self.d_model, tf.float32)
self.warmup_steps = warmup_steps
def __call__(self, step):
arg1 = tf.math.rsqrt(step)
arg2 = step * (self.warmup_steps ** -1.5)
return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)
# 初始化学习率和优化器
learning_rate = CustomSchedule(d_model)
optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)
# 定义损失函数和评估指标
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')
def loss_function(real, pred):
mask = tf.math.logical_not(tf.math.equal(real, 0))
loss_ = loss_object(real, pred)
mask = tf.cast(mask, dtype=loss_.dtype)
loss_ *= mask
return tf.reduce_sum(loss_) / tf.reduce_sum(mask)
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')
# 定义训练步骤
@tf.function
def train_step(inp, tar):
tar_inp = tar[:, :-1]
tar_real = tar[:, 1:]
enc_padding_mask, combined_mask, dec_padding_mask = create_masks(inp, tar_inp)
with tf.GradientTape() as tape:
predictions, _ = transformer(inp, tar_inp, True, enc_padding_mask, combined_mask, dec_padding_mask)
loss = loss_function(tar_real, predictions)
gradients = tape.gradient(loss, transformer.trainable_variables)
optimizer.apply_gradients(zip(gradients, transformer.trainable_variables))
train_loss(loss)
train_accuracy(tar_real, predictions)
# 定义掩码生成函数
def create_masks(inp, tar):
enc_padding_mask = create_padding_mask(inp)
dec_padding_mask = create_padding_mask(inp)
look_ahead_mask = create_look_ahead_mask(tf.shape(tar)[1])
dec_target_padding_mask = create_padding_mask(tar)
combined_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)
return enc_padding_mask, combined_mask, dec_padding_mask
def create_padding_mask(seq):
seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
return seq[:, tf.newaxis, tf.newaxis, :]
def create_look_ahead_mask(size):
mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
return mask
# 开始训练
EPOCHS = 20
for epoch in range(EPOCHS):
train_loss.reset_states()
train_accuracy.reset_states()
for (batch, (inp, tar)) in enumerate(train_dataset):
train_step(inp, tar)
if batch % 50 == 0:
print(f'Epoch {epoch+1} Batch {batch} Loss {train_loss.result():.4f} Accuracy {train_accuracy.result():.4f}')
print(f'Epoch {epoch+1} Loss {train_loss.result():.4f} Accuracy {train_accuracy.result():.4f}')
💡 技巧 1:使用标签平滑(Label Smoothing)技术,缓解模型过拟合,提升泛化能力。 💡 技巧 2:采用波束搜索(Beam Search)替代贪心搜索,生成更流畅、更准确的翻译结果。 💡 技巧 3:使用预训练的词向量初始化嵌入层,提升模型的特征表示能力。
✅ 注意力机制通过 QKV 框架,让模型学会聚焦序列中的关键信息,解决了长序列依赖问题。 ✅ 多头注意力从多个角度捕捉序列特征,是 Transformer 模型的核心组件。 ✅ Transformer 模型完全基于注意力机制,实现了高度并行化计算,在机器翻译、文本生成等任务中表现优异。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online