File: CombineLatestStateMachine.swift

package info (click to toggle)
swiftlang 6.1.3-4
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 2,791,644 kB
  • sloc: cpp: 9,901,738; ansic: 2,201,433; asm: 1,091,827; python: 308,252; objc: 82,166; f90: 80,126; lisp: 38,358; pascal: 25,559; sh: 20,429; ml: 5,058; perl: 4,745; makefile: 4,484; awk: 3,535; javascript: 3,018; xml: 918; fortran: 664; cs: 573; ruby: 396
file content (723 lines) | stat: -rw-r--r-- 25,852 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2022 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//

import DequeModule

/// State machine for combine latest
struct CombineLatestStateMachine<
  Base1: AsyncSequence,
  Base2: AsyncSequence,
  Base3: AsyncSequence
>: Sendable where
  Base1: Sendable,
  Base2: Sendable,
  Base3: Sendable,
  Base1.Element: Sendable,
  Base2.Element: Sendable,
  Base3.Element: Sendable {
  typealias DownstreamContinuation = UnsafeContinuation<Result<(
    Base1.Element,
    Base2.Element,
    Base3.Element?
  )?, Error>, Never>

  private enum State: Sendable {
    /// Small wrapper for the state of an upstream sequence.
    struct Upstream<Element: Sendable>: Sendable {
      /// The upstream continuation.
      var continuation: UnsafeContinuation<Void, Error>?
      /// The produced upstream element.
      var element: Element?
      /// Indicates wether the upstream finished/threw already
      var isFinished: Bool
    }

    /// The initial state before a call to `next` happened.
    case initial(base1: Base1, base2: Base2, base3: Base3?)

    /// The state while we are waiting for downstream demand.
    case waitingForDemand(
      task: Task<Void, Never>,
      upstreams: (Upstream<Base1.Element>, Upstream<Base2.Element>, Upstream<Base3.Element>),
      buffer: Deque<(Base1.Element, Base2.Element, Base3.Element?)>
    )

    /// The state while we are consuming the upstream and waiting until we get a result from all upstreams.
    case combining(
      task: Task<Void, Never>,
      upstreams: (Upstream<Base1.Element>, Upstream<Base2.Element>, Upstream<Base3.Element>),
      downstreamContinuation: DownstreamContinuation,
      buffer: Deque<(Base1.Element, Base2.Element, Base3.Element?)>
    )

    case upstreamsFinished(
      buffer: Deque<(Base1.Element, Base2.Element, Base3.Element?)>
    )

    case upstreamThrew(
      error: Error
    )

    /// The state once the downstream consumer stopped, i.e. by dropping all references
    /// or by getting their `Task` cancelled.
    case finished

    /// Internal state to avoid CoW.
    case modifying
  }

  private var state: State

  private let numberOfUpstreamSequences: Int

  /// Initializes a new `StateMachine`.
  init(
    base1: Base1,
    base2: Base2,
    base3: Base3?
  ) {
    self.state = .initial(
      base1: base1,
      base2: base2,
      base3: base3
    )

    if base3 == nil {
      self.numberOfUpstreamSequences = 2
    } else {
      self.numberOfUpstreamSequences = 3
    }
  }

  /// Actions returned by `iteratorDeinitialized()`.
  enum IteratorDeinitializedAction {
    /// Indicates that the `Task` needs to be cancelled and
    /// the upstream continuations need to be resumed with a `CancellationError`.
    case cancelTaskAndUpstreamContinuations(
      task: Task<Void, Never>,
      upstreamContinuations: [UnsafeContinuation<Void, Error>]
    )
  }

  mutating func iteratorDeinitialized() -> IteratorDeinitializedAction? {
    switch self.state {
    case .initial:
      // Nothing to do here. No demand was signalled until now
      return .none

    case .combining:
      // An iterator was deinitialized while we have a suspended continuation.
      preconditionFailure("Internal inconsistency current state \(self.state) and received iteratorDeinitialized()")

    case .waitingForDemand(let task, let upstreams, _):
      // The iterator was dropped which signals that the consumer is finished.
      // We can transition to finished now and need to clean everything up.
      self.state = .finished

      return .cancelTaskAndUpstreamContinuations(
        task: task,
        upstreamContinuations: [upstreams.0.continuation, upstreams.1.continuation, upstreams.2.continuation].compactMap { $0 }
      )

    case .upstreamThrew, .upstreamsFinished:
      // The iterator was dropped so we can transition to finished now.
      self.state = .finished

      return .none

    case .finished:
      // We are already finished so there is nothing left to clean up.
      // This is just the references dropping afterwards.
      return .none

    case .modifying:
      preconditionFailure("Invalid state")
    }
  }

  mutating func taskIsStarted(
    task: Task<Void, Never>,
    downstreamContinuation: DownstreamContinuation
  ) {
    switch self.state {
    case .initial:
      // The user called `next` and we are starting the `Task`
      // to consume the upstream sequences
      self.state = .combining(
        task: task,
        upstreams: (.init(isFinished: false), .init(isFinished: false), .init(isFinished: false)),
        downstreamContinuation: downstreamContinuation,
        buffer: .init()
      )

    case .combining, .waitingForDemand, .upstreamThrew, .upstreamsFinished, .finished:
      // We only allow a single task to be created so this must never happen.
      preconditionFailure("Internal inconsistency current state \(self.state) and received taskStarted()")

    case .modifying:
      preconditionFailure("Invalid state")
    }
  }

  /// Actions returned by `childTaskSuspended()`.
  enum ChildTaskSuspendedAction {
    /// Indicates that the continuation should be resumed which will lead to calling `next` on the upstream.
    case resumeContinuation(
      upstreamContinuation: UnsafeContinuation<Void, Error>
    )
    /// Indicates that the continuation should be resumed with an Error because another upstream sequence threw.
    case resumeContinuationWithError(
      upstreamContinuation: UnsafeContinuation<Void, Error>,
      error: Error
    )
  }

  mutating func childTaskSuspended(baseIndex: Int, continuation: UnsafeContinuation<Void, Error>) -> ChildTaskSuspendedAction? {
    switch self.state {
    case .initial:
      // Child tasks are only created after we transitioned to `zipping`
      preconditionFailure("Internal inconsistency current state \(self.state) and received childTaskSuspended()")

    case .upstreamsFinished:
      preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamThrew()")

    case .waitingForDemand(let task, var upstreams, let buffer):
      self.state = .modifying

      switch baseIndex {
      case 0:
        upstreams.0.continuation = continuation

      case 1:
        upstreams.1.continuation = continuation

      case 2:
        upstreams.2.continuation = continuation

      default:
        preconditionFailure("Internal inconsistency current state \(self.state) and received childTaskSuspended() with base index \(baseIndex)")
      }

      self.state = .waitingForDemand(
        task: task,
        upstreams: upstreams,
        buffer: buffer
      )

      return .none

    case .combining:
      // We are currently combining and need to resume any upstream until we transition to waitingForDemand

      return .resumeContinuation(upstreamContinuation: continuation)

    case .upstreamThrew, .finished:
      // Since cancellation is cooperative it might be that child tasks are still getting
      // suspended even though we already cancelled them. We must tolerate this and just resume
      // the continuation with an error.
      return .resumeContinuationWithError(
        upstreamContinuation: continuation,
        error: CancellationError()
      )

    case .modifying:
      preconditionFailure("Invalid state")
    }
  }

  /// Actions returned by `elementProduced()`.
  enum ElementProducedAction {
    /// Indicates that the downstream continuation should be resumed with the element.
    case resumeContinuation(
      downstreamContinuation: DownstreamContinuation,
      result: Result<(Base1.Element, Base2.Element, Base3.Element?)?, Error>
    )
  }

  mutating func elementProduced(_ result: (Base1.Element?, Base2.Element?, Base3.Element?)) -> ElementProducedAction? {
    switch self.state {
    case .initial:
      // Child tasks that are producing elements are only created after we transitioned to `zipping`
      preconditionFailure("Internal inconsistency current state \(self.state) and received elementProduced()")

    case .upstreamsFinished:
      preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamThrew()")

    case .waitingForDemand(let task, var upstreams, var buffer):
      // We got an element in late. This can happen since we race the upstreams.
      // We have to store the new tuple in our buffer and remember the upstream states.

      self.state = .modifying

      switch result {
      case (.some(let first), .none, .none):
        buffer.append((first, upstreams.1.element!, upstreams.2.element))
        upstreams.0.element = first

      case (.none, .some(let second), .none):
        buffer.append((upstreams.0.element!, second, upstreams.2.element))
        upstreams.1.element = second

      case (.none, .none, .some(let third)):
        buffer.append((upstreams.0.element!, upstreams.1.element!, third))
        upstreams.2.element = third

      default:
        preconditionFailure("Internal inconsistency current state \(self.state) and received elementProduced()")
      }

      self.state = .waitingForDemand(
        task: task,
        upstreams: upstreams,
        buffer: buffer
      )

      return .none

    case .combining(let task, var upstreams, let downstreamContinuation, let buffer):
      precondition(buffer.isEmpty, "Internal inconsistency current state \(self.state) and the buffer is not empty")
      self.state = .modifying

      switch result {
      case (.some(let first), .none, .none):
        upstreams.0.element = first

      case (.none, .some(let second), .none):
        upstreams.1.element = second

      case (.none, .none, .some(let third)):
        upstreams.2.element = third

      default:
        preconditionFailure("Internal inconsistency current state \(self.state) and received elementProduced()")
      }

      // Implementing this for the two arities without variadic generics is a bit awkward sadly.
      if let first = upstreams.0.element,
         let second = upstreams.1.element,
         let third = upstreams.2.element {
        // We got an element from each upstream so we can resume the downstream now
        self.state = .waitingForDemand(
          task: task,
          upstreams: upstreams,
          buffer: buffer
        )

        return .resumeContinuation(
          downstreamContinuation: downstreamContinuation,
          result: .success((first, second, third))
        )

      } else if let first = upstreams.0.element,
                let second = upstreams.1.element,
                self.numberOfUpstreamSequences == 2 {
        // We got an element from each upstream so we can resume the downstream now
        self.state = .waitingForDemand(
          task: task,
          upstreams: upstreams,
          buffer: buffer
        )

        return .resumeContinuation(
          downstreamContinuation: downstreamContinuation,
          result: .success((first, second, nil))
        )
      } else {
        // We are still waiting for one of the upstreams to produce an element
        self.state = .combining(
          task: task,
          upstreams: (
            .init(continuation: upstreams.0.continuation, element: upstreams.0.element, isFinished: upstreams.0.isFinished),
            .init(continuation: upstreams.1.continuation, element: upstreams.1.element, isFinished: upstreams.1.isFinished),
            .init(continuation: upstreams.2.continuation, element: upstreams.2.element, isFinished: upstreams.2.isFinished)
          ),
          downstreamContinuation: downstreamContinuation,
          buffer: buffer
        )

        return .none
      }

    case .upstreamThrew, .finished:
      // Since cancellation is cooperative it might be that child tasks
      // are still producing elements after we finished.
      // We are just going to drop them since there is nothing we can do
      return .none

    case .modifying:
      preconditionFailure("Invalid state")
    }
  }

  /// Actions returned by `upstreamFinished()`.
  enum UpstreamFinishedAction {
    /// Indicates the task and the upstream continuations should be cancelled.
    case cancelTaskAndUpstreamContinuations(
      task: Task<Void, Never>,
      upstreamContinuations: [UnsafeContinuation<Void, Error>]
    )
    /// Indicates that the downstream continuation should be resumed with `nil` and
    /// the task and the upstream continuations should be cancelled.
    case resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations(
      downstreamContinuation: DownstreamContinuation,
      task: Task<Void, Never>,
      upstreamContinuations: [UnsafeContinuation<Void, Error>]
    )
  }

  mutating func upstreamFinished(baseIndex: Int) -> UpstreamFinishedAction? {
    switch self.state {
    case .initial:
      preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()")

    case .upstreamsFinished:
      preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()")

    case .waitingForDemand(let task, var upstreams, let buffer):
      // One of the upstreams finished.

      self.state = .modifying

      switch baseIndex {
      case 0:
        upstreams.0.isFinished = true

      case 1:
        upstreams.1.isFinished = true

      case 2:
        upstreams.2.isFinished = true

      default:
        preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished() with base index \(baseIndex)")
      }

      if upstreams.0.isFinished && upstreams.1.isFinished && upstreams.2.isFinished {
        // All upstreams finished we can transition to either finished or upstreamsFinished now
        if buffer.isEmpty {
          self.state = .finished
        } else {
          self.state = .upstreamsFinished(buffer: buffer)
        }

        return .cancelTaskAndUpstreamContinuations(
          task: task,
          upstreamContinuations: [upstreams.0.continuation, upstreams.1.continuation, upstreams.2.continuation].compactMap { $0 }
        )
      } else if upstreams.0.isFinished && upstreams.1.isFinished && self.numberOfUpstreamSequences == 2 {
        // All upstreams finished we can transition to either finished or upstreamsFinished now
        if buffer.isEmpty {
          self.state = .finished
        } else {
          self.state = .upstreamsFinished(buffer: buffer)
        }

        return .cancelTaskAndUpstreamContinuations(
          task: task,
          upstreamContinuations: [upstreams.0.continuation, upstreams.1.continuation, upstreams.2.continuation].compactMap { $0 }
        )
      } else {
        self.state = .waitingForDemand(
          task: task,
          upstreams: upstreams,
          buffer: buffer
        )
        return .none
      }

    case .combining(let task, var upstreams, let downstreamContinuation, let buffer):
      // One of the upstreams finished.

      self.state = .modifying

      // We need to track if an empty upstream finished.
      // If that happens we can transition to finish right away.
      let emptyUpstreamFinished: Bool
      switch baseIndex {
      case 0:
        upstreams.0.isFinished = true
        emptyUpstreamFinished = upstreams.0.element == nil

      case 1:
        upstreams.1.isFinished = true
        emptyUpstreamFinished = upstreams.1.element == nil

      case 2:
        upstreams.2.isFinished = true
        emptyUpstreamFinished = upstreams.2.element == nil

      default:
        preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished() with base index \(baseIndex)")
      }

      // Implementing this for the two arities without variadic generics is a bit awkward sadly.
      if emptyUpstreamFinished {
        // All upstreams finished
        self.state = .finished

        return .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations(
          downstreamContinuation: downstreamContinuation,
          task: task,
          upstreamContinuations: [upstreams.0.continuation, upstreams.1.continuation, upstreams.2.continuation].compactMap { $0 }
        )

      } else if upstreams.0.isFinished && upstreams.1.isFinished && upstreams.2.isFinished {
        // All upstreams finished
        self.state = .finished

        return .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations(
          downstreamContinuation: downstreamContinuation,
          task: task,
          upstreamContinuations: [upstreams.0.continuation, upstreams.1.continuation, upstreams.2.continuation].compactMap { $0 }
        )

      } else if upstreams.0.isFinished && upstreams.1.isFinished && self.numberOfUpstreamSequences == 2 {
        // All upstreams finished
        self.state = .finished

        return .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations(
          downstreamContinuation: downstreamContinuation,
          task: task,
          upstreamContinuations: [upstreams.0.continuation, upstreams.1.continuation, upstreams.2.continuation].compactMap { $0 }
        )
      } else {
        self.state = .combining(
          task: task,
          upstreams: upstreams,
          downstreamContinuation: downstreamContinuation,
          buffer: buffer
        )
        return .none
      }

    case .upstreamThrew, .finished:
      // This is just everything finishing up, nothing to do here
      return .none

    case .modifying:
      preconditionFailure("Invalid state")
    }
  }

  /// Actions returned by `upstreamThrew()`.
  enum UpstreamThrewAction {
    /// Indicates the task and the upstream continuations should be cancelled.
    case cancelTaskAndUpstreamContinuations(
      task: Task<Void, Never>,
      upstreamContinuations: [UnsafeContinuation<Void, Error>]
    )
    /// Indicates that the downstream continuation should be resumed with the `error` and
    /// the task and the upstream continuations should be cancelled.
    case resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuations(
      downstreamContinuation: DownstreamContinuation,
      error: Error,
      task: Task<Void, Never>,
      upstreamContinuations: [UnsafeContinuation<Void, Error>]
    )
  }

  mutating func upstreamThrew(_ error: Error) -> UpstreamThrewAction? {
    switch self.state {
    case .initial:
      preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamThrew()")

    case .upstreamsFinished:
      // We need to tolerate multiple upstreams failing
      return .none

    case .waitingForDemand(let task, let upstreams, _):
      // An upstream threw. We can cancel everything now and transition to finished.
      // We just need to store the error for the next downstream demand
      self.state = .upstreamThrew(
        error: error
      )

      return .cancelTaskAndUpstreamContinuations(
        task: task,
        upstreamContinuations: [upstreams.0.continuation, upstreams.1.continuation, upstreams.2.continuation].compactMap { $0 }
      )

    case .combining(let task, let upstreams, let downstreamContinuation, _):
      // One of our upstreams threw. We need to transition to finished ourselves now
      // and resume the downstream continuation with the error. Furthermore, we need to cancel all of
      // the upstream work.
      self.state = .finished

      return .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuations(
        downstreamContinuation: downstreamContinuation,
        error: error,
        task: task,
        upstreamContinuations: [upstreams.0.continuation, upstreams.1.continuation, upstreams.2.continuation].compactMap { $0 }
      )

    case .upstreamThrew, .finished:
      // This is just everything finishing up, nothing to do here
      return .none

    case .modifying:
      preconditionFailure("Invalid state")
    }
  }

  /// Actions returned by `cancelled()`.
  enum CancelledAction {
    /// Indicates that the downstream continuation needs to be resumed and
    /// task and the upstream continuations should be cancelled.
    case resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamContinuations(
      downstreamContinuation: DownstreamContinuation,
      task: Task<Void, Never>,
      upstreamContinuations: [UnsafeContinuation<Void, Error>]
    )
    /// Indicates that the task and the upstream continuations should be cancelled.
    case cancelTaskAndUpstreamContinuations(
      task: Task<Void, Never>,
      upstreamContinuations: [UnsafeContinuation<Void, Error>]
    )
  }

  mutating func cancelled() -> CancelledAction? {
    switch self.state {
    case .initial:
      state = .finished

      return .none

    case .waitingForDemand(let task, let upstreams, _):
      // The downstream task got cancelled so we need to cancel our upstream Task
      // and resume all continuations. We can also transition to finished.
      self.state = .finished

      return .cancelTaskAndUpstreamContinuations(
        task: task,
        upstreamContinuations: [upstreams.0.continuation, upstreams.1.continuation, upstreams.2.continuation].compactMap { $0 }
      )

    case .combining(let task, let upstreams, let downstreamContinuation, _):
      // The downstream Task got cancelled so we need to cancel our upstream Task
      // and resume all continuations. We can also transition to finished.
      self.state = .finished

      return .resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamContinuations(
        downstreamContinuation: downstreamContinuation,
        task: task,
        upstreamContinuations: [upstreams.0.continuation, upstreams.1.continuation, upstreams.2.continuation].compactMap { $0 }
      )

    case .upstreamsFinished:
      // We can transition to finished now
      self.state = .finished

      return .none

    case .upstreamThrew, .finished:
      // We are already finished so nothing to do here:

      return .none

    case .modifying:
      preconditionFailure("Invalid state")
    }
  }

  /// Actions returned by `next()`.
  enum NextAction {
    /// Indicates that a new `Task` should be created that consumes the sequence.
    case startTask(Base1, Base2, Base3?)
    /// Indicates that all upstream continuations should be resumed.
    case resumeUpstreamContinuations(
      upstreamContinuation: [UnsafeContinuation<Void, Error>]
    )
    /// Indicates that the downstream continuation should be resumed with the result.
    case resumeContinuation(
      downstreamContinuation: DownstreamContinuation,
      result: Result<(Base1.Element, Base2.Element, Base3.Element?)?, Error>
    )
    /// Indicates that the downstream continuation should be resumed with `nil`.
    case resumeDownstreamContinuationWithNil(DownstreamContinuation)
  }

  mutating func next(for continuation: DownstreamContinuation) -> NextAction {
    switch self.state {
    case .initial(let base1, let base2, let base3):
      // This is the first time we get demand singalled so we have to start the task
      // The transition to the next state is done in the taskStarted method
      return .startTask(base1, base2, base3)

    case .combining:
      // We already got demand signalled and have suspended the downstream task
      // Getting a second next calls means the iterator was transferred across Tasks which is not allowed
      preconditionFailure("Internal inconsistency current state \(self.state) and received next()")

    case .waitingForDemand(let task, var upstreams, var buffer):
      // We got demand signalled now we have to check if there is anything buffered.
      // If not we have to transition to combining and need to resume all upstream continuations now
      self.state = .modifying

      if let element = buffer.popFirst() {
        self.state = .waitingForDemand(
          task: task,
          upstreams: upstreams,
          buffer: buffer
        )

        return .resumeContinuation(
          downstreamContinuation: continuation,
          result: .success(element)
        )
      } else {
        let upstreamContinuations = [upstreams.0.continuation, upstreams.1.continuation, upstreams.2.continuation].compactMap { $0 }
        upstreams.0.continuation = nil
        upstreams.1.continuation = nil
        upstreams.2.continuation = nil

        self.state = .combining(
          task: task,
          upstreams: upstreams,
          downstreamContinuation: continuation,
          buffer: buffer
        )

        return .resumeUpstreamContinuations(
          upstreamContinuation: upstreamContinuations
        )
      }

    case .upstreamsFinished(var buffer):
      self.state = .modifying

      if let element = buffer.popFirst() {
        self.state = .upstreamsFinished(buffer: buffer)

        return .resumeContinuation(
          downstreamContinuation: continuation,
          result: .success(element)
        )
      } else {
        self.state = .finished

        return .resumeDownstreamContinuationWithNil(continuation)
      }

    case .upstreamThrew(let error):
      // One of the upstreams threw and we have to return this error now.
      self.state = .finished

      return .resumeContinuation(downstreamContinuation: continuation, result: .failure(error))

    case .finished:
      // We are already finished so we are just returning `nil`
      return .resumeDownstreamContinuationWithNil(continuation)

    case .modifying:
      preconditionFailure("Invalid state")
    }
  }
}