File: lazy_value.rb

package info (click to toggle)
ruby-google-cloud-env 2.1.1-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 220 kB
  • sloc: ruby: 984; makefile: 3
file content (1003 lines) | stat: -rw-r--r-- 41,400 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
# frozen_string_literal: true

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require "English"

module Google
  module Cloud
    class Env
      ##
      # @private
      #
      # A lazy value box with thread-safe memoization. The first time accessed
      # it will call a given block to compute its value, and will cache that
      # value. Subsequent requests will return the cached value.
      #
      # At most one thread will be allowed to run the computation; if another
      # thread is already in the middle of a computation, any new threads
      # requesting the value will wait until the existing computation is
      # complete, and will use that computation's result rather than kicking
      # off their own computation.
      #
      # If a computation fails with an exception, that exception will also be
      # memoized and reraised on subsequent accesses. A LazyValue can also be
      # configured so subsequent accesses will retry the computation if the
      # previous computation failed. The maximum number of retries is
      # configurable, as is the retry "interval", i.e. the time since the last
      # failure before an access will retry the computation.
      #
      # By default, a computation's memoized value (or final error after
      # retries have been exhausted) is maintained for the lifetime of the Ruby
      # process. However, a computation can also cause its result (or error) to
      # expire after a specified number of seconds, forcing a recomputation on
      # the next access following expiration, by calling
      # {LazyValue.expiring_value} or {LazyValue.raise_expiring_error}.
      #
      # We keep this private for now so we can move it in the future if we need
      # it to be available to other libraries. Currently it should not be used
      # outside of Google::Cloud::Env.
      #
      class LazyValue
        class << self
          ##
          # Creates a special object that can be returned from a computation to
          # indicate that a value expires after the given number of seconds.
          # Any access after the expiration will cause a recomputation.
          #
          # @param lifetime [Numeric] timeout in seconds
          # @param value [Object] the computation result
          #
          def expiring_value lifetime, value
            return value unless lifetime
            ExpiringValue.new lifetime, value
          end

          ##
          # Raise an error that, if it is the final result (i.e. retries have
          # been exhausted), will expire after the given number of seconds. Any
          # access after the expiration will cause a recomputation. If retries
          # will not have been exhausted, expiration is ignored.
          #
          # The error can be specified as an exception object, a string (in
          # which case a RuntimeError will be raised), or a class that descends
          # from Exception (in which case an error of that type will be
          # created, and passed any additional args given).
          #
          # @param lifetime [Numeric] timeout in seconds
          # @param error [String,Exception,Class] the error to raise
          # @param args [Array] any arguments to pass to an error constructor
          #
          def raise_expiring_error lifetime, error, *args
            raise error unless lifetime
            raise ExpiringError, lifetime if error.equal? $ERROR_INFO
            if error.is_a?(Class) && error.ancestors.include?(Exception)
              error = error.new(*args)
            elsif !error.is_a? Exception
              error = RuntimeError.new error.to_s
            end
            begin
              raise error
            rescue error.class
              raise ExpiringError, lifetime
            end
          end
        end

        ##
        # Create a LazyValue.
        #
        # You must pass a block that will be called to compute the value the
        # first time it is accessed. The block should evaluate to the desired
        # value, or raise an exception on error. To specify a value that
        # expires, use {LazyValue.expiring_value}. To raise an exception that
        # expires, use {LazyValue.raise_expiring_error}.
        #
        # You can optionally pass a retry manager, which controls how
        # subsequent accesses might try calling the block again if a compute
        # attempt fails with an exception. A retry manager should either be an
        # instance of {Retries} or an object that duck types it.
        #
        # @param retries [Retries] A retry manager. The default is a retry
        #     manager that tries only once.
        # @param block [Proc] A block that can be called to attempt to compute
        #     the value.
        #
        def initialize retries: nil, &block
          @retries = retries || Retries.new
          @compute_handler = block
          raise ArgumentError, "missing compute handler block" unless block

          # Internally implemented by a state machine, protected by a mutex that
          # ensures state transitions are consistent. The states themselves are
          # implicit in the values of the various instance variables. The
          # following are the major states:
          #
          # 1. **Pending** The value is not known and needs to be computed.
          #     @retries.finished? is false.
          #     @value is nil.
          #     @error is nil if no previous attempt has yet been made to
          #         compute the value, or set to the error that resulted from
          #         the most recent attempt.
          #     @expires_at is set to the monotonic time of the end of the
          #         current retry delay, or nil if the next computation attempt
          #         should happen immediately at the next access.
          #     @computing_thread is nil.
          #     @compute_notify is nil.
          #     @backfill_notify is set if currently backfilling, otherwise nil.
          #     From this state, calling #get will start computation (first
          #     waiting on @backfill_notify if present). Calling #expire! will
          #     have no effect.
          #
          # 2. **Computing** One thread has initiated computation. All other
          #     threads will be blocked (waiting on @compute_notify) until the
          #     computing thread finishes.
          #     @retries.finished? is false.
          #     @value and @error are nil.
          #     @expires_at is set to the monotonic time when computing started.
          #     @computing_thread is set to the thread that is computing.
          #     @compute_notify is set.
          #     @backfill_notify is nil.
          #     From this state, calling #get will cause the thread to wait
          #     (on @compute_notify) for the computing thread to complete.
          #     Calling #expire! will have no effect.
          #     When the computing thread finishes, it will transition either
          #     to Finished if the computation was successful or failed with
          #     no more retries, or back to Pending if computation failed with
          #     at least one retry remaining. It might also set @backfill_notify
          #     if other threads are waiting for completion.
          #
          # 3. **Finished** Computation has succeeded, or has failed and no
          #     more retries remain.
          #     @retries.finished? is true.
          #     either @value or @error is set, and the other is nil, depending
          #         on whether the final state is success or failure. (If both
          #         are nil, it is considered a @value of nil.)
          #     @expires_at is set to the monotonic time of expiration, or nil
          #         if there is no expiration.
          #     @computing_thread is nil.
          #     @compute_notify is nil.
          #     @backfill_notify is set if currently backfilling, otherwise nil.
          #     From this state, calling #get will either return the result or
          #     raise the error. If the current time exceeds @expires_at,
          #     however, it will block on @backfill_notify (if present), and
          #     and then transition to Pending first, and proceed from there.
          #     Calling #expire! will block on @backfill_notify (if present)
          #     and then transition to Pending,
          #
          # @backfill_notify can be set in the Pending or Finished states. This
          # happens when threads that had been waiting on the previous
          # computation are still clearing out and returning their results.
          # Backfill must complete before the next computation attempt can be
          # started from the Pending state, or before an expiration can take
          # place from the Finished state. This prevents an "overlap" situation
          # where a thread that had been waiting for a previous computation,
          # isn't able to return the new result before some other thread starts
          # a new computation or expires the value. Note that it is okay for
          # #set! to be called during backfill; the threads still backfilling
          # will simply return the new value.
          #
          # Note: One might ask if it would be simpler to extend the mutex
          # across the entire computation, having it protect the computation
          # itself, instead of the current approach of having explicit compute
          # and backfill states with notifications and having the mutex protect
          # only the state transition. However, this would not have been able
          # to satisfy the requirement that we be able to detect whether a
          # thread asked for the value during another thread's computation,
          # and thus should "share" in that computation's result even if it's
          # a failure (rather than kicking off a retry). Additionally, we
          # consider it dangerous to have the computation block run inside a
          # mutex, because arbitrary code can run there which might result in
          # deadlocks.
          @mutex = Thread::Mutex.new
          # The evaluated, cached value, which could be nil.
          @value = nil
          # The last error encountered
          @error = nil
          # If non-nil, this is the CLOCK_MONOTONIC time when the current state
          # expires. If the state is finished, this is the time the current
          # value or error expires (while nil means it never expires). If the
          # state is pending, this is the time the wait period before the next
          # retry expires (and nil means there is no delay.) If the state is
          # computing, this is the time when computing started.
          @expires_at = nil
          # Set to a condition variable during computation. Broadcasts when the
          # computation is complete. Any threads wanting to get the value
          # during computation must wait on this first.
          @compute_notify = nil
          # Set to a condition variable during backfill. Broadcasts when the
          # last backfill thread is complete. Any threads wanting to expire the
          # cache or start a new computation during backfill must wait on this
          # first.
          @backfill_notify = nil
          # The number of threads waiting on backfill. Used to determine
          # whether to activate backfill_notify when a computation completes.
          @backfill_count = 0
          # The thread running the current computation. This is tested against
          # new requests to protect against deadlocks where a thread tries to
          # re-enter from its own computation. This is also tested when a
          # computation completes, to ensure that the computation is still
          # relevant (i.e. if #set! interrupts a computation, this is reset to
          # nil).
          @computing_thread = nil
        end

        ##
        # Returns the value. This will either return the value or raise an
        # error indicating failure to compute the value.
        #
        # If the value was previously cached, it will return that cached value,
        # otherwise it will either run the computation to try to determine the
        # value, or wait for another thread that is already running the
        # computation. Thus, this method could block.
        #
        # Any arguments passed will be forwarded to the block if called, but
        # are ignored if a cached value is returned.
        #
        # @return [Object] the value
        # @raise [Exception] if an error happened while computing the value
        #
        def get *extra_args
          @mutex.synchronize do
            # Wait for any backfill to complete, and handle expiration first
            # because it might change the state.
            wait_backfill
            do_expire if should_expire?
            # Main state handling
            if @retries.finished?
              # finished state: return value or error
              return cached_value
            elsif !@compute_notify.nil?
              # computing state: wait for the computing thread to finish then
              # return its result
              wait_compute
              return cached_value
            else
              # pending state
              cur_time = Process.clock_gettime Process::CLOCK_MONOTONIC
              # waiting for the next retry: return current error
              raise @error if @expires_at && cur_time < @expires_at
              # no delay: compute in the current thread
              enter_compute cur_time
              # and continue below
            end
          end

          # Gets here if we just transitioned from pending to compute
          perform_compute extra_args
        end

        ##
        # This method calls {#get} repeatedly until a final result is available
        # or retries have exhausted.
        #
        # Note: this method spins on {#get}, although honoring any retry delay.
        # Thus, it is best to call this only if retries are limited or a retry
        # delay has been configured.
        #
        # @param extra_args [Array] extra arguments to pass to the block
        # @param transient_errors [Array<Class>] An array of exception classes
        #     that will be treated as transient and will allow await to
        #     continue retrying. Exceptions omitted from this list will be
        #     treated as fatal errors and abort the call. Default is
        #     `[StandardError]`.
        # @param max_tries [Integer,nil] The maximum number of times this will
        #     call {#get} before giving up, or nil for a potentially unlimited
        #     number of attempts. Default is 1.
        # @param max_time [Numeric,nil] The maximum time in seconds this will
        #     spend before giving up, or nil (the default) for a potentially
        #     unlimited timeout.
        # @param delay_epsilon [Numeric] An extra delay in seconds to ensure
        #     that retries happen after the retry delay period
        #
        # @return [Object] the value
        # @raise [Exception] if a fatal error happened, or retries have been
        #     exhausted.
        #
        def await *extra_args, transient_errors: nil, max_tries: 1, max_time: nil, delay_epsilon: 0.0001
          transient_errors ||= [StandardError]
          transient_errors = Array transient_errors
          expiry_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + max_time if max_time
          begin
            get(*extra_args)
          rescue *transient_errors
            # A snapshot of the state. It is possible that another thread has
            # changed this state since we received the error. This is okay
            # because our specification for this method is conservative:
            # whatever we return will have been correct at some point.
            state = internal_state
            # Don't retry unless we're in a state where retries can happen.
            raise if [:failed, :success].include? state[0]
            if max_tries
              # Handle retry countdown
              max_tries -= 1
              raise unless max_tries.positive?
            end
            # Determine the next delay
            delay = determine_await_retry_delay state, expiry_time, delay_epsilon
            # nil means we've exceeded the max time
            raise if delay.nil?
            sleep delay if delay.positive?
            retry
          end
        end

        ##
        # Returns the current low-level state immediately without waiting for
        # computation. Returns a 3-tuple (i.e. a 3-element array) in which the
        # first element is a symbol indicating the overall state, as described
        # below, and the second and third elements are set accordingly.
        #
        # States (the first tuple element) are:
        # * `:pending` - The value has not been computed, or previous
        #   computation attempts have failed but there are retries pending. The
        #   second element will be the most recent error, or nil if no
        #   computation attempt has yet happened. The third element will be the
        #   monotonic time of the end of the current retry delay, or nil if
        #   there will be no delay.
        # * `:computing` - A thread is currently computing the value. The
        #   second element is nil. The third elements is the monotonic time
        #   when the computation started.
        # * `:success` - The computation is finished, and the value is returned
        #   in the second element. The third element may be a numeric value
        #   indicating the expiration monotonic time, or nil for no expiration.
        # * `:failed` - The computation failed finally and no more retries will
        #   be done. The error is returned in the second element. The third
        #   element may be a numeric value indicating the expiration monotonic
        #   time, or nil for no expiration.
        #
        # Future updates may add array elements without warning. Callers should
        # be prepared to ignore additional unexpected elements.
        #
        # @return [Array]
        #
        def internal_state
          @mutex.synchronize do
            if @retries.finished?
              if @error
                [:failed, @error, @expires_at]
              else
                [:success, @value, @expires_at]
              end
            elsif @compute_notify.nil?
              [:pending, @error, @expires_at]
            else
              [:computing, nil, @expires_at]
            end
          end
        end

        ##
        # Force this cache to expire immediately, if computation is complete.
        # Any cached value will be cleared, the retry count is reset, and the
        # next access will call the compute block as if it were the first
        # access. Returns true if this took place. Has no effect and returns
        # false if the computation is not yet complete (i.e. if a thread is
        # currently computing, or if the last attempt failed and retries have
        # not yet been exhausted.)
        #
        # @return [true,false] whether the cache was expired
        #
        def expire!
          @mutex.synchronize do
            wait_backfill
            return false unless @retries.finished?
            do_expire
            true
          end
        end

        ##
        # Set the cache value explicitly and immediately. If a computation is
        # in progress, it is "detached" and its result will no longer be
        # considered.
        #
        # @param value [Object] the value to set
        # @param lifetime [Numeric] the lifetime until expiration in seconds,
        #     or nil (the default) for no expiration.
        # @return [Object] the value
        #
        def set! value, lifetime: nil
          @mutex.synchronize do
            @value = value
            @expires_at = determine_expiry lifetime
            @error = nil
            @retries.finish!
            if @compute_notify.nil?
              enter_backfill
              leave_compute
            end
            value
          end
        end

        private

        ##
        # @private
        # Internal type signaling a value with an expiration
        #
        class ExpiringValue
          def initialize lifetime, value
            @lifetime = lifetime
            @value = value
          end

          attr_reader :lifetime
          attr_reader :value
        end

        ##
        # @private
        # Internal type signaling an error with an expiration.
        #
        class ExpiringError < StandardError
          def initialize lifetime
            super()
            @lifetime = lifetime
          end

          attr_reader :lifetime
        end

        ##
        # @private
        # Perform computation, and transition state on completion.
        # This must be called from outside the mutex.
        # Returns the final value, or raises the final error.
        #
        def perform_compute extra_args
          value = @compute_handler.call(*extra_args)
          @mutex.synchronize do
            handle_success value
          end
        rescue Exception => e # rubocop:disable Lint/RescueException
          @mutex.synchronize do
            handle_failure e
          end
        end

        ##
        # @private
        # Either return the cached value or raise the cached error.
        # This must be called from within the mutex.
        #
        def cached_value
          raise @error if @error
          @value
        end

        ##
        # @private
        # Determine whether we should expire a cached value and compute a new
        # one. Happens in the Finished state if @expires_at is in the past.
        # This must be called from within the mutex.
        #
        def should_expire?
          @retries.finished? && @expires_at && Process.clock_gettime(Process::CLOCK_MONOTONIC) >= @expires_at
        end

        ##
        # @private
        # Reset this cache, transitioning to the Pending state and resetting
        # the retry count.
        # This must be called from within the mutex.
        #
        def do_expire
          @retries.reset!
          @value = @error = @expires_at = nil
        end

        ##
        # @private
        # Wait for backfill to complete if it is in progress, otherwise just
        # return immediately.
        # This must be called from within the mutex.
        #
        def wait_backfill
          @backfill_notify.wait @mutex while @backfill_notify
        end

        ##
        # @private
        # Wait for computation to complete.
        # Also adds the current thread to the backfill list, ensuring that the
        # computing thread will enter the backfill phase on completion. Once
        # computation is done, also checks whether the current thread is the
        # last one to backfill, and if so, turns off backfill mode.
        # This must be called from within the mutex.
        #
        def wait_compute
          if Thread.current.equal? @computing_thread
            raise ThreadError, "deadlock: tried to call LazyValue#get from its own computation"
          end
          @backfill_count += 1
          begin
            @compute_notify.wait @mutex
          ensure
            @backfill_count -= 1
            leave_backfill
          end
        end

        ##
        # @private
        # Initializes compute mode.
        # This must be called from within the mutex.
        #
        def enter_compute cur_time
          @computing_thread = Thread.current
          @compute_notify = Thread::ConditionVariable.new
          @expires_at = cur_time
          @value = @error = nil
        end

        ##
        # @private
        # Finishes compute mode, notifying threads waiting on it.
        # This must be called from within the mutex.
        #
        def leave_compute
          @computing_thread = nil
          @compute_notify.broadcast
          @compute_notify = nil
        end

        ##
        # @private
        # Checks for any threads that need backfill, and if so triggers
        # backfill mode.
        # This must be called from within the mutex.
        #
        def enter_backfill
          return unless @backfill_count.positive?
          @backfill_notify = Thread::ConditionVariable.new
        end

        ##
        # @private
        # Checks whether all threads are done with backfill, and if so notifies
        # threads waiting for backfill to finish.
        # This must be called from within the mutex.
        #
        def leave_backfill
          return unless @backfill_count.zero?
          @backfill_notify.broadcast
          @backfill_notify = nil
        end

        ##
        # @private
        # Sets state to reflect a successful computation (as long as this
        # computation wasn't interrupted by someone calling #set!).
        # Then returns the computed value.
        # This must be called from within the mutex.
        #
        def handle_success value
          expires_at = nil
          if value.is_a? ExpiringValue
            expires_at = determine_expiry value.lifetime
            value = value.value
          end
          if Thread.current.equal? @computing_thread
            @retries.finish!
            @error = nil
            @value = value
            @expires_at = expires_at
            enter_backfill
            leave_compute
          end
          value
        end

        ##
        # @private
        # Sets state to reflect a failed computation (as long as this
        # computation wasn't interrupted by someone calling #set!).
        # Then raises the error.
        # This must be called from within the mutex.
        #
        def handle_failure error
          expires_at = nil
          if error.is_a? ExpiringError
            expires_at = determine_expiry error.lifetime
            error = error.cause
          end
          if Thread.current.equal? @computing_thread
            retry_delay = @retries.next start_time: @expires_at
            @value = nil
            @error = error
            @expires_at =
              if retry_delay.nil?
                # No more retries; use the expiration for the error
                expires_at
              elsif retry_delay.positive?
                determine_expiry retry_delay
              end
            enter_backfill
            leave_compute
          end
          raise error
        end

        ##
        # @private
        # Determines the delay until the next retry during an await
        #
        def determine_await_retry_delay state, expiry_time, delay_epsilon
          cur_time = Process.clock_gettime Process::CLOCK_MONOTONIC
          next_run_time =
            if state[0] == :pending && state[2]
              # Run at end of the current retry delay, plus an epsilon,
              # if in pending state
              state[2] + delay_epsilon
            else
              # Default to run immediately otherwise
              cur_time
            end
          # Signal nil if we're past the max time
          return nil if expiry_time && next_run_time > expiry_time
          # No delay if we're already past the time we want to run
          return 0 if next_run_time < cur_time
          next_run_time - cur_time
        end

        ##
        # @private
        # Determines the expires_at value in monotonic time, given a lifetime.
        #
        def determine_expiry lifetime
          lifetime ? Process.clock_gettime(Process::CLOCK_MONOTONIC) + lifetime : nil
        end
      end

      ##
      # @private
      #
      # This expands on {LazyValue} by providing a lazy key-value dictionary.
      # Each key uses a separate LazyValue; hence multiple keys can be in the
      # process of computation concurrently and independently.
      #
      # We keep this private for now so we can move it in the future if we need
      # it to be available to other libraries. Currently it should not be used
      # outside of Google::Cloud::Env.
      #
      class LazyDict
        ##
        # Create a LazyDict.
        #
        # You must pass a block that will be called to compute the value the
        # first time it is accessed. The block takes the key as an argument and
        # should evaluate to the value for that key, or raise an exception on
        # error. To specify a value that expires, use
        # {LazyValue.expiring_value}. To raise an exception that expires, use
        # {LazyValue.raise_expiring_error}.
        #
        # You can optionally pass a retry manager, which controls how
        # subsequent accesses might try calling the block again if a compute
        # attempt fails with an exception. A retry manager should either be an
        # instance of {Retries} or an object that duck types it.
        #
        # @param retries [Retries,Proc] A retry manager. The default is a retry
        #     manager that tries only once. You can provide either a static
        #     retry manager or a Proc that returns a retry manager.
        # @param block [Proc] A block that can be called to attempt to compute
        #     the value given the key.
        #
        def initialize retries: nil, &block
          @retries = retries
          @compute_handler = block
          @key_values = {}
          @mutex = Thread::Mutex.new
        end

        ##
        # Returns the value for the given key. This will either return the
        # value or raise an error indicating failure to compute the value. If
        # the value was previously cached, it will return that cached value,
        # otherwise it will either run the computation to try to determine the
        # value, or wait for another thread that is already running the
        # computation.
        #
        # Any arguments beyond the initial key argument will be passed to the
        # block if it is called, but are ignored if a cached value is returned.
        #
        # @param key [Object] the key
        # @param extra_args [Array] extra arguments to pass to the block
        # @return [Object] the value
        # @raise [Exception] if an error happened while computing the value
        #
        def get key, *extra_args
          lookup_key(key).get key, *extra_args
        end
        alias [] get

        ##
        # This method calls {#get} repeatedly until a final result is available
        # or retries have exhausted.
        #
        # Note: this method spins on {#get}, although honoring any retry delay.
        # Thus, it is best to call this only if retries are limited or a retry
        # delay has been configured.
        #
        # @param key [Object] the key
        # @param extra_args [Array] extra arguments to pass to the block
        # @param transient_errors [Array<Class>] An array of exception classes
        #     that will be treated as transient and will allow await to
        #     continue retrying. Exceptions omitted from this list will be
        #     treated as fatal errors and abort the call. Default is
        #     `[StandardError]`.
        # @param max_tries [Integer,nil] The maximum number of times this will
        #     call {#get} before giving up, or nil for a potentially unlimited
        #     number of attempts. Default is 1.
        # @param max_time [Numeric,nil] The maximum time in seconds this will
        #     spend before giving up, or nil (the default) for a potentially
        #     unlimited timeout.
        #
        # @return [Object] the value
        # @raise [Exception] if a fatal error happened, or retries have been
        #     exhausted.
        #
        def await key, *extra_args, transient_errors: nil, max_tries: 1, max_time: nil
          lookup_key(key).await key, *extra_args,
                                transient_errors: transient_errors,
                                max_tries: max_tries,
                                max_time: max_time
        end

        ##
        # Returns the current low-level state for the given key. Does not block
        # for computation. See {LazyValue#internal_state} for details.
        #
        # @param key [Object] the key
        # @return [Array] the low-level state
        #
        def internal_state key
          lookup_key(key).internal_state
        end

        ##
        # Force the cache for the given key to expire immediately, if
        # computation is complete.
        #
        # Any cached value will be cleared, the retry count is reset, and the
        # next access will call the compute block as if it were the first
        # access. Returns true if this took place. Has no effect and returns
        # false if the computation is not yet complete (i.e. if a thread is
        # currently computing, or if the last attempt failed and retries have
        # not yet been exhausted.)
        #
        # @param key [Object] the key
        # @return [true,false] whether the cache was expired
        #
        def expire! key
          lookup_key(key).expire!
        end

        ##
        # Force the values for all keys to expire immediately.
        #
        # @return [Array<Object>] A list of keys that were expired. A key is
        #     *not* included if its computation is not yet complete (i.e. if a
        #     thread is currently computing, or if the last attempt failed and
        #     retries have not yet been exhausted.)
        #
        def expire_all!
          all_expired = []
          @mutex.synchronize do
            @key_values.each do |key, value|
              all_expired << key if value.expire!
            end
          end
          all_expired
        end

        ##
        # Set the cache value for the given key explicitly and immediately.
        # If a computation is in progress, it is "detached" and its result will
        # no longer be considered.
        #
        # @param key [Object] the key
        # @param value [Object] the value to set
        # @param lifetime [Numeric] the lifetime until expiration in seconds,
        #     or nil (the default) for no expiration.
        # @return [Object] the value
        #
        def set! key, value, lifetime: nil
          lookup_key(key).set! value, lifetime: lifetime
        end

        private

        ##
        # @private
        # Ensures that exactly one LazyValue exists for the given key, and
        # returns it.
        #
        def lookup_key key
          # Optimization: check for key existence and return quickly without
          # grabbing the mutex. This works because keys are never deleted.
          return @key_values[key] if @key_values.key? key

          @mutex.synchronize do
            if @key_values.key? key
              @key_values[key]
            else
              retries =
                if @retries.respond_to? :reset_dup
                  @retries.reset_dup
                elsif @retries.respond_to? :call
                  @retries.call
                end
              @key_values[key] = LazyValue.new retries: retries, &@compute_handler
            end
          end
        end
      end

      ##
      # @private
      #
      # A simple retry manager with optional delay and backoff. It retries
      # until either a configured maximum number of attempts has been
      # reached, or a configurable total time has elapsed since the first
      # failure.
      #
      # This class is not thread-safe by itself. Access should be protected
      # by an external mutex.
      #
      # We keep this private for now so we can move it in the future if we need
      # it to be available to other libraries. Currently it should not be used
      # outside of Google::Cloud::Env.
      #
      class Retries
        ##
        # Create and initialize a retry manager.
        #
        # @param max_tries [Integer,nil] Maximum number of attempts before we
        #     give up altogether, or nil for no maximum. Default is 1,
        #     indicating one attempt and no retries.
        # @param max_time [Numeric,nil] The maximum amount of time in seconds
        #     until we give up altogether, or nil for no maximum. Default is
        #     nil.
        # @param initial_delay [Numeric] Initial delay between attempts, in
        #     seconds. Default is 0.
        # @param max_delay [Numeric,nil] Maximum delay between attempts, in
        #     seconds, or nil for no max. Default is nil.
        # @param delay_multiplier [Numeric] Multipler applied to the delay
        #     between attempts. Default is 1 for no change.
        # @param delay_adder [Numeric] Value added to the delay between
        #     attempts. Default is 0 for no change.
        # @param delay_includes_time_elapsed [true,false] Whether to deduct any
        #     time already elapsed from the retry delay. Default is false.
        #
        def initialize max_tries: 1,
                       max_time: nil,
                       initial_delay: 0,
                       max_delay: nil,
                       delay_multiplier: 1,
                       delay_adder: 0,
                       delay_includes_time_elapsed: false
          @max_tries = max_tries&.to_i
          raise ArgumentError, "max_tries must be positive" if @max_tries && !@max_tries.positive?
          @max_time = max_time
          raise ArgumentError, "max_time must be positive" if @max_time && !@max_time.positive?
          @initial_delay = initial_delay
          raise ArgumentError, "initial_delay must be nonnegative" if @initial_delay&.negative?
          @max_delay = max_delay
          raise ArgumentError, "max_delay must be nonnegative" if @max_delay&.negative?
          @delay_multiplier = delay_multiplier
          @delay_adder = delay_adder
          @delay_includes_time_elapsed = delay_includes_time_elapsed
          reset!
        end

        ##
        # Create a duplicate in the reset state
        #
        # @return [Retries]
        #
        def reset_dup
          Retries.new max_tries: @max_tries,
                      max_time: @max_time,
                      initial_delay: @initial_delay,
                      max_delay: @max_delay,
                      delay_multiplier: @delay_multiplier,
                      delay_adder: @delay_adder,
                      delay_includes_time_elapsed: @delay_includes_time_elapsed
        end

        ##
        # Returns true if the retry limit has been reached.
        #
        # @return [true,false]
        #
        def finished?
          @current_delay.nil?
        end

        ##
        # Reset to the initial attempt.
        #
        # @return [self]
        #
        def reset!
          @current_delay = :reset
          self
        end

        ##
        # Cause the retry limit to be reached immediately.
        #
        # @return [self]
        #
        def finish!
          @current_delay = nil
          self
        end

        ##
        # Advance to the next attempt.
        #
        # Returns nil if the retry limit has been reached. Otherwise, returns
        # the delay in seconds until the next retry (0 for no delay). Raises an
        # error if the previous call already returned nil.
        #
        # @param start_time [Numeric,nil] Optional start time in monotonic time
        #     units. Used if delay_includes_time_elapsed is set.
        # @return [Numeric,nil]
        #
        def next start_time: nil
          raise "no tries remaining" if finished?
          cur_time = Process.clock_gettime Process::CLOCK_MONOTONIC
          if @current_delay == :reset
            setup_first_retry cur_time
          else
            advance_delay
          end
          advance_retry cur_time
          adjusted_delay start_time, cur_time
        end

        private

        def setup_first_retry cur_time
          @tries_remaining = @max_tries
          @deadline = @max_time ? cur_time + @max_time : nil
          @current_delay = @initial_delay
        end

        def advance_delay
          @current_delay = (@delay_multiplier * @current_delay) + @delay_adder
          @current_delay = @max_delay if @max_delay && @current_delay > @max_delay
        end

        def advance_retry cur_time
          @tries_remaining -= 1 if @tries_remaining
          @current_delay = nil if @tries_remaining&.zero? || (@deadline && cur_time + @current_delay > @deadline)
        end

        def adjusted_delay start_time, cur_time
          delay = @current_delay
          if @delay_includes_time_elapsed && start_time && delay
            delay -= cur_time - start_time
            delay = 0 if delay.negative?
          end
          delay
        end
      end
    end
  end
end