跳到主要内容NVIDIA RTX PC 开源 AI 工具升级:LLM 与扩散模型性能优化 | 极客日志PythonAI算法
NVIDIA RTX PC 开源 AI 工具升级:LLM 与扩散模型性能优化
综述由AI生成NVIDIA RTX PC 开源 AI 工具集迎来重大更新,涵盖 ComfyUI 扩散模型量化、llama.cpp 与 Ollama 的 LLM 加速、LTX-2 音视频生成及 Nemotron 智能体微调。通过 NVFP4/FP8 量化、GPU Token 采样、Flash Attention 等技术,显著降低显存占用并提升推理吞吐量,使开发者能在本地构建高性能 AI 应用。ComfyUI 支持 NVFP4 获得 3-4 倍性能提升,llama.cpp 实现 MoE 模型 35% 吞吐增长,LTX-2 支持 4K 50fps 本地生成,Docling 提供 4 倍 OCR 加速。这些优化降低了硬件门槛,推动 AI PC 生态发展。
林间仙子15 浏览 NVIDIA RTX PC 开源 AI 工具升级:LLM 与扩散模型性能优化

PC 端的 AI 开发活动正在经历显著增长。小型语言模型(SLMs)和扩散模型质量的提升,如 FLUX.2、GPT-OSS-20B 和 Nemotron 3 Nano 等,推动了这一趋势。与此同时,ComfyUI、llama.cpp、Ollama 和 Unsloth 等框架也在不断升级,开发者不再仅仅是在实验生成式 AI 工作流,而是在 NVIDIA GPU 上构建下一代软件栈。
在近期的技术更新中,NVIDIA 针对 AI PC 开发者生态系统发布了多项重要改进,涵盖从底层框架优化到高层应用工具的全方位升级。本文将深入探讨这些开源工具的最新性能提升,并提供详细的代码示例,帮助开发者充分利用 NVIDIA RTX 平台的算力。
ComfyUI 的持续性能改进:扩散模型加速的新里程碑
ComfyUI 作为扩散模型领域最受欢迎的开源框架之一,在 NVIDIA 的协作下实现了显著的性能突破。通过 PyTorch-CUDA 的深度优化,ComfyUI 现已支持 NVFP4 和 FP8 量化格式,这些量化格式分别实现了 60% 和 40% 的显存节省,同时大幅提升了推理性能。开发者使用 NVFP4 格式可获得平均 3 倍的性能提升,使用 NVFP8 格式则可获得 2 倍的性能提升。
ComfyUI 核心优化特性详解
ComfyUI 的最新更新包含了多项关键优化技术,每一项都针对扩散模型推理的特定瓶颈进行了精准优化。
NVFP4 支持:线性层可以使用 NVFP4 格式运行,配合优化的内核实现,相比 FP16 和 BF16 线性层可提供 3-4 倍的吞吐量提升。这一优化特别适合大规模扩散 Transformer 模型。
融合 FP8 量化/反量化内核:通过消除内存带宽受限的操作,这些融合内核显著提升了模型性能。对于没有第四代 Tensor Core 的 NVIDIA RTX GPU(Ada 架构之前),FP8 工作负载的性能得到了进一步改善。
权重流式传输:利用并发的系统内存和 CPU 计算流,权重流式传输技术可以隐藏内存延迟并提高吞吐量,特别适合 VRAM 有限的 GPU。
混合精度支持:模型可以在单个网络中组合多种数值格式,实现精细化调优以获得最佳的准确性和性能平衡。
RMS 和 RoPE 融合:扩散 Transformer 中常见的内存带宽受限算子被融合,减少了内存使用和延迟。这一优化惠及所有数据类型的 DiT 模型。
ComfyUI NVFP4 量化工作流代码示例
下面展示如何在 ComfyUI 中使用 NVFP4 量化格式加载和运行扩散模型。这里有个细节需要注意,配置量化参数时要确保回退数据类型设置合理,防止非量化层出现精度问题。
import torch
import comfy.model_management as mm
from comfy.sd import load_checkpoint_guess_config
import comfy.utils
quantization_config = {
'enable_nvfp4': True,
: ,
: torch.float16
}
checkpoint_path =
model, clip, vae, clip_vision = load_checkpoint_guess_config(
checkpoint_path,
output_vae=,
output_clip=,
embedding_directory=,
quantization=quantization_config
)
device = mm.get_torch_device()
model.to(device)
model.()
prompt_text =
tokens = clip.tokenize(prompt_text)
cond, pooled = clip.encode_from_tokens(tokens, return_pooled=)
sampling_params = {
: ,
: ,
: ,
: ,
:
}
batch_size =
latent_height =
latent_width =
latent_channels =
latent_image = torch.randn(
batch_size, latent_channels, latent_height, latent_width,
device=device, dtype=torch.float16
)
()
torch.inference_mode():
samples = comfy.sample.sample(
model, noise=latent_image, positive=cond, negative=,
cfg=sampling_params[],
steps=sampling_params[],
sampler_name=sampling_params[]
)
decoded_images = vae.decode(samples)
images = (decoded_images + ) /
images = torch.clamp(images, , )
images = (images * ).to(torch.uint8)
PIL Image
numpy np
i, img_tensor (images):
img_np = img_tensor.permute(, , ).cpu().numpy()
img_pil = Image.fromarray(img_np)
img_pil.save()
()
()
'nvfp4_linear_only'
True
'fallback_dtype'
"/path/to/flux2_nvfp4.safetensors"
True
True
None
eval
"A futuristic cityscape at sunset with flying cars"
True
'steps'
20
'cfg'
7.5
'sampler_name'
'euler'
'scheduler'
'normal'
'denoise'
1.0
1
64
64
4
print
"开始使用 NVFP4 量化模型生成图像..."
with
None
'cfg'
'steps'
'sampler_name'
1.0
2.0
0
1
255
from
import
import
as
for
in
enumerate
1
2
0
f"output_nvfp4_{i}.png"
print
f"图像已保存:output_nvfp4_{i}.png"
print
"NVFP4 量化推理完成!"
ComfyUI 混合精度配置示例
混合精度支持允许在同一模型中组合多种数值格式,实现精细化的性能和质量平衡。实际运行时,建议先在小规模数据上测试不同层精度的影响。
import torch
from comfy.model_patcher import ModelPatcher
import comfy.model_management as mm
mixed_precision_config = {
'attention_layers': {
'dtype': torch.float16,
'quantize': False
},
'linear_layers': {
'dtype': 'nvfp4',
'quantize': True,
'calibration': 'minmax'
},
'conv_layers': {
'dtype': 'fp8',
'quantize': True,
'calibration': 'histogram'
},
'norm_layers': {
'dtype': torch.float32,
'quantize': False
}
}
def apply_mixed_precision(model, config):
""" 应用混合精度配置到模型
参数:
model: ComfyUI 模型对象
config: 混合精度配置字典
返回:
patched_model: 应用混合精度后的模型
"""
patcher = ModelPatcher(model)
for name, module in model.named_modules():
if 'attn' in name.lower():
layer_config = config['attention_layers']
if not layer_config['quantize']:
module.to(dtype=layer_config['dtype'])
print(f"注意力层 {name} 使用 {layer_config['dtype']}")
elif isinstance(module, torch.nn.Linear):
layer_config = config['linear_layers']
if layer_config['quantize']:
if layer_config['dtype'] == 'nvfp4':
quantize_linear_nvfp4(module, layer_config['calibration'])
print(f"线性层 {name} 量化为 NVFP4")
elif isinstance(module, (torch.nn.Conv1d, torch.nn.Conv2d, torch.nn.Conv3d)):
layer_config = config['conv_layers']
if layer_config['quantize'] and layer_config['dtype'] == 'fp8':
quantize_conv_fp8(module, layer_config['calibration'])
print(f"卷积层 {name} 量化为 FP8")
elif isinstance(module, (torch.nn.LayerNorm, torch.nn.GroupNorm, torch.nn.BatchNorm2d)):
layer_config = config['norm_layers']
module.to(dtype=layer_config['dtype'])
print(f"归一化层 {name} 使用 {layer_config['dtype']}")
return patcher
def quantize_linear_nvfp4(linear_module, calibration='minmax'):
""" 将线性层量化为 NVFP4 格式
参数:
linear_module: torch.nn.Linear 模块
calibration: 校准方法 ('minmax' 或 'histogram')
"""
weight = linear_module.weight.data
if calibration == 'minmax':
w_min = weight.min()
w_max = weight.max()
scale = (w_max - w_min) / 15.0
zero_point = -w_min / scale
elif calibration == 'histogram':
w_min = torch.quantile(weight, 0.01)
w_max = torch.quantile(weight, 0.99)
scale = (w_max - w_min) / 15.0
zero_point = -w_min / scale
quantized = torch.round((weight - w_min) / scale).clamp(0, 15)
linear_module.weight_scale = scale
linear_module.weight_zero_point = zero_point
linear_module.weight_quantized = quantized.to(torch.uint8)
linear_module.quantization_type = 'nvfp4'
def quantize_conv_fp8(conv_module, calibration='histogram'):
""" 将卷积层量化为 FP8 格式
参数:
conv_module: torch.nn.Conv 模块
calibration: 校准方法
"""
weight = conv_module.weight.data
if calibration == 'histogram':
abs_max = torch.quantile(weight.abs(), 0.99)
else:
abs_max = weight.abs().max()
fp8_max = 448.0
scale = fp8_max / abs_max
quantized = (weight * scale).clamp(-fp8_max, fp8_max)
conv_module.weight_scale = scale
conv_module.weight_quantized = quantized
conv_module.quantization_type = 'fp8'
print("应用混合精度配置...")
patched_model = apply_mixed_precision(model, mixed_precision_config)
print("混合精度配置完成!")
ComfyUI 权重流式传输优化
对于 VRAM 有限的 GPU,权重流式传输技术可以显著提升性能。这相当于把模型分块加载,避免一次性吃满显存。
import torch
import threading
from queue import Queue
class WeightStreamer:
""" 权重流式传输管理器
利用并发的系统内存和 CPU 计算流隐藏内存延迟
"""
def __init__(self, model, device, stream_buffer_size=2):
""" 初始化权重流式传输器
参数:
model: 要流式传输的模型
device: 目标 GPU 设备
stream_buffer_size: 流式缓冲区大小(层数)
"""
self.model = model
self.device = device
self.stream_buffer_size = stream_buffer_size
self.compute_stream = torch.cuda.Stream(device=device)
self.transfer_stream = torch.cuda.Stream(device=device)
self.weight_queue = Queue(maxsize=stream_buffer_size)
self.cpu_weights = {}
for name, param in model.named_parameters():
self.cpu_weights[name] = param.data.cpu().pin_memory()
param.data = torch.empty(0)
print(f"权重流式传输器初始化完成,缓冲区大小:{stream_buffer_size}")
def prefetch_weights(self, layer_names):
""" 预取指定层的权重到 GPU
参数:
layer_names: 要预取的层名称列表
"""
def transfer_worker():
for name in layer_names:
if name in self.cpu_weights:
with torch.cuda.stream(self.transfer_stream):
gpu_weight = self.cpu_weights[name].to(
self.device, non_blocking=True
)
self.weight_queue.put((name, gpu_weight))
transfer_thread = threading.Thread(target=transfer_worker)
transfer_thread.start()
def get_weight(self, layer_name):
""" 获取层权重(如果尚未加载则等待)
参数:
layer_name: 层名称
返回:
weight: GPU 上的权重张量
"""
name, weight = self.weight_queue.get()
self.transfer_stream.synchronize()
return weight
def forward_with_streaming(self, x, layer_sequence):
""" 使用权重流式传输执行前向传播
参数:
x: 输入张量
layer_sequence: 层序列(按执行顺序)
返回:
output: 输出张量
"""
initial_layers = layer_sequence[:self.stream_buffer_size]
self.prefetch_weights(initial_layers)
output = x
for i, layer in enumerate(layer_sequence):
if i + self.stream_buffer_size < len(layer_sequence):
next_layer = layer_sequence[i + self.stream_buffer_size]
self.prefetch_weights([next_layer])
weight = self.get_weight(layer.name)
with torch.cuda.stream(self.compute_stream):
original_weight = layer.weight.data
layer.weight.data = weight
output = layer(output)
layer.weight.data = original_weight
self.compute_stream.synchronize()
return output
print("初始化权重流式传输...")
streamer = WeightStreamer(
model=model,
device=device,
stream_buffer_size=3
)
layer_sequence = [
model.input_blocks[0],
model.input_blocks[1],
model.middle_block,
]
print("使用权重流式传输执行推理...")
with torch.inference_mode():
output = streamer.forward_with_streaming(latent_image, layer_sequence)
print("流式推理完成!")
ComfyUI 的这些优化代码示例可以在 ComfyUI Kitchen 仓库 中找到。NVFP4 和 FP8 检查点也已在 HuggingFace 上发布,包括最新的 LTX-2、FLUX.2、FLUX.1-dev、FLUX.1-Kontext、Qwen-Image 和 Z-Image 模型。
llama.cpp 和 Ollama 的 RTX AI PC 加速
对于小型语言模型(SLMs),混合专家(MoE)模型的 token 生成吞吐量性能在 NVIDIA GPU 上的 llama.cpp 中提升了 35%,在 RTX PC 上的 Ollama 中提升了 30%。
llama.cpp 核心优化详解
llama.cpp 的最新更新包含了多项针对 NVIDIA RTX GPU 的关键优化:
GPU token 采样:将多种采样算法(TopK、TopP、Temperature、minK、minP 和多序列采样)卸载到 GPU,提高响应的质量、一致性和准确性,同时提升性能。
QKV 投影并发:支持运行并发 CUDA 流以加速模型推理。使用 GGML_CUDA_GRAPH_OPT=1 标志启用此功能。
MMVQ 内核优化:将数据预加载到寄存器并通过增加 GPU 在其他任务上的利用率来隐藏延迟,从而加速内核。
更快的模型加载时间:在 DGX Spark 上模型加载时间提升高达 65%,在 RTX GPU 上提升 15%。
Blackwell GPU 原生 MXFP4 支持:在 Blackwell GPU 上使用硬件级 NVFP4 第五代 Tensor Core,LLM 的提示处理速度提升高达 25%。
llama.cpp GPU Token 采样完整示例
下面展示如何在 llama.cpp 中启用 GPU token 采样以获得更好的性能和质量。注意编译时务必开启相关环境变量。
#include"llama.h"
#include"common.h"
#include<vector>
#include<string>
#include<iostream>
struct GPUSamplingConfig {
int top_k = 40;
float top_p = 0.95f;
float temperature = 0.8f;
int min_k = 1;
float min_p = 0.05f;
float repeat_penalty = 1.1f;
int repeat_last_n = 64;
bool use_gpu_sampling = true;
int n_batch = 512;
int n_parallel = 4;
};
class LlamaCppGPUSampler {
private:
llama_model* model;
llama_context* ctx;
GPUSamplingConfig config;
public:
LlamaCppGPUSampler(const std::string& model_path, const GPUSamplingConfig& cfg) : config(cfg) {
llama_model_params model_params = llama_model_default_params();
model_params.n_gpu_layers = 99;
model_params.use_mmap = true;
model_params.use_mlock = false;
std::cout << "加载模型:" << model_path << std::endl;
model = llama_load_model_from_file(model_path.c_str(), model_params);
if (!model) {
throw std::runtime_error("无法加载模型");
}
llama_context_params ctx_params = llama_context_default_params();
ctx_params.n_ctx = 4096;
ctx_params.n_batch = config.n_batch;
ctx_params.n_parallel = config.n_parallel;
ctx_params.n_threads = 8;
ctx_params.n_threads_batch = 8;
ctx_params.offload_kqv = true;
ctx_params.flash_attn = true;
std::cout << "创建上下文(启用 GPU 采样)..." << std::endl;
ctx = llama_new_context_with_model(model, ctx_params);
if (!ctx) {
llama_free_model(model);
throw std::runtime_error("无法创建上下文");
}
std::cout << "GPU 采样初始化完成!" << std::endl;
}
~LlamaCppGPUSampler() {
if (ctx) llama_free(ctx);
if (model) llama_free_model(model);
}
std::string generate(const std::string& prompt, int max_tokens = 512) {
std::vector<llama_token> tokens;
tokens.resize(prompt.size() + 1);
int n_tokens = llama_tokenize(
model, prompt.c_str(), prompt.size(), tokens.data(), tokens.size(),
true,
false
);
tokens.resize(n_tokens);
std::cout << "提示词 token 数:" << n_tokens << std::endl;
llama_batch batch = llama_batch_init(config.n_batch, 0, config.n_parallel);
for (size_t i = 0; i < tokens.size(); i++) {
llama_batch_add(batch, tokens[i], i, {0}, false);
}
batch.logits[batch.n_tokens - 1] = true;
if (llama_decode(ctx, batch) != 0) {
llama_batch_free(batch);
throw std::runtime_error("llama_decode 失败");
}
std::string generated_text;
int n_cur = batch.n_tokens;
int n_decode = 0;
std::cout << "开始生成(使用 GPU 采样)..." << std::endl;
while (n_decode < max_tokens) {
float* logits = llama_get_logits_ith(ctx, batch.n_tokens - 1);
int n_vocab = llama_n_vocab(model);
llama_token new_token = sample_token_gpu(logits, n_vocab);
if (new_token == llama_token_eos(model)) {
std::cout << "\n遇到 EOS 标记,生成结束" << std::endl;
break;
}
char buf[256];
int n = llama_token_to_piece(model, new_token, buf, sizeof(buf));
if (n > 0) {
generated_text.append(buf, n);
std::cout << std::string(buf, n) << std::flush;
}
llama_batch_clear(batch);
llama_batch_add(batch, new_token, n_cur, {0}, true);
n_cur++;
n_decode++;
if (llama_decode(ctx, batch) != 0) {
std::cout << "\nllama_decode 失败" << std::endl;
break;
}
}
llama_batch_free(batch);
std::cout << "\n生成完成!总共生成 " << n_decode << " 个 token" << std::endl;
return generated_text;
}
private:
llama_token sample_token_gpu(float* logits, int n_vocab) {
std::vector<llama_token_data> candidates;
candidates.reserve(n_vocab);
for (int i = 0; i < n_vocab; i++) {
candidates.push_back({i, logits[i], 0.0f});
}
llama_token_data_array candidates_p = {
candidates.data(), candidates.size(), false
};
if (config.repeat_penalty != 1.0f) {
std::vector<llama_token> last_tokens(config.repeat_last_n);
llama_sample_repetition_penalties(
ctx, &candidates_p, last_tokens.data(), last_tokens.size(),
config.repeat_penalty,
0.0f,
0.0f
);
}
if (config.temperature != 1.0f) {
llama_sample_temp(ctx, &candidates_p, config.temperature);
}
if (config.min_p > 0.0f) {
llama_sample_min_p(ctx, &candidates_p, config.min_p, config.min_k);
}
if (config.top_k > 0) {
llama_sample_top_k(ctx, &candidates_p, config.top_k, config.min_k);
}
if (config.top_p < 1.0f) {
llama_sample_top_p(ctx, &candidates_p, config.top_p, config.min_k);
}
llama_token token = llama_sample_token(ctx, &candidates_p);
return token;
}
};
int main() {
GPUSamplingConfig config;
config.top_k = 40;
config.top_p = 0.95f;
config.temperature = 0.8f;
config.repeat_penalty = 1.1f;
config.use_gpu_sampling = true;
try {
LlamaCppGPUSampler sampler("/path/to/nemotron-nano-v2-q4_k_m.gguf", config);
std::string prompt = "Explain quantum computing in simple terms:";
std::string generated = sampler.generate(prompt, 512);
std::cout << "\n\n=== 完整生成结果 ===" << std::endl;
std::cout << prompt << generated << std::endl;
} catch (const std::exception& e) {
std::cerr << "错误:" << e.what() << std::endl;
return 1;
}
return 0;
}
llama.cpp CUDA 图优化和 QKV 并发
使用 CUDA 图和 QKV 并发可以进一步提升性能。编译脚本如下:
#!/bin/bash
export GGML_CUDA_GRAPH_OPT=1
export GGML_CUDA_FA_ALL_QUANTS=1
export CUDA_LAUNCH_BLOCKING=0
cd /path/to/llama.cpp
mkdir -p build && cd build
cmake .. \
-DLLAMA_CUBLAS=ON \
-DLLAMA_CUDA_FA=ON \
-DLLAMA_CUDA_GRAPHS=ON \
-DLLAMA_CUDA_PEER_MAX_BATCH_SIZE=128 \
-DCMAKE_BUILD_TYPE=Release
make -j$(nproc)
echo "llama.cpp 编译完成(已启用 CUDA 图和 QKV 并发)"
./bin/main \
-m /path/to/model.gguf \
-p "Explain the theory of relativity:" \
-n 512 \
--n-gpu-layers 99 \
--batch-size 512 \
--ctx-size 4096 \
--threads 8 \
--backend-sampling \
--flash-attn \
--temp 0.8 \
--top-k 40 \
--top-p 0.95 \
--repeat-penalty 1.1
import os
import ctypes
from llama_cpp import Llama
os.environ['GGML_CUDA_GRAPH_OPT'] = '1'
os.environ['GGML_CUDA_FA_ALL_QUANTS'] = '1'
llm = Llama(
model_path="/path/to/nemotron-nano-v2-q4_k_m.gguf",
n_gpu_layers=99,
n_ctx=4096,
n_batch=512,
n_threads=8,
use_mmap=True,
use_mlock=False,
flash_attn=True,
offload_kqv=True,
logits_all=False,
vocab_only=False,
verbose=True
)
print("llama.cpp 初始化完成(已启用 CUDA 图和 QKV 并发)")
prompt = """You are a helpful AI assistant. Answer the following question: Question: What are the key differences between quantum computing and classical computing? Answer:"""
print("开始生成(使用 GPU 采样和 CUDA 图优化)...")
output = llm(
prompt,
max_tokens=512,
temperature=0.8,
top_k=40,
top_p=0.95,
repeat_penalty=1.1,
stop=["Question:", "\n\n"],
stream=True
)
full_response = ""
for chunk in output:
text = chunk['choices'][0]['text']
print(text, end='', flush=True)
full_response += text
print("\n\n生成完成!")
print(f"\n性能统计:")
print(f"- 提示词 token 数:{chunk['usage']['prompt_tokens']}")
print(f"- 生成 token 数:{chunk['usage']['completion_tokens']}")
print(f"- 总 token 数:{chunk['usage']['total_tokens']}")
Ollama 优化配置示例
Ollama 的最新更新包括 Flash Attention、改进的内存管理和 LogProbs API:
import requests
import json
OLLAMA_API = "http://localhost:11434/api"
def configure_ollama_optimizations():
""" 配置 Ollama 以启用所有性能优化 """
config = {
"flash_attention": {
"enabled": True,
"tile_size": 128,
"use_cuda_graphs": True
},
"memory": {
"gpu_memory_fraction": 0.9,
"enable_unified_memory": True,
"offload_layers": "auto"
},
"performance": {
"num_parallel": 4,
"num_thread": 8,
"batch_size": 512,
"use_mmap": True
},
"ggml": {
"cuda_graph_opt": True,
"fa_all_quants": True
}
}
print("Ollama 优化配置:")
print(json.dumps(config, indent=2))
return config
def generate_with_logprobs(prompt, model="nemotron-nano:latest"):
""" 使用 Ollama 的 LogProbs API 生成文本
LogProbs 可用于分类、困惑度计算和自我评估
参数:
prompt: 输入提示词
model: 模型名称
返回:
response: 包含生成文本和 logprobs 的响应
"""
url = f"{OLLAMA_API}/generate"
payload = {
"model": model,
"prompt": prompt,
"stream": False,
"options": {
"temperature": 0.8,
"top_k": 40,
"top_p": 0.95,
"repeat_penalty": 1.1,
"num_predict": 512,
"num_ctx": 4096,
"num_batch": 512,
"num_gpu": 99,
"num_thread": 8,
"flash_attn": True,
"logprobs": True,
"top_logprobs": 5
}
}
print(f"发送请求到 Ollama API...")
response = requests.post(url, json=payload)
if response.status_code == 200:
result = response.json()
return result
else:
raise Exception(f"API 请求失败:{response.status_code} - {response.text}")
def classify_with_logprobs(text, categories, model="nemotron-nano:latest"):
""" 使用 LogProbs 进行零样本文本分类
参数:
text: 要分类的文本
categories: 类别列表
model: 模型名称
返回:
classification: 分类结果和置信度
"""
prompt = f"""Classify the following text into one of these categories: {', '.join(categories)} Text: {text} Category:"""
result = generate_with_logprobs(prompt, model)
generated_text = result['response'].strip()
if 'logprobs' in result:
logprobs = result['logprobs']
category_probs = {}
for category in categories:
category_lower = category.lower()
if category_lower in generated_text.lower():
prob = 0.0
if 'token_logprobs' in logprobs:
token_logprobs = logprobs['token_logprobs']
if len(token_logprobs) > 0:
prob = sum(token_logprobs) / len(token_logprobs)
category_probs[category] = prob
if category_probs:
best_category = max(category_probs, key=category_probs.get)
confidence = category_probs[best_category]
else:
best_category = generated_text
confidence = 0.0
else:
best_category = generated_text
confidence = 0.0
return {'category': best_category, 'confidence': confidence, 'all_probabilities': category_probs if 'logprobs' in result else {}}
def calculate_perplexity(text, model="nemotron-nano:latest"):
""" 使用 LogProbs 计算文本的困惑度
困惑度是语言模型质量的重要指标
参数:
text: 输入文本
model: 模型名称
返回:
perplexity: 困惑度值
"""
import math
result = generate_with_logprobs(text, model)
if 'logprobs' in result and 'token_logprobs' in result['logprobs']:
token_logprobs = result['logprobs']['token_logprobs']
avg_neg_log_likelihood = -sum(token_logprobs) / len(token_logprobs)
perplexity = math.exp(avg_neg_log_likelihood)
return {'perplexity': perplexity, 'avg_log_likelihood': -avg_neg_log_likelihood, 'num_tokens': len(token_logprobs)}
else:
return {'perplexity': None, 'error': 'LogProbs not available'}
if __name__ == "__main__":
print("=== 配置 Ollama 优化 ===")
config = configure_ollama_optimizations()
print("\n=== 示例 1:基本文本生成 ===")
prompt = "Explain the concept of machine learning in simple terms:"
result = generate_with_logprobs(prompt)
print(f"生成结果:{result['response']}")
print("\n=== 示例 2:使用 LogProbs 进行文本分类 ===")
text_to_classify = "The stock market reached new highs today as investors reacted positively to economic data."
categories = ["Technology", "Finance", "Sports", "Politics", "Entertainment"]
classification = classify_with_logprobs(text_to_classify, categories)
print(f"分类结果:{classification['category']}")
print(f"置信度:{classification['confidence']:.4f}")
print(f"所有概率:{classification['all_probabilities']}")
print("\n=== 示例 3:计算文本困惑度 ===")
sample_text = "The quick brown fox jumps over the lazy dog."
perplexity_result = calculate_perplexity(sample_text)
print(f"困惑度:{perplexity_result['perplexity']:.2f}")
print(f"平均对数似然:{perplexity_result['avg_log_likelihood']:.4f}")
print(f"Token 数:{perplexity_result['num_tokens']}")
LTX-2 高级音视频模型:RTX AI PC 上的云级性能
NVIDIA 与 Lightricks 合作发布了 LTX-2 模型权重,这是一个先进的音视频模型,可以与云端模型竞争,并且可以在 RTX AI PC 或 DGX Spark 上运行。这是一个开放的、生产就绪的音视频基础模型,可生成长达 20 秒的同步 AV 内容,分辨率高达 4K,帧率可达 50fps,并为开发者、研究人员和工作室提供多模态控制以实现高扩展性。
模型权重提供 BF16 和 NVFP8 两种格式。量化检查点实现了 30% 的内存减少,使模型能够在 RTX GPU 和 DGX Spark 上高效运行。
LTX-2 音视频生成完整示例
下面展示如何使用 LTX-2 模型生成高质量的音视频内容。注意显存占用较大,建议优先使用 FP8 版本。
import torch
import torchaudio
import torchvision
from diffusers import DiffusionPipeline
from transformers import CLIPTextModel, CLIPTokenizer
import numpy as np
from PIL import Image
class LTX2AudioVideoGenerator:
""" LTX-2 音视频生成器
支持 4K 分辨率、50fps 帧率和多模态控制
"""
def __init__(self, model_path, use_fp8=True, device="cuda"):
""" 初始化 LTX-2 生成器
参数:
model_path: 模型路径(BF16 或 NVFP8 格式)
use_fp8: 是否使用 FP8 量化(节省 30% 内存)
device: 计算设备
"""
self.device = device
self.use_fp8 = use_fp8
print(f"加载 LTX-2 模型({'NVFP8' if use_fp8 else 'BF16'} 格式)...")
self.pipeline = DiffusionPipeline.from_pretrained(
model_path,
torch_dtype=torch.float8_e4m3fn if use_fp8 else torch.bfloat16,
variant="fp8" if use_fp8 else None,
use_safetensors=True
)
self.pipeline.enable_model_cpu_offload()
self.pipeline.enable_vae_slicing()
self.pipeline.enable_attention_slicing()
try:
self.pipeline.enable_xformers_memory_efficient_attention()
print("已启用 xFormers 内存高效注意力")
except:
print("xFormers 不可用,使用标准注意力")
self.pipeline = self.pipeline.to(device)
print("LTX-2 模型加载完成!")
def generate_video(
self, prompt, audio_prompt=None, duration=10.0, resolution=(3840, 2160),
fps=50, num_inference_steps=50, guidance_scale=7.5, seed=None
):
""" 生成音视频内容
参数:
prompt: 视频描述文本
audio_prompt: 音频描述文本(可选)
duration: 视频时长(秒),最长 20 秒
resolution: 视频分辨率(宽,高)
fps: 帧率,最高 50fps
num_inference_steps: 去噪步数
guidance_scale: 引导强度
seed: 随机种子
返回:
video: 视频张量 (T, C, H, W)
audio: 音频张量 (C, T)
"""
if duration > 20.0:
print("警告:时长超过 20 秒,将截断到 20 秒")
duration = 20.0
if fps > 50:
print("警告:帧率超过 50fps,将限制到 50fps")
fps = 50
num_frames = int(duration * fps)
if seed is not None:
torch.manual_seed(seed)
np.random.seed(seed)
print(f"生成参数:")
print(f" - 提示词:{prompt}")
print(f" - 音频提示词:{audio_prompt or '(无)'}")
print(f" - 时长:{duration}秒")
print(f" - 分辨率:{resolution[0]}x{resolution[1]}")
print(f" - 帧率:{fps}fps")
print(f" - 总帧数:{num_frames}")
print(f" - 去噪步数:{num_inference_steps}")
inputs = {
'prompt': prompt,
'num_frames': num_frames,
'height': resolution[1],
'width': resolution[0],
'num_inference_steps': num_inference_steps,
'guidance_scale': guidance_scale,
'output_type': 'pt'
}
if audio_prompt:
inputs['audio_prompt'] = audio_prompt
inputs['audio_guidance_scale'] = guidance_scale * 0.8
print("开始生成音视频内容...")
with torch.inference_mode():
output = self.pipeline(**inputs)
video = output.frames
audio = output.audio if hasattr(output, 'audio') else None
print(f"生成完成!")
print(f" - 视频形状:{video.shape}")
if audio is not None:
print(f" - 音频形状:{audio.shape}")
return video, audio
def generate_with_image_control(
self, prompt, control_image, audio_prompt=None, duration=10.0, fps=50,
controlnet_conditioning_scale=1.0
):
""" 使用图像控制生成音视频
LTX-2 支持多模态控制,可以使用参考图像引导生成
参数:
prompt: 文本提示词
control_image: 控制图像(PIL Image 或 tensor)
audio_prompt: 音频提示词
duration: 视频时长
fps: 帧率
controlnet_conditioning_scale: 控制强度
返回:
video: 生成的视频
audio: 生成的音频
"""
if isinstance(control_image, Image.Image):
control_image = torchvision.transforms.ToTensor()(control_image)
control_image = control_image.to(self.device)
print(f"使用图像控制生成音视频...")
print(f" - 控制图像形状:{control_image.shape}")
print(f" - 控制强度:{controlnet_conditioning_scale}")
num_frames = int(duration * fps)
inputs = {
'prompt': prompt,
'image': control_image,
'num_frames': num_frames,
'num_inference_steps': 50,
'guidance_scale': 7.5,
'controlnet_conditioning_scale': controlnet_conditioning_scale,
'output_type': 'pt'
}
if audio_prompt:
inputs['audio_prompt'] = audio_prompt
with torch.inference_mode():
output = self.pipeline(**inputs)
return output.frames, output.audio if hasattr(output, 'audio') else None
def save_video(self, video, audio, output_path, fps=50):
""" 保存音视频到文件
参数:
video: 视频张量 (T, C, H, W)
audio: 音频张量 (C, T)
output_path: 输出文件路径
fps: 帧率
"""
print(f"保存音视频到:{output_path}")
video_np = video.permute(0, 2, 3, 1).cpu().numpy()
video_np = (video_np * 255).astype(np.uint8)
if audio is not None:
video_tensor = torch.from_numpy(video_np).permute(0, 3, 1, 2)
torchvision.io.write_video(
output_path, video_tensor, fps=fps, video_codec='h264',
options={'crf': '18'}
)
audio_path = output_path.replace('.mp4', '_audio.wav')
torchaudio.save(audio_path, audio.cpu(), sample_rate=48000)
print(f" - 视频已保存:{output_path}")
print(f" - 音频已保存:{audio_path}")
print(f" - 使用 ffmpeg 合并:ffmpeg -i {output_path} -i {audio_path} -c copy output_with_audio.mp4")
else:
video_tensor = torch.from_numpy(video_np).permute(0, 3, 1, 2)
torchvision.io.write_video(
output_path, video_tensor, fps=fps, video_codec='h264',
options={'crf': '18'}
)
print(f" - 视频已保存:{output_path}")
if __name__ == "__main__":
generator = LTX2AudioVideoGenerator(
model_path="/path/to/ltx2-fp8",
use_fp8=True,
device="cuda"
)
print("\n=== 示例 1:生成 4K 50fps 音视频 ===")
video, audio = generator.generate_video(
prompt="A serene mountain landscape at sunrise, with mist rolling over peaks, cinematic lighting, 4K quality",
audio_prompt="Gentle ambient music with nature sounds, peaceful morning atmosphere",
duration=10.0,
resolution=(3840, 2160),
fps=50,
num_inference_steps=50,
guidance_scale=7.5,
seed=42
)
generator.save_video(video, audio, "mountain_sunrise_4k50.mp4", fps=50)
print("\n=== 示例 2:使用参考图像控制生成 ===")
reference_image = Image.open("reference_frame.jpg")
video2, audio2 = generator.generate_with_image_control(
prompt="Continue this scene with dynamic camera movement, add flying birds",
control_image=reference_image,
audio_prompt="Add wind sounds and bird chirping",
duration=15.0,
fps=50,
controlnet_conditioning_scale=0.8
)
generator.save_video(video2, audio2, "controlled_generation_4k50.mp4", fps=50)
print("\n所有生成任务完成!")
LTX-2 批量生成和优化
对于需要生成大量音视频内容的场景,可以使用批量生成和内存优化:
import torch
from torch.utils.data import Dataset, DataLoader
import json
from pathlib import Path
class VideoPromptDataset(Dataset):
""" 视频提示词数据集
用于批量生成音视频内容
"""
def __init__(self, prompts_file):
""" 初始化数据集
参数:
prompts_file: JSON 格式的提示词文件
格式:[{"prompt": "...", "audio_prompt": "...", "duration": 10.0}, ...]
"""
with open(prompts_file, 'r', encoding='utf-8') as f:
self.prompts = json.load(f)
def __len__(self):
return len(self.prompts)
def __getitem__(self, idx):
return self.prompts[idx]
def batch_generate_videos(generator, prompts_file, output_dir, batch_size=1,
resolution=(1920, 1080),
fps=30):
""" 批量生成音视频内容
参数:
generator: LTX2AudioVideoGenerator 实例
prompts_file: 提示词文件路径
output_dir: 输出目录
batch_size: 批处理大小
resolution: 视频分辨率
fps: 帧率
"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
dataset = VideoPromptDataset(prompts_file)
dataloader = DataLoader(
dataset, batch_size=batch_size, shuffle=False, num_workers=0
)
print(f"开始批量生成,共{len(dataset)}个视频...")
for batch_idx, batch in enumerate(dataloader):
print(f"\n处理批次 {batch_idx + 1}/{len(dataloader)}")
for i, item in enumerate(batch):
idx = batch_idx * batch_size + i
prompt = item['prompt']
audio_prompt = item.get('audio_prompt', None)
duration = item.get('duration', 10.0)
print(f"\n生成视频 {idx + 1}/{len(dataset)}")
print(f" 提示词:{prompt[:50]}...")
try:
video, audio = generator.generate_video(
prompt=prompt,
audio_prompt=audio_prompt,
duration=duration,
resolution=resolution,
fps=fps,
num_inference_steps=40,
guidance_scale=7.5,
seed=42 + idx
)
output_file = output_path / f"video_{idx:04d}.mp4"
generator.save_video(video, audio, str(output_file), fps=fps)
print(f" ✓ 已保存:{output_file}")
del video, audio
torch.cuda.empty_cache()
except Exception as e:
print(f" ✗ 生成失败:{e}")
continue
print(f"\n批量生成完成!所有视频已保存到:{output_dir}")
if __name__ == "__main__":
prompts = [
{"prompt": "A futuristic cityscape at night with neon lights and flying cars", "audio_prompt": "Cyberpunk ambient music with electronic beats", "duration": 10.0},
{"prompt": "Underwater coral reef with colorful fish swimming, sunlight filtering through water", "audio_prompt": "Underwater ambience with gentle water sounds", "duration": 15.0},
{"prompt": "Time-lapse of a flower blooming, macro photography, beautiful colors", "audio_prompt": "Gentle classical music, peaceful atmosphere", "duration": 8.0}
]
with open('video_prompts.json', 'w', encoding='utf-8') as f:
json.dump(prompts, f, indent=2, ensure_ascii=False)
generator = LTX2AudioVideoGenerator(
model_path="/path/to/ltx2-fp8",
use_fp8=True,
device="cuda"
)
batch_generate_videos(
generator=generator,
prompts_file='video_prompts.json',
output_dir='generated_videos',
batch_size=1,
resolution=(1920, 1080),
fps=30
)
本地 AI 智能体工具包:Nemotron 3 Nano 和 Docling
私有本地智能体的用例是无穷无尽的,但构建可靠、可重复和高质量的私有智能体仍然是一个挑战。当你蒸馏和量化模型以适应 PC 上有限的 VRAM 预算时,LLM 质量会下降。随着智能体工作流需要在与其他工具或操作交互时提供可靠和可重复的答案,对准确性的需求也在增加。
为了解决这个问题,开发者通常使用两种工具来提高准确性:微调和检索增强生成(RAG)。NVIDIA 发布了更新以加速构建智能体 AI 的整个工作流。
Nemotron 3 Nano 微调示例
Nemotron 3 Nano 是一个 32B 参数的 MoE 模型,针对智能体 AI 和微调进行了优化。拥有 3.6B 活跃参数和 1M 上下文窗口,它在编码、指令遵循、长上下文推理和 STEM 任务等多个基准测试中名列前茅。
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training
from datasets import load_dataset
from trl import SFTTrainer, DataCollatorForCompletionOnlyLM
import bitsandbytes as bnb
class NemotronNanoFineTuner:
""" Nemotron 3 Nano 微调器
使用 LoRA 进行参数高效微调
"""
def __init__(self, model_name="nvidia/nemotron-3-nano-32b", use_4bit=True, device="cuda"):
""" 初始化微调器
参数:
model_name: 模型名称或路径
use_4bit: 是否使用 4 位量化(节省内存)
device: 计算设备
"""
self.device = device
self.use_4bit = use_4bit
print(f"加载 Nemotron 3 Nano 模型...")
print(f" - 模型:{model_name}")
print(f" - 4 位量化:{use_4bit}")
if use_4bit:
from transformers import BitsAndBytesConfig
bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.bfloat16,
bnb_4bit_use_double_quant=True
)
else:
bnb_config = None
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
quantization_config=bnb_config,
device_map="auto",
trust_remote_code=True,
torch_dtype=torch.bfloat16,
attn_implementation="flash_attention_2"
)
self.tokenizer = AutoTokenizer.from_pretrained(
model_name, trust_remote_code=True
)
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
if use_4bit:
self.model = prepare_model_for_kbit_training(self.model)
print("模型加载完成!")
print(f" - 参数总数:{sum(p.numel() for p in self.model.parameters()):,}")
print(f" - 可训练参数:{sum(p.numel() for p in self.model.parameters() if p.requires_grad):,}")
def setup_lora(self, r=16, lora_alpha=32, lora_dropout=0.05, target_modules=None):
""" 配置 LoRA(低秩适应)
参数:
r: LoRA 秩,控制可训练参数数量
lora_alpha: LoRA 缩放因子
lora_dropout: LoRA dropout 率
target_modules: 要应用 LoRA 的目标模块
返回:
model: 应用 LoRA 后的模型
"""
print("配置 LoRA...")
if target_modules is None:
target_modules = ["q_proj",
"k_proj",
"v_proj",
"o_proj",
"gate_proj",
"up_proj",
"down_proj"]
lora_config = LoraConfig(
r=r,
lora_alpha=lora_alpha,
target_modules=target_modules,
lora_dropout=lora_dropout,
bias="none",
task_type="CAUSAL_LM"
)
self.model = get_peft_model(self.model, lora_config)
trainable_params = sum(p.numel() for p in self.model.parameters() if p.requires_grad)
all_params = sum(p.numel() for p in self.model.parameters())
trainable_percent = 100 * trainable_params / all_params
print("LoRA 配置完成!")
print(f" - LoRA 秩:{r}")
print(f" - LoRA alpha: {lora_alpha}")
print(f" - 目标模块:{target_modules}")
print(f" - 可训练参数:{trainable_params:,} ({trainable_percent:.2f}%)")
return self.model
def prepare_dataset(self, dataset_name, text_column="text", max_length=2048, num_samples=None):
""" 准备训练数据集
参数:
dataset_name: 数据集名称或路径
text_column: 文本列名
max_length: 最大序列长度
num_samples: 使用的样本数(None 表示全部)
返回:
dataset: 处理后的数据集
"""
print(f"加载数据集:{dataset_name}")
dataset = load_dataset(dataset_name, split="train")
if num_samples is not None:
dataset = dataset.shuffle(seed=42).select(range(min(num_samples, len(dataset))))
print(f"数据集大小:{len(dataset)}")
def format_instruction(example):
""" 格式化指令数据为 Nemotron 格式 """
if "instruction" in example and "response" in example:
text = f"<|user|>\n{example['instruction']}\n<|assistant|>\n{example['response']}"
elif text_column in example:
text = example[text_column]
else:
text = str(example)
return {"text": text}
dataset = dataset.map(format_instruction, remove_columns=dataset.column_names)
print("数据集准备完成!")
return dataset
def train(self, train_dataset, output_dir="./nemotron_nano_finetuned", num_epochs=3, batch_size=4,
gradient_accumulation_steps=4, learning_rate=2e-4, max_seq_length=2048,
logging_steps=10, save_steps=100):
""" 执行微调训练
参数:
train_dataset: 训练数据集
output_dir: 输出目录
num_epochs: 训练轮数
batch_size: 批处理大小
gradient_accumulation_steps: 梯度累积步数
learning_rate: 学习率
max_seq_length: 最大序列长度
logging_steps: 日志记录步数
save_steps: 模型保存步数
"""
print("开始训练...")
from transformers import TrainingArguments
training_args = TrainingArguments(
output_dir=output_dir,
num_train_epochs=num_epochs,
per_device_train_batch_size=batch_size,
gradient_accumulation_steps=gradient_accumulation_steps,
learning_rate=learning_rate,
fp16=False,
bf16=True,
logging_steps=logging_steps,
save_steps=save_steps,
save_total_limit=3,
optim="paged_adamw_8bit",
lr_scheduler_type="cosine",
warmup_ratio=0.05,
max_grad_norm=1.0,
report_to="tensorboard",
load_best_model_at_end=True,
metric_for_best_model="loss",
greater_is_better=False,
ddp_find_unused_parameters=False
)
response_template = "<|assistant|>\n"
collator = DataCollatorForCompletionOnlyLM(
response_template=response_template,
tokenizer=self.tokenizer
)
trainer = SFTTrainer(
model=self.model,
args=training_args,
train_dataset=train_dataset,
tokenizer=self.tokenizer,
data_collator=collator,
max_seq_length=max_seq_length,
packing=False
)
print(f"训练配置:")
print(f" - 轮数:{num_epochs}")
print(f" - 批大小:{batch_size}")
print(f" - 梯度累积:{gradient_accumulation_steps}")
print(f" - 有效批大小:{batch_size * gradient_accumulation_steps}")
print(f" - 学习率:{learning_rate}")
print(f" - 最大序列长度:{max_seq_length}")
trainer.train()
print(f"保存模型到:{output_dir}")
trainer.save_model(output_dir)
self.tokenizer.save_pretrained(output_dir)
print("训练完成!")
if __name__ == "__main__":
finetuner = NemotronNanoFineTuner(
model_name="nvidia/nemotron-3-nano-32b",
use_4bit=True,
device="cuda"
)
finetuner.setup_lora(
r=16,
lora_alpha=32,
lora_dropout=0.05,
target_modules=None
)
train_dataset = finetuner.prepare_dataset(
dataset_name="timdettmers/openassistant-guanaco",
max_length=2048,
num_samples=1000
)
finetuner.train(
train_dataset=train_dataset,
output_dir="./nemotron_nano_agent_finetuned",
num_epochs=3,
batch_size=2,
gradient_accumulation_steps=8,
learning_rate=2e-4,
max_seq_length=2048,
logging_steps=10,
save_steps=100
)
print("\n微调完成!模型已保存。")
print("可以使用 Ollama 或 llama.cpp 加载微调后的模型进行推理。")
Docling RAG 管道示例
Docling 是一个用于摄取、分析和处理文档的包,将其转换为机器可理解的语言用于 RAG 管道。Docling 针对 RTX PC 和 DGX Spark 进行了优化,相比 CPU 提供 4 倍的性能提升。
import torch
from docling.document_converter import DocumentConverter
from docling.datamodel.base_models import InputFormat
from docling.pipeline.standard_pdf_pipeline import StandardPdfPipeline
from docling.backend.pypdfium2_backend import PyPdfiumDocumentBackend
from transformers import AutoTokenizer, AutoModel
import faiss
import numpy as np
from pathlib import Path
class DoclingRAGPipeline:
""" 基于 Docling 的 RAG 管道
支持 GPU 加速的文档处理和向量检索
"""
def __init__(self, embedding_model="nvidia/nv-embed-v1", device="cuda", use_gpu_ocr=True):
""" 初始化 RAG 管道
参数:
embedding_model: 嵌入模型名称
device: 计算设备
use_gpu_ocr: 是否使用 GPU 加速 OCR
"""
self.device = device
self.use_gpu_ocr = use_gpu_ocr
print("初始化 Docling RAG 管道...")
self.doc_converter = DocumentConverter(
allowed_formats=[
InputFormat.PDF,
InputFormat.DOCX,
InputFormat.PPTX,
InputFormat.HTML,
InputFormat.IMAGE
]
)
if use_gpu_ocr:
pdf_pipeline = StandardPdfPipeline(
backend=PyPdfiumDocumentBackend,
ocr_enabled=True,
ocr_engine="easyocr",
device=device,
table_structure_enabled=True,
figure_extraction_enabled=True
)
self.doc_converter.set_pipeline(InputFormat.PDF, pdf_pipeline)
print(" - 已启用 GPU 加速 OCR 和布局分析")
print(f"加载嵌入模型:{embedding_model}")
self.tokenizer = AutoTokenizer.from_pretrained(embedding_model)
self.embedding_model = AutoModel.from_pretrained(embedding_model).to(device)
self.embedding_model.eval()
self.index = None
self.documents = []
self.metadata = []
print("Docling RAG 管道初始化完成!")
def process_document(self, file_path):
""" 处理文档并提取内容
使用 GPU 加速的 OCR 和布局分析
参数:
file_path: 文档文件路径
返回:
result: 处理结果,包含文本、表格、图像等
"""
print(f"处理文档:{file_path}")
result = self.doc_converter.convert(file_path)
document_data = {
'text': result.document.export_to_markdown(),
'tables': [],
'figures': [],
'metadata': result.document.metadata
}
for table in result.document.tables:
table_data = {
'content': table.export_to_dataframe(),
'caption': table.caption,
'page': table.page_no
}
document_data['tables'].append(table_data)
for figure in result.document.figures:
figure_data = {
'image': figure.image,
'caption': figure.caption,
'page': figure.page_no
}
document_data['figures'].append(figure_data)
print(f" - 提取文本:{len(document_data['text'])} 字符")
print(f" - 提取表格:{len(document_data['tables'])} 个")
print(f" - 提取图形:{len(document_data['figures'])} 个")
return document_data
def chunk_text(self, text, chunk_size=512, overlap=50):
""" 将文本分块用于嵌入
参数:
text: 输入文本
chunk_size: 块大小(token 数)
overlap: 重叠大小(token 数)
返回:
chunks: 文本块列表
"""
tokens = self.tokenizer.encode(text, add_special_tokens=False)
chunks = []
start = 0
while start < len(tokens):
end = start + chunk_size
chunk_tokens = tokens[start:end]
chunk_text = self.tokenizer.decode(chunk_tokens)
chunks.append(chunk_text)
start += chunk_size - overlap
return chunks
def embed_texts(self, texts, batch_size=32):
""" 将文本转换为嵌入向量
使用 GPU 加速批处理
参数:
texts: 文本列表
batch_size: 批处理大小
返回:
embeddings: 嵌入向量数组 (N, D)
"""
print(f"生成嵌入向量(共{len(texts)}个文本)...")
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch_texts = texts[i:i+batch_size]
inputs = self.tokenizer(
batch_texts, padding=True, truncation=True, max_length=512,
return_tensors="pt").to(self.device)
with torch.no_grad():
outputs = self.embedding_model(**inputs)
embeddings = outputs.last_hidden_state[:, 0, :].cpu().numpy()
all_embeddings.append(embeddings)
all_embeddings = np.vstack(all_embeddings)
print(f" - 嵌入维度:{all_embeddings.shape}")
return all_embeddings
def build_index(self, documents_dir, use_gpu_index=True):
""" 构建向量索引
参数:
documents_dir: 文档目录
use_gpu_index: 是否使用 GPU 加速的 FAISS 索引
"""
print(f"构建向量索引(文档目录:{documents_dir})...")
doc_path = Path(documents_dir)
all_chunks = []
all_metadata = []
for file_path in doc_path.glob("**/*"):
if file_path.is_file() and file_path.suffix.lower() in ['.pdf', '.docx', '.pptx', '.html']:
print(f"\n处理:{file_path.name}")
try:
doc_data = self.process_document(str(file_path))
chunks = self.chunk_text(doc_data['text'])
for i, chunk in enumerate(chunks):
all_chunks.append(chunk)
all_metadata.append({'file': str(file_path), 'chunk_id': i, 'total_chunks': len(chunks)})
print(f" - 生成 {len(chunks)} 个文本块")
except Exception as e:
print(f" - 处理失败:{e}")
continue
print(f"\n总共处理 {len(all_chunks)} 个文本块")
embeddings = self.embed_texts(all_chunks)
dimension = embeddings.shape[1]
if use_gpu_index and torch.cuda.is_available():
print("创建 GPU 加速的 FAISS 索引...")
cpu_index = faiss.IndexFlatL2(dimension)
gpu_resources = faiss.StandardGpuResources()
self.index = faiss.index_cpu_to_gpu(gpu_resources, 0, cpu_index)
else:
print("创建 CPU FAISS 索引...")
self.index = faiss.IndexFlatL2(dimension)
self.index.add(embeddings.astype('float32'))
self.documents = all_chunks
self.metadata = all_metadata
print(f"索引构建完成!")
print(f" - 索引大小:{self.index.ntotal}")
print(f" - 向量维度:{dimension}")
def search(self, query, top_k=5):
""" 搜索相关文档
参数:
query: 查询文本
top_k: 返回前 K 个结果
返回:
results: 搜索结果列表
"""
if self.index is None:
raise ValueError("索引未构建,请先调用 build_index()")
print(f"搜索:{query}")
query_embedding = self.embed_texts([query])
distances, indices = self.index.search(query_embedding.astype('float32'), top_k)
results = []
for i, (dist, idx) in enumerate(zip(distances[0], indices[0])):
result = {
'rank': i + 1,
'score': float(dist),
'text': self.documents[idx],
'metadata': self.metadata[idx]
}
results.append(result)
print(f"找到 {len(results)} 个结果")
return results
def generate_answer(self, query, llm_model, top_k=3):
""" 使用 RAG 生成答案
参数:
query: 用户查询
llm_model: 语言模型(如 Nemotron Nano)
top_k: 检索的文档数量
返回:
answer: 生成的答案
"""
results = self.search(query, top_k=top_k)
context = "\n\n".join([f"文档 {r['rank']}:\n{r['text']}" for r in results])
prompt = f"""基于以下文档回答问题。如果文档中没有相关信息,请说明无法回答。
文档:{context}
问题:{query}
答案:"""
answer = llm_model(prompt)
return {'answer': answer, 'sources': [r['metadata'] for r in results]}
if __name__ == "__main__":
rag = DoclingRAGPipeline(
embedding_model="nvidia/nv-embed-v1",
device="cuda",
use_gpu_ocr=True
)
rag.build_index(
documents_dir="/path/to/documents",
use_gpu_index=True
)
query = "What are the key features of NVIDIA RTX GPUs?"
results = rag.search(query, top_k=5)
print("\n搜索结果:")
for result in results:
print(f"\n排名 {result['rank']} (得分:{result['score']:.4f})")
print(f"文件:{result['metadata']['file']}")
print(f"内容:{result['text'][:200]}...")
音视频效果 SDK:AI 增强的多媒体处理
NVIDIA 视频和音频效果 SDK 使开发者能够在多媒体管道上应用 AI 效果,使用背景噪声去除、虚拟背景或眼神接触等功能来增强质量。
CES 2026 的最新更新增强了视频重新照明功能,可在不同环境中产生更自然和稳定的结果,同时性能提升 3 倍(将运行所需的最低 GPU 降低到 NVIDIA GeForce RTX 3060 或更高版本),模型大小减少高达 6 倍。
视频效果 SDK 集成示例
import cv2
import numpy as np
import torch
from nvidia_vfx import VideoEffects, Effect
class NVIDIAVideoEffectsProcessor:
""" NVIDIA 视频效果处理器
支持 AI 驱动的背景替换、重新照明、眼神接触等效果
"""
def __init__(self, device="cuda"):
""" 初始化视频效果处理器
参数:
device: 计算设备
"""
self.device = device
print("初始化 NVIDIA 视频效果 SDK...")
self.vfx = VideoEffects(device=device)
self.effects = {
'background_blur': Effect.BACKGROUND_BLUR,
'background_replace': Effect.BACKGROUND_REPLACE,
'virtual_background': Effect.VIRTUAL_BACKGROUND,
'video_relighting': Effect.VIDEO_RELIGHTING,
'eye_contact': Effect.EYE_CONTACT,
'face_tracking': Effect.FACE_TRACKING,
'super_resolution': Effect.SUPER_RESOLUTION
}
print("视频效果 SDK 初始化完成!")
print(f"可用效果:{list(self.effects.keys())}")
def setup_background_replacement(self, background_image_path=None, blur_strength=0.8):
""" 配置背景替换效果
参数:
background_image_path: 背景图像路径(None 表示使用模糊)
blur_strength: 模糊强度(0-1)
"""
print("配置背景替换效果...")
if background_image_path:
background = cv2.imread(background_image_path)
background = cv2.cvtColor(background, cv2.COLOR_BGR2RGB)
self.vfx.set_effect(self.effects['virtual_background'], background_image=background)
print(f" - 使用自定义背景:{background_image_path}")
else:
self.vfx.set_effect(self.effects['background_blur'], blur_strength=blur_strength)
print(f" - 使用背景模糊(强度:{blur_strength})")
def setup_video_relighting(self, light_direction=(0.0, -1.0, 0.0), light_intensity=1.0, ambient_intensity=0.3):
""" 配置视频重新照明效果(CES 2026 增强版)
性能提升 3 倍,模型大小减少 6 倍
参数:
light_direction: 光源方向(x, y, z)
light_intensity: 光源强度
ambient_intensity: 环境光强度
"""
print("配置视频重新照明效果(增强版)...")
self.vfx.set_effect(
self.effects['video_relighting'],
light_direction=light_direction,
light_intensity=light_intensity,
ambient_intensity=ambient_intensity,
use_enhanced_model=True,
min_gpu="RTX_3060"
)
print(f" - 光源方向:{light_direction}")
print(f" - 光源强度:{light_intensity}")
print(f" - 环境光强度:{ambient_intensity}")
print(f" - 性能提升:3x")
print(f" - 模型大小:减少 6x")
def setup_eye_contact(self, strength=0.9):
""" 配置眼神接触效果
自动调整眼睛方向使其看向摄像头
参数:
strength: 效果强度(0-1)
"""
print("配置眼神接触效果...")
self.vfx.set_effect(self.effects['eye_contact'], strength=strength)
print(f" - 效果强度:{strength}")
def process_frame(self, frame):
""" 处理单个视频帧
应用所有已配置的效果
参数:
frame: 输入帧(BGR 格式)
返回:
processed_frame: 处理后的帧
"""
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame_tensor = torch.from_numpy(frame_rgb).permute(2, 0, 1).unsqueeze(0)
frame_tensor = frame_tensor.float() / 255.0
frame_tensor = frame_tensor.to(self.device)
with torch.no_grad():
processed_tensor = self.vfx.process(frame_tensor)
processed_frame = processed_tensor.squeeze(0).permute(1, 2, 0).cpu().numpy()
processed_frame = (processed_frame * 255).astype(np.uint8)
processed_frame = cv2.cvtColor(processed_frame, cv2.COLOR_RGB2BGR)
return processed_frame
def process_video(self, input_video_path, output_video_path, show_preview=True):
""" 处理整个视频
参数:
input_video_path: 输入视频路径
output_video_path: 输出视频路径
show_preview: 是否显示预览
"""
print(f"处理视频:{input_video_path}")
cap = cv2.VideoCapture(input_video_path)
fps = int(cap.get(cv2.CAP_PROP_FPS))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"视频属性:")
print(f" - 分辨率:{width}x{height}")
print(f" - 帧率:{fps} fps")
print(f" - 总帧数:{total_frames}")
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
frame_count = 0
while True:
ret, frame = cap.read()
if not ret:
break
processed_frame = self.process_frame(frame)
out.write(processed_frame)
if show_preview:
comparison = np.hstack([frame, processed_frame])
cv2.imshow('Original | Processed', comparison)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
frame_count += 1
if frame_count % 30 == 0:
progress = (frame_count / total_frames) * 100
print(f"处理进度:{progress:.1f}% ({frame_count}/{total_frames})")
cap.release()
out.release()
cv2.destroyAllWindows()
print(f"视频处理完成!")
print(f"输出保存到:{output_video_path}")
def process_webcam(self, camera_id=0):
""" 实时处理摄像头输入
参数:
camera_id: 摄像头 ID
"""
print(f"启动实时处理(摄像头 {camera_id})...")
print("按 'q' 退出")
cap = cv2.VideoCapture(camera_id)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
while True:
ret, frame = cap.read()
if not ret:
break
processed_frame = self.process_frame(frame)
comparison = np.hstack([frame, processed_frame])
cv2.imshow('Webcam: Original | Processed', comparison)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
processor = NVIDIAVideoEffectsProcessor(device="cuda")
print("\n=== 示例 1:背景替换 ===")
processor.setup_background_replacement(background_image_path="/path/to/background.jpg")
print("\n=== 示例 2:视频重新照明 ===")
processor.setup_video_relighting(
light_direction=(0.5, -1.0, 0.3),
light_intensity=1.2,
ambient_intensity=0.4
)
print("\n=== 示例 3:眼神接触 ===")
processor.setup_eye_contact(strength=0.9)
processor.process_video(
input_video_path="input_video.mp4",
output_video_path="output_video_enhanced.mp4",
show_preview=True
)
总结与展望
NVIDIA 与开源社区的深度合作正在推动 AI PC 工具生态系统的快速发展。从 ComfyUI 的扩散模型优化、llama.cpp 和 Ollama 的 LLM 加速,到 LTX-2 的高质量音视频生成,再到 Nemotron 3 Nano 和 Docling 的智能体 AI 工具包,这些更新为开发者提供了强大的工具集,使他们能够在 RTX PC 和 DGX Spark 上构建下一代 AI 应用。
- ComfyUI:NVFP4 格式实现 3-4 倍性能提升,FP8 格式实现 2 倍提升,分别节省 60% 和 40% 显存
- llama.cpp:MoE 模型 token 生成性能提升 35%,模型加载时间在 DGX Spark 上提升 65%,RTX GPU 上提升 15%
- Ollama:MoE 模型性能提升 30%,Flash Attention 默认启用,新增 LogProbs API
- LTX-2:支持 4K 50fps 音视频生成,NVFP8 量化节省 30% 内存
- Docling:相比 CPU 实现 4 倍性能提升,支持 GPU 加速的 OCR 和布局分析
- 视频效果 SDK:重新照明功能性能提升 3 倍,模型大小减少 6 倍,最低 GPU 要求降至 RTX 3060
这些优化不仅提升了性能,还降低了硬件门槛,使更多开发者能够在本地 PC 上运行高质量的 AI 模型。随着开源社区的持续创新和 NVIDIA 的技术支持,我们可以期待 AI PC 生态系统在 2026 年及以后继续蓬勃发展。
相关资源
相关免费在线工具
- 加密/解密文本
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
- RSA密钥对生成器
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
- Mermaid 预览与可视化编辑
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
- 随机西班牙地址生成器
随机生成西班牙地址(支持马德里、加泰罗尼亚、安达卢西亚、瓦伦西亚筛选),支持数量快捷选择、显示全部与下载。 在线工具,随机西班牙地址生成器在线工具,online
- Gemini 图片去水印
基于开源反向 Alpha 混合算法去除 Gemini/Nano Banana 图片水印,支持批量处理与下载。 在线工具,Gemini 图片去水印在线工具,online
- curl 转代码
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online