File: context.pyx

package info (click to toggle)
python-av 16.0.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,684 kB
  • sloc: python: 7,607; sh: 182; ansic: 174; makefile: 135
file content (174 lines) | stat: -rw-r--r-- 5,357 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
cimport libav as lib

import weakref

from av.audio.frame cimport alloc_audio_frame
from av.dictionary cimport _Dictionary
from av.dictionary import Dictionary
from av.error cimport err_check
from av.filter.link cimport alloc_filter_pads
from av.frame cimport Frame
from av.utils cimport avrational_to_fraction
from av.video.frame cimport alloc_video_frame


cdef object _cinit_sentinel = object()


cdef FilterContext wrap_filter_context(Graph graph, Filter filter, lib.AVFilterContext *ptr):
    cdef FilterContext self = FilterContext(_cinit_sentinel)
    self._graph = weakref.ref(graph)
    self.filter = filter
    self.ptr = ptr
    return self


cdef class FilterContext:
    def __cinit__(self, sentinel):
        if sentinel is not _cinit_sentinel:
            raise RuntimeError("cannot construct FilterContext")

    def __repr__(self):
        if self.ptr != NULL:
            name = repr(self.ptr.name) if self.ptr.name != NULL else "<NULL>"
        else:
            name = "None"

        parent = self.filter.ptr.name if self.filter and self.filter.ptr != NULL else None
        return f"<av.FilterContext {name} of {parent!r} at 0x{id(self):x}>"

    @property
    def name(self):
        if self.ptr.name != NULL:
            return self.ptr.name

    @property
    def inputs(self):
        if self._inputs is None:
            self._inputs = alloc_filter_pads(self.filter, self.ptr.input_pads, True, self)
        return self._inputs

    @property
    def outputs(self):
        if self._outputs is None:
            self._outputs = alloc_filter_pads(self.filter, self.ptr.output_pads, False, self)
        return self._outputs

    def init(self, args=None, **kwargs):
        if self.inited:
            raise ValueError("already inited")
        if args and kwargs:
            raise ValueError("cannot init from args and kwargs")

        cdef _Dictionary dict_ = None
        cdef char *c_args = NULL
        if args or not kwargs:
            if args:
                c_args = args
            err_check(lib.avfilter_init_str(self.ptr, c_args))
        else:
            dict_ = Dictionary(kwargs)
            err_check(lib.avfilter_init_dict(self.ptr, &dict_.ptr))

        self.inited = True
        if dict_:
            raise ValueError(f"unused config: {', '.join(sorted(dict_))}")

    def link_to(self, FilterContext input_, int output_idx=0, int input_idx=0):
        err_check(lib.avfilter_link(self.ptr, output_idx, input_.ptr, input_idx))
    
    @property
    def graph(self):
        if (graph := self._graph()):
            return graph
        else:
            raise RuntimeError("graph is unallocated")

    def push(self, Frame frame):
        cdef int res

        if frame is None:
            with nogil:
                res = lib.av_buffersrc_write_frame(self.ptr, NULL)
            err_check(res)
            return
        elif self.filter.name in ("abuffer", "buffer"):
            with nogil:
                res = lib.av_buffersrc_write_frame(self.ptr, frame.ptr)
            err_check(res)
            return

        # Delegate to the input.
        if len(self.inputs) != 1:
            raise ValueError(
                f"cannot delegate push without single input; found {len(self.inputs)}"
            )
        if not self.inputs[0].link:
            raise ValueError("cannot delegate push without linked input")
        self.inputs[0].linked.context.push(frame)

    def pull(self):
        cdef Frame frame
        cdef int res

        if self.filter.name == "buffersink":
            frame = alloc_video_frame()
        elif self.filter.name == "abuffersink":
            frame = alloc_audio_frame()
        else:
            # Delegate to the output.
            if len(self.outputs) != 1:
                raise ValueError(
                    f"cannot delegate pull without single output; found {len(self.outputs)}"
                )
            if not self.outputs[0].link:
                raise ValueError("cannot delegate pull without linked output")
            return self.outputs[0].linked.context.pull()

        self.graph.configure()

        with nogil:
            res = lib.av_buffersink_get_frame(self.ptr, frame.ptr)
        err_check(res)

        frame._init_user_attributes()
        frame.time_base = avrational_to_fraction(&self.ptr.inputs[0].time_base)
        return frame

    def process_command(self, cmd, arg=None, int res_len=1024, int flags=0):
        if not cmd:
            raise ValueError("Invalid cmd")

        cdef char *c_cmd = NULL
        cdef char *c_arg = NULL

        c_cmd = cmd
        if arg is not None:
            c_arg = arg

        cdef char *c_res = NULL
        cdef int ret
        cdef bytearray res_buf = None
        cdef unsigned char[:] view
        cdef bytes b
        cdef int nul

        if res_len > 0:
            res_buf = bytearray(res_len)
            view = res_buf
            c_res = <char*>&view[0]
        else:
            c_res = NULL

        with nogil:
            ret = lib.avfilter_process_command(self.ptr, c_cmd, c_arg, c_res, res_len, flags)
        err_check(ret)

        if res_buf is not None:
            b = bytes(res_buf)
            nul = b.find(b'\x00')
            if nul >= 0:
                b = b[:nul]
            if b:
                return b.decode("utf-8", "strict")
        return None