Lean  $LEAN_TAG$
ScheduledEnumerator.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using NodaTime;
19 using QuantConnect.Data;
20 using System.Collections;
21 using System.Collections.Generic;
22 
24 {
25  /// <summary>
26  /// This enumerator will filter out data of the underlying enumerator based on a provided schedule.
27  /// Will respect the schedule above the data, meaning will let older data through if the underlying provides none for the schedule date
28  /// </summary>
29  public class ScheduledEnumerator : IEnumerator<BaseData>
30  {
31  private readonly IEnumerator<BaseData> _underlyingEnumerator;
32  private readonly IEnumerator<DateTime> _scheduledTimes;
33  private readonly ITimeProvider _frontierTimeProvider;
34  private readonly DateTimeZone _scheduleTimeZone;
35  private BaseData _underlyingCandidateDataPoint;
36  private bool _scheduledTimesEnded;
37 
38  /// <summary>
39  /// The current data point
40  /// </summary>
41  public BaseData Current { get; private set; }
42 
43  object IEnumerator.Current => Current;
44 
45  /// <summary>
46  /// Creates a new instance
47  /// </summary>
48  /// <param name="underlyingEnumerator">The underlying enumerator to filter</param>
49  /// <param name="scheduledTimes">The scheduled times to emit new data points</param>
50  /// <param name="frontierTimeProvider"></param>
51  /// <param name="scheduleTimeZone"></param>
52  /// <param name="startTime">the underlying request start time</param>
53  public ScheduledEnumerator(IEnumerator<BaseData> underlyingEnumerator,
54  IEnumerable<DateTime> scheduledTimes,
55  ITimeProvider frontierTimeProvider,
56  DateTimeZone scheduleTimeZone,
57  DateTime startTime)
58  {
59  _scheduleTimeZone = scheduleTimeZone;
60  _frontierTimeProvider = frontierTimeProvider;
61  _underlyingEnumerator = underlyingEnumerator;
62  _scheduledTimes = scheduledTimes.GetEnumerator();
63  // move our schedule enumerator to current start time
64  MoveScheduleForward(startTime);
65  }
66 
67  /// <summary>
68  /// Advances the enumerator to the next element of the collection.
69  /// </summary>
70  /// <returns> True if the enumerator was successfully advanced to the next element;
71  /// false if the enumerator has passed the end of the collection.
72  /// </returns>
73  public bool MoveNext()
74  {
75  if (_scheduledTimesEnded)
76  {
77  Current = null;
78  return false;
79  }
80 
81  // lets get our candidate data point to emit
82  if (_underlyingCandidateDataPoint == null)
83  {
84  if (_underlyingEnumerator.Current != null && _underlyingEnumerator.Current.EndTime <= _scheduledTimes.Current)
85  {
86  _underlyingCandidateDataPoint = _underlyingEnumerator.Current;
87  }
88  else if (Current != null)
89  {
90  // we will keep the last data point, even if we already emitted it, there could be a case where the user has a schedule in a
91  // period where there's not new data (or it's far in the future) so let's just FF the previous point
92  _underlyingCandidateDataPoint = Current.Clone(fillForward: true);
93  }
94  }
95 
96  // lets try to get a better candidate
97  if (_underlyingEnumerator.Current == null
98  || _underlyingEnumerator.Current.EndTime < _scheduledTimes.Current)
99  {
100  bool pullAgain;
101  do
102  {
103  pullAgain = false;
104  if (!_underlyingEnumerator.MoveNext())
105  {
106  if (_underlyingCandidateDataPoint != null)
107  {
108  // if we still have a candidate wait till we emit him before stopping
109  break;
110  }
111  Current = null;
112  return false;
113  }
114 
115  if (_underlyingEnumerator.Current != null)
116  {
117  if (_underlyingEnumerator.Current.EndTime <= _scheduledTimes.Current)
118  {
119  // lets try again
120  pullAgain = true;
121  // we got another data point which is a newer candidate to emit so let use it instead
122  // and drop the previous
123  _underlyingCandidateDataPoint = _underlyingEnumerator.Current;
124  }
125  else if (_underlyingCandidateDataPoint == null)
126  {
127  // this is the first data point we got and it's After our schedule, let's move our schedule forward
128  _underlyingCandidateDataPoint = _underlyingEnumerator.Current;
129  MoveScheduleForward();
130  }
131  }
132  } while (pullAgain);
133  }
134 
135  if (_underlyingCandidateDataPoint != null
136  // if we are at or past the schedule time we try to emit, in backtest this emits right away, since time is data driven, in live though
137  // we don't emit right away because the underlying might provide us with a newer data point
138  && _scheduledTimes.Current.ConvertToUtc(_scheduleTimeZone) <= GetUtcNow())
139  {
140  Current = _underlyingCandidateDataPoint;
141  // we align the data endtime with the schedule, we respect the schedule above the data time. In backtesting,
142  // time is driven by the data, so let's make sure we emit at the scheduled time even if the data is older
143  Current.EndTime = _scheduledTimes.Current;
144  if (Current.Time > Current.EndTime)
145  {
146  Current.Time = _scheduledTimes.Current;
147  }
148 
149  MoveScheduleForward();
150  _underlyingCandidateDataPoint = null;
151  return true;
152  }
153 
154  Current = null;
155  return true;
156  }
157 
158  /// <summary>
159  /// Resets the underlying enumerator
160  /// </summary>
161  public void Reset()
162  {
163  _underlyingEnumerator.Reset();
164  }
165 
166  /// <summary>
167  /// Disposes of the underlying enumerator
168  /// </summary>
169  public void Dispose()
170  {
171  _scheduledTimes.Dispose();
172  _underlyingEnumerator.Dispose();
173  }
174 
175  /// <summary>
176  /// Available in live trading only, in backtesting frontier is driven and sycned already by the data itself
177  /// so we can't hold data here based on it
178  /// </summary>
179  private DateTime GetUtcNow()
180  {
181  if (_frontierTimeProvider != null)
182  {
183  return _frontierTimeProvider.GetUtcNow();
184  }
185  return DateTime.MaxValue;
186  }
187 
188  private void MoveScheduleForward(DateTime? frontier = null)
189  {
190  do
191  {
192  _scheduledTimesEnded = !_scheduledTimes.MoveNext();
193  }
194  while (!_scheduledTimesEnded && frontier.HasValue && _scheduledTimes.Current < frontier.Value);
195  }
196  }
197 }