Lean  $LEAN_TAG$
Synchronizer.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Collections.Generic;
19 using System.Threading;
20 using NodaTime;
22 using QuantConnect.Logging;
23 using QuantConnect.Util;
24 
26 {
27  /// <summary>
28  /// Implementation of the <see cref="ISynchronizer"/> interface which provides the mechanism to stream data to the algorithm
29  /// </summary>
30  public class Synchronizer : ISynchronizer, IDataFeedTimeProvider, IDisposable
31  {
32  private DateTimeZone _dateTimeZone;
33 
34  /// <summary>
35  /// The algorithm instance
36  /// </summary>
37  protected IAlgorithm Algorithm { get; set; }
38 
39  /// <summary>
40  /// The subscription manager
41  /// </summary>
43 
44  /// <summary>
45  /// The subscription synchronizer
46  /// </summary>
48 
49  /// <summary>
50  /// The time slice factory
51  /// </summary>
52  protected TimeSliceFactory TimeSliceFactory { get; set; }
53 
54  /// <summary>
55  /// Continuous UTC time provider, only valid for live trading see <see cref="LiveSynchronizer"/>
56  /// </summary>
57  public virtual ITimeProvider TimeProvider => null;
58 
59  /// <summary>
60  /// Time provider which returns current UTC frontier time
61  /// </summary>
63 
64  /// <summary>
65  /// Initializes the instance of the Synchronizer class
66  /// </summary>
67  public virtual void Initialize(
68  IAlgorithm algorithm,
69  IDataFeedSubscriptionManager dataFeedSubscriptionManager)
70  {
71  SubscriptionManager = dataFeedSubscriptionManager;
72  Algorithm = algorithm;
75  }
76 
77  /// <summary>
78  /// Returns an enumerable which provides the data to stream to the algorithm
79  /// </summary>
80  public virtual IEnumerable<TimeSlice> StreamData(CancellationToken cancellationToken)
81  {
83 
84  // GetTimeProvider() will call GetInitialFrontierTime() which
85  // will consume added subscriptions so we need to do this after initialization
87 
88  var previousEmitTime = DateTime.MaxValue;
89 
90  var enumerator = SubscriptionSynchronizer
92  .GetEnumerator();
93  var previousWasTimePulse = false;
94  // this is a just in case flag to stop looping if time does not advance
95  var retried = false;
96  while (!cancellationToken.IsCancellationRequested)
97  {
98  TimeSlice timeSlice;
99  try
100  {
101  if (!enumerator.MoveNext())
102  {
103  // the enumerator ended
104  break;
105  }
106  timeSlice = enumerator.Current;
107  }
108  catch (Exception err)
109  {
110  // notify the algorithm about the error, so it can be reported to the user
111  Algorithm.SetRuntimeError(err, "Synchronizer");
112  break;
113  }
114 
115  // check for cancellation
116  if (timeSlice == null || cancellationToken.IsCancellationRequested) break;
117 
118  if (timeSlice.IsTimePulse && Algorithm.UtcTime == timeSlice.Time)
119  {
120  previousWasTimePulse = timeSlice.IsTimePulse;
121  // skip time pulse when algorithms already at that time
122  continue;
123  }
124 
125  // SubscriptionFrontierTimeProvider will return twice the same time if there are no more subscriptions or if Subscription.Current is null
126  if (timeSlice.Time != previousEmitTime || previousWasTimePulse || timeSlice.UniverseData.Count != 0)
127  {
128  previousEmitTime = timeSlice.Time;
129  previousWasTimePulse = timeSlice.IsTimePulse;
130  // if we emitted, clear retry flag
131  retried = false;
132  yield return timeSlice;
133  }
134  else
135  {
136  // if the slice has data lets retry just once more... this could happen
137  // with subscriptions added after initialize using algorithm.AddSecurity() API,
138  // where the subscription start time is the current time loop (but should just happen once)
139  if (!timeSlice.Slice.HasData || retried)
140  {
141  // there's no more data to pull off, we're done (frontier is max value and no security changes)
142  break;
143  }
144  retried = true;
145  }
146  }
147 
148  enumerator.DisposeSafely();
149  Log.Trace("Synchronizer.GetEnumerator(): Exited thread.");
150  }
151 
152  /// <summary>
153  /// Performs additional initialization steps after algorithm initialization
154  /// </summary>
155  protected virtual void PostInitialize()
156  {
157  SubscriptionSynchronizer.SubscriptionFinished += (sender, subscription) =>
158  {
159  SubscriptionManager.RemoveSubscription(subscription.Configuration);
160  if (Log.DebuggingEnabled)
161  {
162  Log.Debug("Synchronizer.SubscriptionFinished(): Finished subscription:" +
163  $"{subscription.Configuration} at {FrontierTimeProvider.GetUtcNow()} UTC");
164  }
165  };
166 
167  // this is set after the algorithm initializes
168  _dateTimeZone = Algorithm.TimeZone;
169  TimeSliceFactory = new TimeSliceFactory(_dateTimeZone);
171  }
172 
173  /// <summary>
174  /// Gets the <see cref="ITimeProvider"/> to use. By default this will load the
175  /// <see cref="RealTimeProvider"/> for live mode, else <see cref="SubscriptionFrontierTimeProvider"/>
176  /// </summary>
177  /// <returns>The <see cref="ITimeProvider"/> to use</returns>
178  protected virtual ITimeProvider GetTimeProvider()
179  {
180  return new SubscriptionFrontierTimeProvider(GetInitialFrontierTime(), SubscriptionManager);
181  }
182 
183  private DateTime GetInitialFrontierTime()
184  {
185  var frontier = DateTime.MaxValue;
186  foreach (var subscription in SubscriptionManager.DataFeedSubscriptions)
187  {
188  var current = subscription.Current;
189  if (current == null)
190  {
191  continue;
192  }
193 
194  // we need to initialize both the frontier time and the offset provider, in order to do
195  // this we'll first convert the current.EndTime to UTC time, this will allow us to correctly
196  // determine the offset in ticks using the OffsetProvider, we can then use this to recompute
197  // the UTC time. This seems odd, but is necessary given Noda time's lenient mapping, the
198  // OffsetProvider exists to give forward marching mapping
199 
200  // compute the initial frontier time
201  if (current.EmitTimeUtc < frontier)
202  {
203  frontier = current.EmitTimeUtc;
204  }
205  }
206 
207  if (frontier == DateTime.MaxValue)
208  {
209  // here we use Time and not StartDate because Time will be before the start during warmup period.
210  frontier = Algorithm.Time.ConvertToUtc(_dateTimeZone);
211  }
212  return frontier;
213  }
214 
215  /// <summary>
216  /// Free resources
217  /// </summary>
218  public virtual void Dispose()
219  {
220  }
221  }
222 }