跳到主要内容注意力机制与 Transformer 模型实战 | 极客日志PythonAI算法
注意力机制与 Transformer 模型实战
注意力机制通过 QKV 框架解决长序列依赖问题,Transformer 基于此实现并行计算。自注意力与多头注意力原理,展示 TensorFlow 搭建编码器解码器架构,涵盖位置编码、英法翻译实战及训练优化技巧,提供完整代码示例与关键步骤解析。
协议工匠1 浏览 注意力机制与 Transformer 模型实战

注意力机制的核心思想
传统的 RNN 和 LSTM 在处理长序列时,存在长距离依赖捕捉能力不足和并行计算效率低的问题。注意力机制的出现,解决了这两个核心痛点。
注意力机制的本质是让模型学会'聚焦'——在处理序列数据时,自动分配不同的权重给输入序列中的各个元素,重点关注与当前任务相关的信息,弱化无关信息的干扰。比如在机器翻译任务中,翻译'我爱中国'时,模型会给'我''爱''中国'分配不同的注意力权重,从而更精准地生成对应的英文翻译。
基础框架与计算流程
注意力机制的计算通常包含查询(Query)、键(Key)、值(Value)三个核心要素,简称 QKV 框架。其计算流程可以总结为三步:
- 计算 Query 和所有 Key 的相似度,得到注意力分数
- 对注意力分数进行归一化处理(常用 Softmax 函数),得到注意力权重
- 用归一化后的权重对 Value 进行加权求和,得到最终的注意力输出
基础注意力计算公式如下:
$$Attention(Q,K,V)=softmax(\frac{QK^T}{\sqrt{d_k}})V$$
其中 $d_k$ 是 Key 的维度,除以 $\sqrt{d_k}$ 是为了防止内积结果过大,导致 Softmax 函数饱和。
import tensorflow as tf
import numpy as np
def scaled_dot_product_attention(q, k, v, mask=None):
matmul_qk = tf.matmul(q, k, transpose_b=True)
dk = tf.cast(tf.shape(k)[-1], tf.float32)
scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
if mask is not None:
scaled_attention_logits += (mask * -1e9)
attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
output = tf.matmul(attention_weights, v)
return output, attention_weights
q = tf.random.normal((, , ))
k = tf.random.normal((, , ))
v = tf.random.normal((, , ))
output, attn_weights = scaled_dot_product_attention(q, k, v)
(, output.shape)
(, attn_weights.shape)
2
3
4
2
3
4
2
3
4
print
"注意力输出形状:"
print
"注意力权重形状:"
注意:掩码(Mask)分为两种,一种是填充掩码,用于屏蔽输入中的无效填充部分;另一种是前瞻掩码,用于在自回归任务中防止模型看到未来的信息。
自注意力与多头注意力
自注意力机制
自注意力(Self-Attention)是注意力机制的一种特殊形式。在自注意力中,Query、Key、Value 三个矩阵都来自同一个输入序列。自注意力可以捕捉序列内部元素之间的依赖关系,比如在句子'他喜欢打篮球,因为它很有趣'中,模型可以通过自注意力机制,将'它'和'打篮球'关联起来。
- 对输入序列的每个元素,分别通过三个不同的线性变换,生成 Q、K、V 矩阵
- 按照基础注意力公式计算注意力输出
- 将注意力输出作为当前层的特征,传递给下一层
多头注意力机制
多头注意力(Multi-Head Attention)是 Transformer 模型的核心创新点之一。它通过多个并行的注意力头,从不同的角度捕捉序列的特征。
- 将输入序列的特征维度拆分为 h 个独立的子空间(h 为注意力头的数量)
- 对每个子空间分别计算自注意力,得到 h 个不同的注意力输出
- 将 h 个注意力输出拼接起来,再通过一个线性变换,得到最终的多头注意力输出
class MultiHeadAttention(tf.keras.layers.Layer):
def __init__(self, d_model, num_heads):
super(MultiHeadAttention, self).__init__()
self.num_heads = num_heads
self.d_model = d_model
assert d_model % self.num_heads == 0
self.depth = d_model // self.num_heads
self.wq = tf.keras.layers.Dense(d_model)
self.wk = tf.keras.layers.Dense(d_model)
self.wv = tf.keras.layers.Dense(d_model)
self.dense = tf.keras.layers.Dense(d_model)
def split_heads(self, x, batch_size):
x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
return tf.transpose(x, perm=[0, 2, 1, 3])
def call(self, v, k, q, mask):
batch_size = tf.shape(q)[0]
q = self.wq(q)
k = self.wk(k)
v = self.wv(v)
q = self.split_heads(q, batch_size)
k = self.split_heads(k, batch_size)
v = self.split_heads(v, batch_size)
scaled_attention, attention_weights = scaled_dot_product_attention(q, k, v, mask)
scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))
output = self.dense(concat_attention)
return output, attention_weights
mha = MultiHeadAttention(d_model=128, num_heads=8)
x = tf.random.normal((2, 5, 128))
output, attn_weights = mha(x, x, x, mask=None)
print("多头注意力输出形状:", output.shape)
print("多头注意力权重形状:", attn_weights.shape)
Transformer 模型架构详解
Transformer 模型由 Google 团队在 2017 年的论文《Attention Is All You Need》中提出。它完全基于注意力机制,摒弃了 RNN 和 CNN 的序列式结构,实现了高度并行化计算,极大提升了模型的训练效率。
Transformer 的整体架构分为编码器(Encoder)和解码器(Decoder)两大部分。
编码器结构
编码器由 N 个相同的编码层堆叠而成,每个编码层包含两个子层:
- 多头自注意力层:捕捉输入序列内部的依赖关系
- 前馈神经网络层:对注意力输出进行非线性变换
每个子层都配备了残差连接和层归一化,公式为:LayerNorm(x + Sublayer(x))
解码器结构
解码器同样由 N 个相同的解码层堆叠而成,每个解码层包含三个子层:
- 掩码多头自注意力层:防止模型看到未来的信息
- 编码器 - 解码器注意力层:捕捉输入序列和输出序列之间的依赖关系
- 前馈神经网络层:对注意力输出进行非线性变换
位置编码
Transformer 模型没有循环结构,无法感知输入序列的顺序信息。因此需要通过位置编码(Positional Encoding),将序列的位置信息注入到输入特征中。
位置编码的计算方式有多种,常用的是正弦余弦位置编码:
$$PE_{(pos,2i)} = sin(pos / 10000^{2i/d_{model}})$$
$$PE_{(pos,2i+1)} = cos(pos / 10000^{2i/d_{model}})$$
其中 pos 是元素在序列中的位置,i 是特征维度的索引。
def positional_encoding(position, d_model):
angle_rads = get_angles(np.arange(position)[:, np.newaxis], np.arange(d_model)[np.newaxis, :], d_model)
angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
pos_encoding = angle_rads[np.newaxis, ...]
return tf.cast(pos_encoding, dtype=tf.float32)
def get_angles(pos, i, d_model):
angles = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
return pos * angles
pos_enc = positional_encoding(100, 128)
print("位置编码形状:", pos_enc.shape)
实战:基于 Transformer 的机器翻译任务
任务介绍与数据集准备
本次实战任务是英 - 法机器翻译。我们将使用一个小型的英法双语数据集,目标是搭建 Transformer 模型,实现英文句子到法文句子的自动翻译。
- 加载英法双语数据集,对文本进行预处理:分词、建立词汇表、转换为整数索引序列
- 统一序列长度,对过长的序列进行截断,过短的序列进行填充
- 划分训练集和测试集,设置批次大小进行数据加载
import tensorflow_datasets as tfds
dataset, info = tfds.load('ted_hrlr_translate/fr_to_en', with_info=True, as_supervised=True)
train_examples, val_examples = dataset['train'], dataset['validation']
tokenizer_en = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(
(en.numpy() for en, fr in train_examples), target_vocab_size=2**13)
tokenizer_fr = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(
(fr.numpy() for en, fr in train_examples), target_vocab_size=2**13)
def encode(lang1, lang2):
lang1 = [tokenizer_en.vocab_size] + tokenizer_en.encode(lang1.numpy()) + [tokenizer_en.vocab_size + 1]
lang2 = [tokenizer_fr.vocab_size] + tokenizer_fr.encode(lang2.numpy()) + [tokenizer_fr.vocab_size + 1]
return lang1, lang2
def tf_encode(en, fr):
return tf.py_function(encode, [en, fr], [tf.int64, tf.int64])
MAX_LENGTH = 40
def filter_max_length(x, y, max_length=MAX_LENGTH):
return tf.logical_and(tf.size(x) <= max_length, tf.size(y) <= max_length)
train_dataset = train_examples.map(tf_encode)
train_dataset = train_dataset.filter(filter_max_length)
train_dataset = train_dataset.cache()
train_dataset = train_dataset.shuffle(10000)
train_dataset = train_dataset.padded_batch(64, padded_shapes=([-1], [-1]))
train_dataset = train_dataset.prefetch(tf.data.AUTOTUNE)
val_dataset = val_examples.map(tf_encode)
val_dataset = val_dataset.filter(filter_max_length).padded_batch(64, padded_shapes=([-1], [-1]))
搭建完整的 Transformer 模型
def point_wise_feed_forward_network(d_model, dff):
return tf.keras.Sequential([
tf.keras.layers.Dense(dff, activation='relu'),
tf.keras.layers.Dense(d_model)
])
class EncoderLayer(tf.keras.layers.Layer):
def __init__(self, d_model, num_heads, dff, rate=0.1):
super(EncoderLayer, self).__init__()
self.mha = MultiHeadAttention(d_model, num_heads)
self.ffn = point_wise_feed_forward_network(d_model, dff)
self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
self.dropout1 = tf.keras.layers.Dropout(rate)
self.dropout2 = tf.keras.layers.Dropout(rate)
def call(self, x, training, mask):
attn_output, _ = self.mha(x, x, x, mask)
attn_output = self.dropout1(attn_output, training=training)
out1 = self.layernorm1(x + attn_output)
ffn_output = self.ffn(out1)
ffn_output = self.dropout2(ffn_output, training=training)
out2 = self.layernorm2(out1 + ffn_output)
return out2
class DecoderLayer(tf.keras.layers.Layer):
def __init__(self, d_model, num_heads, dff, rate=0.1):
super(DecoderLayer, self).__init__()
self.mha1 = MultiHeadAttention(d_model, num_heads)
self.mha2 = MultiHeadAttention(d_model, num_heads)
self.ffn = point_wise_feed_forward_network(d_model, dff)
self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
self.dropout1 = tf.keras.layers.Dropout(rate)
self.dropout2 = tf.keras.layers.Dropout(rate)
self.dropout3 = tf.keras.layers.Dropout(rate)
def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask)
attn1 = self.dropout1(attn1, training=training)
out1 = self.layernorm1(attn1 + x)
attn2, attn_weights_block2 = self.mha2(enc_output, enc_output, out1, padding_mask)
attn2 = self.dropout2(attn2, training=training)
out2 = self.layernorm2(attn2 + out1)
ffn_output = self.ffn(out2)
ffn_output = self.dropout3(ffn_output, training=training)
out3 = self.layernorm3(ffn_output + out2)
return out3, attn_weights_block1, attn_weights_block2
class Encoder(tf.keras.layers.Layer):
def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, maximum_position_encoding, rate=0.1):
super(Encoder, self).__init__()
self.d_model = d_model
self.num_layers = num_layers
self.embedding = tf.keras.layers.Embedding(input_vocab_size, d_model)
self.pos_encoding = positional_encoding(maximum_position_encoding, self.d_model)
self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]
self.dropout = tf.keras.layers.Dropout(rate)
def call(self, x, training, mask):
seq_len = tf.shape(x)[1]
x = self.embedding(x)
x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
x += self.pos_encoding[:, :seq_len, :]
x = self.dropout(x, training=training)
for i in range(self.num_layers):
x = self.enc_layers[i](x, training, mask)
return x
class Decoder(tf.keras.layers.Layer):
def __init__(self, num_layers, d_model, num_heads, dff, target_vocab_size, maximum_position_encoding, rate=0.1):
super(Decoder, self).__init__()
self.d_model = d_model
self.num_layers = num_layers
self.embedding = tf.keras.layers.Embedding(target_vocab_size, d_model)
self.pos_encoding = positional_encoding(maximum_position_encoding, d_model)
self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]
self.dropout = tf.keras.layers.Dropout(rate)
def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
seq_len = tf.shape(x)[1]
attention_weights = {}
x = self.embedding(x)
x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
x += self.pos_encoding[:, :seq_len, :]
x = self.dropout(x, training=training)
for i in range(self.num_layers):
x, block1, block2 = self.dec_layers[i](x, enc_output, training, look_ahead_mask, padding_mask)
attention_weights['decoder_layer{}_block1'.format(i+1)] = block1
attention_weights['decoder_layer{}_block2'.format(i+1)] = block2
return x, attention_weights
class Transformer(tf.keras.Model):
def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, pe_input, pe_target, rate=0.1):
super(Transformer, self).__init__()
self.encoder = Encoder(num_layers, d_model, num_heads, dff, input_vocab_size, pe_input, rate)
self.decoder = Decoder(num_layers, d_model, num_heads, dff, target_vocab_size, pe_target, rate)
self.final_layer = tf.keras.layers.Dense(target_vocab_size)
def call(self, inp, tar, training, enc_padding_mask, look_ahead_mask, dec_padding_mask):
enc_output = self.encoder(inp, training, enc_padding_mask)
dec_output, attention_weights = self.decoder(tar, enc_output, training, look_ahead_mask, dec_padding_mask)
final_output = self.final_layer(dec_output)
return final_output, attention_weights
num_layers = 4
d_model = 128
dff = 512
num_heads = 8
input_vocab_size = tokenizer_en.vocab_size + 2
target_vocab_size = tokenizer_fr.vocab_size + 2
dropout_rate = 0.1
transformer = Transformer(num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, pe_input=1000, pe_target=1000, rate=dropout_rate)
模型编译与训练
机器翻译任务属于序列生成任务,我们使用稀疏交叉熵损失函数,优化器选择 Adam,并设置学习率衰减策略。
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
def __init__(self, d_model, warmup_steps=4000):
super(CustomSchedule, self).__init__()
self.d_model = d_model
self.d_model = tf.cast(self.d_model, tf.float32)
self.warmup_steps = warmup_steps
def __call__(self, step):
arg1 = tf.math.rsqrt(step)
arg2 = step * (self.warmup_steps ** -1.5)
return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)
learning_rate = CustomSchedule(d_model)
optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')
def loss_function(real, pred):
mask = tf.math.logical_not(tf.math.equal(real, 0))
loss_ = loss_object(real, pred)
mask = tf.cast(mask, dtype=loss_.dtype)
loss_ *= mask
return tf.reduce_sum(loss_) / tf.reduce_sum(mask)
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')
@tf.function
def train_step(inp, tar):
tar_inp = tar[:, :-1]
tar_real = tar[:, 1:]
enc_padding_mask, combined_mask, dec_padding_mask = create_masks(inp, tar_inp)
with tf.GradientTape() as tape:
predictions, _ = transformer(inp, tar_inp, True, enc_padding_mask, combined_mask, dec_padding_mask)
loss = loss_function(tar_real, predictions)
gradients = tape.gradient(loss, transformer.trainable_variables)
optimizer.apply_gradients(zip(gradients, transformer.trainable_variables))
train_loss(loss)
train_accuracy(tar_real, predictions)
def create_masks(inp, tar):
enc_padding_mask = create_padding_mask(inp)
dec_padding_mask = create_padding_mask(inp)
look_ahead_mask = create_look_ahead_mask(tf.shape(tar)[1])
dec_target_padding_mask = create_padding_mask(tar)
combined_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)
return enc_padding_mask, combined_mask, dec_padding_mask
def create_padding_mask(seq):
seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
return seq[:, tf.newaxis, tf.newaxis, :]
def create_look_ahead_mask(size):
mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
return mask
EPOCHS = 20
for epoch in range(EPOCHS):
train_loss.reset_states()
train_accuracy.reset_states()
for (batch, (inp, tar)) in enumerate(train_dataset):
train_step(inp, tar)
if batch % 50 == 0:
print(f'Epoch {epoch+1} Batch {batch} Loss {train_loss.result():.4f} Accuracy {train_accuracy.result():.4f}')
print(f'Epoch {epoch+1} Loss {train_loss.result():.4f} Accuracy {train_accuracy.result():.4f}')
模型优化技巧
- 使用标签平滑(Label Smoothing)技术,缓解模型过拟合,提升泛化能力。
- 采用波束搜索(Beam Search)替代贪心搜索,生成更流畅、更准确的翻译结果。
- 使用预训练的词向量初始化嵌入层,提升模型的特征表示能力。
总结
注意力机制通过 QKV 框架,让模型学会聚焦序列中的关键信息,解决了长序列依赖问题。多头注意力从多个角度捕捉序列特征,是 Transformer 模型的核心组件。Transformer 模型完全基于注意力机制,实现了高度并行化计算,在机器翻译、文本生成等任务中表现优异。
相关免费在线工具
- 加密/解密文本
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
- RSA密钥对生成器
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
- Mermaid 预览与可视化编辑
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
- 随机西班牙地址生成器
随机生成西班牙地址(支持马德里、加泰罗尼亚、安达卢西亚、瓦伦西亚筛选),支持数量快捷选择、显示全部与下载。 在线工具,随机西班牙地址生成器在线工具,online
- Gemini 图片去水印
基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online
- curl 转代码
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online