Qwen3-0.6B-FP8代码实例:Python requests调用Web API实现批处理问答
Qwen3-0.6B-FP8代码实例:Python requests调用Web API实现批处理问答
想用Qwen3-0.6B-FP8模型批量处理一堆问题,但不想在Web界面里一个个手动输入?今天我就来分享一个实用的方法:用Python的requests库直接调用模型的Web API,实现自动化批处理问答。
这个方法特别适合需要批量生成内容、处理大量文档或者做自动化测试的场景。你只需要准备一个问题列表,运行一段简单的Python脚本,就能一次性得到所有答案,效率提升不止一点点。
1. 准备工作:理解Web API调用原理
在开始写代码之前,我们先搞清楚一件事:怎么通过代码跟Qwen3-0.6B-FP8对话?
1.1 Web界面背后的秘密
当你访问 https://gpu-{实例ID}-7860.web.gpu.ZEEKLOG.net/ 这个Web界面时,实际上背后有一个API在默默工作。你每发送一条消息,前端就会通过这个API把消息传给模型,然后把模型生成的回复显示给你看。
我们要做的,就是绕过Web界面,直接用代码调用这个API。
1.2 找到API地址
通常,这类基于Gradio或类似框架搭建的Web服务,都会提供一个API端点。对于Qwen3-0.6B-FP8镜像,API地址一般是:
https://gpu-{实例ID}-7860.web.gpu.ZEEKLOG.net/api/predict 或者
https://gpu-{实例ID}-7860.web.gpu.ZEEKLOG.net/run/predict 具体是哪个,我们可以通过浏览器的开发者工具来确认。不过别担心,我会告诉你一个更简单的方法。
2. 单次问答:先试试水
在批量处理之前,我们先从最简单的单次问答开始。这样能确保我们的基本调用方法是正确的。
2.1 安装必要的库
首先确保你安装了requests库,如果没有的话,用pip安装一下:
pip install requests 2.2 编写第一个API调用脚本
创建一个Python文件,比如叫 single_qa.py:
import requests import json # 你的实例地址,把 {实例ID} 替换成实际的ID base_url = "https://gpu-你的实例ID-7860.web.gpu.ZEEKLOG.net" def ask_qwen_single(question, use_think_mode=False): """ 向Qwen3-0.6B-FP8模型提问(单次) 参数: question: 要问的问题 use_think_mode: 是否使用思考模式 返回: 模型的回答 """ try: # 方法1:尝试调用 /api/predict 端点 api_url = f"{base_url}/api/predict" # 构建请求数据 # 注意:这里的参数结构需要根据实际API调整 payload = { "data": [ question, # 用户输入 "", # 历史对话(第一次为空) use_think_mode, # 是否思考模式 0.7, # temperature 0.8, # top_p 512 # 最大生成长度 ] } headers = { "Content-Type": "application/json" } response = requests.post(api_url, json=payload, headers=headers, timeout=30) if response.status_code == 200: result = response.json() # 根据实际API返回结构提取回答 if "data" in result and len(result["data"]) > 0: return result["data"][0] else: return str(result) else: return f"请求失败,状态码:{response.status_code}" except Exception as e: return f"发生错误:{str(e)}" # 测试一下 if __name__ == "__main__": # 测试问题 test_question = "用Python写一个计算斐波那契数列的函数" print("问题:", test_question) print("\n正在获取回答...\n") # 使用非思考模式(快速响应) answer = ask_qwen_single(test_question, use_think_mode=False) print("回答:", answer) print("\n" + "="*50 + "\n") # 使用思考模式(显示推理过程) print("使用思考模式提问...\n") answer_with_think = ask_qwen_single(test_question, use_think_mode=True) print("回答(思考模式):", answer_with_think) 2.3 调试技巧:找到正确的API格式
如果上面的代码不工作,可能是因为API的参数格式不对。这时候我们可以用浏览器的开发者工具来查看实际的API调用。
- 打开Qwen3-0.6B-FP8的Web界面
- 按F12打开开发者工具
- 切换到"Network"(网络)标签
- 在Web界面里问一个问题
- 在开发者工具里找到对应的请求,查看它的请求地址和参数格式
通常你会看到一个向 /api/predict 或 /run/predict 的POST请求,点击它就能看到详细的请求参数。
3. 批量处理:真正的效率提升
现在我们来写批量处理的代码。假设你有一个问题列表,想要一次性得到所有答案。
3.1 基础批量处理脚本
创建一个新文件 batch_qa.py:
import requests import json import time from typing import List, Dict import csv class QwenBatchProcessor: """Qwen3-0.6B-FP8批量处理器""" def __init__(self, base_url: str): """ 初始化批量处理器 参数: base_url: 实例的基础URL """ self.base_url = base_url.rstrip('/') self.api_url = f"{self.base_url}/api/predict" self.session = requests.Session() def ask_single_question(self, question: str, use_think_mode: bool = False, temperature: float = 0.7, top_p: float = 0.8, max_length: int = 512) -> Dict: """ 提问单个问题 返回包含详细信息的字典 """ try: # 构建请求数据 payload = { "data": [ question, "", use_think_mode, temperature, top_p, max_length ] } headers = { "Content-Type": "application/json", "User-Agent": "Qwen-Batch-Processor/1.0" } start_time = time.time() response = self.session.post( self.api_url, json=payload, headers=headers, timeout=60 ) end_time = time.time() result = { "question": question, "success": False, "response_time": round(end_time - start_time, 2), "answer": "", "error": "" } if response.status_code == 200: response_data = response.json() if "data" in response_data and len(response_data["data"]) > 0: result["answer"] = response_data["data"][0] result["success"] = True else: result["error"] = "API返回格式异常" else: result["error"] = f"HTTP错误: {response.status_code}" return result except requests.exceptions.Timeout: return { "question": question, "success": False, "response_time": 60, "answer": "", "error": "请求超时" } except Exception as e: return { "question": question, "success": False, "response_time": 0, "answer": "", "error": str(e) } def process_batch(self, questions: List[str], use_think_mode: bool = False, delay: float = 1.0) -> List[Dict]: """ 批量处理问题列表 参数: questions: 问题列表 use_think_mode: 是否使用思考模式 delay: 问题之间的延迟(秒),避免请求过快 返回: 处理结果列表 """ results = [] print(f"开始批量处理 {len(questions)} 个问题...") print(f"思考模式: {'开启' if use_think_mode else '关闭'}") print("=" * 60) for i, question in enumerate(questions, 1): print(f"处理第 {i}/{len(questions)} 个问题...") result = self.ask_single_question(question, use_think_mode) results.append(result) if result["success"]: print(f"✓ 成功 - 耗时 {result['response_time']}秒") # 显示前100个字符的预览 preview = result["answer"][:100] + "..." if len(result["answer"]) > 100 else result["answer"] print(f" 回答预览: {preview}") else: print(f"✗ 失败 - 错误: {result['error']}") print("-" * 40) # 添加延迟,避免请求过快 if i < len(questions): time.sleep(delay) return results def save_results_to_csv(self, results: List[Dict], filename: str = "qwen_results.csv"): """将结果保存到CSV文件""" if not results: print("没有结果可保存") return try: with open(filename, 'w',, encoding='utf-8-sig') as f: writer = csv.DictWriter(f, fieldnames=["question", "answer", "response_time", "success", "error"]) writer.writeheader() for result in results: # 清理数据,确保CSV格式正确 row = { "question": result["question"], "answer": result["answer"].replace('\n', ' ').replace('\r', ''), "response_time": result["response_time"], "success": "是" if result["success"] else "否", "error": result["error"] } writer.writerow(row) print(f"结果已保存到 {filename}") except Exception as e: print(f"保存文件时出错: {str(e)}") def save_results_to_txt(self, results: List[Dict], filename: str = "qwen_results.txt"): """将结果保存到文本文件(更易读的格式)""" if not results: print("没有结果可保存") return try: with open(filename, 'w', encoding='utf-8') as f: f.write("=" * 60 + "\n") f.write("Qwen3-0.6B-FP8 批量问答结果\n") f.write("=" * 60 + "\n\n") for i, result in enumerate(results, 1): f.write(f"问题 {i}: {result['question']}\n") f.write(f"状态: {'成功' if result['success'] else '失败'}\n") f.write(f"耗时: {result['response_time']}秒\n") if result['error']: f.write(f"错误: {result['error']}\n") f.write(f"回答:\n{result['answer']}\n") f.write("-" * 60 + "\n\n") print(f"结果已保存到 {filename}") except Exception as e: print(f"保存文件时出错: {str(e)}") # 使用示例 if __name__ == "__main__": # 替换成你的实例地址 BASE_URL = "https://gpu-你的实例ID-7860.web.gpu.ZEEKLOG.net" # 创建处理器 processor = QwenBatchProcessor(BASE_URL) # 准备问题列表 questions = [ "Python中列表和元组有什么区别?", "写一个简单的HTTP服务器示例", "解释一下什么是闭包", "如何用Python读取CSV文件?", "写一个快速排序算法的实现" ] # 批量处理(使用非思考模式,响应更快) print("开始批量处理...\n") results = processor.process_batch( questions=questions, use_think_mode=False, # 非思考模式,响应更快 delay=2.0 # 每个问题间隔2秒 ) # 统计结果 success_count = sum(1 for r in results if r["success"]) total_time = sum(r["response_time"] for r in results) print("\n" + "=" * 60) print(f"批量处理完成!") print(f"成功: {success_count}/{len(questions)}") print(f"总耗时: {total_time:.2f}秒") print(f"平均每个问题: {total_time/len(questions):.2f}秒") # 保存结果 processor.save_results_to_csv(results, "batch_results.csv") processor.save_results_to_txt(results, "batch_results.txt") 3.2 高级功能:带重试机制的批量处理
有时候网络可能不稳定,或者API暂时不可用。我们可以添加重试机制来提高成功率:
import requests import time from typing import List, Dict, Optional class RobustQwenProcessor(QwenBatchProcessor): """带重试机制的Qwen处理器""" def ask_with_retry(self, question: str, max_retries: int = 3, retry_delay: float = 2.0, **kwargs) -> Optional[Dict]: """ 带重试机制的提问 参数: question: 问题 max_retries: 最大重试次数 retry_delay: 重试延迟(秒) **kwargs: 其他参数(use_think_mode等) 返回: 成功时返回结果字典,失败返回None """ for attempt in range(max_retries): try: if attempt > 0: print(f"第{attempt+1}次重试...") time.sleep(retry_delay * attempt) # 指数退避 result = self.ask_single_question(question, **kwargs) if result["success"]: return result else: print(f"尝试{attempt+1}失败: {result['error']}") except Exception as e: print(f"尝试{attempt+1}异常: {str(e)}") print(f"问题'{question[:50]}...' 重试{max_retries}次后仍失败") return None def robust_batch_process(self, questions: List[str], max_retries: int = 3, **kwargs) -> List[Dict]: """ 健壮的批量处理 参数: questions: 问题列表 max_retries: 每个问题的最大重试次数 **kwargs: 其他参数 返回: 处理结果列表 """ results = [] for i, question in enumerate(questions, 1): print(f"\n处理问题 {i}/{len(questions)}: {question[:50]}...") result = self.ask_with_retry( question=question, max_retries=max_retries, **kwargs ) if result: results.append(result) print(f"✓ 成功获取回答({result['response_time']}秒)") else: # 即使失败也记录 results.append({ "question": question, "success": False, "response_time": 0, "answer": "", "error": f"重试{max_retries}次后失败" }) print("✗ 获取回答失败") # 问题间的基础延迟 if i < len(questions): time.sleep(kwargs.get('delay', 1.0)) return results # 使用示例 if __name__ == "__main__": processor = RobustQwenProcessor("https://gpu-你的实例ID-7860.web.gpu.ZEEKLOG.net") # 从文件读取问题 def read_questions_from_file(filename: str) -> List[str]: """从文本文件读取问题列表""" questions = [] try: with open(filename, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line and not line.startswith('#'): # 跳过空行和注释 questions.append(line) return questions except FileNotFoundError: print(f"文件 {filename} 不存在") return [] # 从questions.txt读取问题 questions = read_questions_from_file("questions.txt") if questions: print(f"从文件读取到 {len(questions)} 个问题") # 批量处理 results = processor.robust_batch_process( questions=questions, use_think_mode=False, delay=1.5, max_retries=2 ) # 保存结果 processor.save_results_to_txt(results, "robust_results.txt") 4. 实际应用场景
掌握了批量处理的方法后,我们来看看它能用在哪些实际场景中。
4.1 场景一:批量内容生成
假设你是一个内容创作者,需要为一系列产品生成描述:
# 批量生成产品描述 products = [ "智能手表,功能包括心率监测、GPS定位、消息提醒", "无线蓝牙耳机,降噪功能,续航30小时", "便携式充电宝,20000mAh,支持快充", "机械键盘,青轴,RGB背光,87键" ] prompts = [] for product in products: prompt = f"请为以下产品写一段吸引人的电商描述(100字左右):{product}" prompts.append(prompt) # 使用批量处理器生成描述 processor = QwenBatchProcessor("你的实例地址") results = processor.process_batch(prompts, use_think_mode=False) for result in results: if result["success"]: print(f"产品: {result['question'].split(':')[1][:30]}...") print(f"描述: {result['answer'][:150]}...\n") 4.2 场景二:自动化测试问答
如果你在开发一个基于Qwen的应用,可以用这个方法来测试不同问题的回答质量:
class QwenTester: """Qwen模型测试器""" def __init__(self, processor): self.processor = processor self.test_cases = [ { "category": "代码生成", "questions": [ "写一个Python函数计算阶乘", "用JavaScript实现数组去重", "写一个SQL查询,找出销售额最高的产品" ] }, { "category": "知识问答", "questions": [ "什么是机器学习?", "解释一下HTTP和HTTPS的区别", "Python中的装饰器是什么?" ] }, { "category": "创意写作", "questions": [ "写一个关于人工智能的短故事开头", "为环保主题写一句广告语", "写一首关于春天的四行诗" ] } ] def run_tests(self): """运行所有测试用例""" all_results = [] for test_case in self.test_cases: category = test_case["category"] questions = test_case["questions"] print(f"\n测试类别: {category}") print(f"问题数量: {len(questions)}") print("-" * 40) results = self.processor.process_batch( questions=questions, use_think_mode=False, delay=1.0 ) # 记录结果 for i, (question, result) in enumerate(zip(questions, results)): all_results.append({ "category": category, "question": question, "success": result["success"], "response_time": result["response_time"], "answer_length": len(result["answer"]) if result["success"] else 0 }) # 打印本类别统计 success_rate = sum(1 for r in results if r["success"]) / len(results) * 100 avg_time = sum(r["response_time"] for r in results) / len(results) print(f"成功率: {success_rate:.1f}%") print(f"平均响应时间: {avg_time:.2f}秒") return all_results # 使用测试器 processor = QwenBatchProcessor("你的实例地址") tester = QwenTester(processor) test_results = tester.run_tests() # 分析测试结果 total_questions = len(test_results) success_count = sum(1 for r in test_results if r["success"]) avg_response_time = sum(r["response_time"] for r in test_results) / total_questions print(f"\n总体测试结果:") print(f"总问题数: {total_questions}") print(f"成功数: {success_count}") print(f"成功率: {success_count/total_questions*100:.1f}%") print(f"平均响应时间: {avg_response_time:.2f}秒") 4.3 场景三:批量文档处理
如果你有一批文档需要总结或分析:
import os class DocumentProcessor: """文档批量处理器""" def __init__(self, processor): self.processor = processor def process_documents(self, folder_path: str): """处理文件夹中的所有文档""" results = [] # 支持的文件类型 supported_extensions = ['.txt', '.md', '.pdf'] for filename in os.listdir(folder_path): filepath = os.path.join(folder_path, filename) # 检查文件类型 if any(filename.endswith(ext) for ext in supported_extensions): print(f"处理文件: {filename}") try: # 读取文件内容 with open(filepath, 'r', encoding='utf-8') as f: content = f.read() # 如果文件太大,只取前2000字符 if len(content) > 2000: content = content[:2000] + "..." # 构建问题 questions = [ f"请总结以下文档的主要内容:\n{content}", f"这个文档的关键词是什么?\n{content}", f"这个文档属于什么类型(技术文档、报告、文章等)?\n{content}" ] # 批量处理 file_results = self.processor.process_batch( questions=questions, use_think_mode=True, # 使用思考模式获得更详细的分析 delay=2.0 ) # 保存结果 result_file = f"result_{os.path.splitext(filename)[0]}.txt" with open(result_file, 'w', encoding='utf-8') as f: f.write(f"文档分析结果: {filename}\n") f.write("=" * 60 + "\n\n") for q, r in zip(questions, file_results): f.write(f"问题: {q.split(':')[0]}\n") if r["success"]: f.write(f"回答: {r['answer']}\n") else: f.write(f"错误: {r['error']}\n") f.write("-" * 40 + "\n") results.append({ "filename": filename, "success": all(r["success"] for r in file_results) }) print(f"✓ 完成处理: {filename}") except Exception as e: print(f"✗ 处理失败 {filename}: {str(e)}") results.append({ "filename": filename, "success": False, "error": str(e) }) return results # 使用文档处理器 processor = QwenBatchProcessor("你的实例地址") doc_processor = DocumentProcessor(processor) # 处理documents文件夹中的所有文档 results = doc_processor.process_documents("documents") print(f"\n文档处理完成!") print(f"处理文件数: {len(results)}") print(f"成功数: {sum(1 for r in results if r.get('success', False))}") 5. 总结
通过Python requests调用Qwen3-0.6B-FP8的Web API进行批处理,是一个既简单又实用的方法。我们来回顾一下关键点:
5.1 核心优势
- 效率大幅提升:不用手动一个个输入问题,一次性处理成百上千个问题
- 自动化程度高:可以集成到你的工作流中,定时运行或触发执行
- 结果易于管理:自动保存到文件,方便后续分析和使用
- 灵活可控:可以控制请求频率、重试机制、错误处理等
5.2 使用建议
- 合理设置延迟:根据你的实例性能,设置合适的请求间隔,避免给服务器太大压力
- 使用重试机制:网络不稳定时,重试机制能显著提高成功率
- 分批处理:如果问题很多,可以考虑分批处理,每批50-100个问题
- 监控资源使用:长时间运行批处理时,注意监控GPU显存使用情况
- 保存中间结果:定期保存处理结果,防止程序意外中断导致数据丢失
5.3 注意事项
- 确保你的实例正常运行,可以通过
supervisorctl status qwen3检查服务状态 - 如果API调用频繁失败,可能是实例资源不足,可以尝试重启服务
- 批量处理时建议使用非思考模式,响应速度更快
- 对于特别重要的问题,可以使用思考模式获得更详细的回答
5.4 扩展思路
掌握了基本的批处理方法后,你还可以进一步扩展:
- 多线程/异步处理:使用多线程或asyncio进一步提高处理速度
- 结果自动分析:对生成的回答进行自动评分或分类
- 集成到工作流:与你的其他工具或系统集成
- 定时任务:设置定时批处理任务,比如每天自动生成日报
批处理问答不仅能节省大量时间,还能确保处理过程的一致性和可重复性。无论是内容创作、数据分析还是系统测试,这个方法都能帮你更高效地利用Qwen3-0.6B-FP8的能力。
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 ZEEKLOG星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。