Lean  $LEAN_TAG$
ScannableEnumerator.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using NodaTime;
18 using QuantConnect.Data;
20 using System;
21 using System.Collections;
22 using System.Collections.Concurrent;
23 using System.Collections.Generic;
24 
26 {
27  /// <summary>
28  /// An implementation of <see cref="IEnumerator{T}"/> that relies on "consolidated" data
29  /// </summary>
30  /// <typeparam name="T">The item type yielded by the enumerator</typeparam>
31  public class ScannableEnumerator<T> : IEnumerator<T> where T : class, IBaseData
32  {
33  private T _current;
34  private bool _consolidated;
35  private bool _isPeriodBase;
36  private bool _validateInputType;
37  private Type _consolidatorInputType;
38  private readonly DateTimeZone _timeZone;
39  private readonly ConcurrentQueue<T> _queue;
40  private readonly ITimeProvider _timeProvider;
41  private readonly EventHandler _newDataAvailableHandler;
42  private readonly IDataConsolidator _consolidator;
43 
44  /// <summary>
45  /// Gets the element in the collection at the current position of the enumerator.
46  /// </summary>
47  /// <returns>
48  /// The element in the collection at the current position of the enumerator.
49  /// </returns>
50  public T Current => _current;
51 
52  /// <summary>
53  /// Gets the current element in the collection.
54  /// </summary>
55  /// <returns>
56  /// The current element in the collection.
57  /// </returns>
58  /// <filterpriority>2</filterpriority>
59  object IEnumerator.Current => Current;
60 
61  /// <summary>
62  /// Initializes a new instance of the <see cref="ScannableEnumerator{T}"/> class
63  /// </summary>
64  /// <param name="consolidator">Consolidator taking BaseData updates and firing events containing new 'consolidated' data</param>
65  /// <param name="timeZone">The time zone the raw data is time stamped in</param>
66  /// <param name="timeProvider">The time provider instance used to determine when bars are completed and can be emitted</param>
67  /// <param name="newDataAvailableHandler">The event handler for a new available data point</param>
68  /// <param name="isPeriodBased">The consolidator is period based, this will enable scanning on <see cref="MoveNext"/></param>
69  public ScannableEnumerator(IDataConsolidator consolidator, DateTimeZone timeZone, ITimeProvider timeProvider, EventHandler newDataAvailableHandler, bool isPeriodBased = true)
70  {
71  _timeZone = timeZone;
72  _timeProvider = timeProvider;
73  _consolidator = consolidator;
74  _isPeriodBase = isPeriodBased;
75  _queue = new ConcurrentQueue<T>();
76  _consolidatorInputType = consolidator.InputType;
77  _validateInputType = _consolidatorInputType != typeof(BaseData);
78  _newDataAvailableHandler = newDataAvailableHandler ?? ((s, e) => { });
79 
80  _consolidator.DataConsolidated += DataConsolidatedHandler;
81  }
82 
83  /// <summary>
84  /// Updates the consolidator
85  /// </summary>
86  /// <param name="data">The data to consolidate</param>
87  public void Update(T data)
88  {
89  // if the input type of the consolidator isn't generic we validate it's correct before sending it in
90  if (_validateInputType && data.GetType() != _consolidatorInputType)
91  {
92  return;
93  }
94 
95  if (_isPeriodBase)
96  {
97  // we only need to lock if it's period base since the move next call could trigger a scan
98  lock (_consolidator)
99  {
100  _consolidator.Update(data);
101  }
102  }
103  else
104  {
105  _consolidator.Update(data);
106  }
107  }
108 
109  /// <summary>
110  /// Enqueues the new data into this enumerator
111  /// </summary>
112  /// <param name="data">The data to be enqueued</param>
113  private void Enqueue(T data)
114  {
115  _queue.Enqueue(data);
116  }
117 
118  /// <summary>
119  /// Advances the enumerator to the next element of the collection.
120  /// </summary>
121  /// <returns>
122  /// true if the enumerator was successfully advanced to the next element; false if the enumerator has passed the end of the collection.
123  /// </returns>
124  /// <exception cref="T:System.InvalidOperationException">The collection was modified after the enumerator was created. </exception><filterpriority>2</filterpriority>
125  public bool MoveNext()
126  {
127  if (!_queue.TryDequeue(out _current) && _isPeriodBase)
128  {
129  _consolidated = false;
130  lock (_consolidator)
131  {
132  // if there is a working bar we will try to pull it out if the time is right, each consolidator knows when it's right
133  var localTime = _timeProvider.GetUtcNow().ConvertFromUtc(_timeZone);
134  _consolidator.Scan(localTime);
135  }
136 
137  if (_consolidated)
138  {
139  _queue.TryDequeue(out _current);
140  }
141  }
142 
143  // even if we don't have data to return, we haven't technically
144  // passed the end of the collection, so always return true until
145  // the enumerator is explicitly disposed or ended
146  return true;
147  }
148 
149  /// <summary>
150  /// Sets the enumerator to its initial position, which is before the first element in the collection.
151  /// </summary>
152  /// <exception cref="T:System.InvalidOperationException">The collection was modified after the enumerator was created. </exception><filterpriority>2</filterpriority>
153  public void Reset()
154  {
155  }
156 
157  /// <summary>
158  /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
159  /// </summary>
160  /// <filterpriority>2</filterpriority>
161  public void Dispose()
162  {
163  _consolidator.DataConsolidated -= DataConsolidatedHandler;
164  }
165 
166  private void DataConsolidatedHandler(object sender, IBaseData data)
167  {
168  var dataPoint = data as T;
169  _consolidated = true;
170  Enqueue(dataPoint);
171  _newDataAvailableHandler(sender, new NewDataAvailableEventArgs { DataPoint = dataPoint });
172  }
173  }
174 }