Qwen3-ASR-0.6B与GitHub Actions集成:自动化测试实践
1. 引言
想象一下,你刚把一个语音识别模型更新到最新版本,正准备部署到线上服务。突然,用户反馈说某个方言的识别准确率下降了,或者处理长音频时出现了异常。这时候你才发现,手动测试覆盖的场景太有限,根本没能提前发现这些问题。
这就是为什么我们需要自动化测试。对于像Qwen3-ASR-0.6B这样的语音识别模型来说,手动测试不仅耗时耗力,而且很难保证测试的全面性和一致性。今天我们就来聊聊,如何用GitHub Actions为Qwen3-ASR-0.6B搭建一套完整的自动化测试流水线。
Qwen3-ASR-0.6B是个挺有意思的模型,它只有6亿参数,却支持52种语言和方言的识别,还能处理带背景音乐的歌曲。这种轻量级但功能强大的模型,特别适合需要快速部署和实时响应的场景。但功能多也意味着测试复杂度高,不同语言、不同音频质量、不同场景都需要验证。
用GitHub Actions来做自动化测试,最大的好处就是能把这些繁琐的测试工作自动化。每次代码更新、模型权重变化,都能自动跑一遍完整的测试套件,确保不会因为改动引入新的问题。而且测试结果一目了然,有问题能及时发现,不用等到用户反馈才去排查。
2. 为什么需要自动化测试
2.1 语音识别模型的测试挑战
语音识别模型的测试和普通的软件测试不太一样,它有几个特殊的难点。
首先是测试数据的问题。你需要准备各种各样的音频样本:不同语言的、不同口音的、不同背景噪声的、不同长度的。手动收集和整理这些样本就很费劲,更别说每次更新都要重新测试一遍了。
然后是测试环境的一致性。语音识别的结果会受到很多因素影响,比如音频的采样率、编码格式、音量大小等等。如果每次测试的环境不一样,结果就可能波动很大,很难判断到底是模型的问题还是环境的问题。
还有一个是性能测试。Qwen3-ASR-0.6B号称在128并发下能达到2000倍的吞吐量,10秒处理5小时音频。这种性能指标怎么验证?手动测试根本测不出来,需要专门的性能测试工具和长时间的压测。
2.2 GitHub Actions的优势
GitHub Actions正好能解决这些问题。它提供了标准化的运行环境,每次测试都在相同的配置下执行,保证了结果的可比性。而且可以配置定时任务,比如每天晚上自动跑一遍完整的测试套件,第二天早上就能看到测试报告。
更重要的是,GitHub Actions能和代码仓库深度集成。你可以设置每次提交代码、每次创建Pull Request、每次发布新版本时自动触发测试。这样就能在问题进入主分支之前及时发现,避免把有问题的代码合并进去。
对于开源项目来说,GitHub Actions还有个额外的好处:测试过程完全透明。任何人都能看到测试是怎么跑的、用了什么数据、得到了什么结果。这对于建立用户信任很有帮助,大家知道这个模型是经过严格测试的。
3. 搭建基础测试环境
3.1 创建测试仓库结构
我们先从最基础的开始,创建一个专门用于测试的GitHub仓库。这个仓库的结构要清晰,方便后续维护和扩展。
qwen3-asr-test/ ├── .github/ │ └── workflows/ │ ├── unit-tests.yml # 单元测试工作流 │ ├── integration-tests.yml # 集成测试工作流 │ └── performance-tests.yml # 性能测试工作流 ├── tests/ │ ├── unit/ # 单元测试 │ │ ├── test_model_loading.py │ │ ├── test_audio_preprocessing.py │ │ └── test_transcription.py │ ├── integration/ # 集成测试 │ │ ├── test_multilingual.py │ │ ├── test_long_audio.py │ │ └── test_noisy_audio.py │ └── performance/ # 性能测试 │ ├── benchmark_throughput.py │ └── benchmark_latency.py ├── test_data/ # 测试数据 │ ├── audio_samples/ │ │ ├── english/ │ │ ├── chinese/ │ │ └── multilingual/ │ └── expected_results/ # 预期结果 ├── scripts/ # 辅助脚本 │ ├── setup_test_env.sh │ ├── download_test_data.py │ └── generate_test_report.py └── requirements.txt # Python依赖这个结构看起来有点复杂,但其实逻辑很清晰。.github/workflows下面放的是GitHub Actions的配置文件,tests下面按测试类型分目录,test_data放测试用的音频文件,scripts放一些辅助脚本。
3.2 配置基础工作流
我们先来配置一个最简单的测试工作流,验证一下环境能不能正常搭建起来。
# .github/workflows/basic-test.yml name: Basic Model Test on: push: branches: [ main ] pull_request: branches: [ main ] schedule: - cron: '0 2 * * *' # 每天凌晨2点运行 jobs: test-model-loading: runs-on: ubuntu-latest strategy: matrix: python-version: ['3.9', '3.10', '3.11'] steps: - uses: actions/checkout@v4 - name: Set up Python uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} - name: Install dependencies run: | python -m pip install --upgrade pip pip install torch torchaudio --index-url https://download.pytorch.org/whl/cu118 pip install qwen-asr transformers pip install pytest pytest-cov - name: Run basic model loading test run: | python -m pytest tests/unit/test_model_loading.py -v这个工作流做了几件事:首先指定了触发条件,包括代码推送、Pull Request和定时任务。然后定义了一个测试任务,在Ubuntu系统上运行,支持多个Python版本。最后是具体的执行步骤:检出代码、安装Python、安装依赖、运行测试。
你可能注意到了,我们用了矩阵策略来测试不同的Python版本。这是因为有些用户可能还在用老版本的Python,我们要确保模型在这些环境下也能正常工作。
4. 实现语音样本测试
4.1 准备测试数据集
测试语音识别模型,最核心的就是测试数据。我们需要准备一套有代表性的音频样本,覆盖模型支持的各种场景。
# scripts/download_test_data.py import os import requests import hashlib from pathlib import Path class TestDataManager: def __init__(self, data_dir="test_data/audio_samples"): self.data_dir = Path(data_dir) self.data_dir.mkdir(parents=True, exist_ok=True) # 定义测试样本的元数据 self.test_samples = [ { "name": "english_clean", "url": "https://example.com/samples/english_clean.wav", "language": "English", "expected_text": "This is a test audio sample for speech recognition.", "duration": 5.0, "category": "clean_speech" }, { "name": "chinese_dialect", "url": "https://example.com/samples/sichuan_dialect.wav", "language": "Sichuan Dialect", "expected_text": "这是一个四川话的测试音频样本。", "duration": 6.0, "category": "dialect" }, { "name": "noisy_background", "url": "https://example.com/samples/noisy_cafe.wav", "language": "English", "expected_text": "Testing speech recognition in noisy environments.", "duration": 8.0, "category": "noisy" }, { "name": "long_audio", "url": "https://example.com/samples/lecture_10min.wav", "language": "Chinese", "expected_text": None, # 长音频不检查完整文本,只检查是否能正常处理 "duration": 600.0, "category": "long_form" } ] def download_sample(self, sample): """下载单个音频样本""" file_path = self.data_dir / f"{sample['name']}.wav" if file_path.exists(): # 检查文件完整性 if self._verify_file(file_path, sample.get('md5')): print(f"Sample {sample['name']} already exists and is valid") return file_path print(f"Downloading {sample['name']}...") response = requests.get(sample['url'], stream=True) response.raise_for_status() with open(file_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return file_path def _verify_file(self, file_path, expected_md5): """验证文件完整性""" if not expected_md5: return True with open(file_path, 'rb') as f: file_hash = hashlib.md5(f.read()).hexdigest() return file_hash == expected_md5 def prepare_all_samples(self): """准备所有测试样本""" downloaded_files = [] for sample in self.test_samples: try: file_path = self.download_sample(sample) downloaded_files.append({ "path": str(file_path), "metadata": sample }) except Exception as e: print(f"Failed to download {sample['name']}: {e}") # 生成测试清单 self._generate_manifest(downloaded_files) return downloaded_files def _generate_manifest(self, files): """生成测试数据清单""" manifest_path = self.data_dir / "manifest.json" import json manifest = [] for file_info in files: manifest.append({ "audio_file": file_info["path"], "language": file_info["metadata"]["language"], "expected_text": file_info["metadata"]["expected_text"], "category": file_info["metadata"]["category"] }) with open(manifest_path, 'w', encoding='utf-8') as f: json.dump(manifest, f, ensure_ascii=False, indent=2) if __name__ == "__main__": manager = TestDataManager() files = manager.prepare_all_samples() print(f"Prepared {len(files)} test samples")这个脚本负责下载和管理测试数据。我们定义了几类典型的测试样本:干净的英语音频、中文方言音频、带背景噪声的音频、长音频。每类样本都有对应的元数据,包括预期的识别结果。
在实际项目中,你可以把这些音频文件放在项目的test_data目录下,或者使用公开的语音数据集。关键是要保证测试数据的多样性和代表性。
4.2 编写样本测试脚本
有了测试数据,接下来就可以写测试脚本了。
# tests/integration/test_audio_samples.py import pytest import torch from qwen_asr import Qwen3ASRModel import json import os from pathlib import Path class TestAudioSamples: @pytest.fixture(scope="class") def model(self): """加载Qwen3-ASR-0.6B模型""" model = Qwen3ASRModel.from_pretrained( "Qwen/Qwen3-ASR-0.6B", dtype=torch.bfloat16, device_map="auto", max_inference_batch_size=8, max_new_tokens=256, ) return model @pytest.fixture def test_manifest(self): """加载测试数据清单""" manifest_path = Path("test_data/audio_samples/manifest.json") with open(manifest_path, 'r', encoding='utf-8') as f: return json.load(f) def test_clean_speech_recognition(self, model, test_manifest): """测试干净语音的识别准确率""" clean_samples = [s for s in test_manifest if s["category"] == "clean_speech"] for sample in clean_samples: audio_path = sample["audio_file"] expected_text = sample["expected_text"] language = sample["language"] # 执行语音识别 results = model.transcribe( audio=audio_path, language=language, ) # 检查识别结果 assert len(results) > 0, f"No results for {audio_path}" transcribed_text = results[0].text.strip() # 计算词错误率(简化版) # 实际项目中可以使用更精确的WER计算 if expected_text: # 简单的字符串相似度检查 similarity = self._calculate_similarity(transcribed_text, expected_text) assert similarity > 0.8, f"Recognition accuracy too low for {audio_path}: {similarity}" def test_multilingual_support(self, model, test_manifest): """测试多语言支持""" multilingual_samples = [s for s in test_manifest if s["language"] not in ["English", "Chinese"]] for sample in multilingual_samples: audio_path = sample["audio_file"] language = sample["language"] # 测试自动语言检测 results = model.transcribe( audio=audio_path, language=None, # 自动检测 ) assert len(results) > 0, f"No results for {audio_path}" detected_language = results[0].language # 检查语言检测是否正确 # 注意:方言可能被检测为对应的主语言 if language in ["Sichuan Dialect"]: # 方言可能被检测为Chinese assert detected_language in ["Chinese", language], \ f"Language detection failed for {audio_path}: {detected_language}" else: assert detected_language == language, \ f"Language detection failed for {audio_path}: {detected_language}" def test_long_audio_processing(self, model, test_manifest): """测试长音频处理能力""" long_samples = [s for s in test_manifest if s["category"] == "long_form"] for sample in long_samples: audio_path = sample["audio_file"] # Qwen3-ASR支持最长20分钟音频 results = model.transcribe( audio=audio_path, language=sample["language"], ) # 检查是否能正常处理长音频 assert len(results) > 0, f"Failed to process long audio: {audio_path}" transcribed_text = results[0].text # 长音频至少应该有内容 assert len(transcribed_text) > 10, f"Transcription too short for long audio: {audio_path}" def test_noisy_audio_robustness(self, model, test_manifest): """测试噪声环境下的鲁棒性""" noisy_samples = [s for s in test_manifest if s["category"] == "noisy"] for sample in noisy_samples: audio_path = sample["audio_file"] expected_text = sample["expected_text"] results = model.transcribe( audio=audio_path, language=sample["language"], ) assert len(results) > 0, f"No results for noisy audio: {audio_path}" if expected_text: transcribed_text = results[0].text.strip() similarity = self._calculate_similarity(transcribed_text, expected_text) # 噪声环境下允许一定的准确率下降 assert similarity > 0.6, f"Recognition in noisy environment too low: {similarity}" def _calculate_similarity(self, text1, text2): """计算两个文本的相似度(简化版)""" # 实际项目中应该使用更精确的相似度计算方法 # 这里使用简单的词重叠率 words1 = set(text1.lower().split()) words2 = set(text2.lower().split()) if not words1 or not words2: return 0.0 intersection = words1.intersection(words2) union = words1.union(words2) return len(intersection) / len(union) if __name__ == "__main__": # 可以直接运行测试 import sys sys.exit(pytest.main([__file__, "-v"]))这个测试类包含了几个关键的测试场景:干净语音识别、多语言支持、长音频处理、噪声鲁棒性。每个测试方法都针对特定的功能点进行验证。
注意我们用了pytest的fixture来管理测试资源,比如模型加载和测试数据读取。这样可以让测试代码更清晰,也方便复用。
4.3 配置自动化测试工作流
现在我们把测试脚本集成到GitHub Actions中。
# .github/workflows/audio-sample-tests.yml name: Audio Sample Tests on: push: branches: [ main, develop ] paths: - 'models/**' - 'tests/integration/**' - '.github/workflows/audio-sample-tests.yml' pull_request: branches: [ main ] workflow_dispatch: # 允许手动触发 jobs: audio-tests: runs-on: ubuntu-latest container: image: pytorch/pytorch:2.2.0-cuda12.1-cudnn8-runtime options: --gpus all steps: - uses: actions/checkout@v4 - name: Cache test data uses: actions/cache@v3 with: path: test_data/audio_samples key: ${{ runner.os }}-test-data-${{ hashFiles('scripts/download_test_data.py') }} - name: Prepare test data run: | python scripts/download_test_data.py # 检查测试数据是否完整 if [ ! -f "test_data/audio_samples/manifest.json" ]; then echo "Test data preparation failed" exit 1 fi - name: Set up Python environment run: | apt-get update && apt-get install -y libsndfile1 ffmpeg pip install --upgrade pip pip install qwen-asr[vllm] transformers torchaudio pip install pytest pytest-cov pytest-xdist pip install jiwer # 用于计算WER - name: Run audio sample tests run: | python -m pytest tests/integration/test_audio_samples.py \ -v \ --cov=qwen_asr \ --cov-report=xml \ --cov-report=html \ -n auto # 使用所有CPU核心并行运行测试 - name: Upload test coverage uses: codecov/codecov-action@v3 with: file: ./coverage.xml flags: unittests - name: Upload test report if: always() uses: actions/upload-artifact@v3 with: name: test-report path: | coverage.xml htmlcov/ retention-days: 30这个工作流有几个值得注意的地方:
- 我们用了带GPU的Docker容器,因为语音识别模型在GPU上运行更快。GitHub Actions的runner提供了GPU支持,但需要显式指定。
- 使用了缓存来存储测试数据,避免每次运行都重新下载。
- 安装了
jiwer库来计算词错误率(WER),这是语音识别领域标准的评估指标。 - 用了
pytest-xdist来并行运行测试,加快测试速度。 - 测试完成后会上传覆盖率报告和测试报告,方便后续分析。
5. 性能基准测试
5.1 设计性能测试方案
性能测试和功能测试不太一样,它关注的是模型的处理速度、资源占用、并发能力等指标。对于Qwen3-ASR-0.6B这种面向生产环境的模型,性能测试特别重要。
# tests/performance/benchmark_throughput.py import time import torch import asyncio from concurrent.futures import ThreadPoolExecutor from qwen_asr import Qwen3ASRModel import numpy as np from pathlib import Path import json class PerformanceBenchmark: def __init__(self, model_path="Qwen/Qwen3-ASR-0.6B"): self.model_path = model_path self.model = None def setup_model(self, use_vllm=True): """设置模型,可以选择使用vLLM后端""" if use_vllm: # vLLM后端通常性能更好 self.model = Qwen3ASRModel.LLM( model=self.model_path, gpu_memory_utilization=0.8, max_inference_batch_size=128, max_new_tokens=4096, ) else: # Transformers后端 self.model = Qwen3ASRModel.from_pretrained( self.model_path, dtype=torch.bfloat16, device_map="auto", max_inference_batch_size=32, max_new_tokens=256, ) def benchmark_single_audio(self, audio_path, num_runs=10): """基准测试单个音频的处理性能""" if not self.model: self.setup_model() latencies = [] for i in range(num_runs): start_time = time.time() results = self.model.transcribe( audio=audio_path, language=None, # 自动检测 ) end_time = time.time() latency = end_time - start_time latencies.append(latency) # 预热后跳过第一次运行 if i == 0: continue # 去掉第一次运行(预热) latencies = latencies[1:] if len(latencies) > 1 else latencies stats = { "audio_duration": self._get_audio_duration(audio_path), "num_runs": len(latencies), "avg_latency": np.mean(latencies), "min_latency": np.min(latencies), "max_latency": np.max(latencies), "std_latency": np.std(latencies), "rtf": np.mean(latencies) / self._get_audio_duration(audio_path) if latencies else 0, } return stats def benchmark_batch_processing(self, audio_paths, batch_sizes=[1, 4, 8, 16]): """测试批处理性能""" if not self.model: self.setup_model() results = {} for batch_size in batch_sizes: print(f"Testing batch size: {batch_size}") # 分批处理音频 batches = [audio_paths[i:i+batch_size] for i in range(0, len(audio_paths), batch_size)] batch_latencies = [] for batch in batches: start_time = time.time() results_list = self.model.transcribe( audio=batch, language=None, ) end_time = time.time() batch_latency = end_time - start_time batch_latencies.append(batch_latency) avg_latency = np.mean(batch_latencies) total_audio_duration = sum(self._get_audio_duration(path) for path in audio_paths) results[batch_size] = { "avg_batch_latency": avg_latency, "throughput": len(audio_paths) / sum(batch_latencies), "rtf": avg_latency / (total_audio_duration / len(batches)), } return results def benchmark_concurrent_requests(self, audio_path, num_concurrent=32): """测试并发请求处理能力""" if not self.model: self.setup_model(use_vllm=True) # vLLM更适合高并发 async def transcribe_async(): results = await self.model.transcribe_async( audio=audio_path, language=None, ) return results async def run_concurrent(num_requests): tasks = [transcribe_async() for _ in range(num_requests)] start_time = time.time() results = await asyncio.gather(*tasks) end_time = time.time() return end_time - start_time # 运行异步测试 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) concurrency_results = {} for concurrency in [1, 8, 16, 32, 64, 128]: if concurrency > num_concurrent: break print(f"Testing concurrency: {concurrency}") try: total_time = loop.run_until_complete(run_concurrent(concurrency)) concurrency_results[concurrency] = { "total_time": total_time, "throughput": concurrency / total_time, "avg_latency_per_request": total_time / concurrency, } except Exception as e: print(f"Failed at concurrency {concurrency}: {e}") break loop.close() return concurrency_results def _get_audio_duration(self, audio_path): """获取音频时长(简化版)""" # 实际项目中应该使用librosa或torchaudio来准确获取时长 import wave try: with wave.open(audio_path, 'rb') as wav_file: frames = wav_file.getnframes() rate = wav_file.getframerate() return frames / float(rate) except: # 如果不是wav文件,返回估计值 return 5.0 # 默认5秒 def run_full_benchmark(self, test_audio_dir="test_data/audio_samples"): """运行完整的性能基准测试""" # 收集测试音频 audio_dir = Path(test_audio_dir) audio_files = list(audio_dir.glob("*.wav"))[:10] # 最多10个文件 if not audio_files: print("No audio files found for benchmarking") return print(f"Found {len(audio_files)} audio files for benchmarking") benchmark_results = { "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "model": self.model_path, "single_audio_tests": [], "batch_processing": {}, "concurrent_requests": {}, } # 单音频测试 print("\n=== Single Audio Tests ===") for audio_file in audio_files[:3]: # 测试前3个文件 print(f"Testing {audio_file.name}...") stats = self.benchmark_single_audio(str(audio_file)) stats["audio_file"] = audio_file.name benchmark_results["single_audio_tests"].append(stats) print(f" Avg Latency: {stats['avg_latency']:.3f}s") print(f" RTF: {stats['rtf']:.3f}") # 批处理测试 print("\n=== Batch Processing Tests ===") batch_results = self.benchmark_batch_processing( [str(f) for f in audio_files[:8]] # 用8个文件测试批处理 ) benchmark_results["batch_processing"] = batch_results for batch_size, stats in batch_results.items(): print(f" Batch Size {batch_size}: Throughput = {stats['throughput']:.2f} audios/s") # 并发测试(只测试一个音频) print("\n=== Concurrent Request Tests ===") if audio_files: concurrency_results = self.benchmark_concurrent_requests( str(audio_files[0]), num_concurrent=64 ) benchmark_results["concurrent_requests"] = concurrency_results for concurrency, stats in concurrency_results.items(): print(f" Concurrency {concurrency}: Throughput = {stats['throughput']:.2f} requests/s") # 保存结果 output_file = f"benchmark_results_{int(time.time())}.json" with open(output_file, 'w') as f: json.dump(benchmark_results, f, indent=2) print(f"\nBenchmark results saved to {output_file}") return benchmark_results if __name__ == "__main__": benchmark = PerformanceBenchmark() results = benchmark.run_full_benchmark()这个性能测试类包含了三个主要的测试场景:单音频处理性能、批处理性能、并发请求处理能力。每个测试都记录了关键的性能指标,比如延迟、吞吐量、实时因子(RTF)等。
5.2 配置性能测试工作流
性能测试通常比较耗时,而且需要GPU资源,所以我们单独配置一个工作流。
# .github/workflows/performance-tests.yml name: Performance Benchmark on: schedule: - cron: '0 3 * * 0' # 每周日凌晨3点运行 workflow_dispatch: # 允许手动触发 push: branches: [ main ] paths: - 'models/**' - '.github/workflows/performance-tests.yml' jobs: performance-benchmark: runs-on: gpu-large # 使用大GPU实例 timeout-minutes: 120 # 性能测试可能较长时间 steps: - uses: actions/checkout@v4 - name: Set up GPU environment run: | nvidia-smi # 验证GPU可用性 python -c "import torch; print(f'CUDA available: {torch.cuda.is_available()}')" - name: Install dependencies run: | pip install --upgrade pip pip install torch torchaudio --index-url https://download.pytorch.org/whl/cu118 pip install qwen-asr[vllm] transformers pip install numpy pandas matplotlib pip install jiwer librosa # 用于音频处理 - name: Run performance benchmarks run: | python tests/performance/benchmark_throughput.py # 生成性能报告 python scripts/generate_performance_report.py - name: Compare with baseline run: | # 与历史性能数据对比 python scripts/compare_performance.py \ --current benchmark_results_latest.json \ --baseline benchmark_results_baseline.json - name: Upload benchmark results uses: actions/upload-artifact@v3 with: name: performance-results path: | benchmark_results_*.json performance_report.html retention-days: 90 - name: Check for performance regression run: | # 检查是否有性能回归 python scripts/check_performance_regression.py continue-on-error: true # 性能波动是正常的,不阻止工作流完成这个性能测试工作流有几个特点:
- 用了
gpu-largerunner,确保有足够的GPU内存来运行高并发测试。 - 设置了120分钟的超时时间,因为性能测试可能比较耗时。
- 每周自动运行一次,而不是每次提交都运行,避免浪费资源。
- 包含了性能对比和回归检查,可以及时发现性能下降的问题。
6. 回归测试与持续监控
6.1 实现回归测试
回归测试的目的是确保新的改动不会破坏已有的功能。对于语音识别模型来说,我们需要跟踪识别准确率的变化。
# tests/regression/test_regression.py import pytest import json import numpy as np from pathlib import Path from datetime import datetime from qwen_asr import Qwen3ASRModel import torch class RegressionTester: def __init__(self, baseline_file="regression_baseline.json"): self.baseline_file = Path(baseline_file) self.baseline_data = self._load_baseline() self.model = None def _load_baseline(self): """加载基线数据""" if self.baseline_file.exists(): with open(self.baseline_file, 'r') as f: return json.load(f) return {} def setup_model(self, model_path="Qwen/Qwen3-ASR-0.6B"): """设置测试模型""" self.model = Qwen3ASRModel.from_pretrained( model_path, dtype=torch.bfloat16, device_map="auto", max_inference_batch_size=8, max_new_tokens=256, ) def run_regression_test(self, test_suite="test_data/regression_suite.json"): """运行回归测试""" if not self.model: self.setup_model() # 加载测试套件 with open(test_suite, 'r') as f: test_cases = json.load(f) current_results = { "timestamp": datetime.now().isoformat(), "model": "Qwen/Qwen3-ASR-0.6B", "test_cases": [] } all_passed = True regression_issues = [] for test_case in test_cases: audio_file = test_case["audio_file"] expected_text = test_case["expected_text"] language = test_case.get("language") threshold = test_case.get("similarity_threshold", 0.9) # 运行识别 results = self.model.transcribe( audio=audio_file, language=language, ) if not results: test_result = { "audio_file": audio_file, "status": "FAILED", "error": "No transcription results" } all_passed = False regression_issues.append(f"{audio_file}: No results") else: transcribed_text = results[0].text.strip() similarity = self._calculate_similarity(transcribed_text, expected_text) test_result = { "audio_file": audio_file, "expected_text": expected_text, "transcribed_text": transcribed_text, "similarity": similarity, "threshold": threshold, "status": "PASS" if similarity >= threshold else "FAIL" } if similarity < threshold: all_passed = False regression_issues.append( f"{audio_file}: Similarity {similarity:.3f} < {threshold}" ) current_results["test_cases"].append(test_result) # 与基线对比 comparison = self._compare_with_baseline(current_results) # 保存当前结果 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") result_file = f"regression_results_{timestamp}.json" with open(result_file, 'w') as f: json.dump(current_results, f, indent=2) return { "all_passed": all_passed, "regression_issues": regression_issues, "comparison": comparison, "result_file": result_file } def _calculate_similarity(self, text1, text2): """计算文本相似度""" # 使用更精确的相似度计算方法 from jiwer import wer # 计算词错误率 error_rate = wer(text2, text1) similarity = 1 - error_rate return max(similarity, 0.0) # 确保非负 def _compare_with_baseline(self, current_results): """与基线数据对比""" if not self.baseline_data: return {"status": "NO_BASELINE"} comparison = { "status": "COMPARED", "improved": 0, "regressed": 0, "unchanged": 0, "details": [] } # 找到对应的基线测试用例 baseline_cases = {} for case in self.baseline_data.get("test_cases", []): baseline_cases[case["audio_file"]] = case for current_case in current_results["test_cases"]: audio_file = current_case["audio_file"] if audio_file in baseline_cases: baseline_case = baseline_cases[audio_file] baseline_similarity = baseline_case.get("similarity", 0) current_similarity = current_case.get("similarity", 0) diff = current_similarity - baseline_similarity detail = { "audio_file": audio_file, "baseline_similarity": baseline_similarity, "current_similarity": current_similarity, "difference": diff } if diff > 0.01: # 提升超过1% comparison["improved"] += 1 detail["trend"] = "IMPROVED" elif diff < -0.01: # 下降超过1% comparison["regressed"] += 1 detail["trend"] = "REGRESSED" else: comparison["unchanged"] += 1 detail["trend"] = "UNCHANGED" comparison["details"].append(detail) return comparison def update_baseline(self, result_file): """更新基线数据""" with open(result_file, 'r') as f: new_results = json.load(f) # 只保留关键信息作为基线 baseline = { "timestamp": new_results["timestamp"], "model": new_results["model"], "test_cases": [] } for case in new_results["test_cases"]: baseline_case = { "audio_file": case["audio_file"], "expected_text": case.get("expected_text"), "similarity": case.get("similarity", 0) } baseline["test_cases"].append(baseline_case) with open(self.baseline_file, 'w') as f: json.dump(baseline, f, indent=2) print(f"Baseline updated from {result_file}") def test_regression_suite(): """运行回归测试套件""" tester = RegressionTester() result = tester.run_regression_test() print(f"\nRegression Test Results:") print(f"All passed: {result['all_passed']}") print(f"Issues found: {len(result['regression_issues'])}") if result['regression_issues']: print("\nRegression Issues:") for issue in result['regression_issues']: print(f" - {issue}") if result['comparison']['status'] == "COMPARED": print(f"\nComparison with baseline:") print(f" Improved: {result['comparison']['improved']}") print(f" Regressed: {result['comparison']['regressed']}") print(f" Unchanged: {result['comparison']['unchanged']}") # 如果发现回归,测试失败 assert result['all_passed'], f"Regression test failed: {result['regression_issues']}" return result if __name__ == "__main__": # 运行回归测试 result = test_regression_suite() # 如果测试通过,可以选择更新基线 if result['all_passed'] and len(result['regression_issues']) == 0: print("\nAll tests passed. Updating baseline...") tester = RegressionTester() tester.update_baseline(result['result_file'])这个回归测试系统会跟踪识别准确率的变化,并与基线数据对比。如果发现准确率下降超过一定阈值,就会标记为回归问题。
6.2 配置持续监控
除了回归测试,我们还可以设置持续监控,定期检查模型的各项指标。
# .github/workflows/monitoring.yml name: Model Monitoring on: schedule: - cron: '0 */6 * * *' # 每6小时运行一次 workflow_dispatch: jobs: model-monitoring: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.10' - name: Install monitoring dependencies run: | pip install qwen-asr torch torchaudio pip install pandas numpy matplotlib pip install prometheus-client # 用于指标收集 - name: Run model health check run: | python scripts/monitoring/health_check.py # 检查模型是否能正常加载和推理 python scripts/monitoring/model_availability.py - name: Collect performance metrics run: | # 收集性能指标 python scripts/monitoring/collect_metrics.py \ --output metrics.json # 与历史数据对比 python scripts/monitoring/analyze_trends.py \ --current metrics.json \ --history historical_metrics/ - name: Check for anomalies run: | # 检查指标异常 python scripts/monitoring/detect_anomalies.py \ --metrics metrics.json \ --thresholds anomaly_thresholds.json continue-on-error: true - name: Send notification if issues found if: failure() run: | # 发送通知(例如到Slack或邮件) python scripts/monitoring/send_alert.py \ --message "Model monitoring detected issues" - name: Update monitoring dashboard run: | # 更新监控数据看板 python scripts/monitoring/update_dashboard.py \ --metrics metrics.json \ --output dashboard.json - name: Upload monitoring results uses: actions/upload-artifact@v3 with: name: monitoring-results path: | metrics.json dashboard.json health_check_report.html retention-days: 30这个监控工作流会定期检查模型的健康状况,收集性能指标,检测异常,并在发现问题时发送通知。这样就能及时发现潜在的问题,比如模型服务不可用、性能下降、准确率波动等。
7. 总结
把Qwen3-ASR-0.6B和GitHub Actions结合起来做自动化测试,确实能省不少事。以前要手动测试各种场景,现在全都自动化了,每次代码更新都能自动跑一遍完整的测试套件。
从实践来看,这套方案有几个明显的好处。一是测试覆盖全面了,语音样本测试、性能基准测试、回归测试都覆盖到了,不用担心漏测某个场景。二是测试结果可靠,因为每次都在相同的环境下运行,结果可比性强。三是能及时发现问