#!/bin/bash
echo "开始部署 StructBERT 情感分析服务..."
echo "======================================"
mkdir -p ~/nlp_structbert_sentiment
cd ~/nlp_structbert_sentiment
echo "步骤 1:创建虚拟环境..."
python3 -m venv venv
source venv/bin/activate
echo "步骤 2:安装依赖包..."
pip install torch==1.10.0 --index-url https://download.pytorch.org/whl/cpu
pip install transformers==4.18.0
pip install flask==2.1.0
pip install gradio==3.4.1
pip install pandas==1.4.2
pip install supervisor==4.2.4
echo "步骤 3:下载模型文件..."
mkdir -p models
cd models
echo "正在下载模型文件,这可能需要几分钟..."
cat > config.json << 'EOF'
{
"model_type": "bert",
"hidden_size": 768,
"num_hidden_layers": 12,
"num_attention_heads": 12,
"vocab_size": 21128,
"type_vocab_size": 2,
"max_position_embeddings": 512
}
EOF
echo "步骤 4:创建 WebUI 应用..."
cd ~/nlp_structbert_sentiment
cat > webui.py << 'EOF'
import gradio as gr
import pandas as pd
from transformers import BertTokenizer, BertForSequenceClassification
import torch
import json
print("正在加载模型...")
model_path = "./models"
tokenizer = BertTokenizer.from_pretrained(model_path)
model = BertForSequenceClassification.from_pretrained(model_path)
model.eval()
labels = ["负面", "中性", "正面"]
def analyze_sentiment(text):
"""分析单条文本情感"""
inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=128)
with torch.no_grad():
outputs = model(**inputs)
probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1)
pred_label = torch.argmax(probabilities, dim=-1).item()
confidence = probabilities[0][pred_label].item()
result = {
"text": text,
"sentiment": labels[pred_label],
"confidence": round(confidence, 4),
"probabilities": {
"负面": round(probabilities[0][0].item(), 4),
"中性": round(probabilities[0][1].item(), 4),
"正面": round(probabilities[0][2].item(), 4)
}
}
return result
def batch_analyze(texts):
"""批量分析情感"""
texts_list = texts.strip().split('\n')
results = []
for text in texts_list:
if text.strip():
result = analyze_sentiment(text.strip())
results.append(result)
df = pd.DataFrame([{
"文本": r["text"],
"情感倾向": r["sentiment"],
"置信度": r["confidence"],
"负面概率": r["probabilities"]["负面"],
"中性概率": r["probabilities"]["中性"],
"正面概率": r["probabilities"]["正面"]
} for r in results])
return df
with gr.Blocks(title="StructBERT 中文情感分析") as demo:
gr.Markdown("# StructBERT 中文情感分析系统")
gr.Markdown("输入中文文本,分析情感倾向(正面/负面/中性)")
with gr.Tab("单文本分析"):
with gr.Row():
with gr.Column():
input_text = gr.Textbox(label="输入文本", placeholder="请输入要分析的中文文本...", lines=3)
analyze_btn = gr.Button("开始分析", variant="primary")
with gr.Column():
output_json = gr.JSON(label="分析结果")
analyze_btn.click(analyze_sentiment, inputs=input_text, outputs=output_json)
with gr.Tab("批量分析"):
with gr.Row():
with gr.Column():
batch_input = gr.Textbox(label="批量输入", placeholder="每行一条文本...", lines=10)
batch_btn = gr.Button("开始批量分析", variant="primary")
with gr.Column():
batch_output = gr.Dataframe(label="分析结果", headers=["文本", "情感倾向", "置信度", "负面概率", "中性概率", "正面概率"])
batch_btn.click(batch_analyze, inputs=batch_input, outputs=batch_output)
gr.Markdown("### 使用说明")
gr.Markdown("""
1. **单文本分析**:在左侧输入文本,点击'开始分析'查看结果
2. **批量分析**:在批量输入框中每行输入一条文本,点击'开始批量分析'
3. **结果说明**:
- 情感倾向:正面、负面或中性
- 置信度:模型对判断的把握程度(0-1 之间)
- 概率分布:三种情感的具体概率值
""")
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860)
EOF
echo "步骤 5:创建 API 服务..."
cat > api.py << 'EOF'
from flask import Flask, request, jsonify
from transformers import BertTokenizer, BertForSequenceClassification
import torch
import json
app = Flask(__name__)
print("正在加载模型...")
model_path = "./models"
tokenizer = BertTokenizer.from_pretrained(model_path)
model = BertForSequenceClassification.from_pretrained(model_path)
model.eval()
labels = ["negative", "neutral", "positive"]
@app.route('/health', methods=['GET'])
def health_check():
"""健康检查接口"""
return jsonify({"status": "healthy", "model": "structbert-sentiment"})
@app.route('/predict', methods=['POST'])
def predict():
"""单文本情感预测"""
try:
data = request.get_json()
text = data.get('text', '')
if not text:
return jsonify({"error": "text parameter is required"}), 400
inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=128)
with torch.no_grad():
outputs = model(**inputs)
probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1)
pred_label = torch.argmax(probabilities, dim=-1).item()
confidence = probabilities[0][pred_label].item()
result = {
"text": text,
"sentiment": labels[pred_label],
"confidence": round(confidence, 4),
"probabilities": {
"negative": round(probabilities[0][0].item(), 4),
"neutral": round(probabilities[0][1].item(), 4),
"positive": round(probabilities[0][2].item(), 4)
}
}
return jsonify(result)
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/batch_predict', methods=['POST'])
def batch_predict():
"""批量情感预测"""
try:
data = request.get_json()
texts = data.get('texts', [])
if not texts or not isinstance(texts, list):
return jsonify({"error": "texts parameter must be a non-empty list"}), 400
results = []
for text in texts:
inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=128)
with torch.no_grad():
outputs = model(**inputs)
probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1)
pred_label = torch.argmax(probabilities, dim=-1).item()
confidence = probabilities[0][pred_label].item()
results.append({
"text": text,
"sentiment": labels[pred_label],
"confidence": round(confidence, 4),
"probabilities": {
"negative": round(probabilities[0][0].item(), 4),
"neutral": round(probabilities[0][1].item(), 4),
"positive": round(probabilities[0][2].item(), 4)
}
})
return jsonify({"results": results, "count": len(results)})
except Exception as e:
return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080, debug=False)
EOF
echo "步骤 6:配置 Supervisor 进程管理..."
sudo bash -c 'cat > /etc/supervisor/conf.d/structbert.conf << EOF
[program:nlp_structbert_api]
command=/root/nlp_structbert_sentiment/venv/bin/python api.py
directory=/root/nlp_structbert_sentiment
autostart=true
autorestart=true
stderr_logfile=/var/log/structbert_api.err.log
stdout_logfile=/var/log/structbert_api.out.log
[program:nlp_structbert_webui]
command=/root/nlp_structbert_sentiment/venv/bin/python webui.py
directory=/root/nlp_structbert_sentiment
autostart=true
autorestart=true
stderr_logfile=/var/log/structbert_webui.err.log
stdout_logfile=/var/log/structbert_webui.out.log
EOF'
echo "步骤 7:启动服务..."
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start nlp_structbert_api nlp_structbert_webui
echo "======================================"
echo "部署完成!"
echo "WebUI 访问地址:http://localhost:7860"
echo "API 访问地址:http://localhost:8080"
echo ""
echo "常用管理命令:"
echo "查看状态:sudo supervisorctl status"
echo "重启 API:sudo supervisorctl restart nlp_structbert_api"
echo "重启 WebUI:sudo supervisorctl restart nlp_structbert_webui"
echo "查看日志:sudo supervisorctl tail -f nlp_structbert_api"
import asyncio
import websockets
import json
import time
import requests
from collections import defaultdict
from datetime import datetime, timedelta
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import threading
class DanmakuEmotionAnalyzer:
"""弹幕情绪分析器"""
def __init__(self, api_url="http://localhost:8080"):
self.api_url = api_url
self.danmaku_buffer = []
self.emotion_history = []
self.time_windows = [10, 30, 60]
self.emotion_stats = {
"positive": 0,
"negative": 0,
"neutral": 0,
"total": 0
}
self.window_stats = defaultdict(lambda: {
"positive": 0,
"negative": 0,
"neutral": 0,
"total": 0
})
def analyze_emotion(self, text):
"""分析单条弹幕情感"""
try:
response = requests.post(
f"{self.api_url}/predict", json={"text": text}, timeout=2
)
if response.status_code == 200:
result = response.json()
return result["sentiment"], result["confidence"]
else:
return "neutral", 0.5
except Exception as e:
print(f"情感分析失败:{e}")
return "neutral", 0.5
def process_danmaku(self, danmaku_text, timestamp=None):
"""处理单条弹幕"""
if timestamp is None:
timestamp = time.time()
emotion, confidence = self.analyze_emotion(danmaku_text)
record = {
"text": danmaku_text,
"emotion": emotion,
"confidence": confidence,
"timestamp": timestamp,
"time_str": datetime.fromtimestamp(timestamp).strftime("%H:%M:%S")
}
self.emotion_stats[emotion] += 1
self.emotion_stats["total"] += 1
for window in self.time_windows:
window_key = int(timestamp // window) * window
self.window_stats[window_key][emotion] += 1
self.window_stats[window_key]["total"] += 1
self.emotion_history.append(record)
if len(self.emotion_history) > 1000:
self.emotion_history.pop(0)
return record
def get_realtime_stats(self, window_seconds=30):
"""获取实时统计"""
current_time = time.time()
cutoff_time = current_time - window_seconds
recent_danmaku = [
d for d in self.emotion_history if d["timestamp"] > cutoff_time
]
if not recent_danmaku:
return {
"positive_ratio": 0,
"negative_ratio": 0,
"neutral_ratio": 0,
"total_count": 0,
"emotion_trend": "neutral"
}
total = len(recent_danmaku)
positive = sum(1 for d in recent_danmaku if d["emotion"] == "positive")
negative = sum(1 for d in recent_danmaku if d["emotion"] == "negative")
neutral = total - positive - negative
if positive > negative and positive > neutral:
trend = "positive"
elif negative > positive and negative > neutral:
trend = "negative"
else:
trend = "neutral"
return {
"positive_ratio": positive / total,
"negative_ratio": negative / total,
"neutral_ratio": neutral / total,
"total_count": total,
"emotion_trend": trend
}
def get_heatmap_data(self, window_size=60):
"""获取热力图数据"""
if not self.emotion_history:
return {"timestamps": [], "emotions": [], "intensity": []}
heatmap_data = defaultdict(lambda: {"positive": 0, "negative": 0, "neutral": 0})
for record in self.emotion_history:
window_key = int(record["timestamp"] // window_size) * window_size
heatmap_data[window_key][record["emotion"]] += 1
timestamps = []
emotions = []
intensity = []
for window_key, counts in sorted(heatmap_data.items()):
time_str = datetime.fromtimestamp(window_key).strftime("%H:%M:%S")
total = sum(counts.values())
if total > 0:
for emotion in ["positive", "neutral", "negative"]:
timestamps.append(time_str)
emotions.append(emotion)
intensity.append(counts[emotion] / total * 100)
return {
"timestamps": timestamps,
"emotions": emotions,
"intensity": intensity
}
class DanmakuSimulator:
"""弹幕模拟器(用于演示)"""
def __init__(self):
self.positive_samples = [
"哈哈哈笑死我了", "太好看了吧", "666", "神仙操作", "爱了爱了",
"前方高能", "泪目了", "太感动了", "这个特效绝了", "UP 主太有才了",
"收藏了", "三连了"
]
self.negative_samples = [
"就这?", "太水了", "无聊", "取关了", "广告太多了", "浪费时间",
"不好看", "什么鬼", "退钱", "辣眼睛", "太坑了", "失望"
]
self.neutral_samples = [
"来了", "第一", "打卡", "第几?", "有人吗", "几点开播",
"这是什么游戏", "背景音乐是什么", "UP 主哪里人", "多久更新一次"
]
def generate_danmaku(self, emotion_probabilities=None):
"""生成模拟弹幕"""
if emotion_probabilities is None:
emotion_probabilities = {"positive": 0.5, "negative": 0.2, "neutral": 0.3}
import random
rand = random.random()
if rand < emotion_probabilities["positive"]:
emotion = "positive"
samples = self.positive_samples
elif rand < emotion_probabilities["positive"] + emotion_probabilities["negative"]:
emotion = "negative"
samples = self.negative_samples
else:
emotion = "neutral"
samples = self.neutral_samples
text = random.choice(samples)
if random.random() < 0.3:
text = text + "!" * random.randint(1, 3)
if random.random() < 0.2:
text = "【" + text + "】"
return text, emotion
def create_dashboard(analyzer):
"""创建实时情绪热力图仪表盘"""
app = dash.Dash(__name__)
app.layout = html.Div([
html.H1("短视频弹幕实时情绪热力图", style={'textAlign': 'center'}),
html.Div([
html.Div([
html.H3("实时情绪统计"),
html.Div(id="realtime-stats", style={'fontSize': '20px'}),
dcc.Graph(id="emotion-pie-chart"),
], style={'width': '30%', 'display': 'inline-block', 'verticalAlign': 'top'}),
html.Div([
html.H3("情绪热力图"),
dcc.Graph(id="emotion-heatmap"),
dcc.Interval(interval=2000, id='interval-component', n_intervals=0),
], style={'width': '70%', 'display': 'inline-block'}),
]),
html.Div([
html.H3("最近弹幕"),
html.Div(id="recent-danmaku", style={
'height': '200px', 'overflowY': 'scroll', 'border': '1px solid #ddd', 'padding': '10px'
})
]),
html.Div([
html.H3("情绪趋势图"),
dcc.Graph(id="emotion-trend-chart"),
dcc.Interval(interval=5000, id='trend-interval', n_intervals=0),
]),
])
@app.callback(
[
Output('realtime-stats', 'children'),
Output('emotion-pie-chart', 'figure'),
Output('recent-danmaku', 'children'),
Output('emotion-heatmap', 'figure'),
Output('emotion-trend-chart', 'figure')
],
[
Input('interval-component', 'n_intervals'),
Input('trend-interval', 'n_intervals')
]
)
def update_dashboard(n, n_trend):
stats = analyzer.get_realtime_stats(window_seconds=30)
stats_text = html.Div([
html.P(f"总弹幕数:{analyzer.emotion_stats['total']}"),
html.P(f"实时弹幕/30 秒:{stats['total_count']}"),
html.P(f"正面情绪:{stats['positive_ratio']*100:.1f}%"),
html.P(f"负面情绪:{stats['negative_ratio']*100:.1f}%"),
html.P(f"中性情绪:{stats['neutral_ratio']*100:.1f}%"),
html.P(f"情绪趋势:{stats['emotion_trend']}"),
])
pie_fig = go.Figure(data=[go.Pie(
labels=['正面', '负面', '中性'],
values=[stats['positive_ratio'], stats['negative_ratio'], stats['neutral_ratio']],
hole=.3,
marker_colors=['#2E86AB', '#A23B72', '#F18F01']
)])
pie_fig.update_layout(title_text="实时情绪分布")
recent_danmaku = analyzer.emotion_history[-10:]
danmaku_list = []
for dm in reversed(recent_danmaku):
emotion_color = {
"positive": "#2E86AB", "negative": "#A23B72", "neutral": "#F18F01"
}.get(dm["emotion"], "#000000")
danmaku_list.append(html.P([
html.Span(f"[{dm['time_str']}] ", style={'color': '#666'}),
html.Span(dm["text"], style={'color': emotion_color}),
html.Span(f" ({dm['emotion']})", style={'color': '#999', 'fontSize': '12px'})
]))
heatmap_data = analyzer.get_heatmap_data(window_size=10)
heatmap_fig = go.Figure(data=go.Heatmap(
z=heatmap_data["intensity"],
x=heatmap_data["timestamps"],
y=heatmap_data["emotions"],
colorscale='RdBu',
zmin=0, zmax=100,
hoverongaps=False
))
heatmap_fig.update_layout(
title="情绪热力图(颜色越深表示比例越高)",
xaxis_title="时间",
yaxis_title="情绪类型",
height=400
)
trend_data = []
current_time = time.time()
for i in range(30):
window_start = current_time - (30 - i) * 10
window_end = window_start + 10
window_danmaku = [
d for d in analyzer.emotion_history if window_start <= d["timestamp"] < window_end
]
if window_danmaku:
positive = sum(1 for d in window_danmaku if d["emotion"] == "positive")
negative = sum(1 for d in window_danmaku if d["emotion"] == "negative")
neutral = sum(1 for d in window_danmaku if d["emotion"] == "neutral")
total = len(window_danmaku)
trend_data.append({
"time": datetime.fromtimestamp(window_start).strftime("%H:%M:%S"),
"positive": positive / total * 100 if total > 0 else 0,
"negative": negative / total * 100 if total > 0 else 0,
"neutral": neutral / total * 100 if total > 0 else 0,
})
if trend_data:
trend_df = pd.DataFrame(trend_data)
trend_fig = go.Figure()
trend_fig.add_trace(go.Scatter(
x=trend_df["time"], y=trend_df["positive"], mode='lines+markers',
name='正面', line=dict(color='#2E86AB', width=2)
))
trend_fig.add_trace(go.Scatter(
x=trend_df["time"], y=trend_df["negative"], mode='lines+markers',
name='负面', line=dict(color='#A23B72', width=2)
))
trend_fig.add_trace(go.Scatter(
x=trend_df["time"], y=trend_df["neutral"], mode='lines+markers',
name='中性', line=dict(color='#F18F01', width=2)
))
trend_fig.update_layout(
title="情绪趋势(最近 5 分钟)",
xaxis_title="时间",
yaxis_title="比例 (%)",
height=300
)
else:
trend_fig = go.Figure()
trend_fig.update_layout(title="暂无数据")
return stats_text, pie_fig, danmaku_list, heatmap_fig, trend_fig
return app
def simulate_danmaku_stream(analyzer, duration=300):
"""模拟弹幕流"""
simulator = DanmakuSimulator()
emotion_scenarios = [
{"positive": 0.7, "negative": 0.1, "neutral": 0.2},
{"positive": 0.4, "negative": 0.3, "neutral": 0.3},
{"positive": 0.2, "negative": 0.6, "neutral": 0.2},
{"positive": 0.8, "negative": 0.1, "neutral": 0.1},
{"positive": 0.5, "negative": 0.2, "neutral": 0.3},
]
start_time = time.time()
scenario_duration = duration / len(emotion_scenarios)
while time.time() - start_time < duration:
elapsed = time.time() - start_time
scenario_index = min(int(elapsed / scenario_duration), len(emotion_scenarios) - 1)
current_scenario = emotion_scenarios[scenario_index]
danmaku_text, expected_emotion = simulator.generate_danmaku(current_scenario)
analyzer.process_danmaku(danmaku_text)
print(f"[{datetime.now().strftime('%H:%M:%S')}] {danmaku_text} -> {expected_emotion}")
time.sleep(0.5 + random.random() * 2)
print("弹幕模拟结束")
if __name__ == "__main__":
import random
analyzer = DanmakuEmotionAnalyzer()
import threading
sim_thread = threading.Thread(target=simulate_danmaku_stream, args=(analyzer, 600))
sim_thread.daemon = True
sim_thread.start()
app = create_dashboard(analyzer)
app.run_server(debug=True, port=8050)