launch_scientist.py 源码解析
import argparse
import json
import multiprocessing
import openai
import os
import os.path as osp
import shutil
import sys
import time
import torch
from aider.coders import Coder
from aider.io import InputOutput
from aider.models import Model
from datetime import datetime
from ai_scientist.generate_ideas import generate_ideas, check_idea_novelty
from ai_scientist.llm import create_client, AVAILABLE_LLMS
from ai_scientist.perform_experiments import perform_experiments
from ai_scientist.perform_review import perform_review, load_paper, perform_improvement
from ai_scientist.perform_writeup import perform_writeup, generate_latex
NUM_REFLECTIONS = 3
def print_time():
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
def parse_arguments():
parser = argparse.ArgumentParser(description="Run AI scientist experiments")
parser.add_argument(
"--skip-idea-generation",
action="store_true",
help="Skip idea generation and load existing ideas",
)
parser.add_argument(
"--skip-novelty-check",
action="store_true",
help="Skip novelty check and use existing ideas",
)
# add type of experiment (nanoGPT, Boston, etc.)
parser.add_argument(
"--experiment",
type=str,
default="nanoGPT",
help="Experiment to run AI Scientist on.",
)
parser.add_argument(
"--model",
type=str,
default="claude-3-5-sonnet-20240620",
choices=AVAILABLE_LLMS,
help="Model to use for AI Scientist.",
)
parser.add_argument(
"--writeup",
type=str,
default="latex",
choices=["latex"],
help="What format to use for writeup",
)
parser.add_argument(
"--parallel",
type=int,
default=0,
help="Number of parallel processes to run. 0 for sequential execution.",
)
parser.add_argument(
"--improvement",
action="store_true",
help="Improve based on reviews.",
)
parser.add_argument(
"--gpus",
type=str,
default=None,
help="Comma-separated list of GPU IDs to use (e.g., '0,1,2'). If not specified, all available GPUs will be used.",
)
parser.add_argument(
"--num-ideas",
type=int,
default=50,
help="Number of ideas to generate",
)
return parser.parse_args()
def get_available_gpus(gpu_ids=None):
if gpu_ids is not None:
return [int(gpu_id) for gpu_id in gpu_ids.split(",")]
return list(range(torch.cuda.device_count()))
def worker(
queue,
base_dir,
results_dir,
model,
client,
client_model,
writeup,
improvement,
gpu_id,
):
os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id)
print(f"Worker {gpu_id} started.")
while True:
idea = queue.get()
if idea is None:
break
success = do_idea(
base_dir,
results_dir,
idea,
model,
client,
client_model,
writeup,
improvement,
log_file=True,
)
print(f"Completed idea: {idea['Name']}, Success: {success}")
print(f"Worker {gpu_id} finished.")
def do_idea(
base_dir,
results_dir,
idea,
model,
client,
client_model,
writeup,
improvement,
log_file=False,
):
## CREATE PROJECT FOLDER
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
idea_name = f"{timestamp}_{idea['Name']}"
folder_name = osp.join(results_dir, idea_name)
assert not osp.exists(folder_name), f"Folder {folder_name} already exists."
destination_dir = folder_name
shutil.copytree(base_dir, destination_dir, dirs_exist_ok=True)
with open(osp.join(base_dir, "run_0", "final_info.json"), "r") as f:
baseline_results = json.load(f)
baseline_results = {k: v["means"] for k, v in baseline_results.items()}
exp_file = osp.join(folder_name, "experiment.py")
vis_file = osp.join(folder_name, "plot.py")
notes = osp.join(folder_name, "notes.txt")
with open(notes, "w") as f:
f.write(f"# Title: {idea['Title']}\n")
f.write(f"# Experiment description: {idea['Experiment']}\n")
f.write(f"## Run 0: Baseline\n")
f.write(f"Results: {baseline_results}\n")
f.write(f"Description: Baseline results.\n")
if log_file:
original_stdout = sys.stdout
original_stderr = sys.stderr
log_path = osp.join(folder_name, "log.txt")
log = open(log_path, "a")
sys.stdout = log
sys.stderr = log
try:
print_time()
print(f"*Starting idea: {idea_name}*")
## PERFORM EXPERIMENTS
fnames = [exp_file, vis_file, notes]
io = InputOutput(
yes=True, chat_history_file=f"{folder_name}/{idea_name}_aider.txt"
)
if model == "deepseek-coder-v2-0724":
main_model = Model("deepseek/deepseek-coder")
elif model == "llama3.1-405b":
main_model = Model("openrouter/meta-llama/llama-3.1-405b-instruct")
else:
main_model = Model(model)
coder = Coder.create(
main_model=main_model,
fnames=fnames,
io=io,
stream=False,
use_git=False,
edit_format="diff",
)
print_time()
print(f"*Starting Experiments*")
try:
success = perform_experiments(idea, folder_name, coder, baseline_results)
except Exception as e:
print(f"Error during experiments: {e}")
print(f"Experiments failed for idea {idea_name}")
return False
if not success:
print(f"Experiments failed for idea {idea_name}")
return False
print_time()
print(f"*Starting Writeup*")
## PERFORM WRITEUP
if writeup == "latex":
writeup_file = osp.join(folder_name, "latex", "template.tex")
fnames = [exp_file, writeup_file, notes]
if model == "deepseek-coder-v2-0724":
main_model = Model("deepseek/deepseek-coder")
elif model == "llama3.1-405b":
main_model = Model("openrouter/meta-llama/llama-3.1-405b-instruct")
else:
main_model = Model(model)
coder = Coder.create(
main_model=main_model,
fnames=fnames,
io=io,
stream=False,
use_git=False,
edit_format="diff",
)
try:
perform_writeup(idea, folder_name, coder, client, client_model)
except Exception as e:
print(f"Failed to perform writeup: {e}")
return False
print("Done writeup")
else:
raise ValueError(f"Writeup format {writeup} not supported.")
print_time()
print(f"*Starting Review*")
## REVIEW PAPER
if writeup == "latex":
try:
paper_text = load_paper(f"{folder_name}/{idea['Name']}.pdf")
review = perform_review(
paper_text,
model="gpt-4o-2024-05-13",
client=openai.OpenAI(),
num_reflections=5,
num_fs_examples=1,
num_reviews_ensemble=5,
temperature=0.1,
)
# Store the review in separate review.txt file
with open(osp.join(folder_name, "review.txt"), "w") as f:
f.write(json.dumps(review, indent=4))
except Exception as e:
print(f"Failed to perform review: {e}")
return False
## IMPROVE WRITEUP
if writeup == "latex" and improvement:
print_time()
print(f"*Starting Improvement*")
try:
perform_improvement(review, coder)
generate_latex(
coder, folder_name, f"{folder_name}/{idea['Name']}_improved.pdf"
)
paper_text = load_paper(f"{folder_name}/{idea['Name']}_improved.pdf")
review = perform_review(
paper_text,
model="gpt-4o-2024-05-13",
client=openai.OpenAI(),
num_reflections=5,
num_fs_examples=1,
num_reviews_ensemble=5,
temperature=0.1,
)
# Store the review in separate review.txt file
with open(osp.join(folder_name, "review_improved.txt"), "w") as f:
f.write(json.dumps(review))
except Exception as e:
print(f"Failed to perform improvement: {e}")
return False
return True
except Exception as e:
print(f"Failed to evaluate idea {idea_name}: {str(e)}")
return False
finally:
print("FINISHED IDEA")
if log_file:
sys.stdout = original_stdout
sys.stderr = original_stderr
log.close()
if __name__ == "__main__":
args = parse_arguments()
# Check available GPUs and adjust parallel processes if necessary
available_gpus = get_available_gpus(args.gpus)
if args.parallel > len(available_gpus):
print(
f"Warning: Requested {args.parallel} parallel processes, but only {len(available_gpus)} GPUs available. Adjusting to {len(available_gpus)}."
)
args.parallel = len(available_gpus)
print(f"Using GPUs: {available_gpus}")
# Create client
client, client_model = create_client(args.model)
base_dir = osp.join("templates", args.experiment)
results_dir = osp.join("results", args.experiment)
ideas = generate_ideas(
base_dir,
client=client,
model=client_model,
skip_generation=args.skip_idea_generation,
max_num_generations=args.num_ideas,
num_reflections=NUM_REFLECTIONS,
)
ideas = check_idea_novelty(
ideas,
base_dir=base_dir,
client=client,
model=client_model,
)
with open(osp.join(base_dir, "ideas.json"), "w") as f:
json.dump(ideas, f, indent=4)
novel_ideas = [idea for idea in ideas if idea["novel"]]
# novel_ideas = list(reversed(novel_ideas))
if args.parallel > 0:
print(f"Running {args.parallel} parallel processes")
queue = multiprocessing.Queue()
for idea in novel_ideas:
queue.put(idea)
processes = []
for i in range(args.parallel):
gpu_id = available_gpus[i % len(available_gpus)]
p = multiprocessing.Process(
target=worker,
args=(
queue,
base_dir,
results_dir,
args.model,
client,
client_model,
args.writeup,
args.improvement,
gpu_id,
),
)
p.start()
time.sleep(150)
processes.append(p)
# Signal workers to exit
for _ in range(args.parallel):
queue.put(None)
for p in processes:
p.join()
print("All parallel processes completed.")
else:
for idea in novel_ideas:
print(f"Processing idea: {idea['Name']}")
try:
success = do_idea(
base_dir,
results_dir,
idea,
args.model,
client,
client_model,
args.writeup,
args.improvement,
)
print(f"Completed idea: {idea['Name']}, Success: {success}")
except Exception as e:
print(f"Failed to evaluate idea {idea['Name']}: {str(e)}")
print("All ideas evaluated.")
import multiprocessing
导入 multiprocessing 模块,用于创建和管理多个进程。
import torch
导入 PyTorch 库,用于深度学习模型的构建和训练。
import os
导入 os 模块,用于与操作系统交互,如文件和目录操作。
import time
导入 time 模块,用于时间相关的操作,如延迟。
import sys
导入 sys 模块,用于访问与 Python 解释器相关的变量和函数。
from aider.coders import Coder
从 aider.coders 模块导入 Coder 类,用于处理代码生成和编辑。
from aider.models import Model
从 aider.models 模块导入 Model 类,用于加载和使用深度学习模型。
from aider.io import InputOutput
从 aider.io 模块导入 InputOutput 类,用于处理输入输出操作。
from datetime import datetime
从 datetime 模块导入 datetime 类,用于处理日期和时间。
from ai_scientist.generate_ideas import generate_ideas, check_idea_novelty
从 ai_scientist.generate_ideas 模块导入 generate_ideas 和 check_idea_novelty 函数,用于生成和检查创意的独特性。
from ai_scientist.perform_experiments import perform_experiments
从 ai_scientist.perform_experiments 模块导入 perform_experiments 函数,用于执行实验。
from ai_scientist.perform_writeup import perform_writeup, generate_latex
从 ai_scientist.perform_writeup 模块导入 perform_writeup 和 generate_latex 函数,用于撰写和生成 LaTeX 文档。
from ai_scientist.perform_review import perform_review, load_paper, perform_improvement
从 ai_scientist.perform_review 模块导入 perform_review、load_paper 和 perform_improvement 函数,用于审查论文和改进。
NUM_REFLECTIONS = 3
定义常量 NUM_REFLECTIONS,表示反思的数量。
def print_time():
定义一个名为 print_time 的函数,用于打印当前时间。
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
在 print_time 函数中,打印当前时间,格式为 "年-月-日 时:分:秒"。
def parse_arguments():
定义一个名为 parse_arguments 的函数,用于解析命令行参数。
parser = argparse.ArgumentParser(description="Run AI scientist experiments")
创建一个 ArgumentParser 对象,描述程序的功能。
parser.add_argument(
开始添加命令行参数。
"--skip-idea-generation",
action="store_true",
help="Skip idea generation and load existing ideas",
)
添加 --skip-idea-generation 参数,若指定则跳过创意生成。
parser.add_argument(
"--skip-novelty-check",
action="store_true",
help="Skip novelty check and use existing ideas",
)
添加 --skip-novelty-check 参数,若指定则跳过独特性检查。
parser.add_argument(
"--experiment",
type=str,
default="nanoGPT",
help="Experiment to run AI Scientist on.",
)
添加 --experiment 参数,指定要运行的实验类型,默认为 "nanoGPT"。
parser.add_argument(
"--model",
type=str,
default="claude-3-5-sonnet-20240620",
choices=[
"claude-3-5-sonnet-20240620",
"gpt-4o-2024-05-13",
"deepseek-coder-v2-0724",
"llama3.1-405b",
"bedrock/anthropic.claude-3-sonnet-20240229-v1:0",
"bedrock/anthropic.claude-3-5-sonnet-20240620-v1:0",
"bedrock/anthropic.claude-3-haiku-20240307-v1:0",
"bedrock/anthropic.claude-3-opus-20240229-v1:0",
"vertex_ai/claude-3-opus@20240229",
"vertex_ai/claude-3-5-sonnet@20240620",
"vertex_ai/claude-3-sonnet@20240229",
"vertex_ai/claude-3-haiku@20240307",
],
help="Model to use for AI Scientist.",
)
添加 --model 参数,指定要使用的模型,提供多个选项,默认为 "claude-3-5-sonnet-20240620"。
parser.add_argument(
"--writeup",
type=str,
default="latex",
choices=["latex"],
help="What format to use for writeup",
)
添加 --writeup 参数,指定撰写的格式,默认为 "latex"。
parser.add_argument(
"--parallel",
type=int,
default=0,
help="Number of parallel processes to run. 0 for sequential execution.",
)
添加 --parallel 参数,指定要运行的并行进程数量,默认为 0(顺序执行)。
parser.add_argument(
"--improvement",
action="store_true",
help="Improve based on reviews.",
)
添加 --improvement 参数,若指定则根据审查结果进行改进。
parser.add_argument(
"--gpus",
type=str,
default=None,
help="Comma-separated list of GPU IDs to use (e.g., '0,1,2'). If not specified, all available GPUs will be used.",
)
添加 --gpus 参数,指定要使用的 GPU ID 列表,默认为 None(使用所有可用 GPU)。
parser.add_argument(
"--num-ideas",
type=int,
default=50,
help="Number of ideas to generate",
)
添加 --num-ideas 参数,指定要生成的创意数量,默认为 50。
return parser.parse_args()
返回解析后的命令行参数。
def get_available_gpus(gpu_ids=None):
定义一个名为 get_available_gpus 的函数,用于获取可用的 GPU。
if gpu_ids is not None:
return [int(gpu_id) for gpu_id in gpu_ids.split(",")]
如果指定了 GPU ID,则将其转换为整数列表并返回。
return list(range(torch.cuda.device_count()))
如果未指定 GPU ID,则返回所有可用 GPU 的索引列表。
def worker(
queue,
base_dir,
results_dir,
model,
client,
client_model,
writeup,
improvement,
gpu_id,
):
定义一个名为 worker 的函数,用于处理任务的工作进程。
os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id)
设置环境变量 CUDA_VISIBLE_DEVICES,指定当前工作进程使用的 GPU。
print(f"Worker {gpu_id} started.")
打印工作进程开始的信息。
while True:
进入一个无限循环,等待处理任务。
idea = queue.get()
从任务队列中获取一个创意。
if idea is None:
break
如果获取到的创意为 None,则退出循环。
success = do_idea(
base_dir,
results_dir,
idea,
model,
client,
client_model,
writeup,
improvement,
log_file=True,
)
调用 do_idea 函数处理当前创意,并记录成功与否。
print(f"Completed idea: {idea['Name']}, Success: {success}")
打印当前创意处理完成的信息。
print(f"Worker {gpu_id} finished.")
打印工作进程结束的信息。
def do_idea(
base_dir,
results_dir,
idea,
model,
client,
client_model,
writeup,
improvement,
log_file=False,
):
定义一个名为 do_idea 的函数,用于处理创意的具体逻辑。
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
获取当前时间戳,格式为 "年月日_时分秒"。
idea_name = f"{timestamp}_{idea['Name']}"
生成创意名称,包含时间戳和创意的名称。
folder_name = osp.join(results_dir, idea_name)
创建结果文件夹的路径。
assert not osp.exists(folder_name), f"Folder {folder_name} already exists."
检查结果文件夹是否已存在,如果存在则抛出异常。
destination_dir = folder_name
将目标目录设置为结果文件夹。
shutil.copytree(base_dir, destination_dir, dirs_exist_ok=True)
复制基础目录到目标目录,保留目录结构。
with open(osp.join(base_dir, "run_0", "final_info.json"), "r") as f:
打开基础目录中的 final_info.json 文件以读取。
baseline_results = json.load(f)
将 JSON 数据加载到 baseline_results 变量中。
baseline_results = {k: v["means"] for k, v in baseline_results.items()}
提取基线结果中的均值。
exp_file = osp.join(folder_name, "experiment.py")
设置实验文件的路径。
vis_file = osp.join(folder_name, "plot.py")
设置可视化文件的路径。
notes = osp.join(folder_name, "notes.txt")
设置笔记文件的路径。
with open(notes, "w") as f:
打开笔记文件以写入。
f.write(f"# Title: {idea['Title']}\n")
写入创意标题。
f.write(f"# Experiment description: {idea['Experiment']}\n")
写入实验描述。
f.write(f"## Run 0: Baseline\n")
写入基线运行的标题。
f.write(f"Results: {baseline_results}\n")
写入基线结果。
f.write(f"Description: Baseline results.\n")
写入基线结果的描述。
if log_file:
检查是否需要记录日志。
original_stdout = sys.stdout
original_stderr = sys.stderr
保存原始标准输出和标准错误输出。
log_path = osp.join(folder_name, "log.txt")
设置日志文件的路径。
log = open(log_path, "a")
以追加模式打开日志文件。
sys.stdout = log
sys.stderr = log
重定向标准输出和标准错误输出到日志文件。
try:
开始一个 try 块以捕获异常。
print_time()
打印当前时间。
print(f"*Starting idea: {idea_name}*")
打印开始处理创意的信息。
fnames = [exp_file, vis_file, notes]
将文件名列表存储在 fnames 变量中。
io = InputOutput(
yes=True, chat_history_file=f"{folder_name}/{idea_name}_aider.txt"
)
创建 InputOutput 对象,指定聊天历史文件。
if model == "deepseek-coder-v2-0724":
main_model = Model("deepseek/deepseek-coder")
根据模型类型加载相应的模型。
elif model == "llama3.1-405b":
main_model = Model("openrouter/meta-llama/llama-3.1-405b-instruct")
加载另一个模型。
else:
main_model = Model(model)
加载默认模型。
coder = Coder.create(
main_model=main_model,
fnames=fnames,
io=io,
stream=False,
use_git=False,
edit_format="diff",
)
创建 Coder 对象,用于代码生成和编辑。
print_time()
打印当前时间。
print(f"*Starting Experiments*")
打印开始实验的信息。
try:
success = perform_experiments(idea, folder_name, coder, baseline_results)
尝试执行实验,并记录成功与否。
except Exception as e:
print(f"Error during experiments: {e}")
print(f"Experiments failed for idea {idea_name}")
return False
捕获实验中的异常并打印错误信息。
if not success:
print(f"Experiments failed for idea {idea_name}")
return False
如果实验失败,打印失败信息并返回 False。
print_time()
打印当前时间。
print(f"*Starting Writeup*")
打印开始撰写的消息。
if writeup == "latex":
检查撰写格式是否为 LaTeX。
writeup_file = osp.join(folder_name, "latex", "template.tex")
设置 LaTeX 文件的路径。
fnames = [exp_file, writeup_file, notes]
更新文件名列表。
if model == "deepseek-coder-v2-0724":
main_model = Model("deepseek/deepseek-coder")
根据模型类型加载相应的模型。
elif model == "llama3.1-405b":
main_model = Model("openrouter/meta-llama/llama-3.1-405b-instruct")
加载另一个模型。
else:
main_model = Model(model)
加载默认模型。
coder = Coder.create(
main_model=main_model,
fnames=fnames,
io=io,
stream=False,
use_git=False,
edit_format="diff",
)
创建 Coder 对象,用于撰写。
try:
perform_writeup(idea, folder_name, coder, client, client_model)
尝试执行撰写操作。
except Exception as e:
print(f"Failed to perform writeup: {e}")
return False
捕获撰写中的异常并打印错误信息。
print("Done writeup")
打印撰写完成的信息。
else:
raise ValueError(f"Writeup format {writeup} not supported.")
如果撰写格式不支持,抛出异常。
print_time()
打印当前时间。
print(f"*Starting Review*")
打印开始审查的消息。
if writeup == "latex":
检查撰写格式是否为 LaTeX。
try:
paper_text = load_paper(f"{folder_name}/{idea['Name']}.pdf")
尝试加载生成的论文。
review = perform_review(
paper_text,
model="gpt-4o-2024-05-13",
client=openai.OpenAI(),
num_reflections=5,
num_fs_examples=1,
num_reviews_ensemble=5,
temperature=0.1,
)
执行审查操作,使用指定的模型和参数。
with open(osp.join(folder_name, "review.txt"), "w") as f:
打开审查结果文件以写入。
f.write(json.dumps(review, indent=4))
将审查结果写入文件。
except Exception as e:
print(f"Failed to perform review: {e}")
return False
捕获审查中的异常并打印错误信息。
if writeup == "latex" and improvement:
检查撰写格式是否为 LaTeX 且是否需要改进。
print_time()
打印当前时间。
print(f"*Starting Improvement*")
打印开始改进的消息。
try:
perform_improvement(review, coder)
尝试执行改进操作。
generate_latex(
coder, folder_name, f"{folder_name}/{idea['Name']}_improved.pdf"
)
生成改进后的 LaTeX 文档。
paper_text = load_paper(f"{folder_name}/{idea['Name']}_improved.pdf")
加载改进后的论文。
review = perform_review(
paper_text,
model="gpt-4o-2024-05-13",
client=openai.OpenAI(),
num_reflections=5,
num_fs_examples=1,
num_reviews_ensemble=5,
temperature=0.1,
)
对改进后的论文进行审查。
with open(osp.join(folder_name, "review_improved.txt"), "w") as f:
打开改进审查结果文件以写入。
f.write(json.dumps(review))
将改进审查结果写入文件。
except Exception as e:
print(f"Failed to perform improvement: {e}")
return False
捕获改进中的异常并打印错误信息。
return True
返回 True 表示成功完成创意处理。
except Exception as e:
print(f"Failed to evaluate idea {idea_name}: {str(e)}")
return False
捕获整个处理过程中的异常并打印错误信息。
finally:
print("FINISHED IDEA")
无论是否发生异常,打印处理完成的信息。
if log_file:
sys.stdout = original_stdout
sys.stderr = original_stderr
log.close()
如果记录日志,恢复标准输出和标准错误输出,并关闭日志文件。
if __name__ == "__main__":
检查是否为主模块运行。
args = parse_arguments()
解析命令行参数。
available_gpus = get_available_gpus(args.gpus)
获取可用的 GPU 列表。
if args.parallel > len(available_gpus):
检查请求的并行进程数量是否超过可用 GPU 数量。
print(
f"Warning: Requested {args.parallel} parallel processes, but only {len(available_gpus)} GPUs available. Adjusting to {len(available_gpus)}."
)
打印警告信息,告知用户调整并行进程数量。
args.parallel = len(available_gpus)
将并行进程数量调整为可用 GPU 数量。
print(f"Using GPUs: {available_gpus}")
打印正在使用的 GPU 列表。
if args.model == "claude-3-5-sonnet-20240620":
检查指定的模型是否为 "claude-3-5-sonnet-20240620"。
import anthropic
导入 anthropic 库,用于与 Anthropic API 交互。
print(f"Using Anthropic API with model {args.model}.")
打印正在使用的模型信息。
client_model = "claude-3-5-sonnet-20240620"
设置客户端模型为指定的模型。
client = anthropic.Anthropic()
创建 Anthropic API 客户端。
elif args.model.startswith("bedrock") and "claude" in args.model:
检查指定的模型是否为 Bedrock 的 Claude 模型。
import anthropic
导入 anthropic 库。
client_model = args.model.split("/")[-1]
提取模型名称。
print(f"Using Amazon Bedrock with model {client_model}.")
打印正在使用的 Bedrock 模型信息。
client = anthropic.AnthropicBedrock(
aws_access_key=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
aws_region=os.getenv("AWS_REGION_NAME"),
)
创建 Amazon Bedrock API 客户端,使用环境变量中的 AWS 凭证。
elif args.model.startswith("vertex_ai") and "claude" in args.model:
检查指定的模型是否为 Vertex AI 的 Claude 模型。
import anthropic
导入 anthropic 库。
client_model = args.model.split("/")[-1]
提取模型名称。
print(f"Using Vertex AI with model {client_model}.")
打印正在使用的 Vertex AI 模型信息。
client = anthropic.AnthropicVertex()
创建 Vertex AI API 客户端。
elif args.model == "gpt-4o-2024-05-13":
检查指定的模型是否为 "gpt-4o-2024-05-13"。
import openai
导入 OpenAI 库。
print(f"Using OpenAI API with model {args.model}.")
打印正在使用的 OpenAI 模型信息。
client_model = "gpt-4o-2024-05-13"
设置客户端模型为指定的模型。
client = openai.OpenAI()
创建 OpenAI API 客户端。
elif args.model == "deepseek-coder-v2-0724":
检查指定的模型是否为 "deepseek-coder-v2-0724"。
import openai
导入 OpenAI 库。
print(f"Using OpenAI API with {args.model}.")
打印正在使用的 OpenAI 模型信息。
client_model = "deepseek-coder-v2-0724"
设置客户端模型为指定的模型。
client = openai.OpenAI(
api_key=os.environ["DEEPSEEK_API_KEY"], base_url="https://api.deepseek.com"
)
创建 DeepSeek API 客户端,使用环境变量中的 API 密钥。
elif args.model == "llama3.1-405b":
检查指定的模型是否为 "llama3.1-405b"。
import openai
导入 OpenAI 库。
print(f"Using OpenAI API with {args.model}.")
打印正在使用的 OpenAI 模型信息。
client_model = "meta-llama/llama-3.1-405b-instruct"
设置客户端模型为指定的模型。
client = openai.OpenAI(
api_key=os.environ["OPENROUTER_API_KEY"],
base_url="https://openrouter.ai/api/v1",
)
创建 OpenRouter API 客户端,使用环境变量中的 API 密钥。
else:
raise ValueError(f"Model {args.model} not supported.")
如果指定的模型不受支持,抛出异常。
base_dir = osp.join("templates", args.experiment)
设置基础目录为模板目录。
results_dir = osp.join("results", args.experiment)
设置结果目录为实验结果目录。
ideas = generate_ideas(
base_dir,
client=client,
model=client_model,
skip_generation=args.skip_idea_generation,
max_num_generations=args.num_ideas,
num_reflections=NUM_REFLECTIONS,
)
调用 generate_ideas 函数生成创意。
ideas = check_idea_novelty(
ideas,
base_dir=base_dir,
client=client,
model=client_model,
)
调用 check_idea_novelty 函数检查创意的独特性。
with open(osp.join(base_dir, "ideas.json"), "w") as f:
打开 ideas.json 文件以写入。
json.dump(ideas, f, indent=4)
将创意写入 JSON 文件。
novel_ideas = [idea for idea in ideas if idea["novel"]]
筛选出独特的创意。
if args.parallel > 0:
检查是否需要并行处理。
print(f"Running {args.parallel} parallel processes")
打印正在运行的并行进程数量。
queue = multiprocessing.Queue()
创建一个进程间通信的队列。
for idea in novel_ideas:
queue.put(idea)
将独特的创意放入队列中。
processes = []
初始化一个空列表以存储进程。
for i in range(args.parallel):
根据请求的并行进程数量创建进程。
gpu_id = available_gpus[i % len(available_gpus)]
根据可用 GPU 的数量分配 GPU ID。
p = multiprocessing.Process(
target=worker,
args=(
queue,
base_dir,
results_dir,
args.model,
client,
client_model,
args.writeup,
args.improvement,
gpu_id,
),
)
创建一个新的进程,目标为 worker 函数,并传递必要的参数。
p.start()
启动进程。
time.sleep(150)
暂停 150 秒,以避免过快启动进程。
processes.append(p)
将进程添加到进程列表中。
for _ in range(args.parallel):
queue.put(None)
向队列中放入 None,以通知工作进程退出。
for p in processes:
p.join()
等待所有进程完成。
print("All parallel processes completed.")
打印所有并行进程完成的信息。
else:
如果不需要并行处理。
for idea in novel_ideas:
遍历所有独特的创意。
print(f"Processing idea: {idea['Name']}")
打印正在处理的创意名称。
try:
success = do_idea(
base_dir,
results_dir,
idea,
args.model,
client,
client_model,
args.writeup,
args.improvement,
)
尝试处理创意,并记录成功与否。
print(f"Completed idea: {idea['Name']}, Success: {success}")
打印创意处理完成的信息。
except Exception as e:
print(f"Failed to evaluate idea {idea['Name']}: {str(e)}")
捕获处理中的异常并打印错误信息。
print("All ideas evaluated.")
打印所有创意评估完成的信息。
以上是对代码文件的逐行解释。如果你有任何具体的疑问或需要更详细的解释,请告诉我!