experiment.py. scientist

experiment.py. scientist

导入 PyTorch 的神经网络模块,并重命名为 nn,用于构建神经网络。

from torch.nn import functional as F

从 torch.nn 模块导入 functional,并重命名为 F,用于使用各种神经网络功能,如激活函数。

import argparse

导入 argparse 模块,用于解析命令行参数。

class LayerNorm(nn.Module):

定义一个名为 LayerNorm 的类,继承自 nn.Module,用于实现层归一化。

def __init__(self, ndim, bias):

定义构造函数,接受两个参数:ndim(维度)和 bias(是否使用偏置)。

super().__init__()

调用父类的构造函数。

self.weight = nn.Parameter(torch.ones(ndim))

初始化权重参数,设置为全 1 的张量。

self.bias = nn.Parameter(torch.zeros(ndim)) if bias else None

如果 bias 为真,则初始化偏置参数为全 0 的张量;否则设置为 None。

def forward(self, input):

定义前向传播方法,接受输入张量。

return F.layer_norm(input, self.weight.shape, self.weight, self.bias, 1e-5)

返回层归一化的结果,使用 PyTorch 的 layer_norm 函数。

class CausalSelfAttention(nn.Module):

定义一个名为 CausalSelfAttention 的类,继承自 nn.Module,用于实现因果自注意力机制。

def __init__(self, config):

定义构造函数,接受一个配置对象 config。

super().__init__()

调用父类的构造函数。

assert config.n_embd % config.n_head == 0

确保嵌入维度 n_embd 可以被头数 n_head 整除。

self.c_attn = nn.Linear(config.n_embd, 3 * config.n_embd, bias=config.bias)

初始化注意力的线性变换,输出维度为 3 * n_embd。

self.c_proj = nn.Linear(config.n_embd, config.n_embd, bias=config.bias)

初始化输出的线性变换,输入和输出维度均为 n_embd。

self.attn_dropout = nn.Dropout(config.dropout)

初始化注意力的 dropout 层。

self.resid_dropout = nn.Dropout(config.dropout)

初始化残差连接的 dropout 层。

self.n_head = config.n_head

保存头数 n_head。

self.n_embd = config.n_embd

保存嵌入维度 n_embd。

self.dropout = config.dropout

保存 dropout 比例。

self.flash = hasattr(torch.nn.functional, "scaled_dot_product_attention")

检查 PyTorch 是否支持快速注意力机制。

if not self.flash:

如果不支持快速注意力机制,执行以下代码。

print("WARNING: using slow attention. Flash Attention requires PyTorch >= 2.0")

打印警告信息,提示使用慢速注意力。

self.register_buffer(

"bias",

torch.tril(torch.ones(config.block_size, config.block_size)).view(

1, 1, config.block_size, config.block_size

),

)

注册一个缓冲区 bias,用于因果掩码。

def forward(self, x):

定义前向传播方法,接受输入张量 x。

B, T, C = x.size()

获取输入的批量大小 B、序列长度 T 和嵌入维度 C。

q, k, v = self.c_attn(x).split(self.n_embd, dim=2)

计算查询、键和值,并按嵌入维度分割。

k = k.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)

调整键的形状并转置。

q = q.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)

调整查询的形状并转置。

v = v.view(B, T, self.n_head, C // self.n_head).transpose(1, 2)

调整值的形状并转置。

if self.flash:

如果支持快速注意力机制,执行以下代码。

y = torch.nn.functional.scaled_dot_product_attention(

q,

k,

v,

attn_mask=None,

dropout_p=self.dropout if self.training else 0,

is_causal=True,

)

使用快速注意力机制计算输出。

else:

如果不支持快速注意力机制,执行以下代码。

att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))

计算注意力得分。

att = att.masked_fill(self.bias[:, :, :T, :T] == 0, float("-inf"))

应用因果掩码。

att = F.softmax(att, dim=-1)

对注意力得分应用 softmax。

att = self.attn_dropout(att)

应用 dropout。

y = att @ v

计算输出。

y = y.transpose(1, 2).contiguous().view(B, T, C)

调整输出的形状。

y = self.resid_dropout(self.c_proj(y))

应用输出的线性变换和 dropout。

return y

返回输出。

class MLP(nn.Module):

定义一个名为 MLP 的类,继承自 nn.Module,用于实现多层感知机。

def __init__(self, config):

定义构造函数,接受一个配置对象 config。

super().__init__()

调用父类的构造函数。

self.c_fc = nn.Linear(config.n_embd, 4 * config.n_embd, bias=config.bias)

初始化第一个线性层,输出维度为 4 * n_embd。

self.gelu = nn.GELU()

初始化 GELU 激活函数。

self.c_proj = nn.Linear(4 * config.n_embd, config.n_embd, bias=config.bias)

初始化第二个线性层,输出维度为 n_embd。

self.dropout = nn.Dropout(config.dropout)

初始化 dropout 层。

def forward(self, x):

定义前向传播方法,接受输入张量 x。

x = self.c_fc(x)

通过第一个线性层。

x = self.gelu(x)

应用 GELU 激活函数。

x = self.c_proj(x)

通过第二个线性层。

x = self.dropout(x)

应用 dropout。

return x

返回输出。

class Block(nn.Module):

定义一个名为 Block 的类,继承自 nn.Module,用于实现 Transformer 的基本块。

def __init__(self, config):

定义构造函数,接受一个配置对象 config。

super().__init__()

调用父类的构造函数。

self.ln_1 = LayerNorm(config.n_embd, bias=config.bias)

初始化第一个层归一化。

self.attn = CausalSelfAttention(config)

初始化因果自注意力层。

self.ln_2 = LayerNorm(config.n_embd, bias=config.bias)

初始化第二个层归一化。

self.mlp = MLP(config)

初始化多层感知机。

def forward(self, x):

定义前向传播方法,接受输入张量 x。

x = x + self.attn(self.ln_1(x))

通过自注意力层并添加残差连接。

x = x + self.mlp(self.ln_2(x))

通过多层感知机并添加残差连接。

return x

返回输出。

@dataclass

class GPTConfig:

定义一个数据类 GPTConfig,用于存储 GPT 模型的配置。

block_size: int = 1024

定义块大小,默认为 1024。

vocab_size: int = 50304

定义词汇表大小,默认为 50304。

n_layer: int = 12

定义层数,默认为 12。

n_head: int = 12

定义头数,默认为 12。

n_embd: int = 768

定义嵌入维度,默认为 768。

dropout: float = 0.0

定义 dropout 比例,默认为 0.0。

bias: bool = True

定义是否使用偏置,默认为 True。

class GPT(nn.Module):

定义一个名为 GPT 的类,继承自 nn.Module,用于实现 GPT 模型。

def __init__(self, config):

定义构造函数,接受一个配置对象 config。

super().__init__()

调用父类的构造函数。

assert config.vocab_size is not None

确保词汇表大小不为 None。

assert config.block_size is not None

确保块大小不为 None。

self.config = config

保存配置对象。

self.transformer = nn.ModuleDict(

初始化一个模块字典,用于存储模型的各个部分。

dict(

wte=nn.Embedding(config.vocab_size, config.n_embd),

添加词嵌入层。

wpe=nn.Embedding(config.block_size, config.n_embd),

添加位置嵌入层。

drop=nn.Dropout(config.dropout),

添加 dropout 层。

h=nn.ModuleList([Block(config) for _ in range(config.n_layer)]),

添加多个 Transformer 块。

ln_f=LayerNorm(config.n_embd, bias=config.bias),

添加最终的层归一化。

)

)

结束模块字典的定义。

self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False)

初始化语言模型头,输出维度为词汇表大小。

self.transformer.wte.weight = (

self.lm_head.weight

)

共享嵌入权重和语言模型头的权重。

self.apply(self._init_weights)

应用权重初始化。

for pn, p in self.named_parameters():

遍历模型的所有参数。

if pn.endswith("c_proj.weight"):

检查参数名称是否以 c_proj.weight 结尾。

torch.nn.init.normal_(

p, mean=0.0, std=0.02 / math.sqrt(2 * config.n_layer)

)

对残差投影权重应用特殊的初始化。

print("number of parameters: %.2fM" % (self.get_num_params() / 1e6,))

打印模型的参数数量。

def get_num_params(self, non_embedding=True):

定义一个方法,返回模型的参数数量。

n_params = sum(p.numel() for p in self.parameters())

计算所有参数的数量。

if non_embedding:

如果不计算嵌入参数,执行以下代码。

n_params -= self.transformer.wpe.weight.numel()

从总参数中减去位置嵌入的参数数量。

return n_params

返回参数数量。

def _init_weights(self, module):

定义一个方法,用于初始化权重。

if isinstance(module, nn.Linear):

检查模块是否为线性层。

torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

对线性层的权重应用正态分布初始化。

if module.bias is not None:

如果线性层有偏置,执行以下代码。

torch.nn.init.zeros_(module.bias)

将偏置初始化为零。

elif isinstance(module, nn.Embedding):

检查模块是否为嵌入层。

torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

对嵌入层的权重应用正态分布初始化。

def forward(self, idx, targets=None):

定义前向传播方法,接受输入索引 idx 和目标 targets。

device = idx.device

获取输入的设备信息。

b, t = idx.size()

获取批量大小 b 和序列长度 t。

assert (

t <= self.config.block_size

), f"Cannot forward sequence of length {t}, block size is only {self.config.block_size}"

确保序列长度不超过块大小。

pos = torch.arange(0, t, dtype=torch.long, device=device)  # shape (t)

创建位置索引。

tok_emb = self.transformer.wte(idx)  # token embeddings of shape (b, t, n_embd)

获取输入的词嵌入。

pos_emb = self.transformer.wpe(pos)  # position embeddings of shape (t, n_embd)

获取位置嵌入。

x = self.transformer.drop(tok_emb + pos_emb)

将词嵌入和位置嵌入相加并应用 dropout。

for block in self.transformer.h:

遍历所有 Transformer 块。

x = block(x)

通过当前块进行前向传播。

x = self.transformer.ln_f(x)

通过最终的层归一化。

if targets is not None:

如果提供了目标,执行以下代码。

logits = self.lm_head(x)

通过语言模型头计算 logits。

loss = F.cross_entropy(

logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1

)

计算交叉熵损失。

else:

如果没有提供目标,执行以下代码。

logits = self.lm_head(

x[:, [-1], :]

)  # note: using list [-1] to preserve the time dim

仅在最后一个时间步计算 logits。

loss = None

将损失设置为 None。

return logits, loss

返回 logits 和损失。

def crop_block_size(self, block_size):

定义一个方法,用于裁剪块大小。

assert block_size <= self.config.block_size

确保新的块大小不超过原始块大小。

self.config.block_size = block_size

更新块大小。

self.transformer.wpe.weight = nn.Parameter(

self.transformer.wpe.weight[:block_size]

)

裁剪位置嵌入的权重。

for block in self.transformer.h:

遍历所有 Transformer 块。

if hasattr(block.attn, "bias"):

检查当前块的注意力层是否有偏置。

block.attn.bias = block.attn.bias[:, :, :block_size, :block_size]

裁剪注意力层的偏置。

def configure_optimizers(self, weight_decay, learning_rate, betas, device_type):

定义一个方法,用于配置优化器。

param_dict = {pn: p for pn, p in self.named_parameters()}

创建一个参数字典,包含所有参数。

param_dict = {pn: p for pn, p in param_dict.items() if p.requires_grad}

过滤出需要梯度的参数。

decay_params = [p for n, p in param_dict.items() if p.dim() >= 2]

获取需要权重衰减的参数。

nodecay_params = [p for n, p in param_dict.items() if p.dim() < 2]

获取不需要权重衰减的参数。

optim_groups = [

{"params": decay_params, "weight_decay": weight_decay},

{"params": nodecay_params, "weight_decay": 0.0},

]

创建优化器组,分别设置权重衰减。

num_decay_params = sum(p.numel() for p in decay_params)

计算需要衰减的参数数量。

num_nodecay_params = sum(p.numel() for p in nodecay_params)

计算不需要衰减的参数数量。

print(

f"num decayed parameter tensors: {len(decay_params)}, with {num_decay_params:,} parameters"

)

打印需要衰减的参数数量信息。

print(

f"num non-decayed parameter tensors: {len(nodecay_params)}, with {num_nodecay_params:,} parameters"

)

打印不需要衰减的参数数量信息。

fused_available = "fused" in inspect.signature(torch.optim.AdamW).parameters

检查是否可以使用融合版本的 AdamW 优化器。

use_fused = fused_available and device_type == "cuda"

确定是否使用融合版本。

extra_args = dict(fused=True) if use_fused else dict()

设置额外参数。

optimizer = torch.optim.AdamW(

optim_groups, lr=learning_rate, betas=betas, **extra_args

)

创建 AdamW 优化器。

print(f"using fused AdamW: {use_fused}")

打印是否使用融合版本的信息。

return optimizer

返回优化器。

@torch.no_grad()

def generate(self, idx, max_new_tokens, temperature=1.0, top_k=None):

定义一个生成方法,接受输入索引、最大生成的令牌数、温度和 top_k 参数。

for _ in range(max_new_tokens):

循环生成指定数量的新令牌。

idx_cond = (

idx

if idx.size(1) <= self.config.block_size

else idx[:, -self.config.block_size :]

)

裁剪输入序列以适应块大小。

logits, _ = self(idx_cond)

通过模型前向传播获取 logits。

logits = logits[:, -1, :] / temperature

获取最后一个时间步的 logits,并按温度缩放。

if top_k is not None:

如果指定了 top_k,执行以下代码。

v, _ = torch.topk(logits, min(top_k, logits.size(-1)))

获取 logits 中的 top_k 值。

logits[logits < v[:, [-1]]] = -float("Inf")

将低于 top_k 的 logits 设置为负无穷。

probs = F.softmax(logits, dim=-1)

对 logits 应用 softmax 转换为概率。

idx_next = torch.multinomial(probs, num_samples=1)

从概率分布中采样下一个索引。

idx = torch.cat((idx, idx_next), dim=1)

将采样的索引添加到输入序列中。

return idx

返回生成的索引序列。

def train(dataset="shakespeare_char", out_dir="run_0", seed_offset=0):

定义一个训练函数,接受数据集名称、输出目录和种子偏移量。

gradient_accumulation_steps = 1

设置梯度累积步数。

batch_size = 64 if dataset == "shakespeare_char" else 32

根据数据集设置批量大小。

block_size = 256

设置块大小。

eval_interval = 250 if dataset == "shakespeare_char" else 1000

设置评估间隔。

log_interval = 10 if dataset == "shakespeare_char" else 100

设置日志间隔。

eval_iters = 200

设置评估迭代次数。

eval_only = False

设置是否仅进行评估。

always_save_checkpoint = False

设置是否始终保存检查点。

never_save_checkpoint = True

设置是否从不保存检查点。

n_layer = 6

设置层数。

n_head = 6

设置头数。

n_embd = 384

设置嵌入维度。

dropout = 0.2

设置 dropout 比例。

bias = False

设置是否使用偏置。

learning_rate = 1e-3 if dataset == "shakespeare_char" else 5e-4

根据数据集设置学习率。

max_iters = 5000 if dataset == "shakespeare_char" else 100000

根据数据集设置最大迭代次数。

weight_decay = 1e-1

设置权重衰减。

beta1 = 0.9

设置 Adam 优化器的 beta1。

beta2 = 0.99

设置 Adam 优化器的 beta2。

grad_clip = 1.0

设置梯度裁剪值。

decay_lr = True

设置是否衰减学习率。

warmup_iters = 100 if dataset == "shakespeare_char" else 200

设置预热迭代次数。

lr_decay_iters = max_iters

设置学习率衰减迭代次数。

min_lr = 1e-4 if dataset == "shakespeare_char" else 5e-5

设置最小学习率。

backend = "nccl"

设置分布式后端。

device = "cuda"

设置设备为 CUDA。

dtype = (

"bfloat16"

if torch.cuda.is_available() and torch.cuda.is_bf16_supported()

else "float16"

)

根据设备支持情况设置数据类型。

compile = True

设置是否编译模型。

master_process = True

设置主进程标志。

tokens_per_iter = gradient_accumulation_steps * batch_size * block_size

计算每次迭代的令牌数量。

print(f"tokens per iteration will be: {tokens_per_iter:,}")

打印每次迭代的令牌数量。

if master_process:

os.makedirs(out_dir, exist_ok=True)

如果是主进程,则创建输出目录。

torch.manual_seed(1337 + seed_offset)

设置随机种子。

torch.backends.cuda.matmul.allow_tf32 = True

允许在 CUDA 上使用 TensorFloat-32。

torch.backends.cudnn.allow_tf32 = True

允许在 cuDNN 上使用 TensorFloat-32。

device_type = (

"cuda" if "cuda" in device else "cpu"

)

确定设备类型。

ptdtype = {

"float32": torch.float32,

"bfloat16": torch.bfloat16,

"float16": torch.float16,

}[dtype]

根据数据类型设置 PyTorch 数据类型。

ctx = (

nullcontext()

if device_type == "cpu"

else torch.amp.autocast(device_type=device_type, dtype=ptdtype)

)

根据设备类型设置上下文管理器。

if out_dir == "run_0":

data_dir = os.path.join("../../data", dataset)

else:

data_dir = os.path.join("../../../data", dataset)

根据输出目录设置数据目录。

def get_batch(split):

定义一个获取批次的函数,接受数据集划分参数。

if split == "train":

检查是否为训练集。

data = np.memmap(

os.path.join(data_dir, "train.bin"), dtype=np.uint16, mode="r"

)

使用内存映射读取训练数据。

else:

data = np.memmap(

os.path.join(data_dir, "val.bin"), dtype=np.uint16, mode="r"

)

使用内存映射读取验证数据。

ix = torch.randint(len(data) - block_size, (batch_size,))

随机选择批次索引。

x = torch.stack(

[torch.from_numpy((data[i : i + block_size]).astype(np.int64)) for i in ix]

)

创建输入张量。

y = torch.stack(

[

torch.from_numpy((data[i + 1 : i + 1 + block_size]).astype(np.int64))

for i in ix

]

)

创建目标张量。

if device_type == "cuda":

检查设备类型是否为 CUDA。

x, y = x.pin_memory().to(device, non_blocking=True), y.pin_memory().to(

device, non_blocking=True

)

将张量移动到 GPU。

else:

x, y = x.to(device), y.to(device)

将张量移动到 CPU。

return x, y

返回输入和目标张量。

iter_num = 0

初始化迭代计数器。

best_val_loss = 1e9

初始化最佳验证损失。

meta_path = os.path.join(data_dir, "meta.pkl")

设置元数据文件路径。

meta_vocab_size = None

初始化元数据词汇表大小。

if os.path.exists(meta_path):

检查元数据文件是否存在。

with open(meta_path, "rb") as f:

以二进制模式打开元数据文件。

meta = pickle.load(f)

加载元数据。

meta_vocab_size = meta["vocab_size"]

提取词汇表大小。

print(f"found vocab_size = {meta_vocab_size} (inside {meta_path})")

打印找到的词汇表大小。

model_args = dict(

创建模型参数字典。

n_layer=n_layer,

n_head=n_head,

n_embd=n_embd,

block_size=block_size,

bias=bias,

vocab_size=None,

dropout=dropout,

)

设置模型参数。

print("Initializing a new model from scratch")

打印初始化新模型的信息。

if meta_vocab_size is None:

检查元数据词汇表大小是否为 None。

print(

"defaulting to vocab_size of GPT-2 to 50304 (50257 rounded up for efficiency)"

)

打印默认词汇表大小的信息。

model_args["vocab_size"] = meta_vocab_size if meta_vocab_size is not None else 50304

设置词汇表大小。

gptconf = GPTConfig(**model_args)

创建 GPT 配置对象。

model = GPT(gptconf)

初始化 GPT 模型。

if block_size < model.config.block_size:

检查块大小是否小于模型的块大小。

model.crop_block_size(block_size)

裁剪模型的块大小。

model_args["block_size"] = (

block_size

)

更新模型参数中的块大小。

model.to(device)

将模型移动到指定设备。

scaler = torch.cuda.amp.GradScaler(enabled=(dtype == "float16"))

初始化梯度缩放器。

optimizer = model.configure_optimizers(

weight_decay, learning_rate, (beta1, beta2), device_type

)

配置优化器。

checkpoint = None

初始化检查点。

if compile:

检查是否需要编译模型。

print("compiling the model... (takes a ~minute)")

打印编译模型的信息。

unoptimized_model = model

保存未优化的模型。

model = torch.compile(model)

编译模型。

@torch.no_grad()

def estimate_loss():

定义一个估计损失的方法,使用无梯度上下文。

out = {}

初始化输出字典。

model.eval()

将模型设置为评估模式。

for split in ["train", "val"]:

遍历训练和验证集。

losses = torch.zeros(eval_iters)

初始化损失张量。

for k in range(eval_iters):

进行评估迭代。

X, Y = get_batch(split)

获取当前批次的数据。

with ctx:

使用上下文管理器。

logits, loss = model(X, Y)

通过模型前向传播获取 logits 和损失。

losses[k] = loss.item()

记录当前损失。

out[split] = losses.mean()

计算并保存平均损失。

model.train()

将模型设置为训练模式。

return out

返回损失输出。

def get_lr(it):

定义一个获取学习率的方法。

if it < warmup_iters:

检查当前迭代是否在预热阶段。

return learning_rate * it / warmup_iters

返回线性预热的学习率。

if it > lr_decay_iters:

检查当前迭代是否超过学习率衰减迭代次数。

return min_lr

返回最小学习率。

decay_ratio = (it - warmup_iters) / (lr_decay_iters - warmup_iters)

计算衰减比例。

coeff = 0.5 * (1.0 + math.cos(math.pi * decay_ratio))

计算余弦衰减系数。

return min_lr + coeff * (learning_rate - min_lr)

返回衰减后的学习率。

val_log_info = []

初始化验证日志信息列表。

train_log_info = []

初始化训练日志信息列表。

X, Y = get_batch("train")

获取初始训练批次。

og_t0 = time.time()

记录原始开始时间。

t0 = time.time()

记录当前时间。

local_iter_num = 0

初始化本地迭代计数器。

raw_model = model

保存原始模型。

while True:

进入无限循环。

lr = get_lr(iter_num) if decay_lr else learning_rate

获取当前学习率。

for param_group in optimizer.param_groups:

遍历优化器的参数组。

param_group["lr"] = lr

设置当前学习率。

if iter_num % eval_interval == 0 and master_process:

检查是否需要评估。

losses = estimate_loss()

估计损失。

print(

f"step {iter_num}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}"

)

打印当前步骤的训练和验证损失。

val_log_info.append(

{

"iter": iter_num,

"train/loss": losses["train"].item(),

"val/loss": losses["val"].item(),

"lr": lr,

}

)

记录验证日志信息。

if losses["val"] < best_val_loss or always_save_checkpoint:

检查是否需要保存检查点。

best_val_loss = losses["val"]

更新最佳验证损失。

if iter_num > 0 and not never_save_checkpoint:

检查是否需要保存检查点。

checkpoint = {

"model": raw_model.state_dict(),

"optimizer": optimizer.state_dict(),

"model_args": model_args,

"iter_num": iter_num,

"best_val_loss": best_val_loss,

}

创建检查点字典。

print(f"saving checkpoint to {out_dir}")

打印保存检查点的信息。

torch.save(checkpoint, os.path.join(out_dir, "ckpt.pt"))

保存检查点。

if iter_num == 0 and eval_only:

break

如果是第一次迭代且仅进行评估,则退出循环。

for micro_step in range(gradient_accumulation_steps):

进行梯度累积。

with ctx:

使用上下文管理器。

logits, loss = model(X, Y)

通过模型前向传播获取 logits 和损失。

loss = (

loss / gradient_accumulation_steps

)

缩放损失以适应梯度累积。

X, Y = get_batch("train")

获取下一个训练批次。

scaler.scale(loss).backward()

缩放损失并进行反向传播。

if grad_clip != 0.0:

检查是否需要梯度裁剪。

scaler.unscale_(optimizer)

取消缩放优化器的梯度。

torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)

裁剪梯度。

scaler.step(optimizer)

更新优化器。

scaler.update()

更新缩放器。

optimizer.zero_grad(set_to_none=True)

清零梯度。

t1 = time.time()

记录当前时间。

dt = t1 - t0

计算时间差。

t0 = t1

更新时间。

if iter_num % log_interval == 0 and master_process:

检查是否需要记录日志。

lossf = loss.item() * gradient_accumulation_steps

计算实际损失。

print(f"iter {iter_num}: loss {lossf:.4f}, time {dt*1000:.2f}ms")

打印当前迭代的损失和时间。

train_log_info.append(

{

"iter": iter_num,

"loss": lossf,

"time": dt * 1000,

}

)

记录训练日志信息。

iter_num += 1

增加迭代计数器。

local_iter_num += 1

增加本地迭代计数器。

if iter_num > max_iters:

break

检查是否超过最大迭代次数。

print("training done")

打印训练完成的信息。

print(f"Best validation loss: {best_val_loss}")

打印最佳验证损失。

print(f"Total train time: {(time.time() - og_t0) / 60:.2f} mins")

打印总训练时间。

final_info = {

"final_train_loss": lossf,

"best_val_loss": best_val_loss.item(),

"total_train_time": time.time() - og_t0,

}

创建最终信息字典。

start = " "

初始化生成的起始文本。

num_samples = 10

设置生成样本的数量。

max_new_tokens = 500

设置每个样本生成的最大令牌数。

temperature = 0.8

设置生成的温度。

top_k = 200

设置保留的 top_k 选项。

assert os.path.exists(meta_path), "meta.pkl not found, please run training script first"

检查元数据文件是否存在。

print(f"Loading meta from {meta_path}...")

打印加载元数据的信息。

with open(meta_path, "rb") as f:

以二进制模式打开元数据文件。

meta = pickle.load(f)

加载元数据。

stoi, itos = meta["stoi"], meta["itos"]

提取字符到索引和索引到字符的映射。

encode = lambda s: [stoi[c] for c in s]

定义编码函数,将字符转换为索引。

decode = lambda l: "".join([itos[i] for i in l])

定义解码函数,将索引转换为字符。

if start.startswith("FILE:"):

检查起始文本是否以 "FILE:" 开头。

with open(start[5:], "r", encoding="utf-8") as f:

打开指定文件以读取起始文本。

start = f.read()

读取文件内容。

start_ids = encode(start)

将起始文本编码为索引。

x = torch.tensor(start_ids, dtype=torch.long, device=device)[None, ...]

将起始索引转换为张量。

model.eval()

将模型设置为评估模式。

results = []

初始化结果列表。

with torch.no_grad():

使用无梯度上下文。

with ctx:

使用上下文管理器。

for k in range(num_samples):

循环生成指定数量的样本。

start_time = time.time()

记录开始时间。

y = model.generate(

x, max_new_tokens, temperature=temperature, top_k=top_k

)

通过模型生成新令牌。

end_time = time.time()

记录结束时间。

generated_text = decode(y[0].tolist())

解码生成的索引为文本。

inference_time = end_time - start_time

计算推理时间。

tokens_per_second = max_new_tokens / inference_time

计算每秒生成的令牌数。

print(f"Sample {k+1}:")

打印样本编号。

print(generated_text)

打印生成的文本。

print(f"Inference time: {inference_time:.2f} seconds")

打印推理时间。

print(f"Tokens per second: {tokens_per_second:.2f}")

打印每秒生成的令牌数。

print("---------------")

打印分隔线。

results.append(

{

"sample_id": k + 1,

"generated_text": generated_text,

"inference_time": inference_time,

"tokens_per_second": tokens_per_second,

}

)

将生成结果添加到结果列表。

avg_tokens_per_second = sum(r["tokens_per_second"] for r in results) / len(results)

计算平均每秒生成的令牌数。

print(f"Average tokens per second: {avg_tokens_per_second:.2f}")

打印平均每秒生成的令牌数。

final_info["avg_inference_tokens_per_second"] = avg_tokens_per_second

将平均推理速度添加到最终信息中。

with open(

os.path.join(out_dir, f"final_info_{dataset}_{seed_offset}.json"), "w"

) as f:

打开最终信息文件以写入。

json.dump(final_info, f)

将最终信息写入文件。

return final_info, train_log_info, val_log_info

返回最终信息、训练日志和验证日志。

parser = argparse.ArgumentParser(description="Run experiment")

创建命令行参数解析器。

parser.add_argument("--out_dir", type=str, default="run_0", help="Output directory")

添加输出目录参数。

args = parser.parse_args()

解析命令行参数。

if __name__ == "__main__":

检查是否为主模块运行。

num_seeds = {

"shakespeare_char": 3,

"enwik8": 1,

"text8": 1,

}

定义每个数据集的种子数量。

out_dir = args.out_dir

获取输出目录。

all_results = {}

初始化所有结果字典。

final_infos = {}

初始化最终信息字典。

for dataset in ["shakespeare_char", "enwik8", "text8"]:

遍历数据集。

final_info_list = []

初始化最终信息列表。

for seed_offset in range(num_seeds[dataset]):

遍历种子偏移量。

final_info, train_info, val_info = train(dataset, out_dir, seed_offset)

调用训练函数。

all_results[f"{dataset}_{seed_offset}_final_info"] = final_info

保存最终信息。

all_results[f"{dataset}_{seed_offset}_train_info"] = train_info

保存训练信息。

all_results[f"{dataset}_{seed_offset}_val_info"] = val_info

保存验证信息。

final_info_list.append(final_info)

将最终信息添加到列表。

final_info_dict = {

k: [d[k] for d in final_info_list] for k in final_info_list[0].keys()

}

创建最终信息字典。

means = {f"{k}_mean": np.mean(v) for k, v in final_info_dict.items()}

计算均值。

stderrs = {

f"{k}_stderr": np.std(v) / len(v) for k, v in final_info_dict.items()

}

计算标准误差。

final_infos[dataset] = {

"means": means,

"stderrs": stderrs,

"final_info_dict": final_info_dict,

}

保存最终信息。

with open(os.path.join(out_dir, "final_info.json"), "w") as f:

打开最终信息文件以写入。

json.dump(final_infos, f)

将最终信息写入文件。

with open(os.path.join(out_dir, "all_results.npy"), "wb") as f:

打开所有结果文件以写入。

np.save(f, all_results)

将所有结果保存为 NumPy 文件。

以上是对代码文件的逐行解释。如果你有任何具体的疑问或需要更详细的解释,请告诉我!

Read more

超棒的雅思资源!

超棒的雅思资源!

雅思真题材料地址: https://github.com/zeeklog/IETLS 感谢所有人。材料来自:@shah0150 & @kbtxwer * 超棒的雅思资源 * 雅思简介 * 听力 * 阅读 * 写作 * 口语 * 词汇 * 其他 * YouTube 频道 * [播客] (#podcasts) 雅思简介 * 什么是雅思 - 了解什么是雅思 听力 * 高级听力 * 雅思官方网站 * 考试英语 * 英国广播公司节目 * 乔治梅森大学口音学习网站 - 学习不同的口音 * 英国广播公司播客 * 英国文化协会听力练习 阅读 * 雅思提升阅读 写作 * 雅思提升写作 * 雅思从 6 分到 9 分 * 迷你雅思 口语 * Verbling 提供在线英语家教服务

By Ne0inhk
🚀Zeek.ai一款基于 Electron 和 Vite 打造的跨平台(支持 Windows、macOS 和 Linux) AI 浏览器

🚀Zeek.ai一款基于 Electron 和 Vite 打造的跨平台(支持 Windows、macOS 和 Linux) AI 浏览器

是一款基于 Electron 和 Vite 打造的跨平台(支持 Windows、macOS 和 Linux) AI 浏览器。 集成了 SearXNG AI 搜索、开发工具集合、 市面上最流行的 AI 工具门户,以及代码编写和桌面快捷工具等功能, 通过模块化的 Monorepo 架构,提供轻量级、可扩展且高效的桌面体验, 助力 AI 驱动的日常工作流程。

By Ne0inhk