File: ltspfsmounter

package info (click to toggle)
ltspfs 1.5-2
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 356 kB
  • sloc: ansic: 1,957; sh: 232; python: 135; makefile: 37
file content (111 lines) | stat: -rw-r--r-- 3,329 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#!/usr/bin/python3

import os
import sys
from subprocess import call
import re

hook_dirs=list()

def run_hooks(mode, mountpoint):
    executed_hooks=list()
    valid_filename=re.compile('^[a-zA-Z0-9\-_]*$')
    for dir in hook_dirs:
        if os.path.isdir(dir):
            # get a unique list of hook scripts
            hook_scripts=list(set(os.listdir(dir)))
            hook_scripts.sort()
            for script in hook_scripts:
                # only run the first script of a given name
                if executed_hooks.count(script) == 0 and valid_filename.match(script):
                    executed_hooks.append(script)
                    try:
                        call([os.path.join(dir, script), mode, mountpoint])
                    except:
                        # be very tolerant of failures with hook scripts
                        pass

def get_var(name):
    return os.environ.get(name)

def add_ltspfsmount(conn, path, dev, mediaroot):
    lbmount_command = ['lbmount', dev]
    ltspfs_mount = ['ltspfs', conn+':'+path, mediaroot+'/'+dev]

    env = os.environ.copy()

    try:
        call(lbmount_command)
        os.mkdir(mediaroot+'/'+dev, 0o700)
        run_hooks('add', os.path.join(mediaroot, dev))
    except OSError as e:
        print("suid mount failed:", e, file=sys.stderr)
    try:
        call(ltspfs_mount, env=env)
    except OSError as e:
        print("mount failed:", e, file=sys.stderr)

def remove_ltspfsmount(root, dev):
    lbumount_command=['lbmount', '--umount', dev]
    ltspfs_umount=['fusermount', '-uzq', root+'/'+dev]

    try:
        call(ltspfs_umount)
        run_hooks('remove', os.path.join(root, dev))
    except OSError as e:
        print("umount failed:", e, file=sys.stderr)
    try:
        os.rmdir(root+'/'+dev)
        call(lbumount_command)
    except OSError as e:
        print("suid umount failed:", e, file=sys.stderr)

def cleanup(user, mediaroot):
    known_mounts = open( '/proc/mounts', 'r' ).readlines()
    for mount in known_mounts:
        if mount.startswith('ltspfs') and user in mount:
            mountpoint=mount.split()[1]
            device=mountpoint.split('/')[-1]
            remove_ltspfsmount(mediaroot, device)
    run_hooks('cleanup', '')
    sys.exit(0)

def main():
    if len(sys.argv) < 3:
        print('usage: %s mountpoint add|remove|cleanup' % sys.argv[0])
        sys.exit(1)

    if not os.access('/dev/fuse', 2):
        sys.stderr.write('/dev/fuse not writeable\n')
        sys.exit(1)

    # check if hook dirs are present
    for dir in ['/etc', '/usr/share', '/usr/lib']:
        dir=os.path.join(dir, 'ltspfs/mounter.d')
        if os.path.isdir(dir):
            hook_dirs.append(dir)

    path = sys.argv[1]
    command = sys.argv[2]
    username = get_var('USER')
    mediaroot = "/media/%s" % username

    if not get_var('SSH_CONNECTION'):
        conn = "127.0.0.1"
        os.putenv('SSH_CONNECTION', '127.0.0.1')
    else:
        conn = get_var('SSH_CONNECTION').split()[0]

    dev = path.split('/')[-1]

    if command=='add':
        add_ltspfsmount(conn, path, dev, mediaroot)
    elif command=='remove':
        remove_ltspfsmount(mediaroot, dev)
    elif command=='cleanup':
        cleanup(username, mediaroot)
    else:
        print('unknown command')

if __name__ == "__main__":
    main()