File: inotify.py

package info (click to toggle)
git-cola 4.14.0-1
  • links: PTS
  • area: main
  • in suites: forky, sid
  • size: 6,812 kB
  • sloc: python: 37,625; sh: 298; makefile: 223; xml: 102; tcl: 62
file content (86 lines) | stat: -rw-r--r-- 2,195 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import ctypes
import ctypes.util
import errno
import os

# constant from Linux include/uapi/linux/limits.h
NAME_MAX = 255

# constants from Linux include/uapi/linux/inotify.h
IN_MODIFY = 0x00000002
IN_ATTRIB = 0x00000004
IN_CLOSE_WRITE = 0x00000008
IN_MOVED_FROM = 0x00000040
IN_MOVED_TO = 0x00000080
IN_CREATE = 0x00000100
IN_DELETE = 0x00000200

IN_Q_OVERFLOW = 0x00004000

IN_ONLYDIR = 0x01000000
IN_EXCL_UNLINK = 0x04000000
IN_ISDIR = 0x80000000


class inotify_event(ctypes.Structure):
    _fields_ = [
        ('wd', ctypes.c_int),
        ('mask', ctypes.c_uint32),
        ('cookie', ctypes.c_uint32),
        ('len', ctypes.c_uint32),
    ]


MAX_EVENT_SIZE = ctypes.sizeof(inotify_event) + NAME_MAX + 1


def _errcheck(result, func, arguments):
    if result >= 0:
        return result
    err = ctypes.get_errno()
    if err == errno.EINTR:
        return func(*arguments)
    raise OSError(err, os.strerror(err))


try:
    _libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True)
    _read = _libc.read
    init = _libc.inotify_init
    add_watch = _libc.inotify_add_watch
    rm_watch = _libc.inotify_rm_watch
except AttributeError:
    raise ImportError('Could not load inotify functions from libc')


_read.argtypes = [ctypes.c_int, ctypes.c_void_p, ctypes.c_size_t]
_read.errcheck = _errcheck

init.argtypes = []
init.errcheck = _errcheck

add_watch.argtypes = [ctypes.c_int, ctypes.c_char_p, ctypes.c_uint32]
add_watch.errcheck = _errcheck

rm_watch.argtypes = [ctypes.c_int, ctypes.c_int]
rm_watch.errcheck = _errcheck


def read_events(inotify_fd, count=64):
    buf = ctypes.create_string_buffer(MAX_EVENT_SIZE * count)
    num = _read(inotify_fd, buf, ctypes.sizeof(buf))

    addr = ctypes.addressof(buf)
    while num:
        assert num >= ctypes.sizeof(inotify_event)
        event = inotify_event.from_address(addr)
        addr += ctypes.sizeof(inotify_event)
        num -= ctypes.sizeof(inotify_event)
        if event.len:
            assert num >= event.len
            name = ctypes.string_at(addr)
            addr += event.len
            num -= event.len
        else:
            name = None
        yield event.wd, event.mask, event.cookie, name