File: Chunked.md

package info (click to toggle)
swiftlang 6.2.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,856,264 kB
  • sloc: cpp: 9,995,718; ansic: 2,234,019; asm: 1,092,167; python: 313,940; objc: 82,726; f90: 80,126; lisp: 38,373; pascal: 25,580; sh: 20,378; ml: 5,058; perl: 4,751; makefile: 4,725; awk: 3,535; javascript: 3,018; xml: 918; fortran: 664; cs: 573; ruby: 396
file content (314 lines) | stat: -rw-r--r-- 15,387 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# Chunked

* Author(s): [Kevin Perry](https://github.com/kperryua)

[
[Source](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncChunkedByGroupSequence.swift),
[Source](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncChunkedOnProjectionSequence.swift),
[Source](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncChunkedOnProjectionSequence.swift),
[Source](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncChunksOfCountAndSignalSequence.swift),
[Source](https://github.com/apple/swift-async-algorithms/blob/main/Sources/AsyncAlgorithms/AsyncChunksOfCountSequence.swift) |
[Tests](https://github.com/apple/swift-async-algorithms/blob/main/Tests/AsyncAlgorithmsTests/TestChunk.swift)
]

## Introduction

Grouping of values from an asynchronous sequence is often useful for tasks that involve writing those values efficiently or useful to handle specific structured data inputs.

## Proposed Solution

Chunking operations can be broken down into a few distinct categories: grouping according to a binary predicate used to determine whether consecutive elements belong to the same group, projecting an element's property to determine the element's chunk membership, by discrete count, by another signal asynchronous sequence which indicates when the chunk should be delimited, or by a combination of count and signal.

### Grouping

Group chunks are determined by passing two consecutive elements to a closure which tests whether they are in the same group. When the `AsyncChunkedByGroupSequence` iterator receives the first element from the base sequence, it will immediately be added to a group. When it receives the second item, it tests whether the previous item and the current item belong to the same group. If they are not in the same group, then the iterator emits the first item's group and a new group is created containing the second item. Items declared to be in the same group accumulate until a new group is declared, or the iterator finds the end of the base sequence. When the base sequence terminates, the final group is emitted. If the base sequence throws an error, `AsyncChunkedByGroupSequence` will rethrow that error immediately and discard any current group.

```swift
extension AsyncSequence {
  public func chunked<Collected: RangeReplaceableCollection>(
    into: Collected.Type,
  	by belongInSameGroup: @escaping @Sendable (Element, Element) -> Bool 
  ) -> AsyncChunkedByGroupSequence<Self, Collected> 
  	where Collected.Element == Element

  public func chunked(
  	by belongInSameGroup: @escaping @Sendable (Element, Element) -> Bool
  ) -> AsyncChunkedByGroupSequence<Self, [Element]>
}
```

Consider an example where an asynchronous sequence emits the following values: `10, 20, 30, 10, 40, 40, 10, 20`. Given the chunked operation to be defined as follows:

```swift
let chunks = numbers.chunked { $0 <= $1 }
for await numberChunk in chunks {
  print(numberChunk)
}
```

That snippet will produce the following values:

```swift
[10, 20, 30]
[10, 40, 40]
[10, 20]
```

While `Array` is the default type for chunks, thanks to the overload that takes a `RangeReplaceableCollection` type, the same sample can be chunked into instances of `ContiguousArray`, or any other `RangeReplaceableCollection` instead.

```swift
let chunks = numbers.chunked(into: ContiguousArray.self) { $0 <= $1 }
for await numberChunk in chunks {
  print(numberChunk)
}
```

That variant is the funnel method for the main implementation, which passes `[Element].self` in as the parameter. 

### Projection

In some scenarios, chunks are determined not by comparing different elements, but by the element itself. This may be the case when the element has some sort of discriminator that can determine the chunk it belongs to. When two consecutive elements have different projections, the current chunk is emitted and a new chunk is created for the new element.

When the `AsyncChunkedOnProjectionSequence`'s iterator receives `nil` from the base sequence, it emits the final chunk. When the base sequence throws an error, the iterator discards the current chunk and rethrows that error.

Similarly to the `chunked(by:)` method this algorithm has an optional specification for the `RangeReplaceableCollection` which is used as the type of each chunk.

```swift
extension AsyncSequence {
  public func chunked<Subject : Equatable, Collected: RangeReplaceableCollection>(
    into: Collected.Type,
    on projection: @escaping @Sendable (Element) -> Subject
  ) -> AsyncChunkedOnProjectionSequence<Self, Subject, Collected>
  
  public func chunked<Subject : Equatable>(
  	on projection: @escaping @Sendable (Element) -> Subject
  ) -> AsyncChunkedOnProjectionSequence<Self, Subject, [Element]>
}
```

The following example shows how a sequence of names can be chunked together by their first characters.

```swift
let names = URL(fileURLWithPath: "/tmp/names.txt").lines
let groupedNames = names.chunked(on: \.first!)
for try await (firstLetter, names) in groupedNames {
  print(firstLetter)
  for name in names {
    print("  ", name)
  }
}
```

A special property of this kind of projection chunking is that when an asynchronous sequence's elements are known to be ordered, the output of the chunking asynchronous sequence is suitable for initializing dictionaries using the `AsyncSequence` initializer for `Dictionary`. This is because the projection can be easily designed to match the sorting characteristics and thereby guarantee that the output matches the pattern of an array of pairs of unique "keys" with the chunks as the "values".

In the example above, if the names are known to be ordered then you can take advantage of the uniqueness of each "first character" projection to initialize a `Dictionary` like so:

```swift
let names = URL(fileURLWithPath: "/tmp/names.txt").lines
let nameDirectory = try await Dictionary(uniqueKeysWithValues: names.chunked(on: \.first!))
```

### Count or Signal

Sometimes chunks are determined not by the elements themselves, but by external factors. This final category enables limiting chunks to a specific size and/or delimiting them by another asynchronous sequence which is referred to as a "signal". This particular chunking family is useful for scenarios where the elements are more efficiently processed as chunks than individual elements, regardless of their values.

This family is broken down into two sub-families of methods: ones that employ a signal plus an optional count (which return an `AsyncChunksOfCountOrSignalSequence`), and the ones that only deal with counts (which return an `AsyncChunksOfCountSequence`). Both sub-families have `Collected` as their element type, or `Array` if unspecified. These sub-families have rethrowing behaviors; if the base `AsyncSequence` can throw then the chunks sequence can also throw. Likewise if the base `AsyncSequence` cannot throw then the chunks sequence also cannot throw.

##### Count only

```swift
extension AsyncSequence {
  public func chunks<Collected: RangeReplaceableCollection>(
    ofCount count: Int, 
    into: Collected.Type
  ) -> AsyncChunksOfCountSequence<Self, Collected> 
    where Collected.Element == Element

  public func chunks(
    ofCount count: Int
  ) -> AsyncChunksOfCountSequence<Self, [Element]>
}
```

If a chunk size limit is specified via an `ofCount` parameter, the sequence will produce chunks of type `Collected` with at most the specified number of elements. When a chunk reaches the given size, the asynchronous sequence will emit it immediately.

For example, an asynchronous sequence of `UInt8` bytes can be chunked into at most 1024-byte `Data` instances like so:

```swift
let packets = bytes.chunks(ofCount: 1024, into: Data.self)
for try await packet in packets {
  write(packet)
}
```

##### Signal only

```swift
extension AsyncSequence {
  public func chunked<Signal, Collected: RangeReplaceableCollection>(
    by signal: Signal, 
    into: Collected.Type
  ) -> AsyncChunksOfCountOrSignalSequence<Self, Collected, Signal> 
    where Collected.Element == Element

  public func chunked<Signal>(
    by signal: Signal
  ) -> AsyncChunksOfCountOrSignalSequence<Self, [Element], Signal>

  public func chunked<C: Clock, Collected: RangeReplaceableCollection>(
    by timer: AsyncTimerSequence<C>, 
    into: Collected.Type
  ) -> AsyncChunksOfCountOrSignalSequence<Self, Collected, AsyncTimerSequence<C>> 
    where Collected.Element == Element

  public func chunked<C: Clock>(
    by timer: AsyncTimerSequence<C>
  ) -> AsyncChunksOfCountOrSignalSequence<Self, [Element], AsyncTimerSequence<C>>
}
```

If a signal asynchronous sequence is specified, the chunking asynchronous sequence emits chunks whenever the signal emits. The signals element values are ignored. If the chunking asynchronous sequence hasn't accumulated any elements since its previous emission, then no value is emitted in response to the signal.

Since time is a frequent method of signaling desired delineations of chunks, there is a pre-specialized set of overloads that take `AsyncTimerSequence`. These allow shorthand initialization by using `AsyncTimerSequence`'s static member initializers.

As an example, an asynchronous sequence of log messages can be chunked into arrays of logs in four second segments like so:

```swift
let fourSecondsOfLogs = logs.chunked(by: .repeating(every: .seconds(4)))
for await chunk in fourSecondsOfLogs {
  send(chunk)
}
```

##### Count or Signal

```swift
extension AsyncSequence {
  public func chunks<Signal, Collected: RangeReplaceableCollection>(
    ofCount count: Int, 
    or signal: Signal, 
    into: Collected.Type
  ) -> AsyncChunksOfCountOrSignalSequence<Self, Collected, Signal> 
    where Collected.Element == Element

  public func chunks<Signal>(
    ofCount count: Int, 
    or signal: Signal
  ) -> AsyncChunksOfCountOrSignalSequence<Self, [Element], Signal>

  public func chunked<C: Clock, Collected: RangeReplaceableCollection>(
    by timer: AsyncTimerSequence<C>, 
    into: Collected.Type
  ) -> AsyncChunksOfCountOrSignalSequence<Self, Collected, AsyncTimerSequence<C>> 
    where Collected.Element == Element

  public func chunked<C: Clock>(
    by timer: AsyncTimerSequence<C>
  ) -> AsyncChunksOfCountOrSignalSequence<Self, [Element], AsyncTimerSequence<C>>
}
```

If both count and signal are specified, the chunking asynchronous sequence emits chunks whenever *either* the chunk reaches the specified size *or* the signal asynchronous sequence emits. When a signal causes a chunk to be emitted, the accumulated element count is reset back to zero. When an `AsyncTimerSequence` is used as a signal, the timer is started from the moment `next()` is called for the first time on `AsyncChunksOfCountOrSignalSequence`'s iterator, and it emits on a regular cadence from that moment. Note that the scheduling of the timer's emission is unaffected by any chunks emitted based on count.

Like the example above, this code emits up to 1024-byte `Data` instances, but a chunk will also be emitted every second.

```swift
let packets = bytes.chunks(ofCount: 1024, or: .repeating(every: .seconds(1)), into: Data.self)
for try await packet in packets {
  write(packet)
}
```

In any configuration of any of the chunking families, when the base asynchronous sequence terminates, one of two things will happen: 1) a partial chunk will be emitted, or 2) no chunk will be emitted (i.e. the iterator received no elements since the emission of the previous chunk). No elements from the base asynchronous sequence are ever discarded, except in the case of a thrown error.

## Detailed Design

### Grouping

```swift
public struct AsyncChunkedByGroupSequence<Base: AsyncSequence, Collected: RangeReplaceableCollection>: AsyncSequence 
  where Collected.Element == Base.Element {
  public typealias Element = Collected
  
  public struct Iterator: AsyncIteratorProtocol {
    public mutating func next() async rethrows -> Collected?
  }
  
  public func makeAsyncIterator() -> Iterator
}

extension AsyncChunkedByGroupSequence: Sendable 
  where Base: Sendable, Base.Element: Sendable { }
  
extension AsyncChunkedByGroupSequence.Iterator: Sendable 
  where Base.AsyncIterator: Sendable, Base.Element: Sendable { }
```

### Projection

```swift
public struct AsyncChunkedOnProjectionSequence<Base: AsyncSequence, Subject: Equatable, Collected: RangeReplaceableCollection>: AsyncSequence where Collected.Element == Base.Element {
  public typealias Element = (Subject, Collected)

  public struct Iterator: AsyncIteratorProtocol {
    public mutating func next() async rethrows -> (Subject, Collected)?
  }

  public func makeAsyncIterator() -> Iterator
}

extension AsyncChunkedOnProjectionSequence: Sendable 
  where Base: Sendable, Base.Element: Sendable { }
extension AsyncChunkedOnProjectionSequence.Iterator: Sendable
  where Base.AsyncIterator: Sendable, Base.Element: Sendable, Subject: Sendable { }
```

### Count

```swift
public struct AsyncChunksOfCountSequence<Base: AsyncSequence, Collected: RangeReplaceableCollection>: AsyncSequence 
  where Collected.Element == Base.Element {
  public typealias Element = Collected

  public struct Iterator: AsyncIteratorProtocol {
    public mutating func next() async rethrows -> Collected?
  }

  public func makeAsyncIterator() -> Iterator
}

extension AsyncChunksOfCountSequence : Sendable where Base : Sendable, Base.Element : Sendable { }
extension AsyncChunksOfCountSequence.Iterator : Sendable where Base.AsyncIterator : Sendable, Base.Element : Sendable { }

```

### Count or Signal

```swift
public struct AsyncChunksOfCountOrSignalSequence<Base: AsyncSequence, Collected: RangeReplaceableCollection, Signal: AsyncSequence>: AsyncSequence, Sendable 
  where 
    Collected.Element == Base.Element, 
    Base: Sendable, Signal: Sendable, 
    Base.AsyncIterator: Sendable, Signal.AsyncIterator: Sendable, 
    Base.Element: Sendable, Signal.Element: Sendable {
  public typealias Element = Collected

  public struct Iterator: AsyncIteratorProtocol, Sendable {
    public mutating func next() async rethrows -> Collected?
  }
  
  public func makeAsyncIterator() -> Iterator
}
```

## Alternatives Considered

It was considered to make the chunked element to be an `AsyncSequence` instead of allowing collection into a `RangeReplaceableCollection` however it was determined that the throwing behavior of that would be complex to understand. If that hurdle could be overcome then that might be a future direction/consideration that would be worth exploring.

Variants of `chunked(by:)` (grouping) and `chunked(on:)` (projection) methods could be added that take delimiting `Signal` and `AsyncTimerSequence` inputs similar to `chunked(byCount:or:)`. However, it was decided that such functionality was likely to be underutilized and not worth the complication to the already broad surface area of `chunked` methods.

The naming of this family was considered to be `collect` which is used in APIs like `Combine`. This family of functions has distinct similarity to those APIs.

## Credits/Inspiration

This transformation function is a heavily inspired analog of the synchronous version [defined in the Swift Algorithms package](https://github.com/apple/swift-algorithms/blob/main/Guides/Chunked.md)