Lean  $LEAN_TAG$
LiveTradingDataFeed.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Linq;
19 using QuantConnect.Data;
20 using QuantConnect.Util;
21 using QuantConnect.Logging;
22 using QuantConnect.Packets;
25 using System.Collections.Generic;
34 
36 {
37  /// <summary>
38  /// Provides an implementation of <see cref="IDataFeed"/> that is designed to deal with
39  /// live, remote data sources
40  /// </summary>
42  {
43  private static readonly int MaximumWarmupHistoryDaysLookBack = Config.GetInt("maximum-warmup-history-days-look-back", 5);
44 
45  private LiveNodePacket _job;
46 
47  // used to get current time
48  private ITimeProvider _timeProvider;
49  private IAlgorithm _algorithm;
50  private ITimeProvider _frontierTimeProvider;
51  private IDataProvider _dataProvider;
52  private IMapFileProvider _mapFileProvider;
53  private IDataQueueHandler _dataQueueHandler;
54  private BaseDataExchange _customExchange;
55  private SubscriptionCollection _subscriptions;
56  private IFactorFileProvider _factorFileProvider;
57  private IDataChannelProvider _channelProvider;
58  // in live trading we delay scheduled universe selection between 11 & 12 hours after midnight UTC so that we allow new selection data to be piped in
59  // NY goes from -4/-5 UTC time, so:
60  // 11 UTC - 4 => 7am NY
61  // 12 UTC - 4 => 8am NY
62  private readonly TimeSpan _scheduledUniverseUtcTimeShift = TimeSpan.FromMinutes(11 * 60 + DateTime.UtcNow.Second);
63  private readonly HashSet<string> _unsupportedConfigurations = new();
64 
65  /// <summary>
66  /// Public flag indicator that the thread is still busy.
67  /// </summary>
68  public bool IsActive
69  {
70  get; private set;
71  }
72 
73  /// <summary>
74  /// Initializes the data feed for the specified job and algorithm
75  /// </summary>
76  public override void Initialize(IAlgorithm algorithm,
78  IResultHandler resultHandler,
79  IMapFileProvider mapFileProvider,
80  IFactorFileProvider factorFileProvider,
81  IDataProvider dataProvider,
82  IDataFeedSubscriptionManager subscriptionManager,
83  IDataFeedTimeProvider dataFeedTimeProvider,
84  IDataChannelProvider dataChannelProvider)
85  {
86  if (!(job is LiveNodePacket))
87  {
88  throw new ArgumentException("The LiveTradingDataFeed requires a LiveNodePacket.");
89  }
90 
91  _algorithm = algorithm;
92  _job = (LiveNodePacket)job;
93  _timeProvider = dataFeedTimeProvider.TimeProvider;
94  _dataProvider = dataProvider;
95  _mapFileProvider = mapFileProvider;
96  _factorFileProvider = factorFileProvider;
97  _channelProvider = dataChannelProvider;
98  _frontierTimeProvider = dataFeedTimeProvider.FrontierTimeProvider;
99  _customExchange = GetBaseDataExchange();
100  _subscriptions = subscriptionManager.DataFeedSubscriptions;
101 
102  _dataQueueHandler = GetDataQueueHandler();
103  _dataQueueHandler?.SetJob(_job);
104 
105  // run the custom data exchange
106  _customExchange.Start();
107 
108  IsActive = true;
109 
110  base.Initialize(algorithm, job, resultHandler, mapFileProvider, factorFileProvider, dataProvider, subscriptionManager, dataFeedTimeProvider, dataChannelProvider);
111  }
112 
113  /// <summary>
114  /// Creates a new subscription to provide data for the specified security.
115  /// </summary>
116  /// <param name="request">Defines the subscription to be added, including start/end times the universe and security</param>
117  /// <returns>The created <see cref="Subscription"/> if successful, null otherwise</returns>
119  {
120  Subscription subscription = null;
121  try
122  {
123  // create and add the subscription to our collection
124  subscription = request.IsUniverseSubscription
125  ? CreateUniverseSubscription(request)
126  : CreateDataSubscription(request);
127  }
128  catch (Exception err)
129  {
130  Log.Error(err, $"CreateSubscription(): Failed configuration: '{request.Configuration}'");
131  // kill the algorithm, this shouldn't happen
132  _algorithm.SetRuntimeError(err, $"Failed to subscribe to {request.Configuration.Symbol}");
133  }
134 
135  return subscription;
136  }
137 
138  /// <summary>
139  /// Removes the subscription from the data feed, if it exists
140  /// </summary>
141  /// <param name="subscription">The subscription to remove</param>
142  public override void RemoveSubscription(Subscription subscription)
143  {
144  var symbol = subscription.Configuration.Symbol;
145 
146  // remove the subscriptions
147  if (!_channelProvider.ShouldStreamSubscription(subscription.Configuration))
148  {
149  _customExchange.RemoveEnumerator(symbol);
150  }
151  else
152  {
153  _dataQueueHandler.UnsubscribeWithMapping(subscription.Configuration);
154  }
155  }
156 
157  /// <summary>
158  /// External controller calls to signal a terminate of the thread.
159  /// </summary>
160  public override void Exit()
161  {
162  if (IsActive)
163  {
164  IsActive = false;
165  Log.Trace("LiveTradingDataFeed.Exit(): Start. Setting cancellation token...");
166  if (_dataQueueHandler is DataQueueHandlerManager manager)
167  {
168  manager.UnsupportedConfiguration -= HandleUnsupportedConfigurationEvent;
169  }
170  _customExchange?.Stop();
171  Log.Trace("LiveTradingDataFeed.Exit(): Exit Finished.");
172 
173  base.Exit();
174  }
175  }
176 
177  /// <summary>
178  /// Gets the <see cref="IDataQueueHandler"/> to use by default <see cref="DataQueueHandlerManager"/>
179  /// </summary>
180  /// <remarks>Useful for testing</remarks>
181  /// <returns>The loaded <see cref="IDataQueueHandler"/></returns>
183  {
184  var result = new DataQueueHandlerManager(_algorithm.Settings);
185  result.UnsupportedConfiguration += HandleUnsupportedConfigurationEvent;
186  return result;
187  }
188 
189  /// <summary>
190  /// Gets the <see cref="BaseDataExchange"/> to use
191  /// </summary>
192  /// <remarks>Useful for testing</remarks>
194  {
195  return new BaseDataExchange("CustomDataExchange") { SleepInterval = 100 };
196  }
197 
198  /// <summary>
199  /// Creates a new subscription for the specified security
200  /// </summary>
201  /// <param name="request">The subscription request</param>
202  /// <returns>A new subscription instance of the specified security</returns>
203  private Subscription CreateDataSubscription(SubscriptionRequest request)
204  {
205  Subscription subscription = null;
206 
207  var localEndTime = request.EndTimeUtc.ConvertFromUtc(request.Security.Exchange.TimeZone);
208  var timeZoneOffsetProvider = new TimeZoneOffsetProvider(request.Configuration.ExchangeTimeZone, request.StartTimeUtc, request.EndTimeUtc);
209 
210  IEnumerator<BaseData> enumerator = null;
211  if (!_channelProvider.ShouldStreamSubscription(request.Configuration))
212  {
213  if (!Tiingo.IsAuthCodeSet)
214  {
215  // we're not using the SubscriptionDataReader, so be sure to set the auth token here
216  Tiingo.SetAuthCode(Config.Get("tiingo-auth-token"));
217  }
218 
219  var factory = new LiveCustomDataSubscriptionEnumeratorFactory(_timeProvider, _algorithm.ObjectStore);
220  var enumeratorStack = factory.CreateEnumerator(request, _dataProvider);
221 
222  var enqueable = new EnqueueableEnumerator<BaseData>();
223  _customExchange.AddEnumerator(request.Configuration.Symbol, enumeratorStack, handleData: data =>
224  {
225  enqueable.Enqueue(data);
226 
227  subscription?.OnNewDataAvailable();
228  });
229 
230  enumerator = enqueable;
231  }
232  else
233  {
234  var auxEnumerators = new List<IEnumerator<BaseData>>();
235 
236  if (LiveAuxiliaryDataEnumerator.TryCreate(request.Configuration, _timeProvider, request.Security.Cache, _mapFileProvider,
237  _factorFileProvider, request.StartTimeLocal, out var auxDataEnumator))
238  {
239  auxEnumerators.Add(auxDataEnumator);
240  }
241 
242  EventHandler handler = (_, _) => subscription?.OnNewDataAvailable();
243  enumerator = Subscribe(request.Configuration, handler, IsExpired);
244 
245  if (auxEnumerators.Count > 0)
246  {
247  enumerator = new LiveAuxiliaryDataSynchronizingEnumerator(_timeProvider, request.Configuration.ExchangeTimeZone, enumerator, auxEnumerators);
248  }
249  }
250 
251  // scale prices before 'SubscriptionFilterEnumerator' since it updates securities realtime price
252  // and before fill forwarding so we don't happen to apply twice the factor
253  if (request.Configuration.PricesShouldBeScaled(liveMode: true))
254  {
255  enumerator = new PriceScaleFactorEnumerator(
256  enumerator,
257  request.Configuration,
258  _factorFileProvider,
259  liveMode: true);
260  }
261 
262  if (request.Configuration.FillDataForward)
263  {
264  var fillForwardResolution = _subscriptions.UpdateAndGetFillForwardResolution(request.Configuration);
265  var useDailyStrictEndTimes = LeanData.UseDailyStrictEndTimes(_algorithm.Settings, request, request.Configuration.Symbol, request.Configuration.Increment);
266 
267  enumerator = new LiveFillForwardEnumerator(_frontierTimeProvider, enumerator, request.Security.Exchange, fillForwardResolution, request.Configuration.ExtendedMarketHours,
268  localEndTime, request.Configuration.Resolution, request.Configuration.DataTimeZone, useDailyStrictEndTimes);
269  }
270 
271  // make our subscriptions aware of the frontier of the data feed, prevents future data from spewing into the feed
272  enumerator = new FrontierAwareEnumerator(enumerator, _frontierTimeProvider, timeZoneOffsetProvider);
273 
274  // define market hours and user filters to incoming data after the frontier enumerator so during warmup we avoid any realtime data making it's way into the securities
276  {
277  enumerator = new SubscriptionFilterEnumerator(enumerator, request.Security, localEndTime, request.Configuration.ExtendedMarketHours, true, request.ExchangeHours);
278  }
279 
280  enumerator = GetWarmupEnumerator(request, enumerator);
281 
282  var subscriptionDataEnumerator = new SubscriptionDataEnumerator(request.Configuration, request.Security.Exchange.Hours, timeZoneOffsetProvider,
283  enumerator, request.IsUniverseSubscription, _algorithm.Settings.DailyPreciseEndTime);
284  subscription = new Subscription(request, subscriptionDataEnumerator, timeZoneOffsetProvider);
285 
286  return subscription;
287  }
288 
289  /// <summary>
290  /// Helper method to determine if the symbol associated with the requested configuration is expired or not
291  /// </summary>
292  /// <remarks>This is useful during warmup where we can be requested to add some already expired asset. We want to skip sending it
293  /// to our live <see cref="_dataQueueHandler"/> instance to avoid explosions. But we do want to add warmup enumerators</remarks>
294  private bool IsExpired(SubscriptionDataConfig dataConfig)
295  {
296  var mapFile = _mapFileProvider.ResolveMapFile(dataConfig);
297  var delistingDate = dataConfig.Symbol.GetDelistingDate(mapFile);
298  return _timeProvider.GetUtcNow().Date > delistingDate.ConvertToUtc(dataConfig.ExchangeTimeZone);
299  }
300 
301  private IEnumerator<BaseData> Subscribe(SubscriptionDataConfig dataConfig, EventHandler newDataAvailableHandler, Func<SubscriptionDataConfig, bool> isExpired)
302  {
303  return new LiveSubscriptionEnumerator(dataConfig, _dataQueueHandler, newDataAvailableHandler, isExpired);
304  }
305 
306  /// <summary>
307  /// Creates a new subscription for universe selection
308  /// </summary>
309  /// <param name="request">The subscription request</param>
310  private Subscription CreateUniverseSubscription(SubscriptionRequest request)
311  {
312  Subscription subscription = null;
313 
314  // TODO : Consider moving the creating of universe subscriptions to a separate, testable class
315 
316  // grab the relevant exchange hours
317  var config = request.Universe.Configuration;
318  var localEndTime = request.EndTimeUtc.ConvertFromUtc(request.Security.Exchange.TimeZone);
319  var tzOffsetProvider = new TimeZoneOffsetProvider(request.Configuration.ExchangeTimeZone, request.StartTimeUtc, request.EndTimeUtc);
320 
321  IEnumerator<BaseData> enumerator = null;
322 
323  var timeTriggered = request.Universe as ITimeTriggeredUniverse;
324  if (timeTriggered != null)
325  {
326  Log.Trace($"LiveTradingDataFeed.CreateUniverseSubscription(): Creating user defined universe: {config.Symbol.ID}");
327 
328  // spoof a tick on the requested interval to trigger the universe selection function
329  var enumeratorFactory = new TimeTriggeredUniverseSubscriptionEnumeratorFactory(timeTriggered, MarketHoursDatabase.FromDataFolder(), _frontierTimeProvider);
330  enumerator = enumeratorFactory.CreateEnumerator(request, _dataProvider);
331 
332  enumerator = new FrontierAwareEnumerator(enumerator, _timeProvider, tzOffsetProvider);
333 
334  var enqueueable = new EnqueueableEnumerator<BaseData>();
335  _customExchange.AddEnumerator(new EnumeratorHandler(config.Symbol, enumerator, enqueueable));
336  enumerator = enqueueable;
337  }
338  else if (config.Type.IsAssignableTo(typeof(ETFConstituentUniverse)) ||
339  config.Type.IsAssignableTo(typeof(FundamentalUniverse)) ||
340  (request.Universe is OptionChainUniverse && request.Configuration.SecurityType != SecurityType.FutureOption))
341  {
342  Log.Trace($"LiveTradingDataFeed.CreateUniverseSubscription(): Creating {config.Type.Name} universe: {config.Symbol.ID}");
343 
344  // Will try to pull data from the data folder every 10min, file with yesterdays date.
345  // If lean is started today it will trigger initial coarse universe selection
346  var factory = new LiveCustomDataSubscriptionEnumeratorFactory(_timeProvider,
347  _algorithm.ObjectStore,
348  // we adjust time to the previous tradable date
349  time => Time.GetStartTimeForTradeBars(request.Security.Exchange.Hours, time, Time.OneDay, 1, false, config.DataTimeZone, _algorithm.Settings.DailyPreciseEndTime),
350  TimeSpan.FromMinutes(10)
351  );
352  var enumeratorStack = factory.CreateEnumerator(request, _dataProvider);
353 
354  // aggregates each coarse data point into a single BaseDataCollection
355  var aggregator = new BaseDataCollectionAggregatorEnumerator(enumeratorStack, config.Symbol, true);
356  var enqueable = new EnqueueableEnumerator<BaseData>();
357  _customExchange.AddEnumerator(config.Symbol, aggregator, handleData: data =>
358  {
359  enqueable.Enqueue(data);
360  subscription?.OnNewDataAvailable();
361  });
362 
363  enumerator = GetConfiguredFrontierAwareEnumerator(enqueable, tzOffsetProvider,
364  // advance time if before 23pm or after 5am and not on Saturdays
365  time => time.Hour < 23 && time.Hour > 5 && time.DayOfWeek != DayOfWeek.Saturday);
366  }
367  else if (request.Universe is OptionChainUniverse)
368  {
369  Log.Trace("LiveTradingDataFeed.CreateUniverseSubscription(): Creating option chain universe: " + config.Symbol.ID);
370 
371  Func<SubscriptionRequest, IEnumerator<BaseData>> configure = (subRequest) =>
372  {
373  var useDailyStrictEndTimes = LeanData.UseDailyStrictEndTimes(_algorithm.Settings, request, request.Configuration.Symbol, request.Configuration.Increment);
374  var fillForwardResolution = _subscriptions.UpdateAndGetFillForwardResolution(subRequest.Configuration);
375  var input = Subscribe(subRequest.Configuration, (sender, args) => subscription?.OnNewDataAvailable(), (_) => false);
376  return new LiveFillForwardEnumerator(_frontierTimeProvider, input, subRequest.Security.Exchange, fillForwardResolution, subRequest.Configuration.ExtendedMarketHours,
377  localEndTime, subRequest.Configuration.Resolution, subRequest.Configuration.DataTimeZone, useDailyStrictEndTimes);
378  };
379 
380  var symbolUniverse = GetUniverseProvider(request.Configuration.SecurityType);
381 
382  var enumeratorFactory = new OptionChainUniverseSubscriptionEnumeratorFactory(configure, symbolUniverse, _timeProvider);
383  enumerator = enumeratorFactory.CreateEnumerator(request, _dataProvider);
384 
385  enumerator = new FrontierAwareEnumerator(enumerator, _frontierTimeProvider, tzOffsetProvider);
386  }
387  else if (request.Universe is FuturesChainUniverse)
388  {
389  Log.Trace("LiveTradingDataFeed.CreateUniverseSubscription(): Creating futures chain universe: " + config.Symbol.ID);
390 
391  var symbolUniverse = GetUniverseProvider(SecurityType.Future);
392 
393  enumerator = new DataQueueFuturesChainUniverseDataCollectionEnumerator(request, symbolUniverse, _timeProvider);
394  enumerator = new FrontierAwareEnumerator(enumerator, _frontierTimeProvider, tzOffsetProvider);
395  }
396  else
397  {
398  Log.Trace("LiveTradingDataFeed.CreateUniverseSubscription(): Creating custom universe: " + config.Symbol.ID);
399 
400  var factory = new LiveCustomDataSubscriptionEnumeratorFactory(_timeProvider, _algorithm.ObjectStore);
401  var enumeratorStack = factory.CreateEnumerator(request, _dataProvider);
402  enumerator = new BaseDataCollectionAggregatorEnumerator(enumeratorStack, config.Symbol, liveMode: true);
403 
404  var enqueueable = new EnqueueableEnumerator<BaseData>();
405  _customExchange.AddEnumerator(new EnumeratorHandler(config.Symbol, enumerator, enqueueable));
406  enumerator = enqueueable;
407  }
408 
409  enumerator = AddScheduleWrapper(request, enumerator, new PredicateTimeProvider(_frontierTimeProvider, (currentUtcDateTime) => {
410  // will only let time advance after it's passed the live time shift frontier
411  return currentUtcDateTime.TimeOfDay > _scheduledUniverseUtcTimeShift;
412  }));
413 
414  enumerator = GetWarmupEnumerator(request, enumerator);
415 
416  // create the subscription
417  var subscriptionDataEnumerator = new SubscriptionDataEnumerator(request.Configuration, request.Security.Exchange.Hours, tzOffsetProvider,
418  enumerator, request.IsUniverseSubscription, _algorithm.Settings.DailyPreciseEndTime);
419  subscription = new Subscription(request, subscriptionDataEnumerator, tzOffsetProvider);
420 
421  return subscription;
422  }
423 
424  /// <summary>
425  /// Build and apply the warmup enumerators when required
426  /// </summary>
427  private IEnumerator<BaseData> GetWarmupEnumerator(SubscriptionRequest request, IEnumerator<BaseData> liveEnumerator)
428  {
429  if (_algorithm.IsWarmingUp)
430  {
431  var warmupRequest = new SubscriptionRequest(request, endTimeUtc: _timeProvider.GetUtcNow(),
432  // we will not fill forward each warmup enumerators separately but concatenated bellow
433  configuration: new SubscriptionDataConfig(request.Configuration, fillForward: false,
434  resolution: _algorithm.Settings.WarmupResolution));
435  if (warmupRequest.TradableDaysInDataTimeZone.Any()
436  // make sure there is at least room for a single bar of the requested resolution, else can cause issues with some history providers
437  // this could happen when we create some internal subscription whose start time is 'Now', which we don't really want to warmup
438  && warmupRequest.EndTimeUtc - warmupRequest.StartTimeUtc >= warmupRequest.Configuration.Resolution.ToTimeSpan()
439  // since we change the resolution, let's validate it's still valid configuration (example daily equity quotes are not!)
440  && LeanData.IsValidConfiguration(warmupRequest.Configuration.SecurityType, warmupRequest.Configuration.Resolution, warmupRequest.Configuration.TickType))
441  {
442  // since we will source data locally and from the history provider, let's limit the history request size
443  // by setting a start date respecting the 'MaximumWarmupHistoryDaysLookBack'
444  var historyWarmup = warmupRequest;
445  var warmupHistoryStartDate = warmupRequest.EndTimeUtc.AddDays(-MaximumWarmupHistoryDaysLookBack);
446  if (warmupHistoryStartDate > warmupRequest.StartTimeUtc)
447  {
448  historyWarmup = new SubscriptionRequest(warmupRequest, startTimeUtc: warmupHistoryStartDate);
449  }
450 
451  // let's keep track of the last point we got from the file based enumerator and start our history enumeration from this point
452  // this is much more efficient since these duplicated points will be dropped by the filter righ away causing memory usage spikes
453  var lastPointTracker = new LastPointTracker();
454 
455  var synchronizedWarmupEnumerator = TryAddFillForwardEnumerator(warmupRequest,
456  // we concatenate the file based and history based warmup enumerators, dropping duplicate time stamps
457  new ConcatEnumerator(true, GetFileBasedWarmupEnumerator(warmupRequest, lastPointTracker), GetHistoryWarmupEnumerator(historyWarmup, lastPointTracker)) { CanEmitNull = false },
458  // if required by the original request, we will fill forward the Synced warmup data
460  _algorithm.Settings.WarmupResolution);
461  synchronizedWarmupEnumerator = AddScheduleWrapper(warmupRequest, synchronizedWarmupEnumerator, null);
462 
463  // don't let future data past. We let null pass because that's letting the next enumerator know we've ended because we always return true in live
464  synchronizedWarmupEnumerator = new FilterEnumerator<BaseData>(synchronizedWarmupEnumerator, data => data == null || data.EndTime <= warmupRequest.EndTimeLocal);
465 
466  // the order here is important, concat enumerator will keep the last enumerator given and dispose of the rest
467  liveEnumerator = new ConcatEnumerator(true, synchronizedWarmupEnumerator, liveEnumerator);
468  }
469  }
470  return liveEnumerator;
471  }
472 
473  /// <summary>
474  /// File based warmup enumerator
475  /// </summary>
476  private IEnumerator<BaseData> GetFileBasedWarmupEnumerator(SubscriptionRequest warmup, LastPointTracker lastPointTracker)
477  {
478  IEnumerator<BaseData> result = null;
479  try
480  {
481  result = new FilterEnumerator<BaseData>(CreateEnumerator(warmup),
482  data =>
483  {
484  // don't let future data past, nor fill forward, that will be handled after merging with the history request response
485  if (data == null || data.EndTime < warmup.EndTimeLocal && !data.IsFillForward)
486  {
487  if (data != null)
488  {
489  lastPointTracker.LastDataPoint = data;
490  }
491  return true;
492  }
493  return false;
494  });
495  }
496  catch (Exception e)
497  {
498  Log.Error(e, $"File based warmup: {warmup.Configuration}");
499  }
500  return result;
501  }
502 
503  /// <summary>
504  /// History based warmup enumerator
505  /// </summary>
506  private IEnumerator<BaseData> GetHistoryWarmupEnumerator(SubscriptionRequest warmup, LastPointTracker lastPointTracker)
507  {
508  IEnumerator<BaseData> result;
509  if (warmup.IsUniverseSubscription)
510  {
511  // we ignore the fill forward time span argument because we will fill forwared the concatenated file and history based enumerators next in the stack
512  result = CreateUniverseEnumerator(warmup, createUnderlyingEnumerator: (req, _) => GetHistoryWarmupEnumerator(req, lastPointTracker));
513  }
514  else
515  {
516  // we create an enumerable of which we get the enumerator to defer the creation of the history request until the file based enumeration ended
517  // and potentially the 'lastPointTracker' is available to adjust our start time
518  result = new[] { warmup }.SelectMany(_ =>
519  {
520  var startTimeUtc = warmup.StartTimeUtc;
521  if (lastPointTracker != null && lastPointTracker.LastDataPoint != null)
522  {
523  var lastPointExchangeTime = lastPointTracker.LastDataPoint.Time;
524  if (warmup.Configuration.Resolution == Resolution.Daily)
525  {
526  // time could be 9.30 for example using strict daily end times, but we just want the date in this case
527  lastPointExchangeTime = lastPointExchangeTime.Date;
528  }
529 
530  var utcLastPointTime = lastPointExchangeTime.ConvertToUtc(warmup.ExchangeHours.TimeZone);
531  if (utcLastPointTime > startTimeUtc)
532  {
533  if (Log.DebuggingEnabled)
534  {
535  Log.Debug($"LiveTradingDataFeed.GetHistoryWarmupEnumerator(): Adjusting history warmup start time to {utcLastPointTime} from {startTimeUtc} for {warmup.Configuration}");
536  }
537  startTimeUtc = utcLastPointTime;
538  }
539  }
540  var historyRequest = new Data.HistoryRequest(warmup.Configuration, warmup.ExchangeHours, startTimeUtc, warmup.EndTimeUtc);
541  try
542  {
543  return _algorithm.HistoryProvider.GetHistory(new[] { historyRequest }, _algorithm.TimeZone).Select(slice =>
544  {
545  try
546  {
547  var data = slice.Get(historyRequest.DataType);
548  return (BaseData)data[warmup.Configuration.Symbol];
549  }
550  catch (Exception e)
551  {
552  Log.Error(e, $"History warmup: {warmup.Configuration}");
553  }
554  return null;
555  });
556  }
557  catch
558  {
559  // some history providers could throw if they do not support a type
560  }
561  return Enumerable.Empty<BaseData>();
562  }).GetEnumerator();
563  }
564 
565  return new FilterEnumerator<BaseData>(result,
566  // don't let future data past, nor fill forward, that will be handled after merging with the file based enumerator
567  data => data == null || data.EndTime < warmup.EndTimeLocal && !data.IsFillForward);
568  }
569 
570  /// <summary>
571  /// Will wrap the provided enumerator with a <see cref="FrontierAwareEnumerator"/>
572  /// using a <see cref="PredicateTimeProvider"/> that will advance time based on the provided
573  /// function
574  /// </summary>
575  /// <remarks>Won't advance time if now.Hour is bigger or equal than 23pm, less or equal than 5am or Saturday.
576  /// This is done to prevent universe selection occurring in those hours so that the subscription changes
577  /// are handled correctly.</remarks>
578  private IEnumerator<BaseData> GetConfiguredFrontierAwareEnumerator(
579  IEnumerator<BaseData> enumerator,
580  TimeZoneOffsetProvider tzOffsetProvider,
581  Func<DateTime, bool> customStepEvaluator)
582  {
583  var stepTimeProvider = new PredicateTimeProvider(_frontierTimeProvider, customStepEvaluator);
584 
585  return new FrontierAwareEnumerator(enumerator, stepTimeProvider, tzOffsetProvider);
586  }
587 
588  private IDataQueueUniverseProvider GetUniverseProvider(SecurityType securityType)
589  {
590  if (_dataQueueHandler is not IDataQueueUniverseProvider or DataQueueHandlerManager { HasUniverseProvider: false })
591  {
592  throw new NotSupportedException($"The DataQueueHandler does not support {securityType}.");
593  }
594  return (IDataQueueUniverseProvider)_dataQueueHandler;
595  }
596 
597  private void HandleUnsupportedConfigurationEvent(object _, SubscriptionDataConfig config)
598  {
599  if (_algorithm != null)
600  {
601  lock (_unsupportedConfigurations)
602  {
603  var key = $"{config.Symbol.ID.Market} {config.Symbol.ID.SecurityType} {config.Type.Name}";
604  if (_unsupportedConfigurations.Add(key))
605  {
606  Log.Trace($"LiveTradingDataFeed.HandleUnsupportedConfigurationEvent(): detected unsupported configuration: {config}");
607 
608  _algorithm.Debug($"Warning: {key} data not supported. Please consider reviewing the data providers selection.");
609  }
610  }
611  }
612  }
613 
614  /// <summary>
615  /// Overrides methods of the base data exchange implementation
616  /// </summary>
617  private class EnumeratorHandler : BaseDataExchange.EnumeratorHandler
618  {
619  public EnumeratorHandler(Symbol symbol, IEnumerator<BaseData> enumerator, EnqueueableEnumerator<BaseData> enqueueable)
620  : base(symbol, enumerator, handleData: enqueueable.Enqueue)
621  {
622  EnumeratorFinished += (_, _) => enqueueable.Stop();
623  }
624  }
625 
626  private class LastPointTracker
627  {
628  public BaseData LastDataPoint { get; set; }
629  }
630  }
631 }