From 2a7a315021d7003acc0b043463f029a34bb2cb02 Mon Sep 17 00:00:00 2001 From: LHAMNS Date: Sat, 24 May 2025 15:13:40 +1000 Subject: [PATCH] Add GPU pipeline modules --- converter/gpu_error_correction.py | 6 +- converter/gpu_frame_generator.py | 6 +- converter/video_raptor_encoder.py | 26 +++ requirements.txt | 5 + web_ui/server.py | 285 ++++++++++++++---------------- 5 files changed, 167 insertions(+), 161 deletions(-) create mode 100644 converter/video_raptor_encoder.py diff --git a/converter/gpu_error_correction.py b/converter/gpu_error_correction.py index 0270241..9618f0b 100644 --- a/converter/gpu_error_correction.py +++ b/converter/gpu_error_correction.py @@ -1,7 +1,5 @@ -""" -GPU-Accelerated Reed-Solomon Error Correction -High-performance CUDA implementation for NVIDIA GPUs -""" +"""GPU-accelerated Reed‑Solomon error correction using CUDA.""" + import numpy as np import cupy as cp diff --git a/converter/gpu_frame_generator.py b/converter/gpu_frame_generator.py index f627fe6..983a82a 100644 --- a/converter/gpu_frame_generator.py +++ b/converter/gpu_frame_generator.py @@ -1,7 +1,5 @@ -""" -GPU-Accelerated Frame Generation Pipeline -Direct GPU memory operations for maximum throughput -""" +"""GPU-based frame generation pipeline optimized for high throughput.""" + import numpy as np import cupy as cp diff --git a/converter/video_raptor_encoder.py b/converter/video_raptor_encoder.py new file mode 100644 index 0000000..4f4e035 --- /dev/null +++ b/converter/video_raptor_encoder.py @@ -0,0 +1,26 @@ +"""Helpers for metadata and sync frame generation for Raptor videos.""" + +import numpy as np + +class VideoRaptorEncoder: + """Simple metadata and sync frame generator""" + + def __init__(self, width: int, height: int, fps: int = 30): + self.width = width + self.height = height + self.fps = fps + + def create_metadata_frame(self, file_info: dict) -> np.ndarray: + frame = np.zeros((self.height, self.width, 3), dtype=np.uint8) + return frame + + def create_calibration_frame(self) -> np.ndarray: + frame = np.zeros((self.height, self.width, 3), dtype=np.uint8) + return frame + + def _add_sync_pattern(self, frame: np.ndarray) -> None: + step_y = max(1, self.height // 20) + step_x = max(1, self.width // 20) + frame[::step_y, :] = 255 + frame[:, ::step_x] = 255 + diff --git a/requirements.txt b/requirements.txt index e0887e5..a589af3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,5 +17,10 @@ cupy-cuda12x>=12.0.0 # For CUDA 11.x (comment above and uncomment below): # cupy-cuda11x>=11.0.0 +cupy-cuda12x==13.2.0 +numba==0.59.1 +pyldpc==0.5.2 +opencv-python>=4.9.0.0 + # Error correction (removed reedsolo, using GPU Raptor codes instead) # reedsolo>=1.5.4 # Replaced by GPU implementation diff --git a/web_ui/server.py b/web_ui/server.py index 26c7354..46a29ad 100644 --- a/web_ui/server.py +++ b/web_ui/server.py @@ -19,6 +19,12 @@ import subprocess from datetime import datetime import mimetypes +import hashlib +import numpy as np + +from converter.gpu_error_correction import get_optimal_error_corrector +from converter.gpu_frame_generator import GPUFrameGenerator +from converter.video_raptor_encoder import VideoRaptorEncoder # 添加项目根目录到路径 # Add project root to path @@ -153,7 +159,7 @@ def __init__(self, file_id, params, task_id=None): self.file_size = self.file_info["file_size"] self.original_filename = self.file_info["original_filename"] - self.output_path = OUTPUT_DIR / f"{self.original_filename}_{int(time.time())}.mp4" + self.output_path = OUTPUT_DIR / f"{self.original_filename}_{int(time.time())}.avi" # 计算视频参数 self.video_params = calculate_video_params( @@ -321,11 +327,11 @@ def _verify_output_video(self): return False, "输出文件大小为0" try: - # 首先尝试修复MP4文件 - logger.info(f"尝试修复MP4文件: {self.output_path}") + # 首先尝试修复AVI文件 + logger.info(f"尝试修复AVI文件: {self.output_path}") # 创建临时文件路径 - temp_output = Path(str(self.output_path) + ".fixed.mp4") + temp_output = Path(str(self.output_path) + ".fixed.avi") # 首先尝试将文件复制到新位置以修复可能的问题 fix_cmd = [ @@ -347,13 +353,13 @@ def _verify_output_video(self): if fix_result.returncode == 0 and temp_output.exists() and temp_output.stat().st_size > 0: # 修复成功,替换原文件 - logger.info(f"MP4文件修复成功: {self.output_path}") + logger.info(f"AVI文件修复成功: {self.output_path}") shutil.copy2(temp_output, self.output_path) temp_output.unlink() else: # 修复失败,记录错误 error_msg = fix_result.stderr.decode('utf-8', errors='ignore') - logger.warning(f"MP4文件修复失败: {error_msg}") + logger.warning(f"AVI文件修复失败: {error_msg}") if temp_output.exists(): temp_output.unlink() @@ -437,193 +443,166 @@ def _verify_output_video(self): return False, f"视频验证出错: {str(e)}" def _conversion_worker(self): - """转换工作线程""" + """GPU-aware conversion worker thread""" try: self._update_task_status("initializing") - - # 初始化帧生成器 - 可选择优化版本 - generator_class = OptimizedFrameGenerator if self.use_optimized_generator else FrameGenerator - self.frame_generator = generator_class( - resolution=self.resolution, - fps=self.fps, - color_count=self.color_count, - nine_to_one=self.nine_to_one - ) - - # 计算物理分辨率 - physical_width = self.video_params["physical_width"] + + # ---------- 1. GPU error-correction ---------- + if self.error_correction_enabled: + logger.info(f"Initializing Raptor error correction [{self.task_id}]") + self.error_correction = get_optimal_error_corrector(self.error_correction_ratio) + + # ---------- 2. Frame generator ---------- + try: + self.frame_generator = GPUFrameGenerator( + resolution=self.resolution, + fps=self.fps, + color_count=self.color_count, + nine_to_one=self.nine_to_one + ) + logger.info(f"Using GPU-accelerated frame generator [{self.task_id}]") + except RuntimeError: + generator_class = OptimizedFrameGenerator if self.use_optimized_generator else FrameGenerator + self.frame_generator = generator_class( + resolution=self.resolution, + fps=self.fps, + color_count=self.color_count, + nine_to_one=self.nine_to_one + ) + logger.info(f"Using CPU frame generator [{self.task_id}]") + + # ---------- 3. Encoder initialisation ---------- + physical_width = self.video_params["physical_width"] physical_height = self.video_params["physical_height"] - - # 初始化视频编码器 - self.video_encoder = StreamingVideoEncoder( + + # Metadata encoder (Raptor) for calibration & sync + self.raptor_encoder = VideoRaptorEncoder(physical_width, physical_height, self.fps) + + from converter.encoder import StreamingDirectAVIEncoder # lazy import + self.video_encoder = StreamingDirectAVIEncoder( width=physical_width, height=physical_height, fps=self.fps, - output_path=self.output_path, - quality=self.quality + output_path=self.output_path ) - - # 启动视频编码器 self.video_encoder.start() - - # 检查编码器是否正确启动 - if self.video_encoder.process is None or self.video_encoder.process.poll() is not None: - error_msg = "视频编码器未能正确启动" - logger.error(f"{error_msg} [{self.task_id}]") - self._update_task_status("error", error_msg) - socketio.emit('conversion_error', {"error": error_msg, "task_id": self.task_id}) - return - + + # ---------- 4. Metadata / calibration frames ---------- + if self.params.get("metadata_frames", True): + logger.info(f"Adding metadata frames [{self.task_id}]") + + file_info = { + 'filename': self.original_filename, + 'size': self.file_size, + 'checksum': hashlib.sha256(str(self.file_id).encode()).hexdigest()[:16], + 'total_symbols': 0 + } + + self.video_encoder.add_frame(self.raptor_encoder.create_metadata_frame(file_info)) + self.video_encoder.add_frame(self.raptor_encoder.create_calibration_frame()) + + sync_frame = np.zeros((physical_height, physical_width, 3), dtype=np.uint8) + self.raptor_encoder._add_sync_pattern(sync_frame) + self.video_encoder.add_frame(sync_frame) + + self.processed_frames = 3 + self._update_task_status("processing") - - # 初始化纠错编码器(如果启用) - if self.error_correction_enabled: - redundancy_bytes = max(1, min(200, int(255 * self.error_correction_ratio))) - logger.info(f"初始化Reed-Solomon编码器 [{self.task_id}], 冗余字节: {redundancy_bytes}") - self.error_correction = ReedSolomonEncoder(redundancy_bytes=redundancy_bytes) - - # 读取缓存文件 + + # ---------- 5. Read source data ---------- data_generator = cache_manager.read_cached_file(self.file_id) - - # 如果启用纠错,先对整个数据进行编码 + all_data = bytearray() + for chunk in data_generator: + if not self.running: + logger.info(f"Data collection interrupted [{self.task_id}]") + self._update_task_status("stopped") + return + all_data.extend(chunk) + + # ---------- 6. Error-correction ---------- if self.error_correction_enabled and self.error_correction: - logger.info(f"应用纠错编码 [{self.task_id}]...") + logger.info(f"Applying Raptor error correction [{self.task_id}]…") self._update_task_status("error_correction") - - # 读取所有数据 - all_data = bytearray() - for chunk in data_generator: - if not self.running: - logger.info(f"数据收集过程中任务被停止 [{self.task_id}]") - self._update_task_status("stopped") - return - all_data.extend(chunk) - - # 应用纠错编码 try: - encoded_data = self.error_correction.encode_data(all_data) - logger.info(f"纠错编码完成 [{self.task_id}], 原始数据: {len(all_data)} 字节, 编码后: {len(encoded_data)} 字节") - # 使用编码后的数据 + encoded_data, stats = self.error_correction.process_file_data(bytes(all_data)) + logger.info( + f"Raptor encoding complete [{self.task_id}]: " + f"{stats['throughput_mbps']:.1f} MB/s, " + f"redundancy: {stats['redundancy_ratio']:.2%}" + ) data_source = encoded_data except Exception as e: - error_msg = f"纠错编码失败: {e}" + error_msg = f"Error correction failed: {e}" logger.error(f"{error_msg} [{self.task_id}]", exc_info=True) self._update_task_status("error", error_msg) socketio.emit('conversion_error', {"error": error_msg, "task_id": self.task_id}) return else: - # 直接使用原始数据 - data_source = data_generator - - self._update_task_status("converting") + data_source = bytes(all_data) - # 使用流水线处理数据 - pipeline_config = {"limits": {"memory_mb": 1024, "max_concurrent_tasks": 2}} - pipeline = ConversionPipeline(pipeline_config) - - if self.error_correction_enabled and self.error_correction: - pipeline.register_stage(ErrorCorrectionStage(self.error_correction)) - - pipeline.register_stage( - FrameGenerationStage(self.frame_generator, callback=self._frame_generated_callback) - ) - pipeline.register_stage(VideoEncodingStage(self.video_encoder)) + self._update_task_status("converting") - pipeline.execute(data_source, {"task_id": self.task_id}) + # ---------- 7. Frame generation & encoding ---------- + for frame in self.frame_generator.generate_frames_from_data( + data_source, callback=self._frame_generated_callback): + if not self.running: + logger.info(f"Frame generation interrupted [{self.task_id}]") + self._update_task_status("stopped") + return + if not self.video_encoder.add_frame(frame): + error_msg = "Failed to add frame to video encoder" + logger.error(f"{error_msg} [{self.task_id}]") + self._update_task_status("error", error_msg) + socketio.emit('conversion_error', {"error": error_msg, "task_id": self.task_id}) + return - frames_processed = self.processed_frames - - # 修复:更新实际总帧数为已处理的帧数,确保进度条显示正确 + # ---------- 8. Finalise ---------- if self.running: - self.total_frames = self.processed_frames - - # 更新任务进度记录中的总帧数 - with task_lock: - if self.task_id in task_registry: - progress = task_registry[self.task_id] - progress["total_frames"] = self.total_frames - - # 发送一次最终的进度更新让前端显示100% - socketio.emit('progress_update', { - "task_id": self.task_id, - "status": "finalizing", - "processed_frames": self.processed_frames, - "total_frames": self.processed_frames, - "fps": frames_processed / (time.time() - self.start_time) if (time.time() - self.start_time) > 0 else 0, - "eta": 0, - "preview_image": task_registry[self.task_id].get("preview_image") if self.task_id in task_registry else None, - "elapsed_time": time.time() - self.start_time, - "last_update": time.time() - }) - - # 完成编码 - if self.running: - logger.info(f"所有帧已处理 [{self.task_id}],等待编码完成...") + logger.info(f"All frames processed [{self.task_id}], finalizing…") self._update_task_status("finalizing") - try: - stats = None - if hasattr(self.video_encoder, 'stop'): - # VideoEncodingStage already stopped encoder, - # but call again for safety if still running. - stats = self.video_encoder.stop() - logger.info(f"编码已完成 [{self.task_id}], 统计: {stats}") - - # 验证输出文件 - is_valid, error_msg = self._verify_output_video() + stats = self.video_encoder.stop() + logger.info(f"Encoding complete [{self.task_id}]: {stats}") + + is_valid, err = self._verify_output_video() self.output_file_verified = is_valid - if not is_valid: - logger.error(f"输出文件验证失败 [{self.task_id}]: {error_msg}") - self._update_task_status("error", f"输出文件验证失败: {error_msg}") - socketio.emit('conversion_error', { - "error": f"输出文件验证失败: {error_msg}", - "task_id": self.task_id - }) + logger.error(f"Output verification failed [{self.task_id}]: {err}") + self._update_task_status("error", f"Output verification failed: {err}") + socketio.emit('conversion_error', + {"error": f"Output verification failed: {err}", + "task_id": self.task_id}) return - - # 更新任务进度 + self._update_task_status("completed", output_path=str(self.output_path)) - - # 发送完成通知 - socketio.emit('conversion_complete', { - "output_file": str(self.output_path), - "filename": self.output_path.name, - "duration": time.time() - self.start_time, - "frames": self.processed_frames, - "task_id": self.task_id - }) - + socketio.emit('conversion_complete', + {"output_file": str(self.output_path), + "filename": self.output_path.name, + "duration": time.time() - self.start_time, + "frames": self.processed_frames, + "task_id": self.task_id}) except Exception as e: - error_msg = f"编码完成过程出错: {e}" + error_msg = f"Encoding completion error: {e}" logger.error(f"{error_msg} [{self.task_id}]", exc_info=True) self._update_task_status("error", error_msg) - socketio.emit('conversion_error', {"error": error_msg, "task_id": self.task_id}) - + socketio.emit('conversion_error', + {"error": error_msg, "task_id": self.task_id}) + except Exception as e: - error_msg = f"转换过程出错: {e}" + error_msg = f"Conversion error: {e}" logger.error(f"{error_msg} [{self.task_id}]", exc_info=True) - - # 更新任务进度 self._update_task_status("error", error_msg) - - # 发送错误通知 socketio.emit('conversion_error', {"error": error_msg, "task_id": self.task_id}) - + finally: - # 确保标记任务为非运行状态 self.running = False - - # 确保视频编码器已停止 if hasattr(self, 'video_encoder') and self.video_encoder: try: - if hasattr(self.video_encoder, 'running') and self.video_encoder.running: + if getattr(self.video_encoder, 'running', False): self.video_encoder.stop() except Exception as e: - logger.error(f"停止视频编码器时出错 [{self.task_id}]: {e}") - - # 清理任务 - self.event.set() # 确保等待该任务的线程能继续 + logger.error(f"Error stopping video encoder [{self.task_id}]: {e}") + self.event.set() @app.route('/') @@ -883,14 +862,14 @@ def download_file_by_task(task_id): # 获取原始文件名 if task_id in conversion_tasks: original_filename = conversion_tasks[task_id].original_filename - filename = f"{os.path.splitext(original_filename)[0]}.mp4" + filename = f"{os.path.splitext(original_filename)[0]}.avi" else: filename = file_path.name # 获取MIME类型 mime_type, _ = mimetypes.guess_type(file_path) if not mime_type: - mime_type = 'application/octet-stream' + mime_type = 'video/x-msvideo' return send_file( file_path, @@ -915,7 +894,7 @@ def download_file_by_name(filename): # 获取MIME类型 mime_type, _ = mimetypes.guess_type(file_path) if not mime_type: - mime_type = 'application/octet-stream' + mime_type = 'video/x-msvideo' return send_file( file_path,