Lean  $LEAN_TAG$
LiveAuxiliaryDataSynchronizingEnumerator.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Collections;
18 using System.Collections.Generic;
19 using NodaTime;
20 using QuantConnect.Data;
21 using QuantConnect.Util;
22 
24 {
25  /// <summary>
26  /// Represents an enumerator capable of synchronizing live equity data enumerators in time.
27  /// This assumes that all enumerators have data time stamped in the same time zone.
28  /// </summary>
29  public class LiveAuxiliaryDataSynchronizingEnumerator : IEnumerator<BaseData>
30  {
31  private readonly ITimeProvider _timeProvider;
32  private readonly DateTimeZone _exchangeTimeZone;
33  private readonly List<IEnumerator<BaseData>> _auxDataEnumerators;
34  private readonly IEnumerator<BaseData> _tradeBarAggregator;
35 
36  /// <summary>
37  /// Initializes a new instance of the <see cref="LiveAuxiliaryDataSynchronizingEnumerator"/> class
38  /// </summary>
39  /// <param name="timeProvider">The source of time used to gauge when this enumerator should emit extra bars when null data is returned from the source enumerator</param>
40  /// <param name="exchangeTimeZone">The time zone the raw data is time stamped in</param>
41  /// <param name="tradeBarAggregator">The trade bar aggregator enumerator</param>
42  /// <param name="auxDataEnumerators">The auxiliary data enumerators</param>
43  public LiveAuxiliaryDataSynchronizingEnumerator(ITimeProvider timeProvider, DateTimeZone exchangeTimeZone, IEnumerator<BaseData> tradeBarAggregator, List<IEnumerator<BaseData>> auxDataEnumerators)
44  {
45  _timeProvider = timeProvider;
46  _exchangeTimeZone = exchangeTimeZone;
47  _auxDataEnumerators = auxDataEnumerators;
48  _tradeBarAggregator = tradeBarAggregator;
49  }
50 
51  /// <summary>
52  /// Advances the enumerator to the next element of the collection.
53  /// </summary>
54  /// <returns> true if the enumerator was successfully advanced to the next element; false if the enumerator has passed the end of the collection.</returns>
55  /// <exception cref="T:System.InvalidOperationException">The collection was modified after the enumerator was created.</exception>
56  public bool MoveNext()
57  {
58  // use manual time provider from LiveTradingDataFeed
59  var frontierUtc = _timeProvider.GetUtcNow();
60 
61  // check if any enumerator is ready to emit
62  if (DataPointEmitted(frontierUtc))
63  return true;
64 
65  // advance enumerators with no current data
66  for (var i = 0; i < _auxDataEnumerators.Count; i++)
67  {
68  if (_auxDataEnumerators[i].Current == null)
69  {
70  _auxDataEnumerators[i].MoveNext();
71  }
72  }
73  if (_tradeBarAggregator.Current == null) _tradeBarAggregator.MoveNext();
74 
75  // check if any enumerator is ready to emit
76  if (DataPointEmitted(frontierUtc))
77  return true;
78 
79  Current = null;
80 
81  // IEnumerator contract dictates that we return true unless we're actually
82  // finished with the 'collection' and since this is live, we're never finished
83  return true;
84  }
85 
86  /// <summary>
87  /// Sets the enumerator to its initial position, which is before the first element in the collection.
88  /// </summary>
89  /// <exception cref="T:System.InvalidOperationException">The collection was modified after the enumerator was created.</exception>
90  public void Reset()
91  {
92  foreach (var auxDataEnumerator in _auxDataEnumerators)
93  {
94  auxDataEnumerator.Reset();
95  }
96  _tradeBarAggregator.Reset();
97  }
98 
99  /// <summary>
100  /// Gets the element in the collection at the current position of the enumerator.
101  /// </summary>
102  /// <returns>The element in the collection at the current position of the enumerator.</returns>
103  public BaseData Current { get; private set; }
104 
105  /// <summary>
106  /// Gets the current element in the collection.
107  /// </summary>
108  /// <returns>The current element in the collection.</returns>
109  object IEnumerator.Current => Current;
110 
111  /// <summary>
112  /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
113  /// </summary>
114  public void Dispose()
115  {
116  foreach (var auxDataEnumerator in _auxDataEnumerators)
117  {
118  auxDataEnumerator.DisposeSafely();
119  }
120  _tradeBarAggregator.DisposeSafely();
121  }
122 
123  private bool DataPointEmitted(DateTime frontierUtc)
124  {
125  // we get the aux enumerator that has the smallest endTime if any
126  IEnumerator<BaseData> auxDataEnumerator = null;
127  for (var i = 0; i < _auxDataEnumerators.Count; i++)
128  {
129  var currentEnum = _auxDataEnumerators[i];
130  if (currentEnum.Current != null)
131  {
132  if (auxDataEnumerator == null)
133  {
134  auxDataEnumerator = currentEnum;
135  }
136  else
137  {
138  auxDataEnumerator = auxDataEnumerator.Current.EndTime > currentEnum.Current.EndTime ? currentEnum : auxDataEnumerator;
139  }
140  }
141  }
142 
143  // check if any enumerator is ready to emit
144  if (auxDataEnumerator?.Current != null && _tradeBarAggregator.Current != null)
145  {
146  var auxDataEndTime = auxDataEnumerator.Current.EndTime.ConvertToUtc(_exchangeTimeZone);
147  var tradeBarEndTime = _tradeBarAggregator.Current.EndTime.ConvertToUtc(_exchangeTimeZone);
148  if (auxDataEndTime < tradeBarEndTime)
149  {
150  if (auxDataEndTime <= frontierUtc)
151  {
152  Current = auxDataEnumerator.Current;
153  auxDataEnumerator.MoveNext();
154  return true;
155  }
156  }
157  else
158  {
159  if (tradeBarEndTime <= frontierUtc)
160  {
161  Current = _tradeBarAggregator.Current;
162  _tradeBarAggregator.MoveNext();
163  return true;
164  }
165  }
166  }
167  else if (auxDataEnumerator?.Current != null)
168  {
169  var auxDataEndTime = auxDataEnumerator.Current.EndTime.ConvertToUtc(_exchangeTimeZone);
170  if (auxDataEndTime <= frontierUtc)
171  {
172  Current = auxDataEnumerator.Current;
173  auxDataEnumerator.MoveNext();
174  return true;
175  }
176  }
177  else if (_tradeBarAggregator.Current != null)
178  {
179  var tradeBarEndTime = _tradeBarAggregator.Current.EndTime.ConvertToUtc(_exchangeTimeZone);
180  if (tradeBarEndTime <= frontierUtc)
181  {
182  Current = _tradeBarAggregator.Current;
183  _tradeBarAggregator.MoveNext();
184  return true;
185  }
186  }
187 
188  return false;
189  }
190  }
191 }