1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
|
from __future__ import annotations
from dataclasses import dataclass
from fractions import Fraction
from pathlib import Path
import av
from auto_editor.utils.log import Log
def mux(input: Path, output: Path, stream: int) -> None:
input_container = av.open(input, "r")
output_container = av.open(output, "w")
input_audio_stream = input_container.streams.audio[stream]
output_audio_stream = output_container.add_stream("pcm_s16le")
for frame in input_container.decode(input_audio_stream):
output_container.mux(output_audio_stream.encode(frame))
output_container.mux(output_audio_stream.encode(None))
output_container.close()
input_container.close()
@dataclass(slots=True, frozen=True)
class VideoStream:
width: int
height: int
codec: str
fps: Fraction
duration: float
sar: Fraction
time_base: Fraction | None
pix_fmt: str | None
color_range: int
color_space: int
color_primaries: int
color_transfer: int
bitrate: int
lang: str | None
@dataclass(slots=True, frozen=True)
class AudioStream:
codec: str
samplerate: int
layout: str
channels: int
duration: float
bitrate: int
lang: str | None
@dataclass(slots=True, frozen=True)
class SubtitleStream:
codec: str
ext: str
lang: str | None
@dataclass(slots=True, frozen=True)
class FileInfo:
path: Path
bitrate: int
duration: float
description: str | None
videos: tuple[VideoStream, ...]
audios: tuple[AudioStream, ...]
subtitles: tuple[SubtitleStream, ...]
def get_res(self) -> tuple[int, int]:
if self.videos:
return self.videos[0].width, self.videos[0].height
return 1920, 1080
def get_fps(self) -> Fraction:
if self.videos:
return self.videos[0].fps
return Fraction(30)
def get_sr(self) -> int:
if self.audios:
return self.audios[0].samplerate
return 48000
def __repr__(self) -> str:
return f"@{self.path.name}"
def initFileInfo(path: str, log: Log) -> FileInfo:
try:
cont = av.open(path, "r")
except av.error.FileNotFoundError:
log.error(f"Input file doesn't exist: {path}")
except av.error.IsADirectoryError:
log.error(f"Expected a media file, but got a directory: {path}")
except av.error.InvalidDataError:
log.error(f"Invalid data when processing: {path}")
videos: tuple[VideoStream, ...] = ()
audios: tuple[AudioStream, ...] = ()
subtitles: tuple[SubtitleStream, ...] = ()
for v in cont.streams.video:
if v.duration is not None and v.time_base is not None:
vdur = float(v.duration * v.time_base)
else:
vdur = 0.0
fps = v.average_rate
if (fps is None or fps < 1) and v.name in {"png", "mjpeg", "webp"}:
fps = Fraction(25)
if fps is None or fps == 0:
fps = Fraction(30)
sar = Fraction(1) if v.sample_aspect_ratio is None else v.sample_aspect_ratio
cc = v.codec_context
if v.name is None:
log.error(f"Can't detect codec for video stream {v}")
videos += (
VideoStream(
v.width,
v.height,
v.name,
fps,
vdur,
sar,
v.time_base,
getattr(v.format, "name", None),
cc.color_range,
cc.colorspace,
cc.color_primaries,
cc.color_trc,
0 if v.bit_rate is None else v.bit_rate,
v.language,
),
)
for a in cont.streams.audio:
adur = 0.0
if a.duration is not None and a.time_base is not None:
adur = float(a.duration * a.time_base)
a_cc = a.codec_context
audios += (
AudioStream(
a_cc.codec.canonical_name,
0 if a_cc.sample_rate is None else a_cc.sample_rate,
a.layout.name,
a_cc.channels,
adur,
0 if a_cc.bit_rate is None else a_cc.bit_rate,
a.language,
),
)
for s in cont.streams.subtitles:
codec = s.codec_context.name
sub_exts = {"mov_text": "srt", "ass": "ass", "webvtt": "vtt"}
ext = sub_exts.get(codec, "vtt")
subtitles += (SubtitleStream(codec, ext, s.language),)
desc = cont.metadata.get("description", None)
bitrate = 0 if cont.bit_rate is None else cont.bit_rate
dur = 0 if cont.duration is None else cont.duration / av.time_base
cont.close()
return FileInfo(Path(path), bitrate, dur, desc, videos, audios, subtitles)
|