Lean  $LEAN_TAG$
BaseDataCollectionAggregatorEnumerator.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using QuantConnect.Data;
18 using System.Collections;
19 using System.Collections.Generic;
21 
23 {
24  /// <summary>
25  /// Provides an implementation of <see cref="IEnumerator{BaseDataCollection}"/>
26  /// that aggregates an underlying <see cref="IEnumerator{BaseData}"/> into a single
27  /// data packet
28  /// </summary>
29  public class BaseDataCollectionAggregatorEnumerator : IEnumerator<BaseDataCollection>
30  {
31  private bool _endOfStream;
32  private bool _needsMoveNext;
33  private bool _liveMode;
34  private readonly Symbol _symbol;
35  private readonly IEnumerator<BaseData> _enumerator;
36 
37  /// <summary>
38  /// Initializes a new instance of the <see cref="BaseDataCollectionAggregatorEnumerator"/> class
39  /// This will aggregate instances emitted from the underlying enumerator and tag them with the
40  /// specified symbol
41  /// </summary>
42  /// <param name="enumerator">The underlying enumerator to aggregate</param>
43  /// <param name="symbol">The symbol to place on the aggregated collection</param>
44  /// <param name="liveMode">True if running in live mode</param>
45  public BaseDataCollectionAggregatorEnumerator(IEnumerator<BaseData> enumerator, Symbol symbol, bool liveMode = false)
46  {
47  _symbol = symbol;
48  _enumerator = enumerator;
49  _liveMode = liveMode;
50  _needsMoveNext = true;
51  }
52 
53  /// <summary>
54  /// Advances the enumerator to the next element of the collection.
55  /// </summary>
56  /// <returns>
57  /// true if the enumerator was successfully advanced to the next element; false if the enumerator has passed the end of the collection.
58  /// </returns>
59  /// <exception cref="T:System.InvalidOperationException">The collection was modified after the enumerator was created. </exception><filterpriority>2</filterpriority>
60  public bool MoveNext()
61  {
62  if (_endOfStream)
63  {
64  return false;
65  }
66 
67  BaseDataCollection collection = null;
68  while (true)
69  {
70  if (_needsMoveNext)
71  {
72  // move next if we dequeued the last item last time we were invoked
73  if (!_enumerator.MoveNext())
74  {
75  _endOfStream = true;
76  if (!IsValid(collection))
77  {
78  // we don't emit
79  collection = null;
80  }
81  break;
82  }
83  }
84 
85  if (_enumerator.Current == null)
86  {
87  // the underlying returned null, stop here and start again on the next call
88  _needsMoveNext = true;
89  break;
90  }
91 
92  if (collection == null)
93  {
94  // we have new data, set the collection's symbol/times
95  var current = _enumerator.Current;
96  collection = CreateCollection(_symbol, current.Time, current.EndTime);
97  }
98 
99  if (collection.EndTime != _enumerator.Current.EndTime)
100  {
101  // the data from the underlying is at a different time, stop here
102  _needsMoveNext = false;
103  if (IsValid(collection))
104  {
105  // we emit
106  break;
107  }
108  // we try again
109  collection = null;
110  continue;
111  }
112 
113  // this data belongs in this collection, keep going until null or bad time
114  Add(collection, _enumerator.Current);
115  _needsMoveNext = true;
116  }
117 
118  Current = collection;
119  return _liveMode || collection != null;
120  }
121 
122  /// <summary>
123  /// Sets the enumerator to its initial position, which is before the first element in the collection.
124  /// </summary>
125  /// <exception cref="T:System.InvalidOperationException">The collection was modified after the enumerator was created. </exception><filterpriority>2</filterpriority>
126  public void Reset()
127  {
128  _enumerator.Reset();
129  }
130 
131  /// <summary>
132  /// Gets the element in the collection at the current position of the enumerator.
133  /// </summary>
134  /// <returns>
135  /// The element in the collection at the current position of the enumerator.
136  /// </returns>
138  {
139  get; private set;
140  }
141 
142  /// <summary>
143  /// Gets the current element in the collection.
144  /// </summary>
145  /// <returns>
146  /// The current element in the collection.
147  /// </returns>
148  /// <filterpriority>2</filterpriority>
149  object IEnumerator.Current
150  {
151  get { return Current; }
152  }
153 
154  /// <summary>
155  /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
156  /// </summary>
157  /// <filterpriority>2</filterpriority>
158  public void Dispose()
159  {
160  _enumerator.Dispose();
161  }
162 
163  /// <summary>
164  /// Creates a new, empty <see cref="BaseDataCollection"/>.
165  /// </summary>
166  /// <param name="symbol">The base data collection symbol</param>
167  /// <param name="time">The start time of the collection</param>
168  /// <param name="endTime">The end time of the collection</param>
169  /// <returns>A new, empty <see cref="BaseDataCollection"/></returns>
170  private BaseDataCollection CreateCollection(Symbol symbol, DateTime time, DateTime endTime)
171  {
172  return new BaseDataCollection
173  {
174  Symbol = symbol,
175  Time = time,
176  EndTime = endTime
177  };
178  }
179 
180  /// <summary>
181  /// Adds the specified instance of <see cref="BaseData"/> to the current collection
182  /// </summary>
183  /// <param name="collection">The collection to be added to</param>
184  /// <param name="current">The data to be added</param>
185  private void Add(BaseDataCollection collection, BaseData current)
186  {
187  var baseDataCollection = current as BaseDataCollection;
188  if (_symbol.HasUnderlying && _symbol.Underlying == current.Symbol)
189  {
190  // if the underlying has been aggregated, even if it shouldn't need to be, let's handle it nicely
191  if (baseDataCollection != null)
192  {
193  collection.Underlying = baseDataCollection.Data[0];
194  }
195  else
196  {
197  collection.Underlying = current;
198  }
199  }
200  else
201  {
202  if (baseDataCollection != null)
203  {
204  // datapoint is already aggregated, let's see if it's a single point or a collection we can use already
205  if(baseDataCollection.Data.Count > 1)
206  {
207  collection.Data = baseDataCollection.Data;
208  }
209  else
210  {
211  collection.Data.Add(baseDataCollection.Data[0]);
212  }
213 
214  // Let's keep the underlying in case it's already there
215  collection.Underlying ??= baseDataCollection.Underlying;
216  }
217  else
218  {
219  collection.Data.Add(current);
220  }
221  }
222  }
223 
224  /// <summary>
225  /// Determines if a given data point is valid and can be emitted
226  /// </summary>
227  /// <param name="collection">The collection to be emitted</param>
228  /// <returns>True if its a valid data point</returns>
229  private static bool IsValid(BaseDataCollection collection)
230  {
231  return collection != null && collection.Data?.Count > 0;
232  }
233  }
234 }