1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
|
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
#if !NO_PERF
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
namespace System.Reactive.Linq.ObservableImpl
{
class TakeLastBuffer<TSource> : Producer<IList<TSource>>
{
private readonly IObservable<TSource> _source;
private readonly int _count;
private readonly TimeSpan _duration;
private readonly IScheduler _scheduler;
public TakeLastBuffer(IObservable<TSource> source, int count)
{
_source = source;
_count = count;
}
public TakeLastBuffer(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
{
_source = source;
_duration = duration;
_scheduler = scheduler;
}
protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
{
if (_scheduler == null)
{
var sink = new _(this, observer, cancel);
setSink(sink);
return _source.SubscribeSafe(sink);
}
else
{
var sink = new Impl(this, observer, cancel);
setSink(sink);
return sink.Run();
}
}
class _ : Sink<IList<TSource>>, IObserver<TSource>
{
private readonly TakeLastBuffer<TSource> _parent;
private Queue<TSource> _queue;
public _(TakeLastBuffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
_queue = new Queue<TSource>();
}
public void OnNext(TSource value)
{
_queue.Enqueue(value);
if (_queue.Count > _parent._count)
_queue.Dequeue();
}
public void OnError(Exception error)
{
base._observer.OnError(error);
base.Dispose();
}
public void OnCompleted()
{
var res = new List<TSource>(_queue.Count);
while (_queue.Count > 0)
res.Add(_queue.Dequeue());
base._observer.OnNext(res);
base._observer.OnCompleted();
base.Dispose();
}
}
class Impl : Sink<IList<TSource>>, IObserver<TSource>
{
private readonly TakeLastBuffer<TSource> _parent;
private Queue<System.Reactive.TimeInterval<TSource>> _queue;
public Impl(TakeLastBuffer<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)
: base(observer, cancel)
{
_parent = parent;
_queue = new Queue<System.Reactive.TimeInterval<TSource>>();
}
private IStopwatch _watch;
public IDisposable Run()
{
_watch = _parent._scheduler.StartStopwatch();
return _parent._source.SubscribeSafe(this);
}
public void OnNext(TSource value)
{
var now = _watch.Elapsed;
_queue.Enqueue(new System.Reactive.TimeInterval<TSource>(value, now));
Trim(now);
}
public void OnError(Exception error)
{
base._observer.OnError(error);
base.Dispose();
}
public void OnCompleted()
{
var now = _watch.Elapsed;
Trim(now);
var res = new List<TSource>(_queue.Count);
while (_queue.Count > 0)
res.Add(_queue.Dequeue().Value);
base._observer.OnNext(res);
base._observer.OnCompleted();
base.Dispose();
}
private void Trim(TimeSpan now)
{
while (_queue.Count > 0 && now - _queue.Peek().Interval >= _parent._duration)
_queue.Dequeue();
}
}
}
}
#endif
|