experiment.py. scientist
导入 PyTorch 的神经网络模块,并重命名为 nn,用于构建神经网络。
from torch.nn import functional as F
从 torch.nn 模块导入 functional,并重命名为 F,用于使用各种神经网络功能,如激活函数。
import argparse
导入 argparse 模块,用于解析命令行参数。
class LayerNorm(nn.Module):
定义一个名为 LayerNorm 的类,继承自 nn.Module,用于实现层归一化。
def __init__(self, ndim, bias):
定义构造函数,接受两个参数:ndim(维度)和 bias(是否使用偏置)。
super().__init__()
调用父类的构造函数。
self.weight = nn.Parameter(torch.ones(ndim))
初始化权重参数,设置为全 1 的张量。
self.bias = nn.Parameter(torch.zeros(ndim)) if bias else None
如果 bias 为真,则初始化偏置参数为全 0 的张量;否则设置为 None。
def forward(self, input):
定义前向传播方法,接受输入张量。
return F.layer_norm(input, self.weight.shape, self.weight, self.bias, 1e-5)
返回层归一化的结果,使用 PyTorch 的 layer_norm 函数。
class CausalSelfAttention(nn.Module):
定义一个名为 CausalSelfAttention 的类,继承自 nn.Module,用于实现因果自注意力机制。
def __init__(self, config):
定义构造函数,接受一个配置对象 config。
super().__init__()
调用父类的构造函数。
assert config.n_embd % config.n_head == 0
确保嵌入维度 n_embd 可以被头数 n_head 整除。
self.c_attn = nn.Linear(config.n_embd, 3 * config.n_embd, bias=config.bias)
初始化注意力的线性变换,输出维度为 3 * n_embd。
self.c_proj = nn.Linear(config.n_embd, config.n_embd, bias=config.bias)
初始化输出的线性变换,输入和输出维度均为 n_embd。
self.attn_dropout = nn.Dropout(config.dropout)
初始化注意力的 dropout 层。
self.resid_dropout = nn.Dropout(config.dropout)
初始化残差连接的 dropout 层。
self.n_head = config.n_head
保存头数 n_head。
self.n_embd = config.n_embd
保存嵌入维度 n_embd。
self.dropout = config.dropout
保存 dropout 比例。
self.flash = hasattr(torch.nn.functional, "scaled_dot_product_attention")
检查 PyTorch 是否支持快速注意力机制。
if not self.flash:
如果不支持快速注意力机制,执行以下代码。
print("WARNING: using slow attention. Flash Attention requires PyTorch >= 2.0")
打印警告信息,提示使用慢速注意力。
self.register_buffer(
"bias",
torch.tril(torch.ones(config.block_size, config.block_size)).view(
1, 1, config.block_size, config.block_size
),
)
注册一个缓冲区 bias,用于因果掩码。
def forward(self, x):
定义前向传播方法,接受输入张量 x。
B, T, C = x.size()
获取输入的批量大小 B、序列长度 T 和嵌入维度 C。
q, k, v = self.c_attn(x).split(self.n_embd, dim=2)
计算查询、键和值,并按嵌入维度分割。
k = k.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
调整键的形状并转置。
q = q.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
调整查询的形状并转置。
v = v.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)
调整值的形状并转置。
if self.flash:
如果支持快速注意力机制,执行以下代码。
y = torch.nn.functional.scaled_dot_product_attention(
q,
k,
v,
attn_mask=None,
dropout_p=self.dropout if self.training else 0,
is_causal=True,
)
使用快速注意力机制计算输出。
else:
如果不支持快速注意力机制,执行以下代码。
att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))
计算注意力得分。
att = att.masked_fill(self.bias[:, :, :T, :T] == 0, float("-inf"))
应用因果掩码。
att = F.softmax(att, dim=-1)
对注意力得分应用 softmax。
att = self.attn_dropout(att)
应用 dropout。
y = att @ v
计算输出。
y = y.transpose(1, 2).contiguous().view(B, T, C)
调整输出的形状。
y = self.resid_dropout(self.c_proj(y))
应用输出的线性变换和 dropout。
return y
返回输出。
class MLP(nn.Module):
定义一个名为 MLP 的类,继承自 nn.Module,用于实现多层感知机。
def __init__(self, config):
定义构造函数,接受一个配置对象 config。
super().__init__()
调用父类的构造函数。
self.c_fc = nn.Linear(config.n_embd, 4 * config.n_embd, bias=config.bias)
初始化第一个线性层,输出维度为 4 * n_embd。
self.gelu = nn.GELU()
初始化 GELU 激活函数。
self.c_proj = nn.Linear(4 * config.n_embd, config.n_embd, bias=config.bias)
初始化第二个线性层,输出维度为 n_embd。
self.dropout = nn.Dropout(config.dropout)
初始化 dropout 层。
def forward(self, x):
定义前向传播方法,接受输入张量 x。
x = self.c_fc(x)
通过第一个线性层。
x = self.gelu(x)
应用 GELU 激活函数。
x = self.c_proj(x)
通过第二个线性层。
x = self.dropout(x)
应用 dropout。
return x
返回输出。
class Block(nn.Module):
定义一个名为 Block 的类,继承自 nn.Module,用于实现 Transformer 的基本块。
def __init__(self, config):
定义构造函数,接受一个配置对象 config。
super().__init__()
调用父类的构造函数。
self.ln_1 = LayerNorm(config.n_embd, bias=config.bias)
初始化第一个层归一化。
self.attn = CausalSelfAttention(config)
初始化因果自注意力层。
self.ln_2 = LayerNorm(config.n_embd, bias=config.bias)
初始化第二个层归一化。
self.mlp = MLP(config)
初始化多层感知机。
def forward(self, x):
定义前向传播方法,接受输入张量 x。
x = x + self.attn(self.ln_1(x))
通过自注意力层并添加残差连接。
x = x + self.mlp(self.ln_2(x))
通过多层感知机并添加残差连接。
return x
返回输出。
@dataclass
class GPTConfig:
定义一个数据类 GPTConfig,用于存储 GPT 模型的配置。
block_size: int = 1024
定义块大小,默认为 1024。
vocab_size: int = 50304
定义词汇表大小,默认为 50304。
n_layer: int = 12
定义层数,默认为 12。
n_head: int = 12
定义头数,默认为 12。
n_embd: int = 768
定义嵌入维度,默认为 768。
dropout: float = 0.0
定义 dropout 比例,默认为 0.0。
bias: bool = True
定义是否使用偏置,默认为 True。
class GPT(nn.Module):
定义一个名为 GPT 的类,继承自 nn.Module,用于实现 GPT 模型。
def __init__(self, config):
定义构造函数,接受一个配置对象 config。
super().__init__()
调用父类的构造函数。
assert config.vocab_size is not None
确保词汇表大小不为 None。
assert config.block_size is not None
确保块大小不为 None。
self.config = config
保存配置对象。
self.transformer = nn.ModuleDict(
初始化一个模块字典,用于存储模型的各个部分。
dict(
wte=nn.Embedding(config.vocab_size, config.n_embd),
添加词嵌入层。
wpe=nn.Embedding(config.block_size, config.n_embd),
添加位置嵌入层。
drop=nn.Dropout(config.dropout),
添加 dropout 层。
h=nn.ModuleList([Block(config) for _ in range(config.n_layer)]),
添加多个 Transformer 块。
ln_f=LayerNorm(config.n_embd, bias=config.bias),
添加最终的层归一化。
)
)
结束模块字典的定义。
self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
初始化语言模型头,输出维度为词汇表大小。
self.transformer.wte.weight = (
self.lm_head.weight
)
共享嵌入权重和语言模型头的权重。
self.apply(self._init_weights)
应用权重初始化。
for pn, p in self.named_parameters():
遍历模型的所有参数。
if pn.endswith("c_proj.weight"):
检查参数名称是否以 c_proj.weight 结尾。
torch.nn.init.normal_(
p, mean=0.0, std=0.02 / math.sqrt(2 * config.n_layer)
)
对残差投影权重应用特殊的初始化。
print("number of parameters: %.2fM" % (self.get_num_params() / 1e6,))
打印模型的参数数量。
def get_num_params(self, non_embedding=True):
定义一个方法,返回模型的参数数量。
n_params = sum(p.numel() for p in self.parameters())
计算所有参数的数量。
if non_embedding:
如果不计算嵌入参数,执行以下代码。
n_params -= self.transformer.wpe.weight.numel()
从总参数中减去位置嵌入的参数数量。
return n_params
返回参数数量。
def _init_weights(self, module):
定义一个方法,用于初始化权重。
if isinstance(module, nn.Linear):
检查模块是否为线性层。
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
对线性层的权重应用正态分布初始化。
if module.bias is not None:
如果线性层有偏置,执行以下代码。
torch.nn.init.zeros_(module.bias)
将偏置初始化为零。
elif isinstance(module, nn.Embedding):
检查模块是否为嵌入层。
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
对嵌入层的权重应用正态分布初始化。
def forward(self, idx, targets=None):
定义前向传播方法,接受输入索引 idx 和目标 targets。
device = idx.device
获取输入的设备信息。
b, t = idx.size()
获取批量大小 b 和序列长度 t。
assert (
t <= self.config.block_size
), f"Cannot forward sequence of length {t}, block size is only {self.config.block_size}"
确保序列长度不超过块大小。
pos = torch.arange(0, t, dtype=torch.long, device=device) # shape (t)
创建位置索引。
tok_emb = self.transformer.wte(idx) # token embeddings of shape (b, t, n_embd)
获取输入的词嵌入。
pos_emb = self.transformer.wpe(pos) # position embeddings of shape (t, n_embd)
获取位置嵌入。
x = self.transformer.drop(tok_emb + pos_emb)
将词嵌入和位置嵌入相加并应用 dropout。
for block in self.transformer.h:
遍历所有 Transformer 块。
x = block(x)
通过当前块进行前向传播。
x = self.transformer.ln_f(x)
通过最终的层归一化。
if targets is not None:
如果提供了目标,执行以下代码。
logits = self.lm_head(x)
通过语言模型头计算 logits。
loss = F.cross_entropy(
logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1
)
计算交叉熵损失。
else:
如果没有提供目标,执行以下代码。
logits = self.lm_head(
x[:, [-1], :]
) # note: using list [-1] to preserve the time dim
仅在最后一个时间步计算 logits。
loss = None
将损失设置为 None。
return logits, loss
返回 logits 和损失。
def crop_block_size(self, block_size):
定义一个方法,用于裁剪块大小。
assert block_size <= self.config.block_size
确保新的块大小不超过原始块大小。
self.config.block_size = block_size
更新块大小。
self.transformer.wpe.weight = nn.Parameter(
self.transformer.wpe.weight[:block_size]
)
裁剪位置嵌入的权重。
for block in self.transformer.h:
遍历所有 Transformer 块。
if hasattr(block.attn, "bias"):
检查当前块的注意力层是否有偏置。
block.attn.bias = block.attn.bias[:, :, :block_size, :block_size]
裁剪注意力层的偏置。
def configure_optimizers(self, weight_decay, learning_rate, betas, device_type):
定义一个方法,用于配置优化器。
param_dict = {pn: p for pn, p in self.named_parameters()}
创建一个参数字典,包含所有参数。
param_dict = {pn: p for pn, p in param_dict.items() if p.requires_grad}
过滤出需要梯度的参数。
decay_params = [p for n, p in param_dict.items() if p.dim() >= 2]
获取需要权重衰减的参数。
nodecay_params = [p for n, p in param_dict.items() if p.dim() < 2]
获取不需要权重衰减的参数。
optim_groups = [
{"params": decay_params, "weight_decay": weight_decay},
{"params": nodecay_params, "weight_decay": 0.0},
]
创建优化器组,分别设置权重衰减。
num_decay_params = sum(p.numel() for p in decay_params)
计算需要衰减的参数数量。
num_nodecay_params = sum(p.numel() for p in nodecay_params)
计算不需要衰减的参数数量。
print(
f"num decayed parameter tensors: {len(decay_params)}, with {num_decay_params:,} parameters"
)
打印需要衰减的参数数量信息。
print(
f"num non-decayed parameter tensors: {len(nodecay_params)}, with {num_nodecay_params:,} parameters"
)
打印不需要衰减的参数数量信息。
fused_available = "fused" in inspect.signature(torch.optim.AdamW).parameters
检查是否可以使用融合版本的 AdamW 优化器。
use_fused = fused_available and device_type == "cuda"
确定是否使用融合版本。
extra_args = dict(fused=True) if use_fused else dict()
设置额外参数。
optimizer = torch.optim.AdamW(
optim_groups, lr=learning_rate, betas=betas, **extra_args
)
创建 AdamW 优化器。
print(f"using fused AdamW: {use_fused}")
打印是否使用融合版本的信息。
return optimizer
返回优化器。
@torch.no_grad()
def generate(self, idx, max_new_tokens, temperature=1.0, top_k=None):
定义一个生成方法,接受输入索引、最大生成的令牌数、温度和 top_k 参数。
for _ in range(max_new_tokens):
循环生成指定数量的新令牌。
idx_cond = (
idx
if idx.size(1) <= self.config.block_size
else idx[:, -self.config.block_size :]
)
裁剪输入序列以适应块大小。
logits, _ = self(idx_cond)
通过模型前向传播获取 logits。
logits = logits[:, -1, :] / temperature
获取最后一个时间步的 logits,并按温度缩放。
if top_k is not None:
如果指定了 top_k,执行以下代码。
v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
获取 logits 中的 top_k 值。
logits[logits < v[:, [-1]]] = -float("Inf")
将低于 top_k 的 logits 设置为负无穷。
probs = F.softmax(logits, dim=-1)
对 logits 应用 softmax 转换为概率。
idx_next = torch.multinomial(probs, num_samples=1)
从概率分布中采样下一个索引。
idx = torch.cat((idx, idx_next), dim=1)
将采样的索引添加到输入序列中。
return idx
返回生成的索引序列。
def train(dataset="shakespeare_char", out_dir="run_0", seed_offset=0):
定义一个训练函数,接受数据集名称、输出目录和种子偏移量。
gradient_accumulation_steps = 1
设置梯度累积步数。
batch_size = 64 if dataset == "shakespeare_char" else 32
根据数据集设置批量大小。
block_size = 256
设置块大小。
eval_interval = 250 if dataset == "shakespeare_char" else 1000
设置评估间隔。
log_interval = 10 if dataset == "shakespeare_char" else 100
设置日志间隔。
eval_iters = 200
设置评估迭代次数。
eval_only = False
设置是否仅进行评估。
always_save_checkpoint = False
设置是否始终保存检查点。
never_save_checkpoint = True
设置是否从不保存检查点。
n_layer = 6
设置层数。
n_head = 6
设置头数。
n_embd = 384
设置嵌入维度。
dropout = 0.2
设置 dropout 比例。
bias = False
设置是否使用偏置。
learning_rate = 1e-3 if dataset == "shakespeare_char" else 5e-4
根据数据集设置学习率。
max_iters = 5000 if dataset == "shakespeare_char" else 100000
根据数据集设置最大迭代次数。
weight_decay = 1e-1
设置权重衰减。
beta1 = 0.9
设置 Adam 优化器的 beta1。
beta2 = 0.99
设置 Adam 优化器的 beta2。
grad_clip = 1.0
设置梯度裁剪值。
decay_lr = True
设置是否衰减学习率。
warmup_iters = 100 if dataset == "shakespeare_char" else 200
设置预热迭代次数。
lr_decay_iters = max_iters
设置学习率衰减迭代次数。
min_lr = 1e-4 if dataset == "shakespeare_char" else 5e-5
设置最小学习率。
backend = "nccl"
设置分布式后端。
device = "cuda"
设置设备为 CUDA。
dtype = (
"bfloat16"
if torch.cuda.is_available() and torch.cuda.is_bf16_supported()
else "float16"
)
根据设备支持情况设置数据类型。
compile = True
设置是否编译模型。
master_process = True
设置主进程标志。
tokens_per_iter = gradient_accumulation_steps * batch_size * block_size
计算每次迭代的令牌数量。
print(f"tokens per iteration will be: {tokens_per_iter:,}")
打印每次迭代的令牌数量。
if master_process:
os.makedirs(out_dir, exist_ok=True)
如果是主进程,则创建输出目录。
torch.manual_seed(1337 + seed_offset)
设置随机种子。
torch.backends.cuda.matmul.allow_tf32 = True
允许在 CUDA 上使用 TensorFloat-32。
torch.backends.cudnn.allow_tf32 = True
允许在 cuDNN 上使用 TensorFloat-32。
device_type = (
"cuda" if "cuda" in device else "cpu"
)
确定设备类型。
ptdtype = {
"float32": torch.float32,
"bfloat16": torch.bfloat16,
"float16": torch.float16,
}[dtype]
根据数据类型设置 PyTorch 数据类型。
ctx = (
nullcontext()
if device_type == "cpu"
else torch.amp.autocast(device_type=device_type, dtype=ptdtype)
)
根据设备类型设置上下文管理器。
if out_dir == "run_0":
data_dir = os.path.join("../../data", dataset)
else:
data_dir = os.path.join("../../../data", dataset)
根据输出目录设置数据目录。
def get_batch(split):
定义一个获取批次的函数,接受数据集划分参数。
if split == "train":
检查是否为训练集。
data = np.memmap(
os.path.join(data_dir, "train.bin"), dtype=np.uint16, mode="r"
)
使用内存映射读取训练数据。
else:
data = np.memmap(
os.path.join(data_dir, "val.bin"), dtype=np.uint16, mode="r"
)
使用内存映射读取验证数据。
ix = torch.randint(len(data) - block_size, (batch_size,))
随机选择批次索引。
x = torch.stack(
[torch.from_numpy((data[i : i + block_size]).astype(np.int64)) for i in ix]
)
创建输入张量。
y = torch.stack(
[
torch.from_numpy((data[i + 1 : i + 1 + block_size]).astype(np.int64))
for i in ix
]
)
创建目标张量。
if device_type == "cuda":
检查设备类型是否为 CUDA。
x, y = x.pin_memory().to(device, non_blocking=True), y.pin_memory().to(
device, non_blocking=True
)
将张量移动到 GPU。
else:
x, y = x.to(device), y.to(device)
将张量移动到 CPU。
return x, y
返回输入和目标张量。
iter_num = 0
初始化迭代计数器。
best_val_loss = 1e9
初始化最佳验证损失。
meta_path = os.path.join(data_dir, "meta.pkl")
设置元数据文件路径。
meta_vocab_size = None
初始化元数据词汇表大小。
if os.path.exists(meta_path):
检查元数据文件是否存在。
with open(meta_path, "rb") as f:
以二进制模式打开元数据文件。
meta = pickle.load(f)
加载元数据。
meta_vocab_size = meta["vocab_size"]
提取词汇表大小。
print(f"found vocab_size = {meta_vocab_size} (inside {meta_path})")
打印找到的词汇表大小。
model_args = dict(
创建模型参数字典。
n_layer=n_layer,
n_head=n_head,
n_embd=n_embd,
block_size=block_size,
bias=bias,
vocab_size=None,
dropout=dropout,
)
设置模型参数。
print("Initializing a new model from scratch")
打印初始化新模型的信息。
if meta_vocab_size is None:
检查元数据词汇表大小是否为 None。
print(
"defaulting to vocab_size of GPT-2 to 50304 (50257 rounded up for efficiency)"
)
打印默认词汇表大小的信息。
model_args["vocab_size"] = meta_vocab_size if meta_vocab_size is not None else 50304
设置词汇表大小。
gptconf = GPTConfig(**model_args)
创建 GPT 配置对象。
model = GPT(gptconf)
初始化 GPT 模型。
if block_size < model.config.block_size:
检查块大小是否小于模型的块大小。
model.crop_block_size(block_size)
裁剪模型的块大小。
model_args["block_size"] = (
block_size
)
更新模型参数中的块大小。
model.to(device)
将模型移动到指定设备。
scaler = torch.cuda.amp.GradScaler(enabled=(dtype == "float16"))
初始化梯度缩放器。
optimizer = model.configure_optimizers(
weight_decay, learning_rate, (beta1, beta2), device_type
)
配置优化器。
checkpoint = None
初始化检查点。
if compile:
检查是否需要编译模型。
print("compiling the model... (takes a ~minute)")
打印编译模型的信息。
unoptimized_model = model
保存未优化的模型。
model = torch.compile(model)
编译模型。
@torch.no_grad()
def estimate_loss():
定义一个估计损失的方法,使用无梯度上下文。
out = {}
初始化输出字典。
model.eval()
将模型设置为评估模式。
for split in ["train", "val"]:
遍历训练和验证集。
losses = torch.zeros(eval_iters)
初始化损失张量。
for k in range(eval_iters):
进行评估迭代。
X, Y = get_batch(split)
获取当前批次的数据。
with ctx:
使用上下文管理器。
logits, loss = model(X, Y)
通过模型前向传播获取 logits 和损失。
losses[k] = loss.item()
记录当前损失。
out[split] = losses.mean()
计算并保存平均损失。
model.train()
将模型设置为训练模式。
return out
返回损失输出。
def get_lr(it):
定义一个获取学习率的方法。
if it < warmup_iters:
检查当前迭代是否在预热阶段。
return learning_rate * it / warmup_iters
返回线性预热的学习率。
if it > lr_decay_iters:
检查当前迭代是否超过学习率衰减迭代次数。
return min_lr
返回最小学习率。
decay_ratio = (it - warmup_iters) / (lr_decay_iters - warmup_iters)
计算衰减比例。
coeff = 0.5 * (1.0 + math.cos(math.pi * decay_ratio))
计算余弦衰减系数。
return min_lr + coeff * (learning_rate - min_lr)
返回衰减后的学习率。
val_log_info = []
初始化验证日志信息列表。
train_log_info = []
初始化训练日志信息列表。
X, Y = get_batch("train")
获取初始训练批次。
og_t0 = time.time()
记录原始开始时间。
t0 = time.time()
记录当前时间。
local_iter_num = 0
初始化本地迭代计数器。
raw_model = model
保存原始模型。
while True:
进入无限循环。
lr = get_lr(iter_num) if decay_lr else learning_rate
获取当前学习率。
for param_group in optimizer.param_groups:
遍历优化器的参数组。
param_group["lr"] = lr
设置当前学习率。
if iter_num % eval_interval == 0 and master_process:
检查是否需要评估。
losses = estimate_loss()
估计损失。
print(
f"step {iter_num}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}"
)
打印当前步骤的训练和验证损失。
val_log_info.append(
{
"iter": iter_num,
"train/loss": losses["train"].item(),
"val/loss": losses["val"].item(),
"lr": lr,
}
)
记录验证日志信息。
if losses["val"] < best_val_loss or always_save_checkpoint:
检查是否需要保存检查点。
best_val_loss = losses["val"]
更新最佳验证损失。
if iter_num > 0 and not never_save_checkpoint:
检查是否需要保存检查点。
checkpoint = {
"model": raw_model.state_dict(),
"optimizer": optimizer.state_dict(),
"model_args": model_args,
"iter_num": iter_num,
"best_val_loss": best_val_loss,
}
创建检查点字典。
print(f"saving checkpoint to {out_dir}")
打印保存检查点的信息。
torch.save(checkpoint, os.path.join(out_dir, "ckpt.pt"))
保存检查点。
if iter_num == 0 and eval_only:
break
如果是第一次迭代且仅进行评估,则退出循环。
for micro_step in range(gradient_accumulation_steps):
进行梯度累积。
with ctx:
使用上下文管理器。
logits, loss = model(X, Y)
通过模型前向传播获取 logits 和损失。
loss = (
loss / gradient_accumulation_steps
)
缩放损失以适应梯度累积。
X, Y = get_batch("train")
获取下一个训练批次。
scaler.scale(loss).backward()
缩放损失并进行反向传播。
if grad_clip != 0.0:
检查是否需要梯度裁剪。
scaler.unscale_(optimizer)
取消缩放优化器的梯度。
torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)
裁剪梯度。
scaler.step(optimizer)
更新优化器。
scaler.update()
更新缩放器。
optimizer.zero_grad(set_to_none=True)
清零梯度。
t1 = time.time()
记录当前时间。
dt = t1 - t0
计算时间差。
t0 = t1
更新时间。
if iter_num % log_interval == 0 and master_process:
检查是否需要记录日志。
lossf = loss.item() * gradient_accumulation_steps
计算实际损失。
print(f"iter {iter_num}: loss {lossf:.4f}, time {dt*1000:.2f}ms")
打印当前迭代的损失和时间。
train_log_info.append(
{
"iter": iter_num,
"loss": lossf,
"time": dt * 1000,
}
)
记录训练日志信息。
iter_num += 1
增加迭代计数器。
local_iter_num += 1
增加本地迭代计数器。
if iter_num > max_iters:
break
检查是否超过最大迭代次数。
print("training done")
打印训练完成的信息。
print(f"Best validation loss: {best_val_loss}")
打印最佳验证损失。
print(f"Total train time: {(time.time() - og_t0) / 60:.2f} mins")
打印总训练时间。
final_info = {
"final_train_loss": lossf,
"best_val_loss": best_val_loss.item(),
"total_train_time": time.time() - og_t0,
}
创建最终信息字典。
start = " "
初始化生成的起始文本。
num_samples = 10
设置生成样本的数量。
max_new_tokens = 500
设置每个样本生成的最大令牌数。
temperature = 0.8
设置生成的温度。
top_k = 200
设置保留的 top_k 选项。
assert os.path.exists(meta_path), "meta.pkl not found, please run training script first"
检查元数据文件是否存在。
print(f"Loading meta from {meta_path}...")
打印加载元数据的信息。
with open(meta_path, "rb") as f:
以二进制模式打开元数据文件。
meta = pickle.load(f)
加载元数据。
stoi, itos = meta["stoi"], meta["itos"]
提取字符到索引和索引到字符的映射。
encode = lambda s: [stoi[c] for c in s]
定义编码函数,将字符转换为索引。
decode = lambda l: "".join([itos[i] for i in l])
定义解码函数,将索引转换为字符。
if start.startswith("FILE:"):
检查起始文本是否以 "FILE:" 开头。
with open(start[5:], "r", encoding="utf-8") as f:
打开指定文件以读取起始文本。
start = f.read()
读取文件内容。
start_ids = encode(start)
将起始文本编码为索引。
x = torch.tensor(start_ids, dtype=torch.long, device=device)[None, ...]
将起始索引转换为张量。
model.eval()
将模型设置为评估模式。
results = []
初始化结果列表。
with torch.no_grad():
使用无梯度上下文。
with ctx:
使用上下文管理器。
for k in range(num_samples):
循环生成指定数量的样本。
start_time = time.time()
记录开始时间。
y = model.generate(
x, max_new_tokens, temperature=temperature, top_k=top_k
)
通过模型生成新令牌。
end_time = time.time()
记录结束时间。
generated_text = decode(y[0].tolist())
解码生成的索引为文本。
inference_time = end_time - start_time
计算推理时间。
tokens_per_second = max_new_tokens / inference_time
计算每秒生成的令牌数。
print(f"Sample {k+1}:")
打印样本编号。
print(generated_text)
打印生成的文本。
print(f"Inference time: {inference_time:.2f} seconds")
打印推理时间。
print(f"Tokens per second: {tokens_per_second:.2f}")
打印每秒生成的令牌数。
print("---------------")
打印分隔线。
results.append(
{
"sample_id": k + 1,
"generated_text": generated_text,
"inference_time": inference_time,
"tokens_per_second": tokens_per_second,
}
)
将生成结果添加到结果列表。
avg_tokens_per_second = sum(r["tokens_per_second"] for r in results) / len(results)
计算平均每秒生成的令牌数。
print(f"Average tokens per second: {avg_tokens_per_second:.2f}")
打印平均每秒生成的令牌数。
final_info["avg_inference_tokens_per_second"] = avg_tokens_per_second
将平均推理速度添加到最终信息中。
with open(
os.path.join(out_dir, f"final_info_{dataset}_{seed_offset}.json"), "w"
) as f:
打开最终信息文件以写入。
json.dump(final_info, f)
将最终信息写入文件。
return final_info, train_log_info, val_log_info
返回最终信息、训练日志和验证日志。
parser = argparse.ArgumentParser(description="Run experiment")
创建命令行参数解析器。
parser.add_argument("--out_dir", type=str, default="run_0", help="Output directory")
添加输出目录参数。
args = parser.parse_args()
解析命令行参数。
if __name__ == "__main__":
检查是否为主模块运行。
num_seeds = {
"shakespeare_char": 3,
"enwik8": 1,
"text8": 1,
}
定义每个数据集的种子数量。
out_dir = args.out_dir
获取输出目录。
all_results = {}
初始化所有结果字典。
final_infos = {}
初始化最终信息字典。
for dataset in ["shakespeare_char", "enwik8", "text8"]:
遍历数据集。
final_info_list = []
初始化最终信息列表。
for seed_offset in range(num_seeds[dataset]):
遍历种子偏移量。
final_info, train_info, val_info = train(dataset, out_dir, seed_offset)
调用训练函数。
all_results[f"{dataset}_{seed_offset}_final_info"] = final_info
保存最终信息。
all_results[f"{dataset}_{seed_offset}_train_info"] = train_info
保存训练信息。
all_results[f"{dataset}_{seed_offset}_val_info"] = val_info
保存验证信息。
final_info_list.append(final_info)
将最终信息添加到列表。
final_info_dict = {
k: [d[k] for d in final_info_list] for k in final_info_list[0].keys()
}
创建最终信息字典。
means = {f"{k}_mean": np.mean(v) for k, v in final_info_dict.items()}
计算均值。
stderrs = {
f"{k}_stderr": np.std(v) / len(v) for k, v in final_info_dict.items()
}
计算标准误差。
final_infos[dataset] = {
"means": means,
"stderrs": stderrs,
"final_info_dict": final_info_dict,
}
保存最终信息。
with open(os.path.join(out_dir, "final_info.json"), "w") as f:
打开最终信息文件以写入。
json.dump(final_infos, f)
将最终信息写入文件。
with open(os.path.join(out_dir, "all_results.npy"), "wb") as f:
打开所有结果文件以写入。
np.save(f, all_results)
将所有结果保存为 NumPy 文件。
以上是对代码文件的逐行解释。如果你有任何具体的疑问或需要更详细的解释,请告诉我!