18 using System.Collections.Concurrent;
19 using System.Collections.Generic;
20 using System.Threading;
32 private readonly BlockingCollection<T> _collection;
33 private readonly ManualResetEventSlim _processingCompletedEvent;
34 private readonly
object _lock =
new object();
42 get {
return _processingCompletedEvent.WaitHandle; }
50 get {
return _collection.Count; }
62 return _collection.Count > 0 || !_processingCompletedEvent.IsSet;
83 _collection =
new BlockingCollection<T>(boundedCapacity);
86 _processingCompletedEvent =
new ManualResetEventSlim(
true);
93 public void Add(T item)
95 Add(item, CancellationToken.None);
103 public void Add(T item, CancellationToken cancellationToken)
109 _processingCompletedEvent.Reset();
110 added = _collection.TryAdd(item, 0, cancellationToken);
115 _collection.Add(item, cancellationToken);
124 _collection.CompleteAdding();
143 while (!_collection.IsCompleted)
152 tookItem = _collection.TryTake(out item, 0, cancellationToken);
154 catch (OperationCanceledException)
175 if (_collection.Count == 0)
178 _processingCompletedEvent.Set();
185 tookItem = _collection.TryTake(out item, Timeout.Infinite, cancellationToken);
187 catch (OperationCanceledException)
201 _processingCompletedEvent.Set();
210 _collection.Dispose();
211 _processingCompletedEvent.Dispose();