1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
|
(** Flows are used to represent byte streams, such as open files and network sockets.
A {!source} provides a stream of bytes. A {!sink} consumes a stream.
A {!two_way} can do both.
To read structured data (e.g. a line at a time), wrap a source using {!Buf_read}. *)
open Std
(** {2 Types} *)
type source_ty = [`R | `Flow]
type 'a source = ([> source_ty] as 'a) r
(** A readable flow provides a stream of bytes. *)
type sink_ty = [`W | `Flow]
type 'a sink = ([> sink_ty] as 'a) r
(** A writeable flow accepts a stream of bytes. *)
type shutdown_ty = [`Shutdown]
type 'a shutdown = ([> shutdown_ty] as 'a) r
type 'a read_method = ..
(** Sources can offer a list of ways to read them, in order of preference. *)
type shutdown_command = [
| `Receive (** Indicate that no more reads will be done *)
| `Send (** Indicate that no more writes will be done *)
| `All (** Indicate that no more reads or writes will be done *)
]
(** {2 Reading} *)
val single_read : _ source -> Cstruct.t -> int
(** [single_read src buf] reads one or more bytes into [buf].
It returns the number of bytes read (which may be less than the
buffer size even if there is more data to be read).
- Use {!read_exact} instead if you want to fill [buf] completely.
- Use {!Buf_read.line} to read complete lines.
- Use {!copy} to stream data directly from a source to a sink.
[buf] must not be zero-length.
@raise End_of_file if there is no more data to read *)
val read_exact : _ source -> Cstruct.t -> unit
(** [read_exact src dst] keeps reading into [dst] until it is full.
@raise End_of_file if the buffer could not be filled. *)
val string_source : string -> source_ty r
(** [string_source s] is a source that gives the bytes of [s]. *)
val cstruct_source : Cstruct.t list -> source_ty r
(** [cstruct_source cs] is a source that gives the bytes of [cs]. *)
type 't read_method += Read_source_buffer of ('t -> (Cstruct.t list -> int) -> unit)
(** If a source offers [Read_source_buffer rsb] then the user can call [rsb t fn]
to borrow a view of the source's buffers. [fn] returns the number of bytes it consumed.
[rsb] will raise [End_of_file] if no more data will be produced.
If no data is currently available, [rsb] will wait for some to become available before calling [fn].
[fn] must not continue to use the buffers after it returns. *)
(** {2 Writing} *)
val write : _ sink -> Cstruct.t list -> unit
(** [write dst bufs] writes all bytes from [bufs].
You should not perform multiple concurrent writes on the same flow
(the output may get interleaved).
This is a low level API. Consider using:
- {!Buf_write} to combine multiple small writes.
- {!copy} for bulk transfers, as it allows some extra optimizations. *)
val single_write : _ sink -> Cstruct.t list -> int
(** [single_write dst bufs] writes at least one byte from [bufs] and returns the number of bytes written. *)
val copy : _ source -> _ sink -> unit
(** [copy src dst] copies data from [src] to [dst] until end-of-file. *)
val copy_string : string -> _ sink -> unit
(** [copy_string s = copy (string_source s)] *)
val buffer_sink : Buffer.t -> sink_ty r
(** [buffer_sink b] is a sink that adds anything sent to it to [b].
To collect data as a cstruct, use {!Buf_read} instead. *)
(** {2 Bidirectional streams} *)
type two_way_ty = [source_ty | sink_ty | shutdown_ty]
type 'a two_way = ([> two_way_ty] as 'a) r
val shutdown : _ two_way -> shutdown_command -> unit
(** [shutdown t cmd] indicates that the caller has finished reading or writing [t]
(depending on [cmd]).
This is useful in some protocols to indicate that you have finished sending the request,
and that the remote peer should now send the response. *)
(** {2 Closing}
Flows are usually attached to switches and closed automatically when the switch
finishes. However, it can be useful to close them sooner manually in some cases. *)
val close : [> `Close] r -> unit
(** Alias of {!Resource.close}. *)
(** {2 Provider Interface} *)
module Pi : sig
module type SOURCE = sig
type t
val read_methods : t read_method list
val single_read : t -> Cstruct.t -> int
end
module type SINK = sig
type t
val single_write : t -> Cstruct.t list -> int
val copy : t -> src:_ source -> unit
(** [copy t ~src] allows for optimising copy operations.
If you have no optimisations, you can use {!Eio.Flow.Pi.simple_copy} to implement this using {!single_write}. *)
end
module type SHUTDOWN = sig
type t
val shutdown : t -> shutdown_command -> unit
end
val source : (module SOURCE with type t = 't) -> ('t, source_ty) Resource.handler
val sink : (module SINK with type t = 't) -> ('t, sink_ty) Resource.handler
val shutdown : (module SHUTDOWN with type t = 't) -> ('t, shutdown_ty) Resource.handler
module type TWO_WAY = sig
include SHUTDOWN
include SOURCE with type t := t
include SINK with type t := t
end
val two_way : (module TWO_WAY with type t = 't) -> ('t, two_way_ty) Resource.handler
type (_, _, _) Resource.pi +=
| Source : ('t, (module SOURCE with type t = 't), [> source_ty]) Resource.pi
| Sink : ('t, (module SINK with type t = 't), [> sink_ty]) Resource.pi
| Shutdown : ('t, (module SHUTDOWN with type t = 't), [> shutdown_ty]) Resource.pi
val simple_copy : single_write:('t -> Cstruct.t list -> int) -> 't -> src:_ source -> unit
(** [simple_copy ~single_write] implements {!SINK}'s [copy] API using [single_write]. *)
end
|