File: 590-aio-copy.py

package info (click to toggle)
libnbd 1.22.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 10,636 kB
  • sloc: ansic: 53,855; ml: 12,311; sh: 8,499; python: 4,595; makefile: 2,902; perl: 165; cpp: 24
file content (132 lines) | stat: -rw-r--r-- 4,874 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# libnbd Python bindings
# Copyright Red Hat
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

import select
import nbd
import os

nbdkit = os.getenv("NBDKIT", "nbdkit")
disk_size = 512 * 1024 * 1024
bs = 65536
max_reads_in_flight = 16
bytes_read = 0
bytes_written = 0


def asynch_copy(src, dst):
    size = src.get_size()

    # This is our reading position in the source.
    soff = 0

    writes = []

    # This callback is called when any pread from the source
    # has completed.
    def read_completed(buf, offset, error):
        if error.value != 0:
            raise RuntimeError(f"read: {os.strerror(error.value)}")
        global bytes_read
        bytes_read += buf.size()
        wr = (buf, offset)
        writes.append(wr)
        # By returning 1 here we auto-retire the pread command.
        return 1

    # This callback is called when any pwrite to the destination
    # has completed.
    def write_completed(buf, error):
        if error.value != 0:
            raise RuntimeError(f"write: {os.strerror(error.value)}")
        global bytes_written
        bytes_written += buf.size()
        # By returning 1 here we auto-retire the pwrite command.
        return 1

    # The main loop which runs until we have finished reading and
    # there are no more commands in flight.
    while (soff < size or src.aio_in_flight() > 0 or dst.aio_in_flight() > 0
           or len(writes) > 0):
        # If we're able to submit more reads from the source
        # then do so now.
        if soff < size and src.aio_in_flight() < max_reads_in_flight:
            bufsize = min(bs, size - soff)
            buf = nbd.Buffer(bufsize)
            # NB: Python lambdas are BROKEN.
            # https://stackoverflow.com/questions/2295290
            src.aio_pread(buf, soff,
                          lambda err, buf=buf, soff=soff:
                          read_completed(buf, soff, err))
            soff += bufsize

        # If there are any write commands waiting to be issued
        # to the destination, send them now.
        for buf, offset in writes:
            # See above link about broken Python lambdas.
            dst.aio_pwrite(buf, offset,
                           lambda err, buf=buf:
                           write_completed(buf, err))
        writes = []

        poll = select.poll()

        sfd = src.aio_get_fd()
        dfd = dst.aio_get_fd()

        sevents = 0
        devents = 0
        if src.aio_get_direction() & nbd.AIO_DIRECTION_READ:
            sevents += select.POLLIN
        if src.aio_get_direction() & nbd.AIO_DIRECTION_WRITE:
            sevents += select.POLLOUT
        if dst.aio_get_direction() & nbd.AIO_DIRECTION_READ:
            devents += select.POLLIN
        if dst.aio_get_direction() & nbd.AIO_DIRECTION_WRITE:
            devents += select.POLLOUT
        poll.register(sfd, sevents)
        poll.register(dfd, devents)
        for (fd, revents) in poll.poll():
            # The direction of each handle can change since we
            # slept in the select.
            if fd == sfd and revents & select.POLLIN and \
                    src.aio_get_direction() & nbd.AIO_DIRECTION_READ:
                src.aio_notify_read()
            elif fd == sfd and revents & select.POLLOUT and \
                    src.aio_get_direction() & nbd.AIO_DIRECTION_WRITE:
                src.aio_notify_write()
            elif fd == dfd and revents & select.POLLIN and \
                    dst.aio_get_direction() & nbd.AIO_DIRECTION_READ:
                dst.aio_notify_read()
            elif fd == dfd and revents & select.POLLOUT and \
                    dst.aio_get_direction() & nbd.AIO_DIRECTION_WRITE:
                dst.aio_notify_write()


src = nbd.NBD()
src.set_handle_name("src")
dst = nbd.NBD()
dst.set_handle_name("dst")
src.connect_command([nbdkit, "-s", "--exit-with-parent", "-r",
                     "pattern", "size=%d" % disk_size])
dst.connect_command([nbdkit, "-s", "--exit-with-parent",
                     "memory", "size=%d" % disk_size])
asynch_copy(src, dst)

print("bytes read: %d written: %d size: %d" %
      (bytes_read, bytes_written, disk_size))
assert bytes_read == disk_size
assert bytes_written == disk_size