import torch import torch.nn as nn import torch.nn.functional as F classMultiHeadAttention(nn.Module):def__init__(self, d_model, num_heads):super().__init__() self.d_model = d_model self.num_heads = num_heads self.d_k = d_model // num_heads self.wq = nn.Linear(d_model, d_model) self.wk = nn.Linear(d_model, d_model) self.wv = nn.Linear(d_model, d_model) self.w_o = nn.Linear(d_model, d_model)defsplit_heads(self, x, batch_size): x = x.view(batch_size,-1, self.num_heads, self.d_k)return x.transpose(1,2)defforward(self, x, mask=None): batch_size = x.size(0)# 生成Q、K、V q = self.split_heads(self.wq(x), batch_size) k = self.split_heads(self.wk(x), batch_size) v = self.split_heads(self.wv(x), batch_size)# 计算注意力分数 scores = torch.matmul(q, k.transpose(-2,-1))/ torch.sqrt(torch.tensor(self.d_k, dtype=torch.float32))if mask isnotNone: scores = scores.masked_fill(mask ==0,-1e9)# 计算注意力权重 attn_weights = F.softmax(scores, dim=-1)# 计算注意力输出 attn_output = torch.matmul(attn_weights, v) attn_output = attn_output.transpose(1,2).contiguous().view(batch_size,-1, self.d_model)return self.w_o(attn_output)classFeedForward(nn.Module):def__init__(self, d_model, d_ff):super().__init__() self.linear1 = nn.Linear(d_model, d_ff) self.linear2 = nn.Linear(d_ff, d_model) self.relu = nn.ReLU()defforward(self, x):return self.linear2(self.relu(self.linear1(x)))classDecoderLayer(nn.Module):def__init__(self, d_model, num_heads, d_ff):super().__init__() self.self_attn = MultiHeadAttention(d_model, num_heads) self.feed_forward = FeedForward(d_model, d_ff) self.layernorm1 = nn.LayerNorm(d_model) self.layernorm2 = nn.LayerNorm(d_model) self.dropout = nn.Dropout(0.1)defforward(self, x, mask):# 掩码自注意力 + 残差连接 + 层归一化 attn_output = self.self_attn(x, mask) x = self.layernorm1(x + self.dropout(attn_output))# 前馈网络 + 残差连接 + 层归一化 ff_output = self.feed_forward(x) x = self.layernorm2(x + self.dropout(ff_output))return x classDecoderOnlyLLM(nn.Module):def__init__(self, vocab_size, d_model, num_heads, num_layers, d_ff):super().__init__() self.embedding = nn.Embedding(vocab_size, d_model) self.pos_encoding = nn.Embedding(1024, d_model)# 最大序列长度1024 self.decoder_layers = nn.ModuleList([ DecoderLayer(d_model, num_heads, d_ff)for _ inrange(num_layers)]) self.fc = nn.Linear(d_model, vocab_size)defgenerate_look_ahead_mask(self, seq_len):# 生成前瞻掩码,防止看到未来token mask = torch.tril(torch.ones((seq_len, seq_len)))return mask defforward(self, x): batch_size, seq_len = x.size()# 词嵌入 + 位置编码 positions = torch.arange(0, seq_len).expand(batch_size, seq_len).to(x.device) x = self.embedding(x)+ self.pos_encoding(positions)# 生成掩码 mask = self.generate_look_ahead_mask(seq_len).to(x.device)# 逐层解码for layer in self.decoder_layers: x = layer(x, mask)# 输出vocab_size维度的logits logits = self.fc(x)return logits # 初始化一个小型Decoder-only LLM vocab_size =10000 d_model =512 num_heads =8 num_layers =6 d_ff =2048 model = DecoderOnlyLLM(vocab_size, d_model, num_heads, num_layers, d_ff)print(model)
from datasets import load_dataset from transformers import AutoTokenizer # 加载中文Alpaca数据集 dataset = load_dataset("silk-road/alpaca-data-gpt4-chinese")# 加载LLaMA-2分词器 tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-7b-hf", use_fast=False) tokenizer.pad_token = tokenizer.eos_token # 设置pad_token为eos_token# 定义数据格式化函数defformat_prompt(sample): instruction = sample["instruction"] input_text = sample["input"] output_text = sample["output"]if input_text: prompt =f"### 指令:\n{instruction}\n### 输入:\n{input_text}\n### 输出:\n{output_text}"else: prompt =f"### 指令:\n{instruction}\n### 输出:\n{output_text}"return{"prompt": prompt}# 格式化数据集 dataset = dataset.map(format_prompt)# 定义分词函数deftokenize_function(sample): tokenized = tokenizer( sample["prompt"], max_length=512, truncation=True, padding="max_length", return_tensors="pt")# 设置labels,与input_ids相同(自回归训练) tokenized["labels"]= tokenized["input_ids"].clone()return tokenized # 分词处理 tokenized_dataset = dataset.map(tokenize_function, batched=True)# 划分训练集和验证集 tokenized_dataset = tokenized_dataset["train"].train_test_split(test_size=0.1)
from transformers import AutoModelForCausalLM, TrainingArguments, Trainer from peft import LoraConfig, get_peft_model, TaskType # 配置LoRA参数 lora_config = LoraConfig( task_type=TaskType.CAUSAL_LM, r=8,# 低秩矩阵的秩 lora_alpha=32,# 缩放系数 lora_dropout=0.1,# dropout概率 target_modules=["q_proj","v_proj"],# 仅微调注意力层的q和v投影矩阵 bias="none", inference_mode=False)# 加载LLaMA-2-7B模型,使用4bit量化降低显存占用 model = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-2-7b-hf", load_in_4bit=True, device_map="auto", torch_dtype=torch.float16 )# 为模型添加LoRA适配器 model = get_peft_model(model, lora_config)# 查看可训练参数数量 model.print_trainable_parameters()# 输出类似: trainable params: 约400万 || all params: 约70亿 || trainable%: 0.06
# 配置训练参数 training_args = TrainingArguments( output_dir="./llama-2-7b-lora-chinese", per_device_train_batch_size=4, per_device_eval_batch_size=4, gradient_accumulation_steps=4, learning_rate=2e-4, num_train_epochs=3, logging_steps=10, evaluation_strategy="epoch", save_strategy="epoch", fp16=True, remove_unused_columns=False, report_to="none")# 初始化Trainer trainer = Trainer( model=model, args=training_args, train_dataset=tokenized_dataset["train"], eval_dataset=tokenized_dataset["test"], tokenizer=tokenizer )# 启动训练 trainer.train()# 保存LoRA适配器 model.save_pretrained("./llama-2-7b-lora-chinese-adapter")
from peft import PeftModel # 加载基座模型 base_model = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-2-7b-hf", load_in_4bit=True, device_map="auto", torch_dtype=torch.float16 )# 加载LoRA适配器 peft_model = PeftModel.from_pretrained(base_model,"./llama-2-7b-lora-chinese-adapter")# 定义推理函数defgenerate_response(instruction, input_text=""):if input_text: prompt =f"### 指令:\n{instruction}\n### 输入:\n{input_text}\n### 输出:\n"else: prompt =f"### 指令:\n{instruction}\n### 输出:\n" inputs = tokenizer(prompt, return_tensors="pt").to("cuda")# 生成回答 outputs = peft_model.generate(**inputs, max_new_tokens=200, temperature=0.7, top_p=0.9, do_sample=True, repetition_penalty=1.1)# 解码输出 response = tokenizer.decode(outputs[0], skip_special_tokens=True)# 提取输出部分 output = response.split("### 输出:\n")[1]return output # 测试推理 instruction ="解释一下什么是大语言模型" response = generate_response(instruction)print(f"指令: {instruction}")print(f"回答: {response}")