Lean  $LEAN_TAG$
SynchronizingEnumerator.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Collections;
19 using System.Collections.Generic;
20 using System.Linq;
21 using QuantConnect.Data;
22 
24 {
25  /// <summary>
26  /// Represents an enumerator capable of synchronizing other enumerators of type T in time.
27  /// This assumes that all enumerators have data time stamped in the same time zone
28  /// </summary>
29  public abstract class SynchronizingEnumerator<T> : IEnumerator<T>
30  {
31  private IEnumerator<T> _syncer;
32  private readonly IEnumerator<T>[] _enumerators;
33 
34  /// <summary>
35  /// Gets the Timestamp for the data
36  /// </summary>
37  protected abstract DateTime GetInstanceTime(T instance);
38 
39  /// <summary>
40  /// Gets the element in the collection at the current position of the enumerator.
41  /// </summary>
42  /// <returns>
43  /// The element in the collection at the current position of the enumerator.
44  /// </returns>
45  public T Current
46  {
47  get; private set;
48  }
49 
50  /// <summary>
51  /// Gets the current element in the collection.
52  /// </summary>
53  /// <returns>
54  /// The current element in the collection.
55  /// </returns>
56  object IEnumerator.Current
57  {
58  get { return Current; }
59  }
60 
61  /// <summary>
62  /// Initializes a new instance of the <see cref="SynchronizingEnumerator{T}"/> class
63  /// </summary>
64  /// <param name="enumerators">The enumerators to be synchronized. NOTE: Assumes the same time zone for all data</param>
65  /// <remark>The type of data we want, for example, <see cref="BaseData"/> or <see cref="Slice"/>, ect...</remark>
66  protected SynchronizingEnumerator(params IEnumerator<T>[] enumerators)
67  : this ((IEnumerable<IEnumerator<T>>)enumerators)
68  {
69  }
70 
71  /// <summary>
72  /// Initializes a new instance of the <see cref="SynchronizingEnumerator{T}"/> class
73  /// </summary>
74  /// <param name="enumerators">The enumerators to be synchronized. NOTE: Assumes the same time zone for all data</param>
75  /// <remark>The type of data we want, for example, <see cref="BaseData"/> or <see cref="Slice"/>, ect...</remark>
76  protected SynchronizingEnumerator(IEnumerable<IEnumerator<T>> enumerators)
77  {
78  _enumerators = enumerators.ToArray();
79  _syncer = GetSynchronizedEnumerator(_enumerators);
80  }
81 
82  /// <summary>
83  /// Advances the enumerator to the next element of the collection.
84  /// </summary>
85  /// <returns>
86  /// true if the enumerator was successfully advanced to the next element; false if the enumerator has passed the end of the collection.
87  /// </returns>
88  /// <exception cref="T:System.InvalidOperationException">The collection was modified after the enumerator was created. </exception>
89  public bool MoveNext()
90  {
91  var moveNext = _syncer.MoveNext();
92  Current = moveNext ? _syncer.Current : default(T);
93  return moveNext;
94  }
95 
96  /// <summary>
97  /// Sets the enumerator to its initial position, which is before the first element in the collection.
98  /// </summary>
99  /// <exception cref="T:System.InvalidOperationException">The collection was modified after the enumerator was created. </exception>
100  public void Reset()
101  {
102  foreach (var enumerator in _enumerators)
103  {
104  enumerator.Reset();
105  }
106  // don't call syncer.reset since the impl will just throw
107  _syncer = GetSynchronizedEnumerator(_enumerators);
108  }
109 
110  /// <summary>
111  /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
112  /// </summary>
113  public void Dispose()
114  {
115  foreach (var enumerator in _enumerators)
116  {
117  enumerator.Dispose();
118  }
119  _syncer.Dispose();
120  }
121 
122  /// <summary>
123  /// Synchronization system for the enumerator:
124  /// </summary>
125  /// <param name="enumerators"></param>
126  /// <returns></returns>
127  private IEnumerator<T> GetSynchronizedEnumerator(IEnumerator<T>[] enumerators)
128  {
129  return GetBruteForceMethod(enumerators);
130  }
131 
132  /// <summary>
133  /// Brute force implementation for synchronizing the enumerator.
134  /// Will remove enumerators returning false to the call to MoveNext.
135  /// Will not remove enumerators with Current Null returning true to the call to MoveNext
136  /// </summary>
137  private IEnumerator<T> GetBruteForceMethod(IEnumerator<T>[] enumerators)
138  {
139  var ticks = DateTime.MaxValue.Ticks;
140  var collection = new HashSet<IEnumerator<T>>();
141  foreach (var enumerator in enumerators)
142  {
143  if (enumerator.MoveNext())
144  {
145  if (enumerator.Current != null)
146  {
147  ticks = Math.Min(ticks, GetInstanceTime(enumerator.Current).Ticks);
148  }
149  collection.Add(enumerator);
150  }
151  else
152  {
153  enumerator.Dispose();
154  }
155  }
156 
157  var frontier = new DateTime(ticks);
158  var toRemove = new List<IEnumerator<T>>();
159  while (collection.Count > 0)
160  {
161  var nextFrontierTicks = DateTime.MaxValue.Ticks;
162  foreach (var enumerator in collection)
163  {
164  while (enumerator.Current == null || GetInstanceTime(enumerator.Current) <= frontier)
165  {
166  if (enumerator.Current != null)
167  {
168  yield return enumerator.Current;
169  }
170  if (!enumerator.MoveNext())
171  {
172  toRemove.Add(enumerator);
173  break;
174  }
175  if (enumerator.Current == null)
176  {
177  break;
178  }
179  }
180 
181  if (enumerator.Current != null)
182  {
183  nextFrontierTicks = Math.Min(nextFrontierTicks, GetInstanceTime(enumerator.Current).Ticks);
184  }
185  }
186 
187  if (toRemove.Count > 0)
188  {
189  foreach (var enumerator in toRemove)
190  {
191  collection.Remove(enumerator);
192  }
193  toRemove.Clear();
194  }
195 
196  frontier = new DateTime(nextFrontierTicks);
197  if (frontier == DateTime.MaxValue)
198  {
199  break;
200  }
201  }
202  }
203  }
204 }