File: asynch_copy.ml

package info (click to toggle)
libnbd 1.24.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 10,956 kB
  • sloc: ansic: 55,063; ml: 12,364; sh: 8,817; python: 4,757; makefile: 3,036; perl: 165; cpp: 24
file content (103 lines) | stat: -rw-r--r-- 3,512 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
open Unix
open Printf

let (+^) = Int64.add
let (-^) = Int64.sub

let bs = 65536_L
let max_reads_in_flight = 16

let dir_is_read dir = dir land (Int32.to_int NBD.aio_direction_read) <> 0
let dir_is_write dir = dir land (Int32.to_int NBD.aio_direction_write) <> 0

(* Copy between two libnbd handles using asynchronous I/O (AIO). *)
let asynch_copy src dst =
  let size = NBD.get_size dst in

  (* This is our reading position in the source. *)
  let soff = ref 0_L in

  (* This callback is called when any pread from the source has
   * completed.  (buf, size) contains the block of data.  This
   * builds up a queue of write commands which are sent on the
   * next iteration of the loop.
   *)
  let writes = ref [] in
  let read_completed buf offset _ =
    (* Get ready to issue a write command. *)
    writes := (buf, offset) :: !writes;
    (* By returning 1 here we auto-retire the pread command. *)
    1
  in

  (* This callback is called when any pwrite to the destination
   * has completed.
   *)
  let write_completed buf _ =
    (* By returning 1 here we auto-retire the pwrite command. *)
    1
  in

  (* The main loop which runs until we have finished reading and
   * there are no more commands in flight.
   *)
  while !soff < size || NBD.aio_in_flight src > 0 || NBD.aio_in_flight dst > 0
  do
    (* If we're able to submit more reads from the source then do so now. *)
    if !soff < size && NBD.aio_in_flight src < max_reads_in_flight then (
      let bs = min bs (size -^ !soff) in
      let buf = NBD.Buffer.alloc (Int64.to_int bs) in
      ignore (NBD.aio_pread src buf !soff
                ~completion:(read_completed buf !soff));
      soff := !soff +^ bs
    );

    (* If there are any write commands waiting to be issued to
     * the destination, send them now.
     *)
    List.iter (
      fun (buf, offset) ->
        (* Note the size of the write is implicitly stored in buf. *)
        ignore (NBD.aio_pwrite dst buf offset
                  ~completion:(write_completed buf))
    ) !writes;
    writes := [];

    let sfd = NBD.aio_get_fd src
    and dfd = NBD.aio_get_fd dst
    and sdir = NBD.aio_get_direction src
    and ddir = NBD.aio_get_direction dst in
    let rfds = if dir_is_read sdir then [sfd] else [] in
    let rfds = if dir_is_read ddir then dfd :: rfds else rfds in
    let wfds = if dir_is_write sdir then [sfd] else [] in
    let wfds = if dir_is_write ddir then dfd :: wfds else wfds in
    let rfds, wfds, _ = select rfds wfds [] (-1.0) in

    (* These can change since we slept in the select, so we must
     * check them again.
     *)
    let sdir = NBD.aio_get_direction src
    and ddir = NBD.aio_get_direction dst in
    if List.mem sfd rfds && dir_is_read sdir then
      NBD.aio_notify_read src
    else if List.mem sfd wfds && dir_is_write sdir then
      NBD.aio_notify_write src
    else if List.mem dfd rfds && dir_is_read ddir then
      NBD.aio_notify_read dst
    else if List.mem dfd wfds && dir_is_write ddir then
      NBD.aio_notify_write dst
  done

let () =
  let src = NBD.create () in
  NBD.set_handle_name src "src";
  let dst = NBD.create () in
  NBD.set_handle_name dst "dst";
  NBD.connect_command src ["nbdkit"; "-s"; "--exit-with-parent"; "-r";
                           "pattern"; "size=512M"];
  NBD.connect_command dst ["nbdkit"; "-s"; "--exit-with-parent";
                           "memory"; "size=512M"];
  asynch_copy src dst

(* This is a good way to find memory leaks or corruption. *)
let () = Gc.compact ()