File: inotify.py

package info (click to toggle)
cockpit-podman 121-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 277,516 kB
  • sloc: javascript: 695,995; cpp: 11,141; python: 7,835; sh: 923; makefile: 156; xml: 90
file content (60 lines) | stat: -rw-r--r-- 1,961 bytes parent folder | download | duplicates (8)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#
#
# Copyright (C) 2017 Red Hat, Inc.
# SPDX-License-Identifier: LGPL-2.1-or-later


import ctypes
import os
import struct
import sys

IN_CLOSE_WRITE = 0x00000008
IN_MOVED_FROM = 0x00000040
IN_MOVED_TO = 0x00000080
IN_CREATE = 0x00000100
IN_DELETE = 0x00000200
IN_DELETE_SELF = 0x00000400
IN_MOVE_SELF = 0x00000800
IN_IGNORED = 0x00008000


class Inotify:
    def __init__(self):
        self._libc = ctypes.CDLL(None, use_errno=True)
        self._get_errno_func = ctypes.get_errno

        self._libc.inotify_init.argtypes = []
        self._libc.inotify_init.restype = ctypes.c_int
        self._libc.inotify_add_watch.argtypes = [ctypes.c_int, ctypes.c_char_p,
                                                 ctypes.c_uint32]
        self._libc.inotify_add_watch.restype = ctypes.c_int
        self._libc.inotify_rm_watch.argtypes = [ctypes.c_int, ctypes.c_int]
        self._libc.inotify_rm_watch.restype = ctypes.c_int

        self.fd = self._libc.inotify_init()

    def add_watch(self, path, mask):
        path = ctypes.create_string_buffer(path.encode(sys.getfilesystemencoding()))
        wd = self._libc.inotify_add_watch(self.fd, path, mask)
        if wd < 0:
            sys.stderr.write("can't add watch for %s: %s\n" % (path, os.strerror(self._get_errno_func())))
        return wd

    def rem_watch(self, wd):
        if self._libc.inotify_rm_watch(self.fd, wd) < 0:
            sys.stderr.write("can't remove watch: %s\n" % (os.strerror(self._get_errno_func())))

    def process(self, callback):
        buf = os.read(self.fd, 4096)
        pos = 0
        while pos < len(buf):
            (wd, mask, _cookie, name_len) = struct.unpack('iIII', buf[pos:pos + 16])
            pos += 16
            (name,) = struct.unpack('%ds' % name_len, buf[pos:pos + name_len])
            pos += name_len
            callback(wd, mask, name.decode().rstrip('\0'))

    def run(self, callback):
        while True:
            self.process(callback)