使用 Python 和向量数据库构建实时患者信息管理系统
介绍如何使用 Python 结合 Chroma 向量数据库和 sentence-transformers 模型构建实时患者信息管理系统。涵盖向量数据库原理、数据建模、嵌入生成、增删改查、语义搜索及混合过滤查询。实现了实时更新功能,支持流式数据处理(如 Redis Stream),并讨论了索引优化、安全隐私保护及合规性建议。适用于医疗信息化场景下的辅助诊断、临床研究和实时监控。

介绍如何使用 Python 结合 Chroma 向量数据库和 sentence-transformers 模型构建实时患者信息管理系统。涵盖向量数据库原理、数据建模、嵌入生成、增删改查、语义搜索及混合过滤查询。实现了实时更新功能,支持流式数据处理(如 Redis Stream),并讨论了索引优化、安全隐私保护及合规性建议。适用于医疗信息化场景下的辅助诊断、临床研究和实时监控。

在现代医疗信息化建设中,患者数据的管理和利用正变得越来越复杂。传统的数据库(如关系型数据库)擅长存储结构化数据,但对于非结构化的文本信息(如病历描述、症状记录、影像报告等),却难以实现高效的语义检索。当医生希望根据一段症状描述快速找到历史上相似病例时,传统的关键词搜索往往因为用词差异而漏掉关键信息。
与此同时,随着物联网设备的普及,患者的生命体征数据(如心率、血压、血糖)正在以流式的方式实时产生,如何及时更新这些数据,并在海量历史数据中快速找到相似模式,成为临床决策支持系统的重要挑战。
向量数据库(Vector Database)的兴起为解决这些问题提供了新的思路。通过将文本、图像甚至结构化数据转换为高维向量(Embeddings),向量数据库能够基于向量之间的相似度进行快速检索,实现语义级别的搜索。结合实时更新能力,向量数据库可以成为实时患者信息管理系统的核心组件。
本文将详细介绍如何使用 Python 编程语言,结合 Chroma 向量数据库和 sentence-transformers 嵌入模型,构建一个能够实时获取和更新患者信息的原型系统。我们将从基础概念讲起,逐步深入到代码实现、性能优化、安全隐私以及扩展应用,力求覆盖一个完整项目的方方面面。
目标读者:具备一定 Python 基础,对数据库和机器学习有基本了解的开发者、医疗信息化从业者或研究者。阅读完本文后,您将能够:
向量数据库是一种专门用于存储、索引和查询向量数据的数据库系统。这里的'向量'通常指通过机器学习模型(如神经网络)将原始数据(文本、图像、音频等)转换成的固定长度的数值数组。例如,一个句子'患者发烧咳嗽'可能被转换为一个 384 维的向量。
向量数据库的核心能力是相似性搜索(Similarity Search):给定一个查询向量,它能快速返回数据库中与之最相似的若干向量及其对应的原始数据。这种搜索基于向量之间的距离度量,如余弦相似度(Cosine Similarity)、欧氏距离(Euclidean Distance)或点积(Dot Product)。
与传统数据库相比,向量数据库的优势在于:
向量嵌入是将非结构化数据映射到向量空间的过程。一个优秀的嵌入模型应该使得语义相似的输入在向量空间中距离较近,而语义不同的输入距离较远。
例如,使用预训练的句子嵌入模型 all-MiniLM-L6-v2,将句子'患者持续高烧'和'病人体温升高'转换为向量后,它们的余弦相似度会较高,而和'患者血压正常'的相似度则较低。
常见的嵌入模型包括:
在本文中,我们将使用开源的 sentence-transformers/all-MiniLM-L6-v2,它在保证较好效果的同时模型小巧(约 80 MB),适合本地部署。
为了在大量向量中快速找到最近邻,向量数据库通常采用近似最近邻(ANN)算法,而非精确的暴力扫描。常见的 ANN 算法包括:
Chroma 底层支持多种 ANN 库(如 HNSW),并提供了简洁的 API,我们无需手动选择算法,直接使用即可。
在动手编码之前,我们需要明确系统的功能需求和性能要求。
假设我们要管理每位患者的信息,包括:
一个示例患者数据如下:
{
"patient_id": "P001234",
"name": "张三",
"age": 58,
"gender": "男",
"symptoms": "发热、咳嗽、乏力,伴胸闷气短",
"lab_results": {
"blood_pressure": "145/90",
"heart_rate": 102,
"blood_sugar": 7.8,
"temperature": 38.5
},
"timestamp": "2025-04-07T14:30:00Z"
}
患者数据属于高度敏感信息,必须考虑:
目前主流的 Python 向量数据库方案有:
| 数据库 | 类型 | 特点 | 适用场景 |
|---|---|---|---|
| FAISS | 本地库 | 仅向量索引,不支持元数据存储和过滤,需自行管理 | 纯向量检索,高吞吐 |
| Chroma | 本地/嵌入式 | 轻量级,支持元数据存储和过滤,API 简洁,集成方便 | 中小规模,快速原型 |
| Milvus | 分布式 | 企业级,支持高可用、水平扩展,功能全面 | 大规模生产环境 |
| Weaviate | 云/本地 | 支持 GraphQL 查询,内置向量化和模块化 | 需要灵活查询和混合搜索 |
| Pinecone | 云服务 | 全托管,无需运维,但可能产生费用 | 快速上线,不想自建基础设施 |
考虑到本文的重点是演示实现原理,且希望本地可运行,我们选择 Chroma。Chroma 是一个开源的嵌入式向量数据库,具有以下优点:
pip install chromadb)。我们选用 sentence-transformers/all-MiniLM-L6-v2,它是一个轻量级的预训练模型,将句子转换为 384 维向量,兼顾速度和效果。该模型在多语言任务上表现良好,支持中文。
若需更好的中文支持,可考虑 paraphrase-multilingual-MiniLM-L12-v2 或调用商业 API(如 OpenAI)。但为了本地部署和隐私安全,我们优先使用开源模型。
在本文中,我们将聚焦核心的向量数据库操作,不强制使用 Web 框架,但会在扩展部分讨论如何对接实时流。
下图展示了系统的核心组件和数据流向:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 数据源 │ ──> │ 嵌入服务 │ ──> │ 向量数据库 │
│ (手动/流式) │ │(sentence- │ │ (Chroma) │
└─────────────┘ │ transformers)│ └─────────────┘
└─────────────┘
▲
│
│
└───────────┬───────────┘
│
┌───────────┴───────────┐
│ 查询接口 │
│ (Python 函数) │
└───────────────────────┘
我们将系统划分为以下模块:
在接下来的章节中,我们将依次实现这些模块。
首先,创建一个新的 Python 虚拟环境并安装必要的包:
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install chromadb sentence-transformers numpy pandas
如果需要构建 API,可以安装 fastapi 和 uvicorn。
Chroma 客户端可以指定持久化目录,以便重启后数据不丢失。
import chromadb
from chromadb.config import Settings
# 持久化目录
PERSIST_DIRECTORY = "./chroma_patient_db"
# 初始化客户端
client = chromadb.Client(Settings(
chroma_db_impl="duckdb+parquet",
persist_directory=PERSIST_DIRECTORY,
anonymized_telemetry=False # 关闭匿名数据收集
))
# 创建或获取集合
COLLECTION_NAME = "patients"
existing_collections = client.list_collections()
if COLLECTION_NAME not in [col.name for col in existing_collections]:
collection = client.create_collection(name=COLLECTION_NAME)
else:
collection = client.get_collection(COLLECTION_NAME)
为了代码清晰,我们使用 Python 的 dataclass 定义患者信息结构,并包含一个方法生成用于嵌入的文本。
from dataclasses import dataclass, asdict
from typing import Dict, Any
@dataclass
class Patient:
patient_id: str
name: str
age: int
gender: str
symptoms: str
lab_results: Dict[str, float] # 如 {"blood_pressure_sys": 145, ...}
timestamp: str # ISO 格式
def to_embedding_text(self) -> str:
"""生成用于嵌入的文本,包含关键信息"""
lab_text = ", ".join([f"{k}:{v}" for k, v in self.lab_results.items()])
return f"患者姓名:{self.name},性别:{self.gender},年龄:{self.age},症状:{self.symptoms},检查结果:{lab_text},时间:{self.timestamp}"
def to_metadata(self) -> Dict[str, Any]:
"""生成存储在 Chroma 中的元数据字段(用于过滤)"""
meta = {
"patient_id": self.patient_id,
"age": self.age,
"gender": .gender,
: .timestamp,
}
k, v .lab_results.items():
meta[k] = v
meta
注意:血压通常表示为两个数值,我们可以拆分为 bp_sys 和 bp_dia 两个字段,或者存储为字符串(但字符串无法用于数值过滤)。为了便于过滤,建议拆分为数值字段。
我们封装一个嵌入器类,负责加载模型并将文本转为向量列表。
from sentence_transformers import SentenceTransformer
from typing import List
class Embedder:
def __init__(self, model_name: str='all-MiniLM-L6-v2'):
self.model = SentenceTransformer(model_name)
def encode(self, texts: List[str]) -> List[List[float]]:
"""将文本列表转换为向量列表"""
embeddings = self.model.encode(texts, convert_to_numpy=True)
return embeddings.tolist()
def encode_one(self, text: str) -> List[float]:
return self.encode([text])[0]
# 全局嵌入器实例(可单例)
embedder = Embedder()
我们将对 Chroma 集合的操作封装成一个类,方便业务层调用。
class PatientVectorDB:
def __init__(self, collection, embedder):
self.collection = collection
self.embedder = embedder
def upsert_patient(self, patient: Patient):
"""插入或更新患者信息"""
# 生成嵌入文本并向量化
text = patient.to_embedding_text()
embedding = self.embedder.encode_one(text)
# 准备元数据
metadata = patient.to_metadata()
# 使用 upsert:如果 patient_id 已存在,则更新;否则插入
self.collection.upsert(
ids=[patient.patient_id],
embeddings=[embedding],
metadatas=[metadata],
documents=[text] # 可选,存储原始文本便于查看
)
print(f"Upserted patient {patient.patient_id}")
def delete_patient(self, patient_id: str):
"""根据 patient_id 删除患者记录"""
self.collection.delete(ids=[patient_id])
print(f"Deleted patient {patient_id}")
def get_patient(self, patient_id: str) -> Dict[str, Any]:
"""根据 patient_id 查询患者详细信息(返回元数据)"""
result = self.collection.get(ids=[patient_id])
if result and result['metadatas']:
return result['metadatas'][0]
:
() -> []:
query_emb = .embedder.encode_one(query_text)
results = .collection.query(
query_embeddings=[query_emb],
n_results=n_results,
where=where
)
metadatas = results[][]
distances = results[][]
output = []
meta, dist (metadatas, distances):
output.append({
: meta.get(),
: meta,
: - dist
})
output
现在我们编写一些测试数据来验证功能。
# 创建几个患者实例
patient1 = Patient(
patient_id="P001", name="张三", age=45, gender="男",
symptoms="发热、咳嗽、乏力,体温 38.5 度",
lab_results={"bp_sys": 130, "bp_dia": 85, "heart_rate": 90, "blood_sugar": 6.2, "temperature": 38.5},
timestamp="2025-04-07T10:00:00Z"
)
patient2 = Patient(
patient_id="P002", name="李四", age=62, gender="男",
symptoms="胸闷、气短、心悸,偶有胸痛",
lab_results={"bp_sys": 150, "bp_dia": 95, "heart_rate": 110, "blood_sugar": 7.0, "temperature": 36.8},
timestamp="2025-04-07T11:20:00Z"
)
patient3 = Patient(
patient_id="P003", name="王芳", age=34, gender="女",
symptoms="头痛、恶心、视力模糊",
lab_results={"bp_sys": 120, "bp_dia": 80, "heart_rate": 75, "blood_sugar": 5.1, "temperature": 36.5},
timestamp="2025-04-06T09:15:00Z"
)
# 初始化数据库操作对象
db = PatientVectorDB(collection, embedder)
db.upsert_patient(patient1)
db.upsert_patient(patient2)
db.upsert_patient(patient3)
query =
results = db.search_similar(query, n_results=)
()
r results:
()
输出示例:
相似病例搜索结果:
患者 ID: P001, 相似度:0.923, 症状:发热、咳嗽、乏力,体温 38.5 度
患者 ID: P002, 相似度:0.456, 症状:胸闷、气短、心悸,偶有胸痛
可以看到,与咳嗽发热最相似的病例是 P001,相似度远高于 P002。
实时更新是指当患者的新数据到达时,立即更新数据库中的记录。Chroma 的 upsert 操作正好满足这一需求:如果 ID 已存在,则覆盖原有记录。
假设患者张三(P001)进行了复诊,症状变为'发热已退,仍有咳嗽',同时新的血压和心率如下:
# 更新患者 P001
patient1_updated = Patient(
patient_id="P001", name="张三", age=45, gender="男",
symptoms="咳嗽,少量白痰,无发热",
lab_results={"bp_sys": 125, "bp_dia": 80, "heart_rate": 80, "blood_sugar": 6.0, "temperature": 36.6},
timestamp="2025-04-08T09:00:00Z"
)
db.upsert_patient(patient1_updated)
# 再次搜索'咳嗽'
results = db.search_similar("咳嗽", n_results=3)
print("更新后相似病例:")
for r in results:
print(f"患者 ID: {r['patient_id']}, 症状:{r['metadata']['symptoms']}")
此时,P001 的症状已更新为'咳嗽,少量白痰,无发热',搜索引擎会基于新的文本向量进行匹配。
有时我们需要在语义搜索的基础上加上结构化字段的筛选。例如,查找症状类似且年龄大于 60 的患者。
# 查询文本:胸闷
query = "胸闷"
# 过滤条件:年龄 > 60
where = {"age": {"$gt": 60}}
results = db.search_similar(query, n_results=5, where=where)
print("年龄>60 且症状类似胸闷的病例:")
for r in results:
print(f"患者 ID: {r['patient_id']}, 年龄:{r['metadata']['age']}, 症状:{r['metadata']['symptoms']}")
Chroma 支持丰富的过滤语法,例如:
{"gender": "男"}{"age": {"$gt": 50}}, {"bp_sys": {"$gte": 140}}{"$and": [{"age": {"$gt": 50}}, {"gender": "男"}]}更多过滤语法请参考 Chroma 文档。
根据患者 ID 删除记录:
db.delete_patient("P003")
在许多医疗场景中,数据不是一次性录入的,而是以流的方式持续到达。例如,ICU 中的监护仪每秒上传一次心率、血压数据,或者可穿戴设备定期上传健康指标。如何处理这种高频、实时的数据流,并及时更新向量数据库?
实时流处理通常涉及以下组件:
如果数据量不大,也可以直接使用异步处理(如 asyncio)接收 HTTP 请求并立即更新,但需要确保高并发下的稳定性。
这里我们用一个简化的例子,使用 Redis 的 Stream 数据结构作为消息队列,通过 Python 的 redis-py 和 asyncio 实现消费者。
首先安装依赖:
pip install redis aioredis
生产者(模拟设备)不断发送患者更新:
import redis
import json
import time
import random
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
while True:
# 模拟患者 ID 和数据
patient_id = random.choice(["P001", "P002", "P003"])
data = {
"patient_id": patient_id,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"lab_results": {
"heart_rate": random.randint(60, 120),
"bp_sys": random.randint(110, 160),
"bp_dia": random.randint(70, 100)
}
}
# 发送到 Redis Stream "patient_updates"
r.xadd("patient_updates", {"data": json.dumps(data)})
time.sleep(0.5) # 每 0.5 秒发送一次
消费者:持续监听 Stream,处理每条消息。
import asyncio
import aioredis
import json
from your_implementation import db, embedder, Patient # 假设已导入之前的模块
async def process_message(data_json: str):
"""处理单条消息,更新对应患者"""
data = json.loads(data_json)
patient_id = data["patient_id"]
# 从数据库获取该患者的现有信息(假设我们存储了部分静态信息,如 name, age, gender)
# 为了简化,我们假设通过 get_patient 能获取最近一次存储的完整信息
existing = db.get_patient(patient_id)
if not existing:
print(f"患者 {patient_id} 不存在,无法更新")
return
# 更新 lab_results 和时间戳
# 注意:existing 中包含之前的所有元数据,但可能缺少某些字段,我们需要重构一个 Patient 对象
# 在实际中,可能需要额外的存储来保留完整信息,此处仅作示例
updated_lab = existing.copy()
# 更新数值字段
for k, v in data["lab_results"].items():
updated_lab[k] = v
# 创建新的 Patient 对象
patient = Patient(
patient_id=patient_id,
name=existing.get("name", ""),
age=existing.get("age", 0),
gender=existing.get("gender", ""),
symptoms=existing.get("symptoms", ""), # 症状可能不变
lab_results={k: v for k, v in updated_lab.items() if k in ["heart_rate", "bp_sys", "bp_dia", , ]},
timestamp=data[]
)
db.upsert_patient(patient)
()
():
redis_conn = aioredis.from_url()
stream =
last_id =
:
messages = redis_conn.xread({stream: last_id}, block=)
msg_stream, msg_list messages:
msg_id, msg_data msg_list:
data_json = msg_data[].decode()
process_message(data_json)
last_id = msg_id
__name__ == :
asyncio.run(consumer())
这个例子展示了如何将流式数据转换为向量数据库的更新操作。在实际应用中,还需要考虑消息确认、失败重试、幂等性等问题。
对于高频流式更新,每次更新都重新计算整个文本的嵌入可能成本较高。如果只是数值字段变化(如心率),文本描述可能不变,那么向量的变化微乎其微,但数值字段的更新会影响过滤查询。我们可以优化:
upsert 需要提供整个向量,但我们可以先获取原向量再更新元数据,或者使用 update 方法(Chroma 提供 update 但必须提供 embedding 或 document)。更好的做法是分离存储:将静态文本和动态数值分开,但这样查询时需要联合,复杂度增加。Chroma 底层使用 HNSW 索引,该索引有几个可调参数,影响检索速度和召回率:
hnsw:space:距离度量,可选 'cosine'(默认)、'l2'、'ip'(内积)。对于文本嵌入,通常使用余弦相似度。hnsw:construction_ef:构建索引时的动态列表大小,越大索引质量越高,但构建时间越长。默认 100。hnsw:M:每个节点的最大连接数,越大索引更精确,但内存占用高。默认 16。可以在创建集合时指定这些参数:
collection = client.create_collection(
name="patients",
metadata={"hnsw:space": "cosine", "hnsw:construction_ef": 200, "hnsw:M": 32}
)
当需要插入大量历史数据时,逐条插入非常慢。Chroma 支持批量操作:
# 假设有 patients 列表
ids = [p.patient_id for p in patients]
embeddings = [embedder.encode_one(p.to_embedding_text()) for p in patients]
metadatas = [p.to_metadata() for p in patients]
documents = [p.to_embedding_text() for p in patients]
collection.add(
ids=ids,
embeddings=embeddings,
metadatas=metadatas,
documents=documents
)
批量操作大幅减少网络往返和事务开销。
对于频繁查询的热门患者数据,可以在应用层加缓存(如 Redis),减少对向量数据库的访问。但需要注意缓存的失效策略,确保实时性。
当数据量达到百万级以上,单机 Chroma 可能无法满足性能要求。此时可以考虑:
选择哪种方案取决于团队的运维能力和数据规模。
记录所有对患者数据的操作,包括谁、什么时间、做了什么操作(查询、修改、删除)。Chroma 本身不提供审计功能,需要在应用层实现。
在使用向量数据库时,需要注意嵌入模型可能无意中记忆训练数据中的敏感信息,但使用通用预训练模型的风险较小。另外,向量本身无法直接还原为原始文本,但通过相似性搜索仍可能推断出敏感信息,因此对查询结果也应进行权限控制。
医生在接诊新患者时,输入患者的症状描述,系统返回历史上症状相似的患者及其诊断结果、治疗方案,为医生提供参考。
研究人员希望筛选符合特定条件的患者群体,例如'年龄 50-70 岁、有高血压史、最近一次心电图显示 ST 段抬高'。通过混合查询,可以快速定位符合条件的患者,并提取其匿名化数据进行统计分析。
对于 ICU 患者,实时监测生命体征,当发现某个患者的体征序列与历史上发生心脏骤停的病例高度相似时,系统自动向医护人员发出预警。
本文详细介绍了如何使用 Python 和 Chroma 向量数据库构建一个能够实时管理患者信息、支持语义搜索和结构化过滤的系统。我们实现了从数据模型设计、嵌入生成、增删改查到实时流式更新的完整流程,并讨论了性能优化和安全隐私方面的考量。
向量数据库为医疗信息管理带来了新的可能性,它突破了传统关键词搜索的局限,让计算机能够理解临床描述的语义,从而更智能地辅助决策。未来,随着多模态大模型的发展,我们可以将影像、基因序列等也转换为向量,实现跨模态的相似性检索,为精准医疗提供更强有力的工具。
当然,本文的示例只是一个原型,实际生产环境还需要考虑高可用、容灾、监控、版本升级等工程问题。但希望这篇文章能为您的项目起步提供坚实的基础。
为了方便读者直接运行,这里提供所有代码的整合,分为几个文件:
models.py:数据模型定义embedder.py:嵌入器封装db.py:数据库操作封装main.py:示例主程序stream_consumer.py:流式消费者示例(篇幅所限,此处不重复粘贴,但建议读者按章节组织代码。)
参考资料:

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online