设计五种算法精确的身份证号匹配
问题定义与数据准备
我们有两个 Excel 文件:
本文针对从大表中匹配小表身份证号的需求,设计了五种算法:暴力循环、Pandas isin、Pandas Merge、SQLite 数据库及分块处理。通过代码实现与性能对比,指出暴力法效率极低不推荐;Pandas isin 和 Merge 速度最快(<1 秒),为首选方案;SQLite 适合复杂逻辑或海量数据;分块处理用于解决内存不足问题。最终建议根据数据规模选择合适方法。

我们有两个 Excel 文件:
small.xlsx: 包含约 5,000 条记录。large.xlsx: 包含约 140,000 条记录。目标:快速、高效地从 large.xlsx 中找出所有其'身份证号'字段存在于 small.xlsx'身份证号'字段中的记录,并将这些匹配的记录保存到一个新的 Excel 文件 result.xlsx 中。
假设:身份证号字段名在两个表中都是 id_card。
首先,进行准备工作,安装必要的库并模拟一些数据用于测试和性能估算。
pip install pandas openpyxl
import pandas as pd
import time
import random
# 为演示和测试,我们可以创建一些模拟数据(实际中使用 pd.read_excel 读取你的文件)
def generate_id_card():
"""生成一个模拟的 18 位身份证号"""
region_code = random.choice(['110101', '310104', '440301']) # 随机地区码
birth_date = f"19{random.randint(50, 99):02d}{random.randint(1, 12):02d}{random.randint(1, 28):02d}"
sequence_code = f"{random.randint(0, 999):03d}" # 顺序码
check_code = random.choice(['X', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9']) # 校验码
return region_code + birth_date + sequence_code + check_code
# 生成小表数据 (5000 条)
small_data = {'id_card': [generate_id_card() for _ in range(5000)]}
small_df = pd.DataFrame(small_data)
small_df.to_excel('small.xlsx', index=False)
# 生成大表数据 (140000 条),并确保其中包含一部分小表中的 ID
large_list = []
ids_from_small = small_df['id_card'].tolist()
overlap_ids = random.sample(ids_from_small, 3000)
for _ in range(140000):
if random.random() < 0.02 and overlap_ids:
id_to_use = random.choice(overlap_ids)
else:
id_to_use = generate_id_card()
large_list.append(id_to_use)
large_data = {'id_card': large_list, 'other_data': ['Some other info'] * 140000}
large_df = pd.DataFrame(large_data)
large_df.to_excel('large.xlsx', index=False)
print("模拟数据生成完成!")
print(f"小表尺寸:{small_df.shape}")
print(f"大表尺寸:{large_df.shape}")
现在,我们开始设计算法。
这是最直观、最基础的方法。
small_df 和 large_df。large_df 的每一行(140,000 次迭代)。small_df 的每一行(5,000 次迭代)。(large_id, small_id),比较它们是否相等。large_df 的当前行标记为匹配。def algorithm_1_brute_force(small_file, large_file, output_file):
""" 算法 1:暴力双重循环 """
print("算法 1:暴力双重循环 - 开始执行")
start_time = time.time()
# 1. 加载数据
small_df = pd.read_excel(small_file)
large_df = pd.read_excel(large_file)
# 确保 id_card 列是字符串类型,避免因数据类型导致的匹配失败
small_df['id_card'] = small_df['id_card'].astype(str)
large_df['id_card'] = large_df['id_card'].astype(str)
# 获取小表的身份证号列表
small_ids = small_df['id_card'].tolist()
# 2. 嵌套循环比对
matched_rows = [] # 存储匹配的行索引或行数据
large_ids = large_df['id_card'].tolist()
for i, large_id in enumerate(large_ids):
for small_id in small_ids:
if large_id == small_id:
matched_rows.append(i) # 记录匹配的行索引
break # 找到一个匹配就可以跳出内层循环,进入下一个大表 ID
# 3. 收集结果
result_df = large_df.iloc[matched_rows]
# 4. 保存结果
result_df.to_excel(output_file, index=False)
end_time = time.time()
execution_time = end_time - start_time
print(f"算法 1 完成。找到 {len(result_df)} 条匹配记录。耗时:{execution_time:.4f} 秒")
return execution_time
isin() 方法这是对暴力算法的极大优化,利用了 Pandas 内置的高效向量化操作。
small_df 的'身份证号'列转换为一个 Python 集合(Set)。集合是基于哈希表实现的,其 in 操作的查询时间复杂度是平均 O(1)。isin() 方法。该方法会接收一个集合或列表,并返回一个布尔序列(Series),指示 large_df 的'身份证号'列中的每个元素是否存在于给定的集合中。large_df 进行索引,快速筛选出所有匹配的行。def algorithm_2_pandas_isin(small_file, large_file, output_file):
""" 算法 2:利用 Pandas 的 isin 和集合 """
print("算法 2:Pandas isin() - 开始执行")
start_time = time.time()
# 1. 加载数据
small_df = pd.read_excel(small_file)
large_df = pd.read_excel(large_file)
small_df['id_card'] = small_df['id_card'].astype(str)
large_df['id_card'] = large_df['id_card'].astype(str)
# 2. 创建查询集合
target_set = set(small_df['id_card'])
# 3. 向量化筛选
mask = large_df['id_card'].isin(target_set)
# 4. 使用布尔索引获取结果
result_df = large_df[mask]
# 5. 保存结果
result_df.to_excel(output_file, index=False)
end_time = time.time()
execution_time = end_time - start_time
print(f"算法 2 完成。找到 {len(result_df)} 条匹配记录。耗时:{execution_time:.4f} 秒")
return execution_time
isin() 是 Pandas 内部用 C 优化过的向量化操作,背后通常也使用了哈希表机制。它的时间复杂度可以近似看作是 O(n)。利用数据库的 INNER JOIN 思想,使用 Pandas 的合并功能。
pd.merge() 函数,以'身份证号'作为连接键,对两个 DataFrame 进行内连接。内连接的特性是只会保留两个表中键值匹配的行。def algorithm_3_pandas_merge(small_file, large_file, output_file):
""" 算法 3:Pandas Merge (Inner Join) """
print("算法 3:Pandas Merge - 开始执行")
start_time = time.time()
# 1. 加载数据
small_df = pd.read_excel(small_file)
large_df = pd.read_excel(large_file)
small_df['id_card'] = small_df['id_card'].astype(str)
large_df['id_card'] = large_df['id_card'].astype(str)
# 2. 执行内连接
# on 参数指定连接的列名。how='inner'表示内连接。
result_df = pd.merge(large_df, small_df[['id_card']], on='id_card', how='inner')
# 3. 保存结果
result_df.to_excel(output_file, index=False)
end_time = time.time()
execution_time = end_time - start_time
print(f"算法 3 完成。找到 {len(result_df)} 条匹配记录。耗时:{execution_time:.4f} 秒")
return execution_time
merge 函数底层也经过了高度优化,通常基于哈希或排序 - 合并算法,效率很高。isin() 方法产生一些额外的中间开销,因为需要协调两个表的列。将数据加载到内存数据库(如 SQLite)中,使用 SQL 语言的 IN 或 JOIN 语句来让数据库引擎完成高效的查找工作。
sqlite3 模块在内存中创建一个临时数据库。import sqlite3
def algorithm_4_sqlite(small_file, large_file, output_file):
""" 算法 4:使用 SQLite 内存数据库 """
print("算法 4:SQLite 内存数据库 - 开始执行")
start_time = time.time()
# 1. 加载数据
small_df = pd.read_excel(small_file)
large_df = pd.read_excel(large_file)
small_df['id_card'] = small_df['id_card'].astype(str)
large_df['id_card'] = large_df['id_card'].astype(str)
# 2. 创建内存数据库连接
conn = sqlite3.connect(':memory:')
# 3. 导入数据到数据库
small_df.to_sql('small_table', conn, index=False)
large_df.to_sql('large_table', conn, index=False)
# 4. 编写并执行 SQL 查询
query = """ SELECT large_table.* FROM large_table WHERE large_table.id_card IN (SELECT id_card FROM small_table) """
result_df = pd.read_sql_query(query, conn)
# 5. 关闭连接
conn.close()
# 6. 保存结果
result_df.to_excel(output_file, index=False)
end_time = time.time()
execution_time = end_time - start_time
print(f"算法 4 完成。找到 {len(result_df)} 条匹配记录。耗时:{execution_time:.4f} 秒")
return execution_time
这个算法并非用于提升速度,而是用于解决内存不足的问题。当 large.xlsx 文件巨大(例如几个 GB),无法一次性读入内存时,就需要使用此方法。
small.xlsx 全部读入内存,并创建集合 S。read_excel() 的 chunksize 参数,分批读取 large.xlsx。isin(S) 方法筛选出匹配的行。def algorithm_5_chunking(small_file, large_file, output_file, chunksize=10000):
""" 算法 5:分块处理(用于内存不足的大文件场景) """
print("算法 5:分块处理 - 开始执行")
start_time = time.time()
# 1. 加载小数据并创建集合
small_df = pd.read_excel(small_file)
small_df['id_card'] = small_df['id_card'].astype(str)
target_set = set(small_df['id_card'])
# 2. 初始化一个列表来存储每个块的结果
chunks_result_list = []
# 3. 分块读取大数据
chunk_reader = pd.read_excel(large_file, chunksize=chunksize)
for chunk in chunk_reader:
chunk['id_card'] = chunk['id_card'].astype(str)
# 4. 处理当前块
mask = chunk['id_card'].isin(target_set)
filtered_chunk = chunk[mask]
chunks_result_list.append(filtered_chunk)
print(f"已处理一个数据块,该块找到 {len(filtered_chunk)} 条匹配记录。")
# 5. 合并结果并保存
if chunks_result_list:
final_result_df = pd.concat(chunks_result_list, ignore_index=True)
else:
final_result_df = pd.DataFrame()
final_result_df.to_excel(output_file, index=False)
end_time = time.time()
execution_time = end_time - start_time
print(f"算法 5 完成。找到 {len(final_result_df)} 条匹配记录。耗时:{execution_time:.4f} 秒")
return execution_time
我们将五种算法的优缺点和适用场景总结如下:
| 算法 | 优点 | 缺点 | 预计时间 | 推荐度 |
|---|---|---|---|---|
| 1. 暴力循环 | 实现简单 | 速度极慢,无法忍受 | ~30 分钟以上 | ⭐(绝不推荐) |
| 2. Pandas isin() | 实现简单,速度最快 | 需要内存容纳小表集合 | <1 秒 | ⭐⭐⭐⭐⭐(首选) |
| 3. Pandas Merge | 实现简单,速度最快 | 略有额外开销 | ~1 秒 | ⭐⭐⭐⭐⭐(首选) |
| 4. SQLite | 高效,支持复杂查询,海量数据优势 | 步骤稍多,数据迁移开销 | 1-3 秒 | ⭐⭐⭐⭐(备用方案) |
| 5. 分块处理 | 内存友好,可处理超大文件 | 速度较慢,实现稍复杂 | 2-5 秒 | ⭐⭐⭐(特殊场景) |
最终结论与建议:
isin())或算法三(merge())。它们是专门为这种表格数据操作设计的,代码简洁、效率最高。代码执行:你可以创建一个主函数来运行和比较这些算法(除了算法一)。
if __name__ == '__main__':
files = ('small.xlsx', 'large.xlsx')
times = {}
times['alg_2'] = algorithm_2_pandas_isin(*files, 'result_2.xlsx')
times['alg_3'] = algorithm_3_pandas_merge(*files, 'result_3.xlsx')
times['alg_4'] = algorithm_4_sqlite(*files, 'result_4.xlsx')
times['alg_5'] = algorithm_5_chunking(*files, 'result_5.xlsx', chunksize=50000)
print("\n=== 所有算法耗时对比 ===")
for alg, t in times.items():
print(f"{alg}: {t:.4f} 秒")
在实际运行中,你会看到算法二和三以绝对优势胜出。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML 转 Markdown 互为补充。 在线工具,Markdown 转 HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML 转 Markdown在线工具,online