本地部署 AI 增强型 SearXNG 搜索引擎技术文档
项目概述
什么是 SearXNG
SearXNG 是一款开源的元搜索引擎,具有以下特点:
- 聚合多个搜索引擎结果
- 保护用户隐私(不记录搜索历史)
- 可自定义搜索引擎源
- 支持多种搜索类别(网页、图片、视频、新闻等)
档详细介绍了如何在本地环境部署 AI 增强的 SearXNG 搜索引擎。内容涵盖系统架构设计、Docker 及直接安装方式、Ollama/LocalAI 等 AI 模型集成方案、联网搜索配置、以及性能优化与安全策略。通过整合自然语言处理与语义搜索,实现了智能查询理解、结果重排序和内容摘要生成。该方案支持企业知识搜索、学术研究及多语言应用,强调隐私保护与数据所有权,提供了完整的监控、备份与维护指南,适合对搜索定制化及隐私有高要求的组织使用。
SearXNG 是一款开源的元搜索引擎,具有以下特点:
将 AI 能力集成到 SearXNG 中,可实现:
┌─────────────────────────────────────────────────────────┐
│ 用户界面层 │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Web 前端 │ │ 移动端 │ │ API 接口 │ │
│ └────────────┘ └────────────┘ └────────────┘ │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ AI 处理层 │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ 查询理解 │ │ 结果排序 │ │ 内容摘要 │ │
│ │ LLM 集成 │ │ 个性化算法 │ │ 翻译服务 │ │
│ └────────────┘ └────────────┘ └────────────┘ │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ SearXNG 核心层 │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ 路由管理 │ │ 引擎管理 │ │ 缓存管理 │ │
│ │ 请求分发 │ │ 结果聚合 │ │ 插件系统 │ │
│ └────────────┘ └────────────┘ └────────────┘ │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ 数据源层 │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ Google │ │ Bing │ │ DuckDuckGo │ │
│ │ Wikipedia │ │ GitHub │ │ 其他引擎 │ │
│ └────────────┘ └────────────┘ └────────────┘ │
└─────────────────────────────────────────────────────────┘
# 创建项目目录
mkdir searxng-ai && cd searxng-ai
# 创建 docker-compose.yml
cat > docker-compose.yml <<'EOF'
version: '3.8'
services:
# SearXNG 服务
searxng:
image: searxng/searxng:latest
container_name: searxng
ports:
- "8080:8080"
volumes:
- ./searxng:/etc/searxng:rw
- ./searxng-data:/var/log/searxng:rw
environment:
- SEARXNG_BASE_URL=https://${SEARXNG_HOSTNAME:-localhost}/
restart: unless-stopped
networks:
- searxng-net
# Redis 缓存(可选但推荐)
redis:
image: redis:alpine
container_name: searxng-redis
command: redis-server --appendonly yes
volumes:
- ./redis-data:/data
restart: unless-stopped
networks:
- searxng-net
# 反向代理(可选,用于 HTTPS)
nginx:
image: nginx:alpine
container_name: searxng-nginx
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx/conf.d:/etc/nginx/conf.d:ro
- ./nginx/ssl:/etc/nginx/ssl:ro
- ./nginx/html:/usr/share/nginx/html:ro
depends_on:
- searxng
restart: unless-stopped
networks:
- searxng-net
networks:
searxng-net:
driver: bridge
EOF
# 创建配置目录
mkdir -p searxng nginx/{conf.d,ssl,html}
# 启动服务
docker-compose up -d
# 查看日志
docker-compose logs -f searxng
# 安装系统依赖
sudo apt update
sudo apt install -y \
git \
build-essential \
libxslt-dev \
zlib1g-dev \
libffi-dev \
libssl-dev \
python3-dev \
python3-venv \
python3-pip \
uwsgi \
uwsgi-plugin-python3
# 克隆 SearXNG 仓库
git clone https://github.com/searxng/searxng.git searxng-ai
cd searxng-ai
# 创建 Python 虚拟环境
python3 -m venv venv
source venv/bin/activate
# 安装依赖
pip install --upgrade pip
pip install -r requirements.txt
# 生成密钥
sed -i "s/ultrasecretkey/$(openssl rand -hex 32)/g" searxng/settings.yml
# 配置设置
cat > searxng/settings.yml <<'EOF'
use_default_settings: true
server:
secret_key: "your-generated-secret-key-here"
base_url: "http://localhost:8080/"
port: 8080
bind_address: "0.0.0.0"
search:
safe_search: 0
autocomplete: google
default_lang: zh
engines:
- name: google
engine: google
shortcut: g
disabled: false
- name: bing
engine: bing
shortcut: b
disabled: false
- name: duckduckgo
engine: duckduckgo
shortcut: d
disabled: false
redis:
url: redis://localhost:6379/0
EOF
# 启动 SearXNG
python3 searxng.py
# 安装 Ollama
curl -fsSL https://ollama.com/install.sh | sh
# 启动 Ollama 服务
ollama serve
# 下载模型(以 llama2 为例)
ollama pull llama2:7b
# 或下载中文优化模型
ollama pull qwen:7b
# 测试模型
ollama run llama2:7b "你好,这是一个测试"
# 使用 Docker 部署 LocalAI
docker run -d \
-p 8081:8080 \
-v ./models:/models \
--name localai \
quay.io/go-skynet/local-ai:latest \
--models-path /models \
--context-size 700 \
--threads 4
# 下载模型
wget -O /models/ggml-gpt4all-j.bin \
https://gpt4all.io/models/ggml-gpt4all-j.bin
# 安装 transformers
pip install transformers torch
# 创建 AI 服务脚本 ai_service.py
import flask
from transformers import AutoModelForCausalLM, AutoTokenizer
app = flask.Flask(__name__)
# 加载模型(根据硬件选择合适模型)
model_name = "bert-base-chinese" # 或更大的模型
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)
@app.route('/ai/process', methods=['POST'])
def process_query():
data = flask.request.json
query = data.get('query', '')
# 简单的查询扩展
inputs = tokenizer(query, return_tensors="pt")
outputs = model.generate(**inputs, max_length=50)
expanded_query = tokenizer.decode(outputs[0], skip_special_tokens=True)
return flask.jsonify({
'original': query,
'expanded': expanded_query,
'intent': analyze_intent(query)
})
def analyze_intent(query):
# 意图分析逻辑
return "information_search"
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
# query_processor.py
import requests
import json
class AIQueryProcessor:
def __init__(self, ai_service_url="http://localhost:5000"):
self.ai_service_url = ai_service_url
def process_query(self, query, context=None):
"""
处理用户查询,包括:
1. 查询意图识别
2. 查询扩展
3. 同义词生成
4. 多语言支持
"""
# 发送到 AI 服务
payload = {
'query': query,
'context': context,
'language': 'zh-CN'
}
try:
response = requests.post(f"{self.ai_service_url}/ai/process", json=payload, timeout=5)
if response.status_code == 200:
return response.json()
except Exception as e:
print(f"AI processing error: {e}")
# 备用方案:基于规则的查询扩展
return self.rule_based_expansion(query)
def rule_based_expansion(self, query):
"""基于规则的查询扩展"""
expansions = {
"怎么": ["如何", "怎样", "方法", "步骤"],
"为什么": ["原因", "缘故", "为何"],
"最好的": ["最佳", "最优", "顶级", "推荐"]
}
expanded = [query]
for keyword, synonyms in expansions.items():
if keyword in query:
for synonym in synonyms:
expanded.append(query.replace(keyword, synonym))
return {'original': query, 'expanded': expanded[:3], 'intent': 'general_search'}
# result_ranker.py
import numpy as np
from sentence_transformers import SentenceTransformer
class AIResultRanker:
def __init__(self):
# 加载轻量级嵌入模型
self.model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
def rerank_results(self, query, results, top_n=10):
"""
基于语义相似度重新排序结果
"""
if not results:
return results
# 提取结果文本
result_texts = []
for result in results:
text = f"{result.get('title','')} {result.get('content','')}"
result_texts.append(text)
# 计算相似度
query_embedding = self.model.encode([query])
result_embeddings = self.model.encode(result_texts)
# 计算余弦相似度
similarities = np.dot(result_embeddings, query_embedding.T).flatten()
# 按相似度排序
ranked_indices = np.argsort(similarities)[::-1]
# 返回重新排序的结果
ranked_results = []
for idx in ranked_indices[:top_n]:
results[idx]['similarity_score'] = float(similarities[idx])
ranked_results.append(results[idx])
return ranked_results
# summarizer.py
from transformers import pipeline
class ContentSummarizer:
def __init__(self):
# 使用轻量级摘要模型
self.summarizer = pipeline("summarization", model="csebuetnlp/mT5_multilingual_XLSum", device=-1) # -1 for CPU, 0 for GPU
def summarize(self, text, max_length=150, min_length=50):
""" 生成内容摘要 """
if len(text) < 100:
return text
try:
summary = self.summarizer(
text,
max_length=max_length,
min_length=min_length,
do_sample=False
)
return summary[0]['summary_text']
except Exception as e:
print(f"Summarization error: {e}")
# 回退方案:提取前 N 个句子
sentences = text.split('。')
return '。'.join(sentences[:3]) + '。'
编辑 SearXNG 配置文件 searxng/settings.yml:
engines:
# 国际搜索引擎
- name: google
engine: google
shortcut: g
disabled: false
use_mobile_ui: false
raise_on_http_error: true
- name: bing
engine: bing
shortcut: b
disabled: false
- name: duckduckgo
engine: duckduckgo
shortcut: ddg
disabled: false
# 中文搜索引擎
- name: baidu
engine: baidu
shortcut: bd
disabled: false
language: zh-CN
- name: sogou
engine: sogou
shortcut: sg
disabled: false
# 学术搜索
- name: google scholar
engine: google scholar
shortcut: gs
disabled: false
- name: semanticscholar
engine: semanticscholar
shortcut: ss
disabled: false
# 多媒体搜索
- name: youtube
engine: youtube
shortcut: yt
disabled: false
- name: vimeo
engine: vimeo
shortcut: vm
disabled: false
# 代码搜索
- name: github
engine: github
shortcut: gh
disabled: false
- name: gitlab
engine: gitlab
shortcut: gl
disabled: false
如果需要通过代理访问:
# 在 settings.yml 中添加 server:
# ... 其他配置 ...
outgoing:
proxies:
- http://proxy-server:port
- socks5://proxy-server:port
# 为特定引擎使用特定代理
per_engine_proxies:
google:
- http://google-proxy:port
baidu:
- http://baidu-proxy:port
# 请求头设置
request_timeout: 10.0
max_request_timeout: 15.0
enable_http: true
verify: true # 验证 SSL 证书
search:
# 搜索限制
max_page_number: 5 # 最大翻页数
max_results_page: 20 # 每页结果数
max_results: 100 # 总结果数限制
# 速率限制
limiter: true
limiter_times: 10 # 每时间段允许的请求数
limiter_period: 60 # 时间段(秒)
# 缓存设置
cache: redis # 使用 redis 缓存
cache_expire: 86400 # 缓存过期时间(秒)
# docker-compose.full.yml
version: '3.8'
services:
# SearXNG 主服务
searxng:
image: searxng/searxng:latest
container_name: searxng-ai
ports:
- "8080:8080"
volumes:
- ./searxng-config:/etc/searxng:rw
- ./searxng-data:/var/log/searxng:rw
- ./ai-plugins:/plugins:ro
environment:
- SEARXNG_BASE_URL=http://localhost:8080/
- AI_SERVICE_URL=http://ai-processor:5000
depends_on:
- redis
- ai-processor
networks:
- ai-search-net
restart: unless-stopped
# AI 处理服务
ai-processor:
build: ./ai-service
container_name: ai-processor
ports:
- "5000:5000"
volumes:
- ./ai-models:/models:ro
- ./ai-cache:/cache:rw
environment:
- MODEL_PATH=/models/ggml-model.bin
- CACHE_ENABLED=true
- MAX_TOKENS=2048
networks:
- ai-search-net
deploy:
resources:
limits:
memory: 4G
reservations:
memory: 2G
restart: unless-stopped
# Ollama LLM 服务
ollama:
image: ollama/ollama:latest
container_name: ollama
ports:
- "11434:11434"
volumes:
- ./ollama-models:/root/.ollama
networks:
- ai-search-net
restart: unless-stopped
# Redis 缓存
redis:
image: redis:alpine
container_name: searxng-redis
command: redis-server --appendonly yes
volumes:
- ./redis-data:/data
networks:
- ai-search-net
restart: unless-stopped
# PostgreSQL 数据库(可选,用于存储历史)
postgres:
image: postgres:15
container_name: searxng-db
environment:
POSTGRES_DB: searxng
POSTGRES_USER: searxng
POSTGRES_PASSWORD: ${DB_PASSWORD}
volumes:
- ./postgres-data:/var/lib/postgresql/data
networks:
- ai-search-net
restart: unless-stopped
# 监控服务
prometheus:
image: prom/prometheus:latest
container_name: prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- ./prometheus-data:/prometheus
networks:
- ai-search-net
restart: unless-stopped
# 可视化监控
grafana:
image: grafana/grafana:latest
container_name: grafana
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_PASSWORD}
volumes:
- ./grafana-data:/var/lib/grafana
networks:
- ai-search-net
restart: unless-stopped
networks:
ai-search-net:
driver: bridge
volumes:
searxng-data:
redis-data:
postgres-data:
ai-models:
ai-cache:
ollama-models:
prometheus-data:
grafana-data:
创建 AI 插件目录结构:
ai-plugins/
├── __init__.py
├── query_processor.py
├── result_ranker.py
├── summarizer.py
├── translator.py
└── config.yaml
插件配置文件 config.yaml:
plugins:
query_processor:
enabled: true
priority: 100
config:
model: "llama2:7b"
max_tokens: 512
temperature: 0.7
result_ranker:
enabled: true
priority: 200
config:
method: "semantic"
model: "paraphrase-multilingual-MiniLM-L12-v2"
top_k: 10
summarizer:
enabled: true
priority: 300
config:
model: "mT5_multilingual_XLSum"
max_length: 200
min_length: 50
translator:
enabled: true
priority: 400
config:
source_lang: "auto"
target_lang: "zh"
provider: "local" # local 或 external
# api_extensions.py
from flask import Blueprint, request, jsonify
import requests
ai_bp = Blueprint('ai_extensions', __name__)
@ai_bp.route('/api/v1/ai/search', methods=['POST'])
def ai_enhanced_search():
""" AI 增强搜索 API """
data = request.json
query = data.get('query', '')
use_ai = data.get('use_ai', True)
# 1. AI 查询处理
if use_ai:
processed_query = process_with_ai(query)
else:
processed_query = query
# 2. 执行搜索
search_results = perform_search(processed_query)
# 3. AI 结果处理
if use_ai:
enhanced_results = enhance_results_with_ai(search_results, query)
else:
enhanced_results = search_results
return jsonify({
'query': query,
'processed_query': processed_query,
'results': enhanced_results,
'ai_enhanced': use_ai
})
@ai_bp.route('/api/v1/ai/summarize', methods=['POST'])
def summarize_content():
""" 内容摘要 API """
data = request.json
text = data.get('text', '')
max_length = data.get('max_length', 200)
summary = generate_summary(text, max_length)
return jsonify({
'original_length': len(text),
'summary': summary,
'summary_length': len(summary)
})
@ai_bp.route('/api/v1/ai/chat', methods=['POST'])
def chat_with_results():
""" 基于搜索结果的对话 API """
data = request.json
query = data.get('query', '')
conversation_history = data.get('history', [])
# 1. 搜索相关信息
search_results = perform_search(query)
# 2. 提取相关内容
relevant_content = extract_relevant_content(search_results, query)
# 3. 生成回答
answer = generate_answer(query, relevant_content, conversation_history)
return jsonify({
'query': query,
'answer': answer,
'sources': [r['url'] for r in search_results[:3]],
'timestamp': datetime.now().isoformat()
})
# 企业专用搜索引擎配置
engines:
- name: internal_wiki
engine: elasticsearch
shortcut: wiki
base_url: http://wiki.internal:9200
index: wiki_pages
- name: code_repository
engine: opensearch
shortcut: code
base_url: http://git.internal:9200
index: codebase
- name: document_store
engine: whoosh
shortcut: docs
index_dir: /var/lib/searxng/document_index
# academic_plugin.py
class AcademicSearchPlugin:
def process_academic_query(self, query):
"""处理学术查询"""
# 识别查询类型
query_types = {
'literature_review': ['综述', '研究现状', 'literature review'],
'methodology': ['方法', 'method', 'methodology'],
'results': ['结果', 'findings', 'results']
}
for qtype, keywords in query_types.items():
if any(keyword in query.lower() for keyword in keywords):
return self.expand_academic_query(query, qtype)
return query
def expand_academic_query(self, query, query_type):
"""扩展学术查询"""
expansions = {
'literature_review': [
f"{query} 研究现状",
f"{query} 最新进展",
f"{query} systematic review"
],
'methodology': [
f"{query} 实验方法",
f"{query} 研究设计",
f"{query} experimental design"
]
}
return expansions.get(query_type, [query])
# caching.py
import redis
import json
import hashlib
from functools import wraps
class AICacheManager:
def __init__(self, redis_url="redis://localhost:6379/0"):
self.redis = redis.from_url(redis_url)
self.default_ttl = 3600 # 1 小时
def cache_key(self, func_name, *args, **kwargs):
"""生成缓存键"""
data = f"{func_name}:{str(args)}:{str(kwargs)}"
return hashlib.md5(data.encode()).hexdigest()
def cached(self, ttl=None):
"""缓存装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存键
cache_key = self.cache_key(func.__name__, *args, **kwargs)
# 尝试从缓存获取
cached_result = self.redis.get(cache_key)
if cached_result:
return json.loads(cached_result)
# 执行函数
result = func(*args, **kwargs)
# 存储到缓存
self.redis.setex(
cache_key, ttl or self.default_ttl, json.dumps(result)
)
return result
return wrapper
return decorator
# 使用示例
cache_manager = AICacheManager()
@cache_manager.cached(ttl=1800) # 缓存 30 分钟
def process_query_with_ai(query):
"""AI 查询处理(带缓存)"""
# AI 处理逻辑
return processed_result
# async_processor.py
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
class AsyncAIProcessor:
def __init__(self, max_workers=4):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.session = None
async def process_multiple_queries(self, queries):
"""并发处理多个查询"""
if not self.session:
self.session = aiohttp.ClientSession()
tasks = []
for query in queries:
task = self.process_single_query(query)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def process_single_query(self, query):
"""处理单个查询"""
# 异步调用 AI 服务
async with self.session.post("http://ai-service:5000/process", json={"query": query}, timeout=10) as response:
if response.status == 200:
return await response.json()
else:
return {"error": "AI service unavailable"}
def run_sync(self, queries):
"""同步接口(用于兼容)"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(self.process_multiple_queries(queries))
finally:
loop.close()
# load_balancer.py
import random
import time
from collections import defaultdict
class AILoadBalancer:
def __init__(self, endpoints):
self.endpoints = endpoints
self.endpoint_stats = defaultdict(lambda: {'requests': 0, 'errors': 0, 'avg_response_time': 0, 'last_used': 0})
self.max_errors = 5
self.error_window = 300 # 5 分钟
def get_best_endpoint(self):
"""选择最佳端点"""
# 移除最近错误过多的端点
available_endpoints = []
current_time = time.time()
for endpoint in self.endpoints:
stats = self.endpoint_stats[endpoint]
# 检查错误率
if stats['errors'] > self.max_errors:
# 如果超过错误窗口,重置错误计数
if current_time - stats['last_used'] > self.error_window:
stats['errors'] = 0
else:
continue
available_endpoints.append(endpoint)
if not available_endpoints:
# 所有端点都有问题,重置并随机选择一个
self.reset_stats()
return random.choice(self.endpoints)
# 基于响应时间选择
return min(available_endpoints, key=lambda ep: self.endpoint_stats[ep]['avg_response_time'])
def update_stats(self, endpoint, response_time, success=True):
"""更新端点统计"""
stats = self.endpoint_stats[endpoint]
stats['requests'] += 1
stats['last_used'] = time.time()
# 更新平均响应时间(指数移动平均)
alpha = 0.3
stats['avg_response_time'] = (alpha * response_time + (1 - alpha) * stats['avg_response_time'])
if not success:
stats['errors'] += 1
def reset_stats(self):
"""重置所有统计"""
for endpoint in self.endpoints:
self.endpoint_stats[endpoint] = {'requests': 0, 'errors': 0, 'avg_response_time': 0, 'last_used': 0}
# security_settings.yml
security:
# 认证机制
authentication:
enabled: true
method: "basic" # basic, token, oauth2
users:
- username: "admin"
password_hash: "$2b$12$..." # bcrypt 哈希
permissions: ["admin", "search", "configure"]
- username: "user"
password_hash: "$2b$12$..."
permissions: ["search"]
# API 密钥
api_keys:
enabled: true
rotation_days: 30
max_keys_per_user: 3
# 请求限制
rate_limiting:
enabled: true
anonymous_requests_per_minute: 10
authenticated_requests_per_minute: 60
api_key_requests_per_minute: 100
# 内容过滤
content_filtering:
enabled: true
filter_level: "moderate" # strict, moderate, lenient
blocked_categories: ["adult", "violence", "hate"]
safe_search: true
# 隐私保护
privacy:
log_retention_days: 7
anonymize_ip: true
strip_query_params: ["api_key", "token", "password"]
do_not_track: true
# security.py
import re
from html import escape
class InputSanitizer:
def __init__(self):
# 定义允许的字符集
self.allowed_patterns = {
'query': re.compile(r'^[一-龥a-zA-Z0-9_\s\-.,!?;:\'"@#$%&*()+=]{1,500}$'),
'url': re.compile(r'^https?://[^\s/$.?#].[^\s]*$'),
'email': re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
}
def sanitize_query(self, query):
"""消毒搜索查询"""
if not query:
return ""
# 移除多余空格
query = ' '.join(query.split())
# 检查模式
if not self.allowed_patterns['query'].match(query):
# 移除不允许的字符
query = re.sub(r'[^一-龥a-zA-Z0-9_\s\-.,!?;:\'"@#$%&*()+=]', '', query)
# HTML 转义
query = escape(query)
# 截断长度
if len(query) > 500:
query = query[:500]
return query
def sanitize_url(self, url):
"""消毒 URL"""
if not url:
return ""
# 验证 URL 格式
if not self.allowed_patterns['url'].match(url):
raise ValueError("Invalid URL format")
# 移除危险协议
dangerous_protocols = ['javascript:', 'data:', 'vbscript:']
for protocol in dangerous_protocols:
if url.lower().startswith(protocol):
raise ValueError(f"Dangerous protocol detected: {protocol}")
return url
def detect_injection(self, input_str):
"""检测注入攻击"""
injection_patterns = [
(r'(\b(SELECT|INSERT|UPDATE|DELETE|DROP|UNION|EXEC)\b)', 'SQL Injection'),
(r'(<\s*script\b[^>]*>.*?<\s*/\s*script\s*>)', 'XSS Attack'),
(r'(\${\w+\([^)]*\)})', 'Template Injection'),
(r'(\b(eval|exec|compile|globals|locals)\b\s*\([^)]*\))', 'Code Injection')
]
for pattern, attack_type in injection_patterns:
if re.search(pattern, input_str, re.IGNORECASE):
return {'detected': True, 'attack_type': attack_type, 'input': input_str[:100]} # 只记录前 100 个字符
return {'detected': False}
# api_security.py
from flask import request, g, jsonify
from functools import wraps
import jwt
import datetime
def require_auth(f):
"""认证装饰器"""
@wraps(f)
def decorated(*args, **kwargs):
token = None
# 从头部获取 token
if 'Authorization' in request.headers:
auth_header = request.headers['Authorization']
if auth_header.startswith('Bearer '):
token = auth_header.split(' ')[1]
if not token:
return jsonify({'error': 'Token is missing'}), 401
try:
# 验证 token
data = jwt.decode(
token, current_app.config['SECRET_KEY'], algorithms=["HS256"]
)
g.user_id = data['user_id']
g.permissions = data.get('permissions', [])
except jwt.ExpiredSignatureError:
return jsonify({'error': 'Token has expired'}), 401
except jwt.InvalidTokenError:
return jsonify({'error': 'Invalid token'}), 401
return f(*args, **kwargs)
return decorated
def rate_limit(max_requests, window_seconds):
"""速率限制装饰器"""
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
# 基于 IP 或用户 ID 的限制
identifier = g.get('user_id') or request.remote_addr
# 使用 Redis 记录请求
key = f"rate_limit:{identifier}:{request.endpoint}"
# 检查请求计数
current = redis.incr(key)
if current == 1:
redis.expire(key, window_seconds)
if current > max_requests:
return jsonify({'error': 'Too many requests', 'retry_after': redis.ttl(key)}), 429
return f(*args, **kwargs)
return decorated
return decorator
def audit_log(f):
"""审计日志装饰器"""
@wraps(f)
def decorated(*args, **kwargs):
start_time = datetime.datetime.now()
# 执行函数
response = f(*args, **kwargs)
end_time = datetime.datetime.now()
duration = (end_time - start_time).total_seconds()
# 记录审计日志
audit_data = {
'timestamp': start_time.isoformat(),
'user_id': g.get('user_id'),
'ip_address': request.remote_addr,
'endpoint': request.endpoint,
'method': request.method,
'parameters': dict(request.args),
'duration_seconds': duration,
'response_status': response.status_code
}
# 写入日志系统(例如 Elasticsearch 或文件)
write_audit_log(audit_data)
return response
return decorated
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'searxng'
static_configs:
- targets: ['searxng:8080']
metrics_path: '/metrics'
- job_name: 'ai-processor'
static_configs:
- targets: ['ai-processor:5000']
metrics_path: '/metrics'
- job_name: 'redis'
static_configs:
- targets: ['redis:6379']
- job_name: 'node-exporter'
static_configs:
- targets: ['node-exporter:9100']
alerting:
alertmanagers:
- static_configs:
- targets: ['alertmanager:9093']
rule_files:
- "alerts.yml"
# alerts.yml
groups:
- name: searxng-alerts
rules:
- alert: HighErrorRate
expr: rate(searxng_http_errors_total[5m]) > 0.1
for: 2m
labels:
severity: warning
annotations:
summary: "High error rate on SearXNG"
description: "Error rate is {{ $value }} per second"
- alert: SlowResponseTime
expr: histogram_quantile(0.95, rate(searxng_request_duration_seconds_bucket[5m])) > 2
for: 5m
labels:
severity: warning
annotations:
summary: "Slow response time on SearXNG"
description: "95th percentile response time is {{ $value }} seconds"
- alert: AIServiceDown
expr: up{job="ai-processor"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "AI service is down"
description: "AI processor service has been down for more than 1 minute"
- alert: HighMemoryUsage
expr: (node_memory_MemTotal_bytes - node_memory_MemAvailable_bytes) / node_memory_MemTotal_bytes > 0.8
for: 5m
labels:
severity: warning
annotations:
summary: "High memory usage"
description: "Memory usage is at {{ $value | humanizePercentage }}"
#!/bin/bash
# backup-searxng.sh
BACKUP_DIR="/backups/searxng"
DATE=$(date +%Y%m%d_%H%M%S)
RETENTION_DAYS=30
# 创建备份目录
mkdir -p "$BACKUP_DIR/$DATE"
echo "Starting SearXNG backup on $DATE"
# 备份配置文件
echo "Backing up configurations..."
docker cp searxng:/etc/searxng "$BACKUP_DIR/$DATE/searxng-config"
# 备份数据
echo "Backing up data..."
docker cp searxng:/var/log/searxng "$BACKUP_DIR/$DATE/searxng-data"
# 备份 Redis 数据(如果启用)
if docker ps | grep -q searxng-redis; then
echo "Backing up Redis data..."
docker exec searxng-redis redis-cli save
docker cp searxng-redis:/data/dump.rdb "$BACKUP_DIR/$DATE/redis-dump.rdb"
fi
# 备份 PostgreSQL(如果启用)
if docker ps | grep -q searxng-db; then
echo "Backing up PostgreSQL database..."
docker exec searxng-db pg_dump -U searxng searxng > "$BACKUP_DIR/$DATE/searxng-db.sql"
fi
# 备份 AI 模型
echo "Backing up AI models..."
cp -r ./ai-models "$BACKUP_DIR/$DATE/ai-models"
# 创建备份元数据
cat > "$BACKUP_DIR/$DATE/backup-info.json" <<EOF
{
"timestamp": "$(date -Iseconds)",
"version": "1.0",
"components": [
"searxng-config",
"searxng-data",
"redis-data",
"postgresql-db",
"ai-models"
],
"size": "$(du -sh $BACKUP_DIR/$DATE | cut -f1)"
}
EOF
# 压缩备份
echo "Compressing backup..."
tar -czf "$BACKUP_DIR/searxng-backup-$DATE.tar.gz" -C "$BACKUP_DIR" "$DATE"
# 清理临时目录
rm -rf "$BACKUP_DIR/$DATE"
# 清理旧备份
echo "Cleaning up old backups..."
find "$BACKUP_DIR" -name "searxng-backup-*.tar.gz" -mtime +$RETENTION_DAYS -delete
echo "Backup completed: $BACKUP_DIR/searxng-backup-$DATE.tar.gz"
#!/bin/bash
# update-searxng.sh
echo "Starting SearXNG update process..."
# 停止服务
docker-compose down
# 备份当前状态
./backup-searxng.sh
# 拉取最新镜像
echo "Pulling latest images..."
docker-compose pull
# 更新配置文件(如果有新版本)
if [ -f "settings.yml.example" ]; then
echo "Checking for configuration updates..."
# 比较并合并配置
cp settings.yml settings.yml.backup
python3 -c "
import yaml, sys
with open('settings.yml.example', 'r') as f:
example = yaml.safe_load(f)
with open('settings.yml', 'r') as f:
current = yaml.safe_load(f)
# 合并新配置项
for key in example:
if key not in current:
current[key] = example[key]
with open('settings.yml', 'w') as f:
yaml.dump(current, f, default_flow_style=False)
"
fi
# 启动服务
echo "Starting updated services..."
docker-compose up -d
# 等待服务就绪
echo "Waiting for services to be ready..."
sleep 30
# 运行健康检查
echo "Running health checks..."
curl -f http://localhost:8080/health || echo "Health check failed!"
# 验证 AI 服务
echo "Verifying AI integration..."
curl -X POST http://localhost:5000/ai/process \
-H "Content-Type: application/json" \
-d '{"query": "test query"}' || echo "AI service check failed!"
echo "Update completed!"
本地部署 AI 增强型 SearXNG 搜索引擎提供了一个强大、隐私友好的搜索解决方案。通过集成本地 AI 模型,用户可以获得智能搜索体验,同时保持数据的私密性。本文档提供了完整的部署指南、技术架构说明和优化建议,帮助您成功部署和维护这样一个系统。
这个解决方案特别适合对隐私要求高、需要定制化搜索功能的企业、教育机构和研究组织。

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
解析常见 curl 参数并生成 fetch、axios、PHP curl 或 Python requests 示例代码。 在线工具,curl 转代码在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online