import torch
from transformers import pipeline
import torchaudio
device = "cuda:0" if torch.cuda.is_available() else "cpu"
print(f"使用设备:{device}")
pipe = pipeline(
"automatic-speech-recognition",
model="openai/whisper-large-v3",
device=device,
)
try:
result = pipe("test_audio.mp3")
print("识别结果:", result["text"])
except FileNotFoundError:
print("找不到测试音频文件,跳过识别测试")
import torch
from transformers import pipeline, AutoModelForSeq2SeqLM, AutoTokenizer
from typing import Optional, Dict, Tuple
import warnings
warnings.filterwarnings("ignore")
class MultilingualTranslationSystem:
"""多语言翻译系统核心类"""
def __init__(self, whisper_model: str = "openai/whisper-large-v3", translation_models: Optional[Dict[str, str]] = None, device: Optional[str] = None):
"""初始化翻译系统"""
self.device = device or ("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"初始化翻译系统,使用设备:{self.device}")
print("加载 Whisper 语音识别模型...")
self.asr_pipe = pipeline(
"automatic-speech-recognition",
model=whisper_model,
device=self.device,
torch_dtype=torch.float16 if "cuda" in self.device else torch.float32,
)
print("加载翻译模型...")
self.translation_models = translation_models or self._get_default_models()
self.translators = {}
self._preload_translation_models()
def _get_default_models(self) -> Dict[str, str]:
"""获取默认的翻译模型配置"""
return {
"en-zh": "Helsinki-NLP/opus-mt-en-zh",
"zh-en": "Helsinki-NLP/opus-mt-zh-en",
"en-de": "Helsinki-NLP/opus-mt-en-de",
"de-en": "Helsinki-NLP/opus-mt-de-en",
"en-ja": "Helsinki-NLP/opus-mt-en-ja",
"ja-en": "Helsinki-NLP/opus-mt-ja-en",
"en-fr": "Helsinki-NLP/opus-mt-en-fr",
"fr-en": "Helsinki-NLP/opus-mt-fr-en",
"en-ko": "Helsinki-NLP/opus-mt-en-ko",
"ko-en": "Helsinki-NLP/opus-mt-ko-en",
}
def _preload_translation_models(self):
"""预加载翻译模型到内存"""
common_pairs = ["en-zh", "zh-en", "en-de", "de-en"]
for pair in common_pairs:
if pair in self.translation_models:
try:
print(f"预加载翻译模型:{pair}")
model_name = self.translation_models[pair]
translator = pipeline("translation", model=model_name, device=self.device)
self.translators[pair] = translator
except Exception as e:
print(f"加载模型 {pair} 失败:{e}")
def transcribe_audio(self, audio_path: str) -> Tuple[str, str]:
"""转录音频文件"""
print(f"开始转录:{audio_path}")
result = self.asr_pipe(audio_path, generate_kwargs={"task": "transcribe"})
text = result["text"].strip()
language = "unknown"
if "language" in result:
language = result["language"]
elif "lang" in result:
language = result["lang"]
print(f"转录完成 - 语言:{language}, 文本长度:{len(text)} 字符")
return text, language
def translate_text(self, text: str, source_lang: str, target_lang: str) -> str:
"""翻译文本"""
if not text or len(text.strip()) == 0:
return ""
pair_key = f"{source_lang}-{target_lang}"
if pair_key not in self.translators:
if pair_key in self.translation_models:
print(f"动态加载翻译模型:{pair_key}")
try:
translator = pipeline("translation", model=self.translation_models[pair_key], device=self.device)
self.translators[pair_key] = translator
except Exception as e:
print(f"加载翻译模型失败:{e}")
return f"[翻译失败:无法加载模型 {pair_key}]"
else:
print(f"没有直接的 {pair_key} 翻译模型,使用英语中转")
return self._translate_via_english(text, source_lang, target_lang)
try:
translator = self.translators[pair_key]
result = translator(text, max_length=512)
translated_text = result[0]["translation_text"]
return translated_text
except Exception as e:
print(f"翻译过程出错:{e}")
return f"[翻译失败:{str(e)}]"
def _translate_via_english(self, text: str, source_lang: str, target_lang: str) -> str:
"""通过英语中转进行翻译"""
if source_lang != "en":
en_text = self.translate_text(text, source_lang, "en")
else:
en_text = text
if target_lang != "en":
final_text = self.translate_text(en_text, "en", target_lang)
else:
final_text = en_text
return final_text
def process_audio_translation(self, audio_path: str, target_language: str = "zh") -> Dict[str, str]:
"""完整的音频翻译流程"""
print(f"\n开始处理音频翻译:{audio_path}")
print(f"目标语言:{target_language}")
original_text, detected_lang = self.transcribe_audio(audio_path)
if detected_lang.lower() != target_language.lower():
translated_text = self.translate_text(original_text, detected_lang, target_language)
else:
translated_text = original_text
print("源语言与目标语言相同,跳过翻译")
return {
"original_text": original_text,
"detected_language": detected_lang,
"translated_text": translated_text,
"target_language": target_language
}
def main():
"""主函数:演示翻译系统的使用"""
print("=" * 50)
print("多语言翻译系统初始化")
print("=" * 50)
translator = MultilingualTranslationSystem()
print("\n示例 1: 英语 -> 中文")
print("-" * 30)
test_cases = [
{"name": "商务会议片段", "audio_path": "meeting_en.mp3", "target_lang": "zh"},
{"name": "技术讲座片段", "audio_path": "lecture_de.mp3", "target_lang": "en"}
]
for i, test in enumerate(test_cases, 1):
print(f"\n处理测试用例 {i}: {test['name']}")
try:
if "en.mp3" in test["audio_path"]:
result = {
"original_text": "Hello everyone, welcome to today's meeting.",
"detected_language": "en",
"translated_text": "大家好,欢迎参加今天的会议。",
"target_language": test["target_lang"]
}
elif "de.mp3" in test["audio_path"]:
result = {
"original_text": "Guten Tag, heute sprechen wir über künstliche Intelligenz.",
"detected_language": "de",
"translated_text": "Good day, today we will talk about artificial intelligence.",
"target_language": test["target_lang"]
}
else:
result = translator.process_audio_translation(test["audio_path"], test["target_lang"])
print(f"检测到的语言:{result['detected_language']}")
print(f"原始文本:{result['original_text'][:100]}...")
print(f"翻译文本:{result['translated_text'][:100]}...")
except Exception as e:
print(f"处理失败:{e}")
if __name__ == "__main__":
main()
def process_long_audio(audio_path, chunk_duration=30):
"""处理长音频,分段识别"""
import librosa
audio, sr = librosa.load(audio_path, sr=16000)
total_duration = len(audio) / sr
print(f"音频总时长:{total_duration:.1f}秒,将分段处理")
chunks = []
for start in range(0, len(audio), int(chunk_duration * sr)):
end = min(start + int(chunk_duration * sr), len(audio))
chunk = audio[start:end]
chunks.append(chunk)
print(f"分成 {len(chunks)} 段处理")
return chunks