Lean  $LEAN_TAG$
LiveSynchronizer.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Collections.Generic;
19 using System.Threading;
23 using QuantConnect.Logging;
24 using QuantConnect.Util;
25 
27 {
28  /// <summary>
29  /// Implementation of the <see cref="ISynchronizer"/> interface which provides the mechanism to stream live data to the algorithm
30  /// </summary>
32  {
33  /// <summary>
34  /// Consumer batching timeout in ms
35  /// </summary>
36  public static readonly int BatchingDelay = Config.GetInt("consumer-batching-timeout-ms");
37 
38  private ITimeProvider _timeProvider;
39  private LiveTimeProvider _frontierTimeProvider;
40  private RealTimeScheduleEventService _realTimeScheduleEventService;
41  private readonly ManualResetEventSlim _newLiveDataEmitted = new ManualResetEventSlim(false);
42 
43  /// <summary>
44  /// Continuous UTC time provider
45  /// </summary>
46  public override ITimeProvider TimeProvider => _timeProvider;
47 
48  /// <summary>
49  /// Initializes the instance of the Synchronizer class
50  /// </summary>
51  public override void Initialize(
52  IAlgorithm algorithm,
53  IDataFeedSubscriptionManager dataFeedSubscriptionManager)
54  {
55  base.Initialize(algorithm, dataFeedSubscriptionManager);
56 
57  // the time provider, is the real time provider
58  _timeProvider = GetTimeProvider();
59  _frontierTimeProvider = new LiveTimeProvider(realTime: TimeProvider);
60  // the synchronizer will use our '_frontierTimeProvider' which initially during warmup will be using
61  // the base time provider which is the subscription based time provider (like backtesting)
62  // once wawrmup finishes it will start using the realtime provider
63  SubscriptionSynchronizer.SetTimeProvider(_frontierTimeProvider);
64 
65  // attach event handlers to subscriptions
66  dataFeedSubscriptionManager.SubscriptionAdded += (sender, subscription) =>
67  {
68  subscription.NewDataAvailable += OnSubscriptionNewDataAvailable;
69  };
70 
71  dataFeedSubscriptionManager.SubscriptionRemoved += (sender, subscription) =>
72  {
73  subscription.NewDataAvailable -= OnSubscriptionNewDataAvailable;
74  };
75 
76  _realTimeScheduleEventService = new RealTimeScheduleEventService(new RealTimeProvider());
77  // this schedule event will be our time pulse
78  _realTimeScheduleEventService.NewEvent += (sender, args) => _newLiveDataEmitted.Set();
79  }
80 
81  /// <summary>
82  /// Returns an enumerable which provides the data to stream to the algorithm
83  /// </summary>
84  public override IEnumerable<TimeSlice> StreamData(CancellationToken cancellationToken)
85  {
87 
88  var shouldSendExtraEmptyPacket = false;
89  var nextEmit = DateTime.MinValue;
90  var lastLoopStart = DateTime.UtcNow;
91 
92  var enumerator = SubscriptionSynchronizer
94  .GetEnumerator();
95 
96  var previousWasTimePulse = false;
97  while (!cancellationToken.IsCancellationRequested)
98  {
99  var now = DateTime.UtcNow;
100  if (!previousWasTimePulse)
101  {
102  if (!_newLiveDataEmitted.IsSet
103  // we warmup as fast as we can even if no new data point is available
105  {
106  // if we just crossed into the next second let's loop again, we will flush any consolidator bar
107  // else we will wait to be notified by the subscriptions or our scheduled event service every second
108  if (lastLoopStart.Second == now.Second)
109  {
110  _realTimeScheduleEventService.ScheduleEvent(TimeSpan.FromMilliseconds(GetPulseDueTime(now)), now);
111  _newLiveDataEmitted.Wait();
112  }
113  }
114  _newLiveDataEmitted.Reset();
115  }
116 
117  lastLoopStart = now;
118 
119  TimeSlice timeSlice;
120  try
121  {
122  if (!enumerator.MoveNext())
123  {
124  // the enumerator ended
125  break;
126  }
127 
128  timeSlice = enumerator.Current;
129  }
130  catch (Exception err)
131  {
132  // notify the algorithm about the error, so it can be reported to the user
133  Algorithm.SetRuntimeError(err, "LiveSynchronizer");
134  shouldSendExtraEmptyPacket = true;
135  break;
136  }
137 
138  // check for cancellation
139  if (timeSlice == null || cancellationToken.IsCancellationRequested) break;
140 
141  var frontierUtc = FrontierTimeProvider.GetUtcNow();
142  // emit on data or if we've elapsed a full second since last emit or there are security changes
143  if (timeSlice.SecurityChanges != SecurityChanges.None
144  || timeSlice.IsTimePulse
145  || timeSlice.Data.Count != 0
146  || frontierUtc >= nextEmit)
147  {
148  previousWasTimePulse = timeSlice.IsTimePulse;
149  yield return timeSlice;
150 
151  // ignore if time pulse because we will emit a slice with the same time just after this one
152  if (!timeSlice.IsTimePulse)
153  {
154  // force emitting every second since the data feed is
155  // the heartbeat of the application
156  nextEmit = frontierUtc.RoundDown(Time.OneSecond).Add(Time.OneSecond);
157  }
158  }
159  }
160 
161  if (shouldSendExtraEmptyPacket)
162  {
163  // send last empty packet list before terminating,
164  // so the algorithm manager has a chance to detect the runtime error
165  // and exit showing the correct error instead of a timeout
166  nextEmit = FrontierTimeProvider.GetUtcNow().RoundDown(Time.OneSecond);
167  if (!cancellationToken.IsCancellationRequested)
168  {
169  var timeSlice = TimeSliceFactory.Create(
170  nextEmit,
171  new List<DataFeedPacket>(),
173  new Dictionary<Universe, BaseDataCollection>());
174  yield return timeSlice;
175  }
176  }
177 
178  enumerator.DisposeSafely();
179  Log.Trace("LiveSynchronizer.GetEnumerator(): Exited thread.");
180  }
181 
182  /// <summary>
183  /// Free resources
184  /// </summary>
185  public override void Dispose()
186  {
187  _newLiveDataEmitted.Set();
188  _newLiveDataEmitted?.DisposeSafely();
189  _realTimeScheduleEventService?.DisposeSafely();
190  }
191 
192  /// <summary>
193  /// Gets the <see cref="ITimeProvider"/> to use. By default this will load the
194  /// <see cref="RealTimeProvider"/> for live mode, else <see cref="SubscriptionFrontierTimeProvider"/>
195  /// </summary>
196  /// <returns>The <see cref="ITimeProvider"/> to use</returns>
197  protected override ITimeProvider GetTimeProvider()
198  {
199  return RealTimeProvider.Instance;
200  }
201 
202  /// <summary>
203  /// Performs additional initialization steps after algorithm initialization
204  /// </summary>
205  protected override void PostInitialize()
206  {
207  base.PostInitialize();
208  _frontierTimeProvider.Initialize(base.GetTimeProvider());
209  }
210 
211  /// <summary>
212  /// Will return the amount of milliseconds that are missing for the next time pulse
213  /// </summary>
214  protected virtual int GetPulseDueTime(DateTime now)
215  {
216  // let's wait until the next second starts
217  return 1000 - now.Millisecond + BatchingDelay;
218  }
219 
220  /// <summary>
221  /// Trigger new data event
222  /// </summary>
223  /// <param name="sender">Sender of the event</param>
224  /// <param name="args">Event information</param>
225  protected virtual void OnSubscriptionNewDataAvailable(object sender, EventArgs args)
226  {
227  _newLiveDataEmitted.Set();
228  }
229  }
230 }