Lean  $LEAN_TAG$
EnqueueableEnumerator.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Threading;
19 using System.Collections;
20 using System.Collections.Generic;
21 
23 {
24  /// <summary>
25  /// An implementation of <see cref="IEnumerator{T}"/> that relies on the
26  /// <see cref="Enqueue"/> method being called and only ends when <see cref="Stop"/>
27  /// is called
28  /// </summary>
29  /// <typeparam name="T">The item type yielded by the enumerator</typeparam>
30  public class EnqueueableEnumerator<T> : IEnumerator<T>
31  {
32  private T _current;
33  private bool _end;
34 
35  private readonly bool _isBlocking;
36  private long _consumerCount;
37  private Queue<T> _consumer = new();
38  private Queue<T> _producer = new();
39  private readonly object _lock = new object();
40  private readonly ManualResetEventSlim _resetEvent = new(false);
41 
42  /// <summary>
43  /// Gets the current number of items held in the internal queue
44  /// </summary>
45  public int Count
46  {
47  get
48  {
49  lock (_lock)
50  {
51  if (_end) return 0;
52  return _producer.Count + (int)Interlocked.Read(ref _consumerCount);
53  }
54  }
55  }
56 
57  /// <summary>
58  /// Returns true if the enumerator has finished and will not accept any more data
59  /// </summary>
60  public bool HasFinished
61  {
62  get { return _end; }
63  }
64 
65  /// <summary>
66  /// Initializes a new instance of the <see cref="EnqueueableEnumerator{T}"/> class
67  /// </summary>
68  /// <param name="blocking">Specifies whether or not to use the blocking behavior</param>
69  public EnqueueableEnumerator(bool blocking = false)
70  {
71  _isBlocking = blocking;
72  }
73 
74  /// <summary>
75  /// Enqueues the new data into this enumerator
76  /// </summary>
77  /// <param name="data">The data to be enqueued</param>
78  public void Enqueue(T data)
79  {
80  lock (_lock)
81  {
82  _producer.Enqueue(data);
83  // most of the time this will be set
84  if(!_resetEvent.IsSet)
85  {
86  _resetEvent.Set();
87  }
88  }
89  }
90 
91  /// <summary>
92  /// Signals the enumerator to stop enumerating when the items currently
93  /// held inside are gone. No more items will be added to this enumerator.
94  /// </summary>
95  public void Stop()
96  {
97  lock (_lock)
98  {
99  if (_end) return;
100  _end = true;
101 
102  // no more items can be added, so no need to wait anymore
103  _resetEvent.Set();
104  _resetEvent.Dispose();
105  }
106  }
107 
108  /// <summary>
109  /// Advances the enumerator to the next element of the collection.
110  /// </summary>
111  /// <returns>
112  /// true if the enumerator was successfully advanced to the next element; false if the enumerator has passed the end of the collection.
113  /// </returns>
114  /// <exception cref="T:System.InvalidOperationException">The collection was modified after the enumerator was created. </exception><filterpriority>2</filterpriority>
115  public bool MoveNext()
116  {
117  // we read with no lock most of the time
118  if (_consumer.TryDequeue(out _current))
119  {
120  Interlocked.Decrement(ref _consumerCount);
121  return true;
122  }
123 
124  bool ended;
125  do
126  {
127  var producer = _producer;
128  lock (_lock)
129  {
130  // swap queues
131  ended = _end;
132  _producer = _consumer;
133  }
134  _consumer = producer;
135  if(_consumer.Count > 0)
136  {
137  _current = _consumer.Dequeue();
138  Interlocked.Exchange(ref _consumerCount, _consumer.Count);
139  break;
140  }
141 
142  // if we are here no queue has data
143  if (ended)
144  {
145  return false;
146  }
147 
148  if (_isBlocking)
149  {
150  try
151  {
152  _resetEvent.Wait(Timeout.Infinite);
153  _resetEvent.Reset();
154  }
155  catch (ObjectDisposedException)
156  {
157  // can happen if disposed
158  }
159  }
160  else
161  {
162  break;
163  }
164  }
165  while (!ended);
166 
167  // even if we don't have data to return, we haven't technically
168  // passed the end of the collection, so always return true until
169  // the enumerator is explicitly disposed or ended
170  return true;
171  }
172 
173  /// <summary>
174  /// Sets the enumerator to its initial position, which is before the first element in the collection.
175  /// </summary>
176  /// <exception cref="T:System.InvalidOperationException">The collection was modified after the enumerator was created. </exception><filterpriority>2</filterpriority>
177  public void Reset()
178  {
179  throw new NotImplementedException("EnqueableEnumerator.Reset() has not been implemented yet.");
180  }
181 
182  /// <summary>
183  /// Gets the element in the collection at the current position of the enumerator.
184  /// </summary>
185  /// <returns>
186  /// The element in the collection at the current position of the enumerator.
187  /// </returns>
188  public T Current
189  {
190  get { return _current; }
191  }
192 
193  /// <summary>
194  /// Gets the current element in the collection.
195  /// </summary>
196  /// <returns>
197  /// The current element in the collection.
198  /// </returns>
199  /// <filterpriority>2</filterpriority>
200  object IEnumerator.Current
201  {
202  get { return Current; }
203  }
204 
205  /// <summary>
206  /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
207  /// </summary>
208  /// <filterpriority>2</filterpriority>
209  public void Dispose()
210  {
211  Stop();
212  }
213  }
214 }