""" Audio Processing Module This module provides comprehensive audio processing capabilities including format conversion, quality enhancement, and preprocessing for the speech translation system. """ import os import logging from typing import Optional, Union, Tuple, List from pathlib import Path import numpy as np import librosa import soundfile as sf from pydub import AudioSegment from scipy import signal import torch import torchaudio from ..config import SAMPLE_RATE, MAX_AUDIO_DURATION, AUDIO_FORMATS class AudioProcessor: """Handles audio file processing, conversion, and enhancement.""" def __init__(self, target_sample_rate: int = SAMPLE_RATE): """ Initialize the audio processor. Args: target_sample_rate: Target sample rate for processing """ self.target_sample_rate = target_sample_rate self.max_duration = MAX_AUDIO_DURATION self.supported_formats = AUDIO_FORMATS self.logger = logging.getLogger(__name__) def load_audio( self, audio_path: Union[str, Path], normalize: bool = True, mono: bool = True ) -> np.ndarray: """ Load audio file and convert to target format. Args: audio_path: Path to audio file normalize: Whether to normalize audio amplitude mono: Whether to convert to mono Returns: Audio data as numpy array """ audio_path = Path(audio_path) if not audio_path.exists(): raise FileNotFoundError(f"Audio file not found: {audio_path}") if audio_path.suffix.lower() not in self.supported_formats: raise ValueError(f"Unsupported audio format: {audio_path.suffix}") try: self.logger.debug(f"Loading audio: {audio_path}") # Load audio using librosa (handles most formats) audio_data, sample_rate = librosa.load( str(audio_path), sr=self.target_sample_rate, mono=mono, dtype=np.float32 ) # Validate duration duration = len(audio_data) / self.target_sample_rate if duration > self.max_duration: self.logger.warning(f"Audio duration ({duration:.1f}s) exceeds maximum " f"({self.max_duration}s). Truncating.") audio_data = audio_data[:int(self.max_duration * self.target_sample_rate)] # Normalize amplitude if requested if normalize: audio_data = self.normalize_audio(audio_data) self.logger.debug(f"Loaded audio: duration={duration:.2f}s, " f"sample_rate={self.target_sample_rate}, shape={audio_data.shape}") return audio_data except Exception as e: self.logger.error(f"Failed to load audio {audio_path}: {str(e)}") raise RuntimeError(f"Audio loading failed: {str(e)}") def save_audio( self, audio_data: np.ndarray, output_path: Union[str, Path], sample_rate: Optional[int] = None, format: Optional[str] = None ) -> None: """ Save audio data to file. Args: audio_data: Audio data as numpy array output_path: Output file path sample_rate: Sample rate (uses target_sample_rate if None) format: Audio format (inferred from extension if None) """ output_path = Path(output_path) sample_rate = sample_rate or self.target_sample_rate try: # Create output directory if needed output_path.parent.mkdir(parents=True, exist_ok=True) # Determine format from extension if not specified if format is None: format = output_path.suffix.lower().lstrip('.') # Ensure audio data is in correct range for format if format in ['wav', 'flac']: # For lossless formats, keep full precision sf.write(str(output_path), audio_data, sample_rate, format=format.upper()) else: # For compressed formats, use pydub self._save_with_pydub(audio_data, output_path, sample_rate, format) self.logger.debug(f"Saved audio to: {output_path}") except Exception as e: self.logger.error(f"Failed to save audio to {output_path}: {str(e)}") raise RuntimeError(f"Audio saving failed: {str(e)}") def _save_with_pydub( self, audio_data: np.ndarray, output_path: Path, sample_rate: int, format: str ) -> None: """Save audio using pydub for compressed formats.""" # Convert to 16-bit PCM for pydub audio_16bit = (audio_data * 32767).astype(np.int16) # Create AudioSegment audio_segment = AudioSegment( audio_16bit.tobytes(), frame_rate=sample_rate, sample_width=2, channels=1 ) # Export with format-specific settings export_params = {} if format == 'mp3': export_params['bitrate'] = '192k' elif format == 'ogg': export_params['codec'] = 'libvorbis' audio_segment.export(str(output_path), format=format, **export_params) def convert_format( self, input_path: Union[str, Path], output_path: Union[str, Path], target_format: str = 'wav' ) -> None: """ Convert audio file to different format. Args: input_path: Input audio file path output_path: Output audio file path target_format: Target audio format """ audio_data = self.load_audio(input_path) # Update output path extension if needed output_path = Path(output_path) if output_path.suffix.lower() != f'.{target_format}': output_path = output_path.with_suffix(f'.{target_format}') self.save_audio(audio_data, output_path, format=target_format) self.logger.info(f"Converted {input_path} to {output_path} ({target_format})") def normalize_audio(self, audio_data: np.ndarray, target_db: float = -20.0) -> np.ndarray: """ Normalize audio amplitude. Args: audio_data: Input audio data target_db: Target RMS level in dB Returns: Normalized audio data """ # Calculate RMS rms = np.sqrt(np.mean(audio_data ** 2)) if rms > 0: # Convert target dB to linear scale target_linear = 10 ** (target_db / 20.0) # Calculate scaling factor scale_factor = target_linear / rms # Apply scaling with clipping prevention normalized = audio_data * scale_factor normalized = np.clip(normalized, -0.95, 0.95) return normalized return audio_data def remove_silence( self, audio_data: np.ndarray, threshold_db: float = -40.0, frame_length: int = 2048, hop_length: int = 512 ) -> np.ndarray: """ Remove silence from audio. Args: audio_data: Input audio data threshold_db: Silence threshold in dB frame_length: Frame length for analysis hop_length: Hop length for analysis Returns: Audio data with silence removed """ # Calculate frame-wise energy frames = librosa.util.frame( audio_data, frame_length=frame_length, hop_length=hop_length ) energy = np.sum(frames ** 2, axis=0) # Convert to dB energy_db = librosa.power_to_db(energy) # Find non-silent frames non_silent = energy_db > threshold_db if not np.any(non_silent): self.logger.warning("No non-silent frames found, returning original audio") return audio_data # Convert frame indices to sample indices start_frame = np.argmax(non_silent) end_frame = len(non_silent) - np.argmax(non_silent[::-1]) - 1 start_sample = start_frame * hop_length end_sample = min(len(audio_data), (end_frame + 1) * hop_length + frame_length) return audio_data[start_sample:end_sample] def apply_noise_reduction( self, audio_data: np.ndarray, noise_factor: float = 0.1 ) -> np.ndarray: """ Apply basic noise reduction using spectral subtraction. Args: audio_data: Input audio data noise_factor: Noise reduction factor (0.0 to 1.0) Returns: Noise-reduced audio data """ # Compute STFT stft = librosa.stft(audio_data) magnitude, phase = np.abs(stft), np.angle(stft) # Estimate noise from first few frames (assume silence) noise_frames = min(10, magnitude.shape[1] // 4) noise_spectrum = np.mean(magnitude[:, :noise_frames], axis=1, keepdims=True) # Apply spectral subtraction magnitude_clean = magnitude - (noise_factor * noise_spectrum) magnitude_clean = np.maximum(magnitude_clean, 0.1 * magnitude) # Reconstruct signal stft_clean = magnitude_clean * np.exp(1j * phase) audio_clean = librosa.istft(stft_clean) return audio_clean def resample_audio( self, audio_data: np.ndarray, original_sr: int, target_sr: int ) -> np.ndarray: """ Resample audio to different sample rate. Args: audio_data: Input audio data original_sr: Original sample rate target_sr: Target sample rate Returns: Resampled audio data """ if original_sr == target_sr: return audio_data return librosa.resample(audio_data, orig_sr=original_sr, target_sr=target_sr) def split_audio( self, audio_data: np.ndarray, chunk_duration: float = 30.0, overlap: float = 0.5 ) -> List[np.ndarray]: """ Split audio into overlapping chunks. Args: audio_data: Input audio data chunk_duration: Duration of each chunk in seconds overlap: Overlap between chunks (0.0 to 1.0) Returns: List of audio chunks """ chunk_samples = int(chunk_duration * self.target_sample_rate) overlap_samples = int(chunk_samples * overlap) step_samples = chunk_samples - overlap_samples chunks = [] start = 0 while start < len(audio_data): end = min(start + chunk_samples, len(audio_data)) chunk = audio_data[start:end] # Pad last chunk if needed if len(chunk) < chunk_samples: chunk = np.pad(chunk, (0, chunk_samples - len(chunk))) chunks.append(chunk) if end >= len(audio_data): break start += step_samples return chunks def get_audio_info(self, audio_path: Union[str, Path]) -> dict: """ Get audio file information. Args: audio_path: Path to audio file Returns: Dictionary with audio information """ try: # Use librosa for detailed info audio_data, sample_rate = librosa.load(str(audio_path), sr=None) duration = len(audio_data) / sample_rate # Get file size file_size = Path(audio_path).stat().st_size info = { 'path': str(audio_path), 'duration': duration, 'sample_rate': sample_rate, 'channels': 1 if audio_data.ndim == 1 else audio_data.shape[0], 'samples': len(audio_data), 'file_size': file_size, 'format': Path(audio_path).suffix.lower(), 'bit_depth': 'float32', # librosa loads as float32 'rms_level': float(np.sqrt(np.mean(audio_data ** 2))), 'max_level': float(np.max(np.abs(audio_data))) } return info except Exception as e: self.logger.error(f"Failed to get audio info for {audio_path}: {str(e)}") raise RuntimeError(f"Audio info extraction failed: {str(e)}") class AudioValidator: """Validates audio files and data.""" def __init__(self, processor: AudioProcessor): """ Initialize audio validator. Args: processor: AudioProcessor instance """ self.processor = processor self.logger = logging.getLogger(__name__) def validate_audio_file(self, audio_path: Union[str, Path]) -> dict: """ Validate audio file. Args: audio_path: Path to audio file Returns: Dictionary with validation results """ validation_result = { 'valid': False, 'errors': [], 'warnings': [], 'info': {} } try: # Check if file exists audio_path = Path(audio_path) if not audio_path.exists(): validation_result['errors'].append(f"File does not exist: {audio_path}") return validation_result # Check file format if audio_path.suffix.lower() not in self.processor.supported_formats: validation_result['errors'].append( f"Unsupported format: {audio_path.suffix}" ) return validation_result # Get audio info info = self.processor.get_audio_info(audio_path) validation_result['info'] = info # Check duration if info['duration'] > self.processor.max_duration: validation_result['warnings'].append( f"Duration ({info['duration']:.1f}s) exceeds maximum " f"({self.processor.max_duration}s)" ) # Check sample rate if info['sample_rate'] < 8000: validation_result['warnings'].append( f"Low sample rate ({info['sample_rate']} Hz) may affect quality" ) # Check audio level if info['max_level'] < 0.01: validation_result['warnings'].append("Audio level is very low") elif info['max_level'] > 0.99: validation_result['warnings'].append("Audio may be clipped") # If we get here, file is valid validation_result['valid'] = True except Exception as e: validation_result['errors'].append(str(e)) return validation_result def validate_batch(self, audio_files: List[Union[str, Path]]) -> dict: """ Validate multiple audio files. Args: audio_files: List of audio file paths Returns: Dictionary with batch validation results """ results = {} valid_count = 0 for audio_file in audio_files: result = self.validate_audio_file(audio_file) results[str(audio_file)] = result if result['valid']: valid_count += 1 return { 'total_files': len(audio_files), 'valid_files': valid_count, 'invalid_files': len(audio_files) - valid_count, 'results': results }