Signed-off-by: wuhaobin <2307299049@qq.com>
main
wuhaobin 4 months ago
parent fa9c6181fb
commit ecdae446b0

@ -0,0 +1,314 @@
#!/usr/bin/env python3
"""
音频分离服务诊断工具
检查Spleeter和FFmpeg是否正常工作
"""
import requests
import json
import os
import subprocess
import sys
def check_server_status():
"""检查服务器状态"""
print("🔍 检查服务器状态...")
try:
response = requests.get("http://localhost:8081/api/audio/health", timeout=5)
if response.status_code == 200:
result = response.json()
status = result.get('status', 'UNKNOWN')
ffmpeg_status = result.get('ffmpeg', 'UNKNOWN')
spleeter_status = result.get('spleeter', 'UNKNOWN')
print(f"✅ 服务器正常运行 (状态: {status})")
print(f" FFmpeg: {ffmpeg_status}")
print(f" Spleeter: {spleeter_status}")
return True
else:
print(f"⚠️ 服务器响应异常,状态码: {response.status_code}")
return False
except requests.exceptions.ConnectionError:
print("❌ 无法连接到服务器")
return False
except Exception as e:
print(f"❌ 检查服务器状态时出错: {e}")
return False
def check_spleeter_availability():
"""检查Spleeter可用性"""
print("\n🔍 检查Spleeter可用性...")
try:
response = requests.get("http://localhost:8081/api/audio/test-spleeter", timeout=30)
if response.status_code == 200:
result = response.json()
available = result.get('available', False)
print(f"Spleeter状态: {'✅ 可用' if available else '❌ 不可用'}")
return available
else:
print(f"⚠️ Spleeter测试请求失败状态码: {response.status_code}")
return False
except Exception as e:
print(f"❌ 检查Spleeter可用性时出错: {e}")
return False
def check_ffmpeg_availability():
"""检查FFmpeg可用性"""
print("\n🔍 检查FFmpeg可用性...")
try:
response = requests.get("http://localhost:8081/api/audio/test-ffmpeg", timeout=10)
if response.status_code == 200:
result = response.json()
available = result.get('available', False)
print(f"FFmpeg状态: {'✅ 可用' if available else '❌ 不可用'}")
return available
else:
print(f"⚠️ FFmpeg测试请求失败状态码: {response.status_code}")
return False
except Exception as e:
print(f"❌ 检查FFmpeg可用性时出错: {e}")
return False
def check_python_environment():
"""检查Python环境"""
print("\n🔍 检查Python环境...")
try:
response = requests.get("http://localhost:8081/api/audio/python-info", timeout=10)
if response.status_code == 200:
info = response.text
print("✅ Python环境信息:")
print(info)
return True
else:
print(f"⚠️ Python信息请求失败状态码: {response.status_code}")
return False
except Exception as e:
print(f"❌ 检查Python环境时出错: {e}")
return False
def test_audio_separation():
"""测试音频分离功能"""
print("\n🧪 测试音频分离功能...")
# 查找测试音频文件
test_files = [
"test_audio.wav",
"test_audio.mp3",
"李荣浩 - 不将就.mp3",
"林俊杰 - 江南.mp3",
"林俊杰 - 江南.wav"
]
test_audio_path = None
for file in test_files:
if os.path.exists(file):
test_audio_path = file
break
if not test_audio_path:
print("❌ 未找到可用的测试音频文件")
print("💡 请确保以下文件之一存在:")
for file in test_files:
print(f" - {file}")
return False
print(f"📁 使用测试文件: {test_audio_path}")
print(f"📊 文件大小: {os.path.getsize(test_audio_path) / 1024 / 1024:.2f} MB")
try:
with open(test_audio_path, 'rb') as audio_file:
files = {'audioFile': (os.path.basename(test_audio_path), audio_file, 'audio/mpeg')}
print("📤 上传音频文件进行分离...")
response = requests.post("http://localhost:8081/api/audio/separate", files=files, timeout=120)
if response.status_code == 200:
result = response.json()
print("✅ 音频分离请求成功")
if result.get('success'):
print("🎯 分离结果详情:")
print(f" ✅ 成功: {result['success']}")
print(f" 🎤 人声文件: {result.get('vocalsPath', 'N/A')}")
print(f" 🎵 伴奏文件: {result.get('accompanimentPath', 'N/A')}")
print(f" 💬 消息: {result.get('message', 'N/A')}")
# 检查文件是否存在
vocals_path = result.get('vocalsPath')
accompaniment_path = result.get('accompanimentPath')
if vocals_path and os.path.exists(vocals_path):
vocals_size = os.path.getsize(vocals_path)
print(f" ✅ 人声文件存在,大小: {vocals_size / 1024 / 1024:.2f} MB")
if vocals_size > 1024: # 大于1KB
print(" 🎉 人声文件大小正常")
else:
print(" ⚠️ 人声文件可能过小")
else:
print(" ❌ 人声文件不存在")
if accompaniment_path and os.path.exists(accompaniment_path):
accomp_size = os.path.getsize(accompaniment_path)
print(f" ✅ 伴奏文件存在,大小: {accomp_size / 1024 / 1024:.2f} MB")
if accomp_size > 1024: # 大于1KB
print(" 🎉 伴奏文件大小正常")
else:
print(" ⚠️ 伴奏文件可能过小")
else:
print(" ❌ 伴奏文件不存在")
return True
else:
print("❌ 分离失败")
print(f"错误信息: {result.get('message', '未知错误')}")
return False
else:
print(f"❌ 请求失败,状态码: {response.status_code}")
print(f"响应内容: {response.text}")
return False
except requests.exceptions.Timeout:
print("❌ 请求超时,音频分离可能耗时过长")
return False
except Exception as e:
print(f"❌ 测试音频分离时出错: {e}")
return False
def check_direct_spleeter():
"""直接检查Spleeter命令行"""
print("\n🔧 直接检查Spleeter命令行...")
# 测试Python命令
python_commands = ["python", "python3", "py"]
working_python = None
for cmd in python_commands:
try:
result = subprocess.run([cmd, "--version"], capture_output=True, text=True, timeout=10)
if result.returncode == 0:
working_python = cmd
print(f"{cmd} 可用: {result.stdout.strip()}")
break
except:
print(f"{cmd} 不可用")
if not working_python:
print("❌ 未找到可用的Python命令")
return False
# 测试Spleeter导入
try:
result = subprocess.run([working_python, "-c", "import spleeter; print('Spleeter导入成功')"],
capture_output=True, text=True, timeout=30)
if result.returncode == 0:
print("✅ Spleeter导入成功")
return True
else:
print(f"❌ Spleeter导入失败: {result.stderr}")
return False
except Exception as e:
print(f"❌ 测试Spleeter导入时出错: {e}")
return False
def check_ffmpeg_direct():
"""直接检查FFmpeg命令行"""
print("\n🔧 直接检查FFmpeg命令行...")
ffmpeg_commands = ["ffmpeg", "ffmpeg.exe"]
working_ffmpeg = None
for cmd in ffmpeg_commands:
try:
result = subprocess.run([cmd, "-version"], capture_output=True, text=True, timeout=10)
if result.returncode == 0:
working_ffmpeg = cmd
# 提取版本信息
lines = result.stdout.split('\n')
if lines:
version_line = lines[0]
print(f"{cmd} 可用: {version_line}")
break
except:
print(f"{cmd} 不可用")
if working_ffmpeg:
return True
else:
print("❌ 未找到可用的FFmpeg命令")
return False
def main():
print("=" * 60)
print("🎵 音频分离服务诊断工具")
print("=" * 60)
# 检查服务器状态
if not check_server_status():
print("\n💡 服务器未启动,请运行以下命令:")
print("cd f:\\traeprojects\\DeAudio\\project\\src\\daudio")
print(".\\mvnw spring-boot:run")
return
# 检查组件可用性
print("\n" + "-" * 40)
print("🔧 检查依赖组件")
print("-" * 40)
spleeter_available = check_spleeter_availability()
ffmpeg_available = check_ffmpeg_availability()
python_ok = check_python_environment()
# 直接命令行检查
print("\n" + "-" * 40)
print("🔧 直接命令行检查")
print("-" * 40)
spleeter_direct = check_direct_spleeter()
ffmpeg_direct = check_ffmpeg_direct()
# 测试音频分离
print("\n" + "-" * 40)
print("🧪 功能测试")
print("-" * 40)
separation_ok = test_audio_separation()
# 诊断总结
print("\n" + "=" * 60)
print("📊 诊断总结")
print("=" * 60)
issues = []
if not spleeter_available:
issues.append("❌ Spleeter在服务中不可用")
if not ffmpeg_available:
issues.append("❌ FFmpeg在服务中不可用")
if not separation_ok:
issues.append("❌ 音频分离功能异常")
if not spleeter_direct:
issues.append("❌ 命令行Spleeter不可用")
if not ffmpeg_direct:
issues.append("❌ 命令行FFmpeg不可用")
if issues:
print("发现以下问题:")
for issue in issues:
print(f" {issue}")
print("\n💡 解决方案建议:")
if not spleeter_direct:
print(" 1. 安装Spleeter: pip install spleeter")
if not ffmpeg_direct:
print(" 2. 安装FFmpeg并添加到PATH环境变量")
if spleeter_direct and not spleeter_available:
print(" 3. 检查Java服务中的Python路径配置")
else:
print("✅ 所有检查项正常,音频分离服务应该可以正常工作")
print("\n" + "=" * 60)
if __name__ == "__main__":
main()

@ -0,0 +1,204 @@
#!/usr/bin/env python3
"""
Spleeter服务监控脚本
用于监控Spleeter服务的健康状态并在服务异常时自动重启
"""
import requests
import time
import subprocess
import os
import sys
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('spleeter_monitor.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
class SpleeterMonitor:
def __init__(self):
self.service_url = "http://localhost:5000/api/health"
self.service_process = None
self.max_retries = 5
self.retry_delay = 10 # 秒
self.health_check_interval = 30 # 秒
self.startup_timeout = 60 # 秒
# Spleeter服务配置
self.spleeter_dir = os.path.dirname(os.path.abspath(__file__))
self.spleeter_script = "spleeter_service.py"
def is_service_healthy(self):
"""检查Spleeter服务是否健康"""
try:
response = requests.get(self.service_url, timeout=5)
if response.status_code == 200:
data = response.json()
if data.get('status') == 'UP':
return True
return False
except requests.exceptions.RequestException as e:
logger.debug(f"服务健康检查失败: {e}")
return False
def start_service(self):
"""启动Spleeter服务"""
try:
logger.info("正在启动Spleeter服务...")
# 构建启动命令
cmd = ["python", self.spleeter_script]
# 启动服务进程
self.service_process = subprocess.Popen(
cmd,
cwd=self.spleeter_dir,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True
)
logger.info(f"Spleeter服务已启动PID: {self.service_process.pid}")
# 等待服务启动完成
start_time = time.time()
while time.time() - start_time < self.startup_timeout:
if self.is_service_healthy():
logger.info("Spleeter服务启动成功")
return True
time.sleep(2)
logger.error("Spleeter服务启动超时")
return False
except Exception as e:
logger.error(f"启动Spleeter服务失败: {e}")
return False
def stop_service(self):
"""停止Spleeter服务"""
if self.service_process and self.service_process.poll() is None:
try:
logger.info("正在停止Spleeter服务...")
self.service_process.terminate()
# 等待进程结束
try:
self.service_process.wait(timeout=10)
except subprocess.TimeoutExpired:
logger.warning("服务正常终止超时,强制终止...")
self.service_process.kill()
self.service_process.wait()
logger.info("Spleeter服务已停止")
return True
except Exception as e:
logger.error(f"停止Spleeter服务失败: {e}")
return False
return True
def restart_service(self):
"""重启Spleeter服务"""
logger.info("正在重启Spleeter服务...")
# 先停止服务
self.stop_service()
# 等待一段时间确保服务完全停止
time.sleep(3)
# 重新启动服务
return self.start_service()
def monitor_service(self):
"""监控Spleeter服务"""
consecutive_failures = 0
while True:
try:
# 检查服务健康状态
if self.is_service_healthy():
consecutive_failures = 0
logger.debug("服务健康检查通过")
else:
consecutive_failures += 1
logger.warning(f"服务健康检查失败 (连续失败次数: {consecutive_failures})")
# 如果连续失败次数达到阈值,重启服务
if consecutive_failures >= 3:
logger.error("服务连续健康检查失败,尝试重启...")
if self.restart_service():
consecutive_failures = 0
logger.info("服务重启成功")
else:
logger.error("服务重启失败")
# 等待下一次检查
time.sleep(self.health_check_interval)
except KeyboardInterrupt:
logger.info("收到中断信号,正在停止监控...")
break
except Exception as e:
logger.error(f"监控过程中发生错误: {e}")
time.sleep(self.health_check_interval)
def run(self):
"""运行监控器"""
logger.info("=== Spleeter服务监控器启动 ===")
logger.info(f"监控目录: {self.spleeter_dir}")
logger.info(f"健康检查间隔: {self.health_check_interval}")
# 检查服务是否已运行
if self.is_service_healthy():
logger.info("检测到Spleeter服务已在运行")
else:
logger.info("未检测到Spleeter服务尝试启动...")
if not self.start_service():
logger.error("Spleeter服务启动失败监控器退出")
return
# 开始监控
try:
self.monitor_service()
except KeyboardInterrupt:
logger.info("监控器被用户中断")
finally:
# 清理资源
self.stop_service()
logger.info("=== Spleeter服务监控器停止 ===")
def main():
"""主函数"""
monitor = SpleeterMonitor()
# 解析命令行参数
if len(sys.argv) > 1:
if sys.argv[1] == "start":
monitor.start_service()
elif sys.argv[1] == "stop":
monitor.stop_service()
elif sys.argv[1] == "restart":
monitor.restart_service()
elif sys.argv[1] == "status":
if monitor.is_service_healthy():
print("Spleeter服务状态: 运行中")
else:
print("Spleeter服务状态: 停止")
else:
print("用法: python spleeter_monitor.py [start|stop|restart|status|monitor]")
else:
# 默认运行监控模式
monitor.run()
if __name__ == "__main__":
main()

@ -0,0 +1,264 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
独立的Spleeter HTTP服务
为Java应用提供音频分离API接口
"""
import os
import sys
import json
import logging
import subprocess
import threading
from pathlib import Path
from flask import Flask, request, jsonify, send_file
from werkzeug.utils import secure_filename
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
app = Flask(__name__)
# 配置
UPLOAD_FOLDER = 'uploads'
OUTPUT_FOLDER = 'separated'
TEMP_FOLDER = 'temp'
ALLOWED_EXTENSIONS = {'wav', 'mp3', 'm4a', 'flac', 'aac'}
PORT = 5000
# 创建必要的目录
for folder in [UPLOAD_FOLDER, OUTPUT_FOLDER, TEMP_FOLDER]:
Path(folder).mkdir(exist_ok=True)
def allowed_file(filename):
"""检查文件扩展名是否允许"""
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def run_spleeter(input_file, output_dir, model='2stems'):
"""运行Spleeter分离音频"""
try:
# 构建Spleeter命令 - 正确的参数顺序
cmd = [
sys.executable, '-m', 'spleeter', 'separate',
'-p', f'spleeter:{model}',
'-o', output_dir,
'-c', 'wav',
input_file
]
logger.info(f"执行Spleeter命令: {' '.join(cmd)}")
# 执行命令
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
if result.returncode == 0:
logger.info("Spleeter分离成功")
return True, result.stdout
else:
logger.error(f"Spleeter分离失败: {result.stderr}")
return False, result.stderr
except subprocess.TimeoutExpired:
logger.error("Spleeter执行超时")
return False, "执行超时"
except Exception as e:
logger.error(f"Spleeter执行异常: {str(e)}")
return False, str(e)
def enhance_audio_with_ffmpeg(input_file, output_file, audio_type='vocals'):
"""使用FFmpeg优化音频"""
try:
if audio_type == 'vocals':
# 人声优化:高通滤波、压缩、均衡
cmd = [
'ffmpeg', '-i', input_file,
'-af', 'highpass=f=80,acompressor=threshold=0.1:ratio=4:attack=20:release=300,equalizer=f=1000:width_type=h:width=200:gain=3',
'-y', output_file
]
else:
# 伴奏优化:低通滤波、压缩
cmd = [
'ffmpeg', '-i', input_file,
'-af', 'lowpass=f=8000,acompressor=threshold=0.05:ratio=3:attack=50:release=500',
'-y', output_file
]
logger.info(f"执行FFmpeg优化: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode == 0:
logger.info("FFmpeg优化成功")
return True, "优化成功"
else:
logger.error(f"FFmpeg优化失败: {result.stderr}")
return False, result.stderr
except Exception as e:
logger.error(f"FFmpeg优化异常: {str(e)}")
return False, str(e)
@app.route('/api/health', methods=['GET'])
def health_check():
"""健康检查接口"""
try:
# 检查Python和Spleeter
result = subprocess.run([sys.executable, '-c', 'import spleeter'],
capture_output=True, text=True)
spleeter_ok = result.returncode == 0
# 检查FFmpeg
result = subprocess.run(['ffmpeg', '-version'],
capture_output=True, text=True)
ffmpeg_ok = result.returncode == 0
return jsonify({
'status': 'UP',
'python': sys.executable,
'spleeter': 'AVAILABLE' if spleeter_ok else 'NOT_AVAILABLE',
'ffmpeg': 'AVAILABLE' if ffmpeg_ok else 'NOT_AVAILABLE'
})
except Exception as e:
return jsonify({'status': 'DOWN', 'error': str(e)}), 500
@app.route('/api/separate', methods=['POST'])
def separate_audio():
"""音频分离接口"""
try:
# 检查文件
if 'file' not in request.files:
return jsonify({'error': '没有上传文件'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': '没有选择文件'}), 400
if not allowed_file(file.filename):
return jsonify({'error': '不支持的文件格式'}), 400
# 获取参数
model = request.form.get('model', '2stems')
enhance = request.form.get('enhance', 'true').lower() == 'true'
# 保存上传文件
filename = secure_filename(file.filename)
input_path = os.path.join(UPLOAD_FOLDER, filename)
file.save(input_path)
logger.info(f"开始分离音频: {filename}, 模型: {model}, 优化: {enhance}")
# 创建输出目录
base_name = os.path.splitext(filename)[0]
output_dir = os.path.join(OUTPUT_FOLDER, base_name)
Path(output_dir).mkdir(exist_ok=True)
# 执行Spleeter分离
success, message = run_spleeter(input_path, output_dir, model)
if not success:
return jsonify({'error': f'Spleeter分离失败: {message}'}), 500
# 检查分离结果
vocals_file = os.path.join(output_dir, f'{base_name}_vocals.wav')
accompaniment_file = os.path.join(output_dir, f'{base_name}_accompaniment.wav')
if not os.path.exists(vocals_file) or not os.path.exists(accompaniment_file):
# 尝试其他可能的文件名格式
for file in Path(output_dir).glob('*_vocals.*'):
vocals_file = str(file)
for file in Path(output_dir).glob('*_accompaniment.*'):
accompaniment_file = str(file)
# 应用FFmpeg优化
if enhance:
enhanced_vocals = os.path.join(output_dir, f'{base_name}_vocals_enhanced.wav')
enhanced_accompaniment = os.path.join(output_dir, f'{base_name}_accompaniment_enhanced.wav')
# 优化人声
success, msg = enhance_audio_with_ffmpeg(vocals_file, enhanced_vocals, 'vocals')
if success:
vocals_file = enhanced_vocals
# 优化伴奏
success, msg = enhance_audio_with_ffmpeg(accompaniment_file, enhanced_accompaniment, 'accompaniment')
if success:
accompaniment_file = enhanced_accompaniment
# 返回结果
result = {
'success': True,
'task_id': base_name,
'vocals_file': os.path.basename(vocals_file),
'accompaniment_file': os.path.basename(accompaniment_file),
'output_dir': output_dir,
'message': '音频分离完成'
}
logger.info(f"音频分离完成: {result}")
return jsonify(result)
except Exception as e:
logger.error(f"音频分离异常: {str(e)}")
return jsonify({'error': f'服务器内部错误: {str(e)}'}), 500
@app.route('/api/download/<task_id>/<file_type>', methods=['GET'])
def download_file(task_id, file_type):
"""下载分离后的文件"""
try:
file_type = file_type.lower()
valid_types = ['vocals', 'accompaniment', 'vocals_enhanced', 'accompaniment_enhanced']
if file_type not in valid_types:
return jsonify({'error': '无效的文件类型'}), 400
# 查找文件
output_dir = os.path.join(OUTPUT_FOLDER, task_id)
if not os.path.exists(output_dir):
return jsonify({'error': '任务不存在'}), 404
# 查找匹配的文件
pattern = f"*{file_type}*"
files = list(Path(output_dir).glob(pattern))
if not files:
return jsonify({'error': '文件不存在'}), 404
file_path = str(files[0])
return send_file(file_path, as_attachment=True)
except Exception as e:
logger.error(f"文件下载异常: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/api/cleanup/<task_id>', methods=['DELETE'])
def cleanup_task(task_id):
"""清理任务文件"""
try:
import shutil
output_dir = os.path.join(OUTPUT_FOLDER, task_id)
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
logger.info(f"清理任务文件: {task_id}")
return jsonify({'success': True, 'message': '清理完成'})
except Exception as e:
logger.error(f"清理任务异常: {str(e)}")
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
logger.info(f"启动Spleeter HTTP服务端口: {PORT}")
logger.info(f"上传目录: {UPLOAD_FOLDER}")
logger.info(f"输出目录: {OUTPUT_FOLDER}")
# 检查依赖
try:
import spleeter
logger.info("Spleeter导入成功")
except ImportError:
logger.error("Spleeter导入失败请检查安装")
app.run(host='0.0.0.0', port=PORT, debug=False)

@ -0,0 +1,40 @@
package com.cauc.deaudio.service;
import com.cauc.deaudio.model.AudioSeparationResult;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.mock.web.MockMultipartFile;
import org.springframework.web.multipart.MultipartFile;
import static org.junit.jupiter.api.Assertions.assertEquals;
@SpringBootTest
public class AudioSeparationServiceTest {
@Test
public void testSeparateVocalsWithModelParameter() {
// 创建一个模拟的音频文件
MultipartFile mockFile = new MockMultipartFile(
"audio",
"test.mp3",
"audio/mpeg",
"test audio content".getBytes()
);
// 创建服务实例
AudioSeparationService service = new AudioSeparationService();
// 测试调用带有model参数的方法
// 这个测试可能会失败因为Spleeter不可用但至少可以验证方法调用不会因为参数不匹配而失败
try {
AudioSeparationResult result = service.separateVocals(mockFile, "2stems");
// 如果Spleeter不可用结果应该是失败的
assertEquals(false, result.isSuccess());
} catch (Exception e) {
// 如果有异常,检查是否是参数不匹配的异常
if (e.getMessage().contains("no suitable method found for separateVocals")) {
throw new AssertionError("方法参数不匹配问题未解决", e);
}
}
}
}

@ -0,0 +1,88 @@
@echo off
chcp 65001 >nul
echo ========================================
echo DeAudio 服务启动脚本
echo ========================================
echo.
REM 检查Python是否可用
python --version >nul 2>&1
if errorlevel 1 (
echo [错误] 未找到Python请确保Python已安装并添加到PATH
pause
exit /b 1
)
echo [信息] 检测到Python环境
echo.
REM 检查Spleeter服务是否已在运行
echo [信息] 检查Spleeter服务状态...
netstat -ano | findstr ":5000" | findstr "LISTENING" >nul
if errorlevel 1 (
echo [信息] Spleeter服务未运行正在启动...
REM 启动Spleeter服务
start "Spleeter Service" python spleeter_service.py
REM 等待服务启动
echo [信息] 等待Spleeter服务启动...
timeout /t 5 /nobreak >nul
REM 检查服务是否成功启动
python -c "import requests; r = requests.get('http://localhost:5000/api/health', timeout=5); print('[成功] Spleeter服务已启动' if r.status_code == 200 else '[警告] Spleeter服务启动异常')" 2>nul
if errorlevel 1 (
echo [警告] Spleeter服务健康检查失败但进程已启动
)
) else (
echo [信息] Spleeter服务已在运行
)
echo.
REM 检查Spring Boot应用是否已在运行
echo [信息] 检查DeAudio应用状态...
netstat -ano | findstr ":8081" | findstr "LISTENING" >nul
if errorlevel 1 (
echo [信息] DeAudio应用未运行正在启动...
REM 切换到Spring Boot应用目录并启动
cd src\daudio
REM 检查Maven Wrapper是否存在
if exist mvnw (
echo [信息] 使用Maven Wrapper启动Spring Boot应用...
start "DeAudio Application" cmd /k "mvnw spring-boot:run"
) else (
echo [错误] 未找到mvnw文件请确保Maven Wrapper存在
pause
exit /b 1
)
REM 等待应用启动
echo [信息] 等待DeAudio应用启动...
timeout /t 10 /nobreak >nul
REM 检查应用是否成功启动
curl -s -o nul -w "%%{http_code}" http://localhost:8081/ >nul
if errorlevel 1 (
echo [警告] DeAudio应用启动检查失败但进程已启动
) else (
echo [成功] DeAudio应用已启动
)
cd ..\..
) else (
echo [信息] DeAudio应用已在运行
)
echo.
echo ========================================
echo [完成] 服务启动完成
echo.
echo 访问地址:
echo - DeAudio网页界面: http://localhost:8081/audio-processing.html
echo - Spleeter API服务: http://localhost:5000/api/health
echo.
echo 按任意键退出...
pause >nul

Binary file not shown.

@ -0,0 +1,178 @@
#!/usr/bin/env python3
"""
测试FFmpeg音频分离结果优化功能
"""
import requests
import json
import time
import os
def test_ffmpeg_enhancement():
"""测试FFmpeg后处理优化功能"""
# 服务器地址
base_url = "http://localhost:8081"
# 测试音频文件路径(使用一个较小的音频文件进行测试)
test_audio_path = "test_audio.mp3"
# 检查测试文件是否存在
if not os.path.exists(test_audio_path):
print(f"❌ 测试音频文件不存在: {test_audio_path}")
print("请准备一个测试音频文件或使用现有的音频文件")
return
print("🎵 开始测试FFmpeg音频分离结果优化功能")
print(f"📁 测试文件: {test_audio_path}")
print(f"📊 文件大小: {os.path.getsize(test_audio_path) / 1024 / 1024:.2f} MB")
# 上传音频文件进行分离
print("\n📤 上传音频文件进行分离...")
try:
with open(test_audio_path, 'rb') as audio_file:
files = {'audioFile': (os.path.basename(test_audio_path), audio_file, 'audio/mpeg')}
start_time = time.time()
response = requests.post(f"{base_url}/api/audio/separate", files=files)
end_time = time.time()
if response.status_code == 200:
result = response.json()
print("✅ 音频分离请求成功")
print(f"⏱️ 处理时间: {end_time - start_time:.2f}")
# 检查分离结果
if result.get('success'):
print("\n🎯 分离结果详情:")
print(f"✅ 成功: {result['success']}")
print(f"🎤 人声文件: {result.get('vocalsPath', 'N/A')}")
print(f"🎵 伴奏文件: {result.get('accompanimentPath', 'N/A')}")
print(f"💬 消息: {result.get('message', 'N/A')}")
# 检查是否包含FFmpeg优化信息
if "FFmpeg" in result.get('message', ''):
print("\n🎉 FFmpeg后处理优化已成功应用")
else:
print("\n⚠️ 未检测到FFmpeg优化信息可能使用了标准处理流程")
# 检查文件是否存在
vocals_path = result.get('vocalsPath')
accompaniment_path = result.get('accompanimentPath')
if vocals_path and os.path.exists(vocals_path):
print(f"✅ 人声文件存在,大小: {os.path.getsize(vocals_path) / 1024 / 1024:.2f} MB")
else:
print("❌ 人声文件不存在")
if accompaniment_path and os.path.exists(accompaniment_path):
print(f"✅ 伴奏文件存在,大小: {os.path.getsize(accompaniment_path) / 1024 / 1024:.2f} MB")
else:
print("❌ 伴奏文件不存在")
else:
print("❌ 分离失败")
print(f"错误信息: {result.get('message', '未知错误')}")
else:
print(f"❌ 请求失败,状态码: {response.status_code}")
print(f"响应内容: {response.text}")
except requests.exceptions.ConnectionError:
print("❌ 无法连接到服务器请确保Spring Boot应用正在运行")
print("💡 运行命令: cd f:\\traeprojects\\DeAudio\\project\\src\\daudio && .\\mvnw spring-boot:run")
except Exception as e:
print(f"❌ 测试过程中出现错误: {e}")
def test_server_status():
"""测试服务器状态"""
print("🔍 检查服务器状态...")
try:
response = requests.get("http://localhost:8081/api/audio/status", timeout=5)
if response.status_code == 200:
print("✅ 服务器正常运行")
return True
else:
print(f"⚠️ 服务器响应异常,状态码: {response.status_code}")
return False
except requests.exceptions.ConnectionError:
print("❌ 服务器未启动或无法连接")
return False
except Exception as e:
print(f"❌ 检查服务器状态时出错: {e}")
return False
def test_ffmpeg_availability():
"""测试FFmpeg可用性"""
print("🔍 检查FFmpeg可用性...")
try:
response = requests.get("http://localhost:8081/api/audio/test-ffmpeg", timeout=10)
if response.status_code == 200:
result = response.json()
print(f"FFmpeg状态: {'✅ 可用' if result.get('available') else '❌ 不可用'}")
return result.get('available', False)
else:
print(f"⚠️ FFmpeg测试请求失败状态码: {response.status_code}")
return False
except Exception as e:
print(f"❌ 测试FFmpeg可用性时出错: {e}")
return False
def test_spleeter_availability():
"""测试Spleeter可用性"""
print("🔍 检查Spleeter可用性...")
try:
response = requests.get("http://localhost:8081/api/audio/test-spleeter", timeout=10)
if response.status_code == 200:
result = response.json()
print(f"Spleeter状态: {'✅ 可用' if result.get('available') else '❌ 不可用'}")
return result.get('available', False)
else:
print(f"⚠️ Spleeter测试请求失败状态码: {response.status_code}")
return False
except Exception as e:
print(f"❌ 测试Spleeter可用性时出错: {e}")
return False
if __name__ == "__main__":
print("=" * 60)
print("🎵 FFmpeg音频分离结果优化功能测试")
print("=" * 60)
# 检查服务器状态
if not test_server_status():
print("\n💡 请先启动服务器:")
print("cd f:\\traeprojects\\DeAudio\\project\\src\\daudio")
print(".\\mvnw spring-boot:run")
exit(1)
# 测试依赖组件可用性
print("\n" + "-" * 40)
print("🔧 检查依赖组件")
print("-" * 40)
ffmpeg_available = test_ffmpeg_availability()
spleeter_available = test_spleeter_availability()
if not ffmpeg_available:
print("\n⚠️ FFmpeg不可用优化功能将无法正常工作")
print("💡 请确保FFmpeg已正确安装并配置在系统PATH中")
if not spleeter_available:
print("\n⚠️ Spleeter不可用音频分离功能将无法正常工作")
print("💡 请确保Python环境已正确安装spleeter包")
# 执行FFmpeg优化测试
print("\n" + "-" * 40)
print("🧪 执行FFmpeg优化测试")
print("-" * 40)
test_ffmpeg_enhancement()
print("\n" + "=" * 60)
print("🎉 测试完成")
print("=" * 60)
Loading…
Cancel
Save