"""Export service — caption and text document generation.""" from __future__ import annotations import os from dataclasses import dataclass, field from typing import Any import pysubs2 @dataclass class ExportSegment: """A segment ready for export.""" text: str start_ms: int end_ms: int speaker: str | None = None @dataclass class ExportRequest: """Input for export operations.""" segments: list[ExportSegment] = field(default_factory=list) speakers: dict[str, str] = field(default_factory=dict) # id → display_name format: str = "srt" # srt, vtt, ass, txt, md output_path: str = "" title: str = "" class ExportService: """Handles export to SRT, WebVTT, ASS, plain text, and Markdown.""" def export(self, request: ExportRequest) -> str: """Export segments to the requested format. Returns the output file path. """ fmt = request.format.lower() if fmt == "srt": return self._export_srt(request) elif fmt in ("vtt", "webvtt"): return self._export_vtt(request) elif fmt == "ass": return self._export_ass(request) elif fmt == "txt": return self._export_txt(request) elif fmt == "md": return self._export_md(request) else: raise ValueError(f"Unsupported export format: {fmt}") def _get_speaker_name(self, speaker: str | None, speakers: dict[str, str]) -> str: """Resolve speaker ID to display name.""" if not speaker: return "Unknown" return speakers.get(speaker, speaker) def _export_srt(self, request: ExportRequest) -> str: """Export to SubRip (.srt) format with speaker prefixes.""" subs = pysubs2.SSAFile() for seg in request.segments: name = self._get_speaker_name(seg.speaker, request.speakers) text = f"[{name}]: {seg.text}" if seg.speaker else seg.text event = pysubs2.SSAEvent( start=seg.start_ms, end=seg.end_ms, text=text, ) subs.append(event) path = request.output_path or "export.srt" subs.save(path, format_="srt") return path def _export_vtt(self, request: ExportRequest) -> str: """Export to WebVTT (.vtt) format with voice tags.""" subs = pysubs2.SSAFile() for seg in request.segments: name = self._get_speaker_name(seg.speaker, request.speakers) # WebVTT voice tags: text text = f"{seg.text}" if seg.speaker else seg.text event = pysubs2.SSAEvent( start=seg.start_ms, end=seg.end_ms, text=text, ) subs.append(event) path = request.output_path or "export.vtt" subs.save(path, format_="vtt") return path def _export_ass(self, request: ExportRequest) -> str: """Export to Advanced SubStation Alpha (.ass) with speaker styles.""" subs = pysubs2.SSAFile() # Create a style per speaker with distinct colors colors = [ "&H0000FFFF", # Yellow "&H00FF00FF", # Magenta "&H00FFFF00", # Cyan "&H000000FF", # Red "&H0000FF00", # Green "&H00FF0000", # Blue "&H0080FF80", # Light green "&H00FF8080", # Light blue ] speaker_styles: dict[str, str] = {} unique_speakers = sorted(set( seg.speaker for seg in request.segments if seg.speaker )) for i, spk in enumerate(unique_speakers): name = self._get_speaker_name(spk, request.speakers) style_name = name.replace(" ", "_") style = pysubs2.SSAStyle() style.primarycolor = pysubs2.Color(*self._parse_ass_color(colors[i % len(colors)])) style.fontsize = 20 style.bold = True subs.styles[style_name] = style speaker_styles[spk] = style_name for seg in request.segments: style = speaker_styles.get(seg.speaker or "", "Default") event = pysubs2.SSAEvent( start=seg.start_ms, end=seg.end_ms, text=seg.text, style=style, ) subs.append(event) path = request.output_path or "export.ass" subs.save(path, format_="ass") return path def _parse_ass_color(self, color_str: str) -> tuple[int, int, int, int]: """Parse ASS color string &HAABBGGRR to (r, g, b, a).""" # Strip &H prefix hex_str = color_str.replace("&H", "").replace("&h", "") val = int(hex_str, 16) a = (val >> 24) & 0xFF b = (val >> 16) & 0xFF g = (val >> 8) & 0xFF r = val & 0xFF return (r, g, b, a) def _export_txt(self, request: ExportRequest) -> str: """Export to plain text with speaker labels.""" lines: list[str] = [] if request.title: lines.append(request.title) lines.append("=" * len(request.title)) lines.append("") current_speaker: str | None = None for seg in request.segments: name = self._get_speaker_name(seg.speaker, request.speakers) if seg.speaker != current_speaker: if lines and lines[-1] != "": lines.append("") lines.append(f"{name}:") current_speaker = seg.speaker lines.append(f" {seg.text}") path = request.output_path or "export.txt" with open(path, "w", encoding="utf-8") as f: f.write("\n".join(lines) + "\n") return path def _export_md(self, request: ExportRequest) -> str: """Export to Markdown with speaker headers and timestamps.""" lines: list[str] = [] if request.title: lines.append(f"# {request.title}") lines.append("") current_speaker: str | None = None for seg in request.segments: name = self._get_speaker_name(seg.speaker, request.speakers) if seg.speaker != current_speaker: lines.append("") lines.append(f"**{name}** _{self._format_timestamp(seg.start_ms)}_") lines.append("") current_speaker = seg.speaker lines.append(seg.text) path = request.output_path or "export.md" with open(path, "w", encoding="utf-8") as f: f.write("\n".join(lines) + "\n") return path def _format_timestamp(self, ms: int) -> str: """Format milliseconds as H:MM:SS or M:SS.""" total_seconds = ms // 1000 h = total_seconds // 3600 m = (total_seconds % 3600) // 60 s = total_seconds % 60 if h > 0: return f"{h}:{m:02d}:{s:02d}" return f"{m}:{s:02d}" def make_export_request(payload: dict[str, Any]) -> ExportRequest: """Create an ExportRequest from IPC payload.""" segments = [ ExportSegment( text=seg["text"], start_ms=seg["start_ms"], end_ms=seg["end_ms"], speaker=seg.get("speaker"), ) for seg in payload.get("segments", []) ] return ExportRequest( segments=segments, speakers=payload.get("speakers", {}), format=payload.get("format", "srt"), output_path=payload.get("output_path", ""), title=payload.get("title", ""), )