AI 大模型实践项目:医学影像分类器(肺结节检测)
本项目利用深度学习技术开发肺结节检测分类器,基于 CT 影像区分良性和恶性结节,聚焦 、 以及受 启发的多模态方法。使用 数据集,整合 原理(自注意力、位置编码),增强代码支持 3D 处理和分割任务,新增高级可视化和隐私保护技术(如联邦学习)。文章结构如下:
AI 大模型实践项目:医学影像分类器(肺结节检测) 本项目利用深度学习技术开发肺结节检测分类器,基于 CT 影像区分良性和恶性结节,聚焦 **卷积神经网络(CNN)**、**视觉变换器(Vision Transformer, ViT)** 以及受 **Med-PaLM** 启发的多模态方法。使用 **LUNA16** 数据集,整合 **Transformer** 原理(自注意力、位置编码),增强代…

本项目利用深度学习技术开发肺结节检测分类器,基于 CT 影像区分良性和恶性结节,聚焦 、 以及受 启发的多模态方法。使用 数据集,整合 原理(自注意力、位置编码),增强代码支持 3D 处理和分割任务,新增高级可视化和隐私保护技术(如联邦学习)。文章结构如下:
应用与展望:多模态融合、实时诊断、联邦学习。


以下为 LUNA16 数据预处理代码,支持 2D 和 3D 数据:
import pydicom
import numpy as np
import pandas as pd
import os
from torch.utils.data import Dataset
import albumentations as A
from albumentations.pytorch import ToTensorV2
from monai.transforms import Compose, Resize, RandRotate, RandFlip, ToTensor
# 自定义数据集
class LUNA16Dataset(Dataset):
def __init__(self, dicom_dir, annotations_file, mode='2d', transform=None):
""" LUNA16 数据集
:param dicom_dir: DICOM 文件目录
:param annotations_file: 标注 CSV 文件
:param mode: '2d' 或 '3d'(切片或体块)
:param transform: 数据增强
"""
self.dicom_dir = dicom_dir
self.annotations = pd.read_csv(annotations_file)
self.mode = mode
self.transform = transform
def __len__(self):
return len(self.annotations)
def __getitem__(self, idx):
# 读取 DICOM
dicom_id = self.annotations.iloc[idx]['dicom_id']
dicom_path = os.path.join(self.dicom_dir, dicom_id)
ds = pydicom.dcmread(dicom_path)
image = ds.pixel_array.astype(np.float32)
# [H, W] 或 [D, H, W]
# 归一化
image = (image - np.min(image)) / (np.max(image) - np.min(image) + 1e-6)
# 提取结节区域
if self.mode == '2d':
x, y, w, h, z = self.annotations.iloc[idx][['x','y','width','height','z']].values
image = image[z, y:y+h, x:x+w] # 2D 切片
else:
# 3d
x, y, z, w, h, d = self.annotations.iloc[idx][['x','y','z','width','height','depth']].values
image = image[z:z+d, y:y+h, x:x+w] # 3D 体块
# 数据增强
if self.transform:
if self.mode == '2d':
augmented = self.transform(image=image)
image = augmented['image']
else:
image = self.transform(image[np.newaxis,...])[0] # 添加通道维度
label = self.annotations.iloc[idx]['label'] # 0: 良性,1: 恶性
return {'image': image, 'label': torch.tensor(label, dtype=torch.long)}
# 数据增强
transform_2d = A.Compose([
A.Resize(224, 224),
A.Rotate(limit=30, p=0.5),
A.HorizontalFlip(p=0.5),
A.RandomBrightnessContrast(p=0.3),
A.Normalize(mean=[0.5], std=[0.5]),
ToTensorV2()
])
transform_3d = Compose([
Resize(spatial_size=(32, 32, 32)),
RandRotate(range_x=30, prob=0.5),
RandFlip(spatial_axis=0, prob=0.5),
ToTensor()
])
# 加载数据集
dataset_2d = LUNA16Dataset(dicom_dir='path/to/luna16', annotations_file='annotations.csv', mode='2d', transform=transform_2d)
dataset_3d = LUNA16Dataset(dicom_dir='path/to/luna16', annotations_file='annotations.csv', mode='3d', transform=transform_3d)
代码注释:
dicom_dir 和 annotations_file 为实际路径。基于 ResNet-50,支持 2D 和 3D CT 影像分类:
import torch
import torch.nn as nn
from torchvision.models import resnet50
from monai.networks.nets import ResNet
from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score, confusion_matrix
# 3D ResNet-50
class ResNet3D(nn.Module):
def __init__(self, num_classes=2):
super().__init__()
self.resnet = ResNet(block='bottleneck', layers=[3,4,6,3], spatial_dims=3, n_input_channels=1, num_classes=num_classes)
def forward(self, x):
return self.resnet(x)
# 2D ResNet-50
class ResNet2D(nn.Module):
def __init__(self, num_classes=2):
super().__init__()
self.resnet = resnet50(pretrained=True)
self.resnet.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3)
self.resnet.fc = nn.Linear(self.resnet.fc.in_features, num_classes)
def forward(self, x):
return self.resnet(x)
# 训练函数
def train_model(model, dataloader, criterion, optimizer, num_epochs=10, device='cuda'):
model = model.to(device)
train_losses = []
for epoch in range(num_epochs):
model.train()
running_loss = 0.0
for batch in dataloader:
images = batch['image'].to(device)
labels = batch['label'].to(device)
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
avg_loss = running_loss / len(dataloader)
train_losses.append(avg_loss)
print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}')
return train_losses
# 数据加载器
dataloader_2d = DataLoader(dataset_2d, batch_size=16, shuffle=True)
dataloader_3d = DataLoader(dataset_3d, batch_size=8, shuffle=True)
# 初始化模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model_2d = ResNet2D(num_classes=2)
model_3d = ResNet3D(num_classes=2)
criterion = nn.CrossEntropyLoss(weight=torch.tensor([0.3, 0.7]).to(device)) # 处理类不平衡
optimizer_2d = torch.optim.Adam(model_2d.parameters(), lr=1e-4, weight_decay=1e-5)
optimizer_3d = torch.optim.Adam(model_3d.parameters(), lr=1e-4, weight_decay=1e-5)
# 训练
train_losses_2d = train_model(model_2d, dataloader_2d, criterion, optimizer_2d, device=device)
train_losses_3d = train_model(model_3d, dataloader_3d, criterion, optimizer_3d, device=device)
# 推理
def evaluate_model(model, dataloader, device='cuda'):
model.eval()
predictions, true_labels = [], []
with torch.no_grad():
for batch in dataloader:
images = batch['image'].to(device)
labels = batch['label'].to(device)
outputs = model(images)
preds = torch.argmax(outputs, dim=1)
predictions.extend(preds.cpu().numpy())
true_labels.extend(labels.cpu().numpy())
return predictions, true_labels
predictions_2d, true_labels_2d = evaluate_model(model_2d, dataloader_2d)
predictions_3d, true_labels_3d = evaluate_model(model_3d, dataloader_3d)
print("2D ResNet 准确率:", accuracy_score(true_labels_2d, predictions_2d))
print("3D ResNet 准确率:", accuracy_score(true_labels_3d, predictions_3d))
代码注释:
基于 ViT,结合 LoRA 微调,支持注意力可视化:
from transformers import ViTImageProcessor, ViTForImageClassification
from peft import LoraConfig, get_peft_model
from torch.utils.data import DataLoader
import torch
import matplotlib.pyplot as plt
# 加载 ViT
processor = ViTImageProcessor.from_pretrained('google/vit-base-patch16-224')
model = ViTForImageClassification.from_pretrained('google/vit-base-patch16-224', num_labels=2)
# LoRA 微调
lora_config = LoraConfig(r=8, lora_alpha=16, target_modules=["query", "value"])
model = get_peft_model(model, lora_config)
# 训练设置
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
criterion = nn.CrossEntropyLoss(weight=torch.tensor([0.3, 0.7]).to(device))
optimizer = torch.optim.Adam(model.parameters(), lr=2e-5)
# 训练
dataloader = DataLoader(dataset_2d, batch_size=16, shuffle=True)
train_losses = []
for epoch in range(10):
model.train()
running_loss = 0.0
for batch in dataloader:
images = batch['image'].to(device)
labels = batch['label'].to(device)
inputs = processor(images, return_tensors='pt', do_rescale=False).to(device)
outputs = model(**inputs).logits
loss = criterion(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
running_loss += loss.item()
avg_loss = running_loss / len(dataloader)
train_losses.append(avg_loss)
print(f'Epoch [{epoch+1}/10], Loss: {avg_loss:.4f}')
# 注意力可视化
def visualize_attention(model, image, processor, device='cuda'):
model.eval()
inputs = processor(image, return_tensors='pt', do_rescale=False).to(device)
with torch.no_grad():
outputs = model(**inputs, output_attentions=True)
attentions = outputs.attentions[-1].mean(dim=1).squeeze(0) # 最后一层注意力
# 将注意力映射到原始图像
h, w = image.shape[-2:]
attn_map = attentions.mean(dim=0).reshape(14, 14).cpu().numpy() # 假设 224/16=14
attn_map = np.resize(attn_map, (h, w))
plt.imshow(image.squeeze(0), cmap='gray')
plt.imshow(attn_map, cmap='jet', alpha=0.5)
plt.title('ViT 注意力热图')
plt.show()
# 推理与评估
predictions, true_labels = [], []
with torch.no_grad():
for batch in dataloader:
images = batch['image'].to(device)
labels = batch['label'].to(device)
inputs = processor(images, return_tensors='pt', do_rescale=False).to(device)
outputs = model(**inputs).logits
preds = torch.argmax(outputs, dim=1)
predictions.extend(preds.cpu().numpy())
true_labels.extend(labels.cpu().numpy())
print("ViT 准确率:", accuracy_score(true_labels, predictions))
# 可视化示例
sample_image = dataset_2d[0]['image']
visualize_attention(model, sample_image, processor)
代码注释:
结合 CT 影像和临床文本(如病史),实现多模态分类:
from transformers import ViTModel, BertTokenizer, BertModel
import torch.nn as nn
# 多模态模型
class MultiModalLungNoduleClassifier(nn.Module):
def __init__(self, num_labels=2):
super().__init__()
self.vit = ViTModel.from_pretrained('google/vit-base-patch16-224')
self.bert = BertModel.from_pretrained('bert-base-uncased')
self.fusion = nn.Linear(768+768, 512)
self.classifier = nn.Linear(512, num_labels)
self.relu = nn.ReLU()
self.dropout = nn.Dropout(0.1)
def forward(self, image_inputs, text_inputs):
vit_outputs = self.vit(**image_inputs).pooler_output # [batch, 768]
bert_outputs = self.bert(**text_inputs).pooler_output # [batch, 768]
combined = torch.cat((vit_outputs, bert_outputs), dim=-1)
combined = self.relu(self.fusion(combined))
combined = self.dropout(combined)
logits = self.classifier(combined)
return logits
# 数据集(扩展支持文本)
class LUNA16MultiModalDataset(Dataset):
def __init__(self, dicom_dir, annotations_file, texts, transform=None):
self.dataset = LUNA16Dataset(dicom_dir, annotations_file, mode='2d', transform=transform)
self.texts = texts
self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
def __getitem__(self, idx):
item = self.dataset[idx]
text = self.texts[idx]
text_inputs = self.tokenizer(text, max_length=128, padding='max_length', truncation=True, return_tensors='pt')
item['text_inputs'] = {k: v.squeeze(0) for k, v in text_inputs.items()}
return item
def __len__(self):
return len(self.dataset)
# 数据准备(模拟临床文本)
texts = ["Patient with cough and fever, suspected malignancy."] * len(dataset_2d)
multimodal_dataset = LUNA16MultiModalDataset('path/to/luna16', 'annotations.csv', texts, transform=transform_2d)
dataloader = DataLoader(multimodal_dataset, batch_size=16, shuffle=True)
# 训练
model = MultiModalLungNoduleClassifier(num_labels=2).to(device)
criterion = nn.CrossEntropyLoss(weight=torch.tensor([0.3, 0.7]).to(device))
optimizer = torch.optim.Adam(model.parameters(), lr=2e-5)
for epoch in range(10):
model.train()
running_loss = 0.0
for batch in dataloader:
images = batch['image'].to(device)
labels = batch['label'].to(device)
image_inputs = processor(images, return_tensors='pt', do_rescale=False).to(device)
text_inputs = {k: v.to(device) for k, v in batch['text_inputs'].items()}
outputs = model(image_inputs, text_inputs)
loss = criterion(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
running_loss += loss.item()
print(f'Epoch [{epoch+1}/10], Loss: {running_loss/len(dataloader):.4f}')
代码注释:
为肺结节分割,基于 MONAI 的 UNETR(U-Net+ViT):
from monai.networks.nets import UNETR
from monai.data import DataLoader, Dataset as MonaiDataset
from monai.transforms import LoadImageD, EnsureChannelFirstD, Compose
# 分割数据集
transform_seg = Compose([
LoadImageD(keys=['image']),
EnsureChannelFirstD(keys=['image']),
Resize(spatial_size=(32, 32, 32)),
ToTensor()
])
# 假设分割标注(mask)
seg_data = [{'image': f'path/to/luna16/{i}.dcm', 'mask': f'path/to/mask/{i}.nii'} for i in range(100)]
seg_dataset = MonaiDataset(seg_data, transform=transform_seg)
seg_dataloader = DataLoader(seg_dataset, batch_size=4, shuffle=True)
# UNETR 模型
model = UNETR(in_channels=1, out_channels=2, img_size=(32, 32, 32), feature_size=16).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
# 训练
for epoch in range(10):
model.train()
running_loss = 0.0
for batch in seg_dataloader:
images = batch['image'].to(device)
masks = batch['mask'].to(device)
outputs = model(images)
loss = criterion(outputs, masks)
optimizer.zero_grad()
loss.backward()
optimizer.step()
running_loss += loss.item()
print(f'Epoch [{epoch+1}/10], Loss: {running_loss/len(seg_dataloader):.4f}')
代码注释:
以下为分类和分割任务的评估代码:
from sklearn.metrics import confusion_matrix, roc_curve, auc, classification_report
from monai.metrics import DiceMetric
import seaborn as sns
import matplotlib.pyplot as plt
# 分类评估
def evaluate_classification(model, dataloader, processor=None, device='cuda'):
model.eval()
predictions, true_labels, probs = [], [], []
with torch.no_grad():
for batch in dataloader:
images = batch['image'].to(device)
labels = batch['label'].to(device)
if processor:
# ViT
inputs = processor(images, return_tensors='pt', do_rescale=False).to(device)
outputs = model(**inputs).logits
else:
# CNN
outputs = model(images)
preds = torch.argmax(outputs, dim=1)
predictions.extend(preds.cpu().numpy())
true_labels.extend(labels.cpu().numpy())
probs.extend(torch.softmax(outputs, dim=1)[:, 1].cpu().numpy())
# 混淆矩阵
cm = confusion_matrix(true_labels, predictions)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['良性', '恶性'], yticklabels=['良性', '恶性'])
plt.xlabel('预测')
plt.ylabel('真实')
plt.title('混淆矩阵')
plt.show()
# 分类报告
print(classification_report(true_labels, predictions, target_names=['良性', '恶性']))
# ROC 曲线
fpr, tpr, _ = roc_curve(true_labels, probs)
roc_auc = auc(fpr, tpr)
plt.plot(fpr, tpr, label=f'ROC 曲线 (AUC = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel('假阳性率')
plt.ylabel('真阳性率')
plt.title('ROC 曲线')
plt.legend()
plt.show()
# 分割评估
def evaluate_segmentation(model, dataloader, device='cuda'):
dice_metric = DiceMetric(include_background=False, reduction='mean')
model.eval()
dice_scores = []
with torch.no_grad():
for batch in dataloader:
images = batch['image'].to(device)
masks = batch['mask'].to(device)
outputs = model(images)
preds = torch.argmax(outputs, dim=1, keepdim=True)
dice_metric(preds, masks)
dice_score = dice_metric.aggregate().item()
dice_scores.append(dice_score)
dice_metric.reset()
print(f"Dice 分数:{dice_score:.4f}")
# 评估示例
evaluate_classification(model_2d, dataloader_2d) # 2D ResNet
evaluate_classification(model, dataloader, processor) # ViT
evaluate_segmentation(model, seg_dataloader) # UNETR
代码注释:
以下为优化的医学影像分类和分割工作流:
graph TD
A[输入数据] --> B{预处理模式}
B -->|2D| C[2D 预处理]
B -->|3D| D[3D 预处理]
C --> E[模型选择]
D --> E
E --> F{模型类型}
F -->|CNN| G[ResNet]
F -->|ViT| H[ViT]
F -->|多模态| I[ViT+BERT]
G --> J[训练]
H --> J
I --> J
J --> K[评估]
K --> L{收敛?}
L -->|否| M[优化参数]
M --> J
L -->|是| N[输出结果]
流程图说明:
以下为 CNN 与 ViT 在肺结节分类上的性能对比折线图(假设数据)。

{
"type": "line",
"data": {
"labels": ["2 折", "3 折", "5 折", "10 折"],
"datasets": [
{
"label": "ResNet 召回率",
"data": [0.88, 0.90, 0.91, 0.90],
"borderColor": "#FF6384",
"fill": false
},
{
"label": "ViT 召回率",
"data": [0.90, 0.92, 0.93, 0.92],
"borderColor": "#36A2EB",
"fill": false
}
]
},
"options": {
"title": {
"display": true,
"text": "CNN 与 ViT 召回率对比(肺结节分类)"
},
"scales": {
"xAxes": [{
"scaleLabel": {
"display": true,
"labelString": "交叉验证折数"
}
}],
"yAxes": [{
"scaleLabel": {
"display": true,
"labelString": "召回率"
},
"ticks": {
"min": 0.8,
"max": 1.0
}
}]
}
}
}
说明:
以下为 ResNet-50(2D/3D)、ViT 和多模态模型在召回率上的对比(假设数据):

{
"type": "bar",
"data": {
"labels": ["2D ResNet-50", "3D ResNet-50", "ViT", "多模态"],
"datasets": [
{
"label": "召回率",
"data": [0.88, 0.90, 0.92, 0.94],
"backgroundColor": ["#FF6384", "#36A2EB", "#FFCE56", "#4BC0C0"],
"borderColor": ["#FF6384", "#36A2EB", "#FFCE56", "#4BC0C0"],
"borderWidth": 1
},
{
"label": "精确率",
"data": [0.85, 0.87, 0.89, 0.91],
"backgroundColor": ["#FF6384", "#36A2EB", "#FFCE56", "#4BC0C0"],
"borderColor": ["#FF6384", "#36A2EB", "#FFCE56", "#4BC0C0"],
"borderWidth": 1
}
]
},
"options": {
"scales": {
"y": {
"beginAtZero": true,
"title": {
"display": true,
"text": "性能指标"
}
},
"x": {
"title": {
"display": true,
"text": "模型"
}
}
},
"plugins": {
"title": {
"display": true,
"text": "模型性能对比(肺结节分类)"
}
}
}
}
说明:
以下为模型训练时间对比(假设数据,单位:小时):

{
"type": "bar",
"data": {
"labels": ["2D ResNet-50", "3D ResNet-50", "ViT", "多模态", "UNETR"],
"datasets": [{
"label": "训练时间(小时)",
"data": [2.0, 5.0, 3.0, 6.0, 8.0],
"backgroundColor": ["#FF6384", "#36A2EB", "#FFCE56", "#4BC0C0", "#9966FF"],
"borderColor": ["#FF6384", "#36A2EB", "#FFCE56", "#4BC0C0", "#9966FF"],
"borderWidth": 1
}]
},
"options": {
"scales": {
"y": {
"beginAtZero": true,
"title": {
"display": true,
"text": "训练时间(小时)"
}
},
"x": {
"title": {
"display": true,
"text": "模型"
}
}
},
"plugins": {
"title": {
"display": true,
"text": "模型训练时间对比"
}
}
}
}
说明:
import shap
explainer = shap.DeepExplainer(model_2d, background_data)
shap_values = explainer.shap_values(dataset_2d[0]['image'].unsqueeze(0).to(device))
shap.image_plot(shap_values, dataset_2d[0]['image'].numpy())
import flwr as fl
strategy = fl.federated_averaging.FedAvg()
fl.server.start_server(config=fl.server.ServerConfig(num_rounds=3))
from torchcam.methods import GradCAM
cam = GradCAM(model_2d.resnet, target_layer='layer4')
heatmap = cam(dataset_2d[0]['image'].unsqueeze(0).to(device))
plt.imshow(heatmap[0], cmap='jet', alpha=0.5)
plt.title('ResNet Grad-CAM 热图')
plt.show()
MRI 应用(如脑肿瘤分割):基于 BraTS 数据集:
from monai.data import CacheDataset
data = [{'image': f'brats/{i}.nii', 'mask': f'brats/mask/{i}.nii'} for i in range(100)]
dataset = CacheDataset(data, transform=transform_seg)
RSNA 数据集:可扩展代码支持 RSNA CTA 影像:
df = pd.read_csv('rsna/train.csv')
images = df['image_path'].tolist()
labels = df['label'].tolist()
dataset = LUNA16Dataset('path/to/rsna', 'rsna_annotations.csv', mode='2d', transform=transform_2d)
dicom_dir 和 annotations_file。环境准备:
pip install torch torchvision transformers peft monai pydicom albumentations scikit-learn seaborn matplotlib flower
本文完善并扩展了肺结节检测分类器项目,整合 Transformer、Med-PaLM 和 CheXNet 原理,提供了全面的理论、代码和可视化:

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML 转 Markdown 互为补充。 在线工具,Markdown 转 HTML在线工具,online
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML 转 Markdown在线工具,online
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online