Python CSV 模块完整教程
本教程全面覆盖 Python 标准库 csv 模块的所有知识点,代码逐行注释,包含生产环境实战案例。
环境要求
- Python 3.12+
- 虚拟环境目录:
本文详细讲解了 Python 标准库 csv 模块的使用方法,包括基础读写、参数配置、Dialect 定制、字典读写器及生产环境优化技巧。内容涵盖代码示例、错误处理、大数据量处理方案及数据库交互案例,旨在帮助开发者高效处理 CSV 数据。

本教程全面覆盖 Python 标准库 csv 模块的所有知识点,代码逐行注释,包含生产环境实战案例。
.venv# 激活虚拟环境
.venv\Scripts\activate
# 使用虚拟环境的 Python 运行示例
.venv\Scripts\python.exe chapter01_intro\01_what_is_csv.py
CSV(Comma-Separated Values,逗号分隔值)是一种通用的、简单的数据存储格式,被广泛应用于数据交换、数据存储和数据处理场景。
CSV 格式的特点:
基本结构示例:
姓名,年龄,城市
张三,25,北京
李四,30,上海
王五,28,广州
import csv
# QUOTE_ALL: 所有字段都加引号
print(f"csv.QUOTE_ALL = {csv.QUOTE_ALL}")
# 输出:csv.QUOTE_ALL = 1
# QUOTE_MINIMAL: 只有包含特殊字符的字段才加引号(默认)
print(f"csv.QUOTE_MINIMAL = {csv.QUOTE_MINIMAL}")
# 输出:csv.QUOTE_MINIMAL = 0
# QUOTE_NONNUMERIC: 非数字字段加引号
print(f"csv.QUOTE_NONNUMERIC = {csv.QUOTE_NONNUMERIC}")
# 输出:csv.QUOTE_NONNUMERIC = 2
# QUOTE_NONE: 不加引号
print(f"csv.QUOTE_NONE = {csv.QUOTE_NONE}")
# 输出:csv.QUOTE_NONE = 3
# Python 3.12+ 新增常量
# QUOTE_NOTNULL: 给非 None 字段添加引号
print(f"csv.QUOTE_NOTNULL = {csv.QUOTE_NOTNULL}")
# 输出:csv.QUOTE_NOTNULL = 5
# QUOTE_STRINGS: 给字符串字段添加引号
print(f"csv.QUOTE_STRINGS = {csv.QUOTE_STRINGS}")
# 输出:csv.QUOTE_STRINGS = 4
import csv
# 定义 CSV 文件路径
output_file = 'sample_output.csv'
# 准备要写入的数据
header = ['姓名', '年龄', '城市', '职业'] # 表头行
data = [
['张三', '25', '北京', '工程师'],
['李四', '30', '上海', '设计师'],
['王五', '28', '广州', '教师'],
]
# 使用 with 语句打开文件,确保文件在使用后正确关闭
# 'w' 模式表示写入(write),如果文件存在会被覆盖
# 是 csv 模块的推荐设置,防止空行问题
with open(output_file, 'w', newline='', encoding='utf-8') as f:
# csv.writer() 创建一个写入器对象
writer = csv.writer(f)
# writerow() 写入单行数据
writer.writerow(header)
# writerows() 写入多行数据
writer.writerows(data)
print(f"✓ CSV 文件已创建:{output_file}")
import csv
# 使用 with 语句打开文件
with open('sample_output.csv', 'r', newline='', encoding='utf-8') as f:
# csv.reader() 创建一个读取器对象
reader = csv.reader(f)
# 使用 enumerate 获取行号和行数据
for row_num, row in enumerate(reader, start=1):
print(f"第{row_num}行:{row}")
# 输出:
# 第 1 行:['姓名', '年龄', '城市', '职业']
# 第 2 行:['张三', '25', '北京', '工程师']
# 第 3 行:['李四', '30', '上海', '设计师']
# 第 4 行:['王五', '28', '广州', '教师']
import csv
from io import StringIO
# CSV 格式的字符串数据
csv_data = """
姓名,年龄,城市
张三,25,北京
李四,30,上海
王五,28,广州
"""
# StringIO 将字符串包装成文件对象
string_io = StringIO(csv_data)
# 使用 csv.reader 读取 StringIO 对象
reader = csv.reader(string_io)
for row in reader:
print(row)
# 输出:
# ['姓名', '年龄', '城市']
# ['张三', '25', '北京']
# ['李四', '30', '上海']
# ['王五', '28', '广州']
import csv
with open('data.csv', 'r', newline='', encoding='utf-8') as csvfile:
# csv.reader() 创建一个 reader 对象
reader = csv.reader(csvfile)
# reader 对象是可迭代的,可以使用 for 循环逐行读取
for row in reader:
# 每一行被解析为一个列表
print(f"读取到:{row}")
import csv
with open('data.csv', 'r', newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile)
# 使用 enumerate 获取行号
# start=1 表示行号从 1 开始(而不是默认的 0)
for line_num, row in enumerate(reader, start=1):
print(f"第{line_num}行:{row}")
import csv
with open('data.csv', 'r', newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile)
# next() 函数获取迭代器的下一个元素
# 第一行通常是表头
headers = next(reader)
print(f"表头:{headers}")
# 剩余的行是数据
for row_num, row in enumerate(reader, start=1):
print(f"数据行{row_num}: {row}")
import csv
with open('data.csv', 'r', newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile)
# 使用 list() 将 reader 转换为列表
# 注意:这会一次性将所有数据加载到内存
all_rows = list(reader)
print(f"总行数:{len(all_rows)}")
print(f"第一行:{all_rows[0]}")
import csv
from io import StringIO
# 使用分号分隔的 CSV(常见于欧洲)
csv_semicolon = """
姓名;年龄;城市
张三;25;北京
李四;30;上海
"""
string_io = StringIO(csv_semicolon)
# 使用 delimiter=';' 指定分号作为分隔符
reader = csv.reader(string_io, delimiter=';')
for row in reader:
print(row)
# 输出:['姓名', '年龄', '城市']
# ['张三', '25', '北京']
# ['李四', '30', '上海']
import csv
from io import StringIO
# 使用单引号作为引号的 CSV
csv_single_quote = """
姓名,年龄,描述
张三,25,'喜欢编程,热爱 Python'
李四,30,'设计师,擅长 UI/UX'
"""
string_io = StringIO(csv_single_quote)
# 使用 quotechar="'" 指定单引号作为引号字符
reader = csv.reader(string_io, quotechar="'")
for row in reader:
print(row)
# 输出:['姓名', '年龄', '描述']
# ['张三', '25', '喜欢编程,热爱 Python']
# ['李四', '30', '设计师,擅长 UI/UX']
import csv
from io import StringIO
# 包含引号的字段
csv_data = '''姓名,描述
张三,"他说:""你好"""
李四,"擅长""Python""编程"'''
string_io = StringIO(csv_data)
# doublequote=True(默认)表示使用双写引号转义
reader = csv.reader(string_io, doublequote=True)
for row in reader:
print(row)
# 输出:['姓名', '描述']
# ['张三', '他说:"你好"']
# ['李四', '擅长"Python"编程']
import csv
from io import StringIO
# 使用反斜杠转义
csv_data = """
姓名,描述
张三,喜欢\,编程
李四,擅长\"Python\""""
string_io = StringIO(csv_data)
# 使用 escapechar='\\' 指定反斜杠作为转义字符
reader = csv.reader(string_io, escapechar='\\')
for row in reader:
print(row)
import csv
with open('sales_data.csv', 'r', newline='', encoding='utf-8') as f:
reader = csv.reader(f)
headers = next(reader) # 读取表头
# 初始化统计变量
total_sales = 0
total_quantity = 0
row_count = 0
for row in reader:
total_sales += int(row[4]) # 销售额
total_quantity += int(row[5]) # 数量
row_count += 1
print(f"统计结果:")
print(f" 总记录数:{row_count}")
print(f" 总销售额:¥{total_sales:,}")
print(f" 总数量:{total_quantity}")
print(f" 平均单价:¥{total_sales / total_quantity:.2f}")
import csv
from collections import defaultdict
with open('sales_data.csv', 'r', newline='', encoding='utf-8') as f:
reader = csv.reader(f)
next(reader) # 跳过表头
# 使用 defaultdict 自动创建默认值为 0 的字典
category_stats = defaultdict(lambda: {'sales': 0, 'quantity': 0})
for row in reader:
category = row[2] # 类别
sales = int(row[4])
quantity = int(row[5])
category_stats[category]['sales'] += sales
category_stats[category]['quantity'] += quantity
print("按类别统计:")
for category, stats in sorted(category_stats.items()):
print(f" {category}: 销售额¥{stats['sales']:,}, 数量{stats['quantity']}")
import csv
with open('sales_data.csv', 'r', newline='', encoding='utf-8') as f:
reader = csv.reader(f)
headers = next(reader)
# 筛选条件:销售额大于 5000
filtered_rows = []
for row in reader:
sales = int(row[4])
if sales > 5000:
filtered_rows.append(row)
print(f"筛选结果(销售额>5000): {len(filtered_rows)} 条")
for row in filtered_rows:
print(f" {row[0]} - {row[1]}: ¥{row[4]}")
import csv
output_file = 'output_basic.csv'
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
# csv.writer() 创建一个 writer 对象
writer = csv.writer(csvfile)
# writerow() 写入单行数据
writer.writerow(['姓名', '年龄', '城市'])
writer.writerow(['张三', '25', '北京'])
writer.writerow(['李四', '30', '上海'])
print(f"✓ 文件已创建:{output_file}")
import csv
output_file = 'output_batch.csv'
# 准备数据
header = ['产品', '价格', '库存']
data = [
['iPhone', '5999', '100'],
['iPad', '3999', '50'],
['MacBook', '9999', '30'],
]
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile)
# 写入表头
writer.writerow(header)
# writerows() 写入多行数据
writer.writerows(data)
import csv
from io import StringIO
string_io = StringIO()
# 使用分号作为分隔符(欧洲常用格式)
writer = csv.writer(string_io, delimiter=';')
writer.writerow(['姓名', '年龄', '城市'])
writer.writerow(['张三', '25', '北京'])
print(string_io.getvalue())
# 输出:姓名;年龄;城市
# 张三;25;北京
import csv
from io import StringIO
data = [
['纯文本', '100'],
['包含,逗号', '200'],
['包含"引号', '300'],
]
# QUOTE_MINIMAL(默认):只在必要时添加引号
string_io = StringIO()
writer = csv.writer(string_io, quoting=csv.QUOTE_MINIMAL)
writer.writerows(data)
print("QUOTE_MINIMAL:", string_io.getvalue())
# QUOTE_ALL:给所有字段添加引号
string_io = StringIO()
writer = csv.writer(string_io, quoting=csv.QUOTE_ALL)
writer.writerows(data)
print("QUOTE_ALL:", string_io.getvalue())
# QUOTE_NONNUMERIC:给非数字字段添加引号
string_io = StringIO()
writer = csv.writer(string_io, quoting=csv.QUOTE_NONNUMERIC)
writer.writerows(data)
print("QUOTE_NONNUMERIC:", string_io.getvalue())
import csv
from io import StringIO
string_io = StringIO()
# 使用 Unix 风格的换行符\n
writer = csv.writer(string_io, lineterminator='\n')
writer.writerow(['A', 'B'])
writer.writerow(['1', '2'])
print(repr(string_io.getvalue()))
# 输出:'A,B\n1,2\n'
import csv
# csv.list_dialects() 返回所有已注册的 dialect 名称
dialects = csv.list_dialects()
print(f"已注册的 Dialect: {dialects}")
# 输出:已注册的 Dialect: ['excel', 'excel-tab', 'unix']
# 查看每个 Dialect 的详细配置
for dialect_name in dialects:
dialect = csv.get_dialect(dialect_name)
print(f"\n{dialect_name} dialect 配置:")
print(f" delimiter: '{dialect.delimiter}'")
print(f" quotechar: '{dialect.quotechar}'")
print(f" doublequote: {dialect.doublequote}")
print(f" skipinitialspace: {dialect.skipinitialspace}")
print(f" lineterminator: {repr(dialect.lineterminator)}")
print(f" quoting: {dialect.quoting}")
print(f" escapechar: {dialect.escapechar}")
import csv
from io import StringIO
string_io = StringIO()
# 使用 excel-tab dialect(制表符分隔)
writer = csv.writer(string_io, dialect='excel-tab')
writer.writerow(['姓名', '年龄', '城市'])
writer.writerow(['张三', '25', '北京'])
print(string_io.getvalue())
# 输出使用制表符分隔
import csv
from io import StringIO
# 注册自定义 Dialect
csv.register_dialect(
'myexcel',
delimiter=';', # 使用分号分隔
quotechar="'", # 使用单引号
quoting=csv.QUOTE_ALL # 所有字段加引号
)
string_io = StringIO()
writer = csv.writer(string_io, dialect='myexcel')
writer.writerow(['姓名', '年龄'])
writer.writerow(['张三', '25'])
print(string_io.getvalue())
# 输出:'姓名';'年龄'
# '张三';'25'
# 注销自定义 Dialect
csv.unregister_dialect('myexcel')
import csv
from io import StringIO
# 未知格式的 CSV 数据
sample = "姓名;年龄;城市\n张三;25;北京"
# 使用 Sniffer 检测 Dialect
sniffer = csv.Sniffer()
dialect = sniffer.sniff(sample)
print(f"检测到的分隔符:'{dialect.delimiter}'")
print(f"检测到的引号字符:'{dialect.quotechar}'")
# 使用检测到的 Dialect 读取
string_io = StringIO(sample)
reader = csv.reader(string_io, dialect=dialect)
for row in reader:
print(row)
import csv
with open('employees.csv', 'r', newline='', encoding='utf-8') as f:
# csv.DictReader 自动使用第一行作为字段名
reader = csv.DictReader(f)
for row in reader:
# 通过字段名访问数据
print(f"{row['姓名']} 在 {row['部门']} 担任 {row['职位']}")
import csv
# 使用普通 reader
with open('data.csv', 'r', newline='', encoding='utf-8') as f:
reader = csv.reader(f)
headers = next(reader)
for row in reader:
# 通过索引访问,可读性较差
print(f"{row[0]} 在 {row[1]} 担任 {row[2]}")
# 使用 DictReader
with open('data.csv', 'r', newline='', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
# 通过字段名访问,可读性更好
print(f"{row['姓名']} 在 {row['部门']} 担任 {row['职位']}")
import csv
from io import StringIO
# 没有表头的 CSV 数据
no_header_data = """
张三,技术部,软件工程师,15000
李四,设计部,UI 设计师,12000
"""
string_io = StringIO(no_header_data)
# 通过 fieldnames 参数指定字段名
reader = csv.DictReader(string_io, fieldnames=['姓名', '部门', '职位', '薪资'])
for row in reader:
print(f"{row['姓名']}: {row['职位']}, 薪资{row['薪资']}")
import csv
output_file = 'output_dict.csv'
# 定义字段名
fieldnames = ['姓名', '年龄', '城市', '职业']
# 准备数据(字典列表)
data = [
{'姓名': '张三', '年龄': '25', '城市': '北京', '职业': '工程师'},
{'姓名': '李四', '年龄': '30', '城市': '上海', '职业': '设计师'},
]
with open(output_file, 'w', newline='', encoding='utf-8') as f:
# 创建 DictWriter,必须指定 fieldnames
writer = csv.DictWriter(f, fieldnames=fieldnames)
# 写入表头
writer.writeheader()
# 写入单行数据
writer.writerow({'姓名': '王五', '年龄': '28', '城市': '广州', '职业': '教师'})
# 写入多行数据
writer.writerows(data)
import csv
from io import StringIO
fieldnames = ['姓名', '年龄', '城市', '职业', '备注']
# 数据缺少某些字段
data = [
{'姓名': '张三', '年龄': '25', '城市': '北京'}, # 缺少职业和备注
{'姓名': '李四', '年龄': '30', '城市': '上海', '职业': '设计师'}, # 缺少备注
]
string_io = StringIO()
# 使用 restval 指定缺失字段的默认值
writer = csv.DictWriter(string_io, fieldnames=fieldnames, restval='N/A')
writer.writeheader()
writer.writerows(data)
print(string_io.getvalue())
# 输出:
# 姓名,年龄,城市,职业,备注
# 张三,25,北京,N/A,N/A
# 李四,30,上海,设计师,N/A
import csv
from io import StringIO
# 处理编码错误
csv_data = "姓名,年龄\n张三,25\n李四,30"
string_io = StringIO(csv_data)
# 使用 errors 参数处理编码错误
with open('data.csv', 'r', newline='', encoding='utf-8', errors='replace') as f:
reader = csv.reader(f)
for row in reader:
print(row)
# 常见的 errors 参数值:
# 'strict' - 默认,遇到错误抛出 UnicodeDecodeError
# 'ignore' - 忽略错误字符
# 'replace' - 用替换错误字符
# 'backslashreplace' - 用\xNN 替换错误字符
import csv
# 方法 1: 使用生成器逐行处理(推荐)
def process_large_file(filepath):
"""逐行处理大文件,内存占用低"""
with open(filepath, 'r', newline='', encoding='utf-8') as f:
reader = csv.reader(f)
next(reader) # 跳过表头
for row in reader:
# 处理每一行数据
yield row
# 使用方法
for row in process_large_file('large_file.csv'):
# 处理每一行
pass
# 方法 2: 批量读取(平衡内存和速度)
def process_in_batches(filepath, batch_size=1000):
"""批量处理"""
with open(filepath, 'r', newline='', encoding='utf-8') as f:
reader = csv.reader(f)
next(reader)
batch = []
for row in reader:
batch.append(row)
if len(batch) >= batch_size:
# 处理一批数据
process_batch(batch)
batch = []
# 处理剩余数据
if batch:
process_batch(batch)
# ✓ 正确做法
with open('file.csv', 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# ✗ 错误做法(Windows 上会产生空行)
with open('file.csv', 'w', encoding='utf-8') as f:
writer = csv.writer(f)
# ✓ 正确做法
with open('file.csv', 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# 写入中文时建议使用 utf-8-sig,Excel 可以正确识别
with open('file.csv', 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.writer(f)
# ✓ 正确做法 - 自动关闭文件
with open('file.csv', 'r', newline='', encoding='utf-8') as f:
reader = csv.reader(f)
for row in reader:
print(row)
# ✗ 错误做法 - 可能忘记关闭文件
f = open('file.csv', 'r', newline='', encoding='utf-8')
reader = csv.reader(f)
# ... 处理数据
f.close() # 容易忘记
# ✓ 推荐做法 - 代码可读性更好
with open('data.csv', 'r', newline='', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
print(row['name']) # 通过字段名访问
# 普通 reader - 需要记住索引位置
with open('data.csv', 'r', newline='', encoding='utf-8') as f:
reader = csv.reader(f)
next(reader) # 跳过表头
for row in reader:
print(row[0]) # 通过索引访问,可读性差
import csv
import sys
# 检查 Python 版本
print(f"当前 Python 版本:{sys.version}")
# QUOTE_STRINGS - 给字符串字段添加引号
string_io = StringIO()
writer = csv.writer(string_io, quoting=csv.QUOTE_STRINGS)
writer.writerow(['用户 ID', '用户名', '年龄', '余额'])
writer.writerow(['U001', '张三', 25, 1500.50])
print(string_io.getvalue())
# 输出:"用户 ID","用户名",年龄,余额
# "U001","张三",25,1500.5
# 特点:只有字符串被引号包裹,数字保持原样
# QUOTE_NOTNULL - 给非 None 字段添加引号
data = [
['订单号', '客户名', '折扣', '备注'],
['ORD001', '张三', None, 'VIP 客户'],
['ORD002', '李四', 0.15, None],
]
string_io = StringIO()
writer = csv.writer(string_io, quoting=csv.QUOTE_NOTNULL)
writer.writerows(data)
print(string_io.getvalue())
# 输出:"订单号","客户名","折扣","备注"
# "ORD001","张三",,"VIP 客户"
# "ORD002","李四","0.15",
# 特点:None 值保持为空(无引号),其他值都有引号
import csv
import sqlite3
def export_table_to_csv(conn, table_name, output_file, where_clause=None):
"""将数据库表导出为 CSV 文件"""
cursor = conn.cursor()
# 获取表结构
cursor.execute(f"PRAGMA table_info({table_name})")
columns = [col[1] for col in cursor.fetchall()]
# 构建查询
query = f"SELECT * FROM {table_name}"
if where_clause:
query += f" WHERE {where_clause}"
cursor.execute(query)
rows = cursor.fetchall()
# 写入 CSV
with open(output_file, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.writer(f)
writer.writerow(columns)
writer.writerows(rows)
return len(rows)
# 使用示例
conn = sqlite3.connect('production.db')
count = export_table_to_csv(conn, 'employees', 'export_employees.csv')
print(f"导出 {count} 条记录")
conn.close()
import csv
import sqlite3
def import_csv_to_table(conn, csv_file, table_name):
"""将 CSV 文件导入数据库表"""
cursor = conn.cursor()
success_count = 0
with open(csv_file, 'r', newline='', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
columns = list(row.keys())
placeholders = ', '.join(['?' for _ in columns])
column_names = ', '.join(columns)
query = f"INSERT INTO {table_name} ({column_names}) VALUES ({placeholders})"
cursor.execute(query, list(row.values()))
success_count += 1
conn.commit()
return success_count
# 使用示例
conn = sqlite3.connect('production.db')
count = import_csv_to_table(conn, 'import_data.csv', 'employees')
print(f"导入 {count} 条记录")
conn.close()
import csv
import sqlite3
def import_batch(conn, csv_file, table_name, batch_size=1000):
"""批量导入,性能提升 10 倍以上"""
cursor = conn.cursor()
total_count = 0
batch = []
with open(csv_file, 'r', newline='', encoding='utf-8') as f:
reader = csv.DictReader(f)
columns = None
for row in reader:
if columns is None:
columns = list(row.keys())
batch.append(list(row.values()))
if len(batch) >= batch_size:
placeholders = ', '.join(['?' for _ in columns])
column_names = ', '.join(columns)
query = f"INSERT INTO {table_name} ({column_names}) VALUES ({placeholders})"
cursor.executemany(query, batch) # 使用 executemany 批量插入
total_count += len(batch)
batch = []
# 插入剩余数据
if batch:
cursor.executemany(query, batch)
total_count += len(batch)
conn.commit()
return total_count
import csv
import re
from collections import defaultdict
class FieldValidator:
"""字段验证器基类"""
def __init__(self, name, required=True, allow_empty=False):
self.name = name
self.required = required
self.allow_empty = allow_empty
def validate(self, value):
if value is None or value == '':
if self.required and not self.allow_empty:
return False, f"{self.name}: 必填字段不能为空"
return True, None
return self._validate_value(value)
def _validate_value(self, value):
return True, None
class StringValidator(FieldValidator):
"""字符串验证器"""
def __init__(self, name, min_length=None, max_length=None, pattern=None, **kwargs):
super().__init__(name, **kwargs)
self.min_length = min_length
self.max_length = max_length
self.pattern = re.compile(pattern) if pattern else None
def _validate_value(self, value):
if self.min_length and len(value) < self.min_length:
return False, f"{self.name}: 长度不能少于 {self.min_length}"
if self.max_length and len(value) > self.max_length:
return False, f"{self.name}: 长度不能超过 {self.max_length}"
if self.pattern and not self.pattern.match(value):
return False, f"{self.name}: 格式不匹配"
return True, None
class IntegerValidator(FieldValidator):
"""整数验证器"""
def __init__(self, name, min_value=None, max_value=None, **kwargs):
super().__init__(name, **kwargs)
self.min_value = min_value
self.max_value = max_value
def _validate_value(self, value):
try:
num = int(value)
if self.min_value and num < self.min_value:
return False, f"{self.name}: 不能小于 {self.min_value}"
if self.max_value and num > self.max_value:
return False, f"{self.name}: 不能大于 {self.max_value}"
return True, None
except ValueError:
return False, f"{self.name}: 必须是整数"
class EmailValidator(StringValidator):
"""邮箱验证器"""
def __init__(self, name, **kwargs):
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
super().__init__(name, pattern=pattern, **kwargs)
# 使用示例
validators = [
StringValidator('用户名', min_length=3, max_length=20),
IntegerValidator('年龄', min_value=0, max_value=150),
EmailValidator('邮箱'),
]
data = {
'用户名': '张三',
'年龄': '25',
'邮箱': '[email protected]'
}
for validator in validators:
is_valid, error = validator.validate(data.get(validator.name))
if is_valid:
print(f"✓ {validator.name}: 有效")
else:
print(f"✗ {validator.name}: {error}")
import csv
def process_with_generator(filepath):
"""使用生成器逐行处理,内存占用低"""
with open(filepath, 'r', newline='', encoding='utf-8') as f:
reader = csv.reader(f)
next(reader) # 跳过表头
for row in reader:
# 处理每一行
yield row
# 使用
# for row in process_with_generator('large_file.csv'):
# process(row)
from io import StringIO
import csv
# ✗ 慢 - 使用 + 拼接
result = ""
for i in range(10000):
result += f"row{i},data{i}\n"
# ✓ 快 - 使用 join
lines = []
for i in range(10000):
lines.append(f"row{i},data{i}")
result = '\n'.join(lines)
# ✓ 更快 - 使用 StringIO(推荐)
output = StringIO()
writer = csv.writer(output)
for i in range(10000):
writer.writerow([f'row{i}', f'data{i}'])
result = output.getvalue()
import csv
# ✗ 慢 - 逐行写入
with open('output.csv', 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
for row in data:
writer.writerow(row)
# ✓ 快 - 批量写入
with open('output.csv', 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerows(data) # 一次性写入所有数据
# ✗ 普通类 - 内存占用大
class Employee:
def __init__(self, id, name, age):
self.id = id
self.name = name
self.age = age
# ✓ 使用__slots__ - 内存占用小
class EmployeeOptimized:
__slots__ = ['id', 'name', 'age']
def __init__(self, id, name, age):
self.id = id
self.name = name
self.age = age
import csv
with open('file.csv', 'r', newline='', encoding='utf-8') as f:
reader = csv.reader(f)
for row in reader:
print(row)
import csv
with open('file.csv', 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['A', 'B', 'C'])
writer.writerows([[1, 2, 3], [4, 5, 6]])
import csv
with open('file.csv', 'r', newline='', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
print(row['column_name'])
import csv
with open('file.csv', 'w', newline='', encoding='utf-8') as f:
fieldnames = ['name', 'age']
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerow({'name': '张三', 'age': 25})
newline='' - 防止在 Windows 上产生空行encoding='utf-8' - 正确处理中文字符with 语句) - 确保文件正确关闭本教程仅供学习使用。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML 转 Markdown 互为补充。 在线工具,Markdown 转 HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML 转 Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online