File: vfs_lfs_corrupt.py

package info (click to toggle)
giac 1.9.0.93%2Bdfsg2-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 117,732 kB
  • sloc: cpp: 404,272; ansic: 205,462; python: 30,548; javascript: 28,788; makefile: 17,997; yacc: 2,690; lex: 2,464; sh: 705; perl: 314; lisp: 216; asm: 62; java: 41; xml: 36; sed: 16; csh: 7; pascal: 6
file content (106 lines) | stat: -rw-r--r-- 2,555 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# Test for VfsLittle using a RAM device, testing error handling from corrupt block device

try:
    import uos
    uos.VfsLfs1
    uos.VfsLfs2
except (ImportError, AttributeError):
    print("SKIP")
    raise SystemExit

class RAMBlockDevice:
    ERASE_BLOCK_SIZE = 1024

    def __init__(self, blocks):
        self.data = bytearray(blocks * self.ERASE_BLOCK_SIZE)
        self.ret = 0

    def readblocks(self, block, buf, off):
        addr = block * self.ERASE_BLOCK_SIZE + off
        for i in range(len(buf)):
            buf[i] = self.data[addr + i]
        return self.ret

    def writeblocks(self, block, buf, off):
        addr = block * self.ERASE_BLOCK_SIZE + off
        for i in range(len(buf)):
            self.data[addr + i] = buf[i]
        return self.ret

    def ioctl(self, op, arg):
        if op == 4: # block count
            return len(self.data) // self.ERASE_BLOCK_SIZE
        if op == 5: # block size
            return self.ERASE_BLOCK_SIZE
        if op == 6: # erase block
            return 0

def corrupt(bdev, block):
    addr = block * bdev.ERASE_BLOCK_SIZE
    for i in range(bdev.ERASE_BLOCK_SIZE):
        bdev.data[addr + i] = i & 0xff

def create_vfs(bdev, vfs_class):
    bdev.ret = 0
    vfs_class.mkfs(bdev)
    vfs = vfs_class(bdev)
    with vfs.open('f', 'w') as f:
        for i in range(100):
            f.write('test')
    return vfs

def test(bdev, vfs_class):
    print('test', vfs_class)

    # statvfs
    vfs = create_vfs(bdev, vfs_class)
    corrupt(bdev, 0)
    corrupt(bdev, 1)
    try:
        print(vfs.statvfs(''))
    except OSError:
        print('statvfs OSError')

    # error during read
    vfs = create_vfs(bdev, vfs_class)
    f = vfs.open('f', 'r')
    bdev.ret = -5 # EIO
    try:
        f.read(10)
    except OSError:
        print('read OSError')

    # error during write
    vfs = create_vfs(bdev, vfs_class)
    f = vfs.open('f', 'a')
    bdev.ret = -5 # EIO
    try:
        f.write('test')
    except OSError:
        print('write OSError')

    # error during close
    vfs = create_vfs(bdev, vfs_class)
    f = vfs.open('f', 'w')
    f.write('test')
    bdev.ret = -5 # EIO
    try:
        f.close()
    except OSError:
        print('close OSError')

    # error during flush
    vfs = create_vfs(bdev, vfs_class)
    f = vfs.open('f', 'w')
    f.write('test')
    bdev.ret = -5 # EIO
    try:
        f.flush()
    except OSError:
        print('flush OSError')
    bdev.ret = 0
    f.close()

bdev = RAMBlockDevice(30)
test(bdev, uos.VfsLfs1)
test(bdev, uos.VfsLfs2)