1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
|
#!/usr/bin/env python3
if __name__ == '__main__':
import sys
import pytest
sys.exit(pytest.main([__file__] + sys.argv[1:]))
import os
import socket
import struct
import subprocess
import sys
import time
from os.path import join as pjoin
import pytest
from util import base_cmdline, basename
FUSE_OP_INIT = 26
FUSE_MAJOR_VERSION = 7
FUSE_MINOR_VERSION = 38
fuse_in_header_fmt = '<IIQQIIII'
fuse_out_header_fmt = '<IiQ'
fuse_init_in_fmt = '<IIIII44x'
fuse_init_out_fmt = '<IIIIHHIIHHI28x'
def sock_recvall(sock: socket.socket, bufsize: int) -> bytes:
buf = bytes()
while len(buf) < bufsize:
buf += sock.recv(bufsize - len(buf))
return buf
def tst_init(sock: socket.socket):
unique_req = 10
dummy_init_req_header = struct.pack(
fuse_in_header_fmt, struct.calcsize(fuse_in_header_fmt) +
struct.calcsize(fuse_init_in_fmt), FUSE_OP_INIT, unique_req, 0, 0, 0,
0, 0)
dummy_init_req_payload = struct.pack(
fuse_init_in_fmt, FUSE_MAJOR_VERSION, FUSE_MINOR_VERSION, 0, 0, 0)
dummy_init_req = dummy_init_req_header + dummy_init_req_payload
sock.sendall(dummy_init_req)
response_header = sock_recvall(sock, struct.calcsize(fuse_out_header_fmt))
packet_len, _, unique_res = struct.unpack(
fuse_out_header_fmt, response_header)
assert unique_res == unique_req
response_payload = sock_recvall(sock, packet_len - len(response_header))
response_payload = struct.unpack(fuse_init_out_fmt, response_payload)
assert response_payload[0] == FUSE_MAJOR_VERSION
def test_hello_uds(output_checker):
cmdline = base_cmdline + [pjoin(basename, 'example', 'hello_ll_uds')]
print(cmdline)
uds_process = subprocess.Popen(cmdline, stdout=output_checker.fd,
stderr=output_checker.fd)
time.sleep(1)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(1)
sock.connect("/tmp/libfuse-hello-ll.sock")
tst_init(sock)
sock.close()
uds_process.terminate()
try:
uds_process.wait(1)
except subprocess.TimeoutExpired:
uds_process.kill()
os.remove("/tmp/libfuse-hello-ll.sock")
|