25 using System.Collections.Generic;
43 private static readonly
int MaximumWarmupHistoryDaysLookBack =
Config.
GetInt(
"maximum-warmup-history-days-look-back", 5);
62 private readonly TimeSpan _scheduledUniverseUtcTimeShift = TimeSpan.FromMinutes(11 * 60 + DateTime.UtcNow.Second);
63 private readonly HashSet<string> _unsupportedConfigurations =
new();
88 throw new ArgumentException(
"The LiveTradingDataFeed requires a LiveNodePacket.");
91 _algorithm = algorithm;
94 _dataProvider = dataProvider;
95 _mapFileProvider = mapFileProvider;
96 _factorFileProvider = factorFileProvider;
97 _channelProvider = dataChannelProvider;
103 _dataQueueHandler?.
SetJob(_job);
106 _customExchange.
Start();
110 base.Initialize(algorithm, job, resultHandler, mapFileProvider, factorFileProvider, dataProvider, subscriptionManager, dataFeedTimeProvider, dataChannelProvider);
125 ? CreateUniverseSubscription(request)
126 : CreateDataSubscription(request);
128 catch (Exception err)
130 Log.
Error(err, $
"CreateSubscription(): Failed configuration: '{request.Configuration}'");
132 _algorithm.SetRuntimeError(err, $
"Failed to subscribe to {request.Configuration.Symbol}");
153 _dataQueueHandler.UnsubscribeWithMapping(subscription.
Configuration);
165 Log.
Trace(
"LiveTradingDataFeed.Exit(): Start. Setting cancellation token...");
168 manager.UnsupportedConfiguration -= HandleUnsupportedConfigurationEvent;
170 _customExchange?.
Stop();
171 Log.
Trace(
"LiveTradingDataFeed.Exit(): Exit Finished.");
185 result.UnsupportedConfiguration += HandleUnsupportedConfigurationEvent;
210 IEnumerator<BaseData> enumerator =
null;
220 var enumeratorStack = factory.CreateEnumerator(request, _dataProvider);
225 enqueable.Enqueue(data);
227 subscription?.OnNewDataAvailable();
230 enumerator = enqueable;
234 var auxEnumerators =
new List<IEnumerator<BaseData>>();
237 _factorFileProvider, request.
StartTimeLocal, out var auxDataEnumator))
239 auxEnumerators.Add(auxDataEnumator);
242 EventHandler handler = (_, _) => subscription?.OnNewDataAvailable();
243 enumerator = Subscribe(request.
Configuration, handler, IsExpired);
245 if (auxEnumerators.Count > 0)
253 if (request.
Configuration.PricesShouldBeScaled(liveMode:
true))
280 enumerator = GetWarmupEnumerator(request, enumerator);
284 subscription =
new Subscription(request, subscriptionDataEnumerator, timeZoneOffsetProvider);
296 var mapFile = _mapFileProvider.ResolveMapFile(dataConfig);
297 var delistingDate = dataConfig.
Symbol.GetDelistingDate(mapFile);
301 private IEnumerator<BaseData> Subscribe(
SubscriptionDataConfig dataConfig, EventHandler newDataAvailableHandler, Func<SubscriptionDataConfig, bool> isExpired)
312 Subscription subscription =
null;
321 IEnumerator<BaseData> enumerator =
null;
324 if (timeTriggered !=
null)
326 Log.
Trace($
"LiveTradingDataFeed.CreateUniverseSubscription(): Creating user defined universe: {config.Symbol.ID}");
330 enumerator = enumeratorFactory.CreateEnumerator(request, _dataProvider);
335 _customExchange.
AddEnumerator(
new EnumeratorHandler(config.Symbol, enumerator, enqueueable));
336 enumerator = enqueueable;
342 Log.
Trace($
"LiveTradingDataFeed.CreateUniverseSubscription(): Creating {config.Type.Name} universe: {config.Symbol.ID}");
350 TimeSpan.FromMinutes(10)
352 var enumeratorStack = factory.CreateEnumerator(request, _dataProvider);
357 _customExchange.
AddEnumerator(config.Symbol, aggregator, handleData: data =>
359 enqueable.Enqueue(data);
360 subscription?.OnNewDataAvailable();
363 enumerator = GetConfiguredFrontierAwareEnumerator(enqueable, tzOffsetProvider,
365 time => time.Hour < 23 && time.Hour > 5 && time.DayOfWeek != DayOfWeek.Saturday);
369 Log.
Trace(
"LiveTradingDataFeed.CreateUniverseSubscription(): Creating option chain universe: " + config.Symbol.ID);
371 Func<SubscriptionRequest, IEnumerator<BaseData>> configure = (subRequest) =>
375 var input = Subscribe(subRequest.Configuration, (sender, args) => subscription?.OnNewDataAvailable(), (_) =>
false);
376 return new LiveFillForwardEnumerator(_frontierTimeProvider, input, subRequest.Security.Exchange, fillForwardResolution, subRequest.Configuration.ExtendedMarketHours,
377 localEndTime, subRequest.Configuration.Resolution, subRequest.Configuration.DataTimeZone, useDailyStrictEndTimes);
383 enumerator = enumeratorFactory.CreateEnumerator(request, _dataProvider);
389 Log.
Trace(
"LiveTradingDataFeed.CreateUniverseSubscription(): Creating futures chain universe: " + config.Symbol.ID);
391 var symbolUniverse = GetUniverseProvider(
SecurityType.Future);
398 Log.
Trace(
"LiveTradingDataFeed.CreateUniverseSubscription(): Creating custom universe: " + config.Symbol.ID);
401 var enumeratorStack = factory.CreateEnumerator(request, _dataProvider);
405 _customExchange.
AddEnumerator(
new EnumeratorHandler(config.Symbol, enumerator, enqueueable));
406 enumerator = enqueueable;
409 enumerator =
AddScheduleWrapper(request, enumerator,
new PredicateTimeProvider(_frontierTimeProvider, (currentUtcDateTime) => {
411 return currentUtcDateTime.TimeOfDay > _scheduledUniverseUtcTimeShift;
414 enumerator = GetWarmupEnumerator(request, enumerator);
419 subscription =
new Subscription(request, subscriptionDataEnumerator, tzOffsetProvider);
427 private IEnumerator<BaseData> GetWarmupEnumerator(
SubscriptionRequest request, IEnumerator<BaseData> liveEnumerator)
435 if (warmupRequest.TradableDaysInDataTimeZone.Any()
438 && warmupRequest.EndTimeUtc - warmupRequest.StartTimeUtc >= warmupRequest.Configuration.Resolution.ToTimeSpan()
440 &&
LeanData.
IsValidConfiguration(warmupRequest.Configuration.SecurityType, warmupRequest.Configuration.Resolution, warmupRequest.Configuration.TickType))
444 var historyWarmup = warmupRequest;
445 var warmupHistoryStartDate = warmupRequest.EndTimeUtc.AddDays(-MaximumWarmupHistoryDaysLookBack);
446 if (warmupHistoryStartDate > warmupRequest.StartTimeUtc)
448 historyWarmup =
new SubscriptionRequest(warmupRequest, startTimeUtc: warmupHistoryStartDate);
453 var lastPointTracker =
new LastPointTracker();
457 new ConcatEnumerator(
true, GetFileBasedWarmupEnumerator(warmupRequest, lastPointTracker), GetHistoryWarmupEnumerator(historyWarmup, lastPointTracker)) { CanEmitNull =
false },
461 synchronizedWarmupEnumerator =
AddScheduleWrapper(warmupRequest, synchronizedWarmupEnumerator,
null);
464 synchronizedWarmupEnumerator =
new FilterEnumerator<BaseData>(synchronizedWarmupEnumerator, data => data ==
null || data.EndTime <= warmupRequest.EndTimeLocal);
467 liveEnumerator =
new ConcatEnumerator(
true, synchronizedWarmupEnumerator, liveEnumerator);
470 return liveEnumerator;
476 private IEnumerator<BaseData> GetFileBasedWarmupEnumerator(
SubscriptionRequest warmup, LastPointTracker lastPointTracker)
478 IEnumerator<BaseData> result =
null;
485 if (data ==
null || data.EndTime < warmup.
EndTimeLocal && !data.IsFillForward)
489 lastPointTracker.LastDataPoint = data;
498 Log.
Error(e, $
"File based warmup: {warmup.Configuration}");
506 private IEnumerator<BaseData> GetHistoryWarmupEnumerator(
SubscriptionRequest warmup, LastPointTracker lastPointTracker)
508 IEnumerator<BaseData> result;
512 result = CreateUniverseEnumerator(warmup, createUnderlyingEnumerator: (req, _) => GetHistoryWarmupEnumerator(req, lastPointTracker));
518 result =
new[] { warmup }.SelectMany(_ =>
521 if (lastPointTracker !=
null && lastPointTracker.LastDataPoint !=
null)
523 var lastPointExchangeTime = lastPointTracker.LastDataPoint.Time;
524 if (warmup.Configuration.Resolution == Resolution.Daily)
527 lastPointExchangeTime = lastPointExchangeTime.Date;
531 if (utcLastPointTime > startTimeUtc)
535 Log.Debug($
"LiveTradingDataFeed.GetHistoryWarmupEnumerator(): Adjusting history warmup start time to {utcLastPointTime} from {startTimeUtc} for {warmup.Configuration}");
537 startTimeUtc = utcLastPointTime;
543 return _algorithm.HistoryProvider.GetHistory(new[] { historyRequest }, _algorithm.
TimeZone).Select(slice =>
547 var data = slice.Get(historyRequest.DataType);
552 Log.
Error(e, $
"History warmup: {warmup.Configuration}");
561 return Enumerable.Empty<
BaseData>();
567 data => data ==
null || data.EndTime < warmup.
EndTimeLocal && !data.IsFillForward);
578 private IEnumerator<BaseData> GetConfiguredFrontierAwareEnumerator(
579 IEnumerator<BaseData> enumerator,
580 TimeZoneOffsetProvider tzOffsetProvider,
581 Func<DateTime, bool> customStepEvaluator)
583 var stepTimeProvider =
new PredicateTimeProvider(_frontierTimeProvider, customStepEvaluator);
592 throw new NotSupportedException($
"The DataQueueHandler does not support {securityType}.");
599 if (_algorithm !=
null)
601 lock (_unsupportedConfigurations)
603 var key = $
"{config.Symbol.ID.Market} {config.Symbol.ID.SecurityType} {config.Type.Name}";
604 if (_unsupportedConfigurations.Add(key))
606 Log.
Trace($
"LiveTradingDataFeed.HandleUnsupportedConfigurationEvent(): detected unsupported configuration: {config}");
608 _algorithm.
Debug($
"Warning: {key} data not supported. Please consider reviewing the data providers selection.");
617 private class EnumeratorHandler : BaseDataExchange.EnumeratorHandler
620 : base(symbol, enumerator, handleData: enqueueable.Enqueue)
622 EnumeratorFinished += (_, _) => enqueueable.
Stop();
626 private class LastPointTracker
628 public BaseData LastDataPoint {
get;
set; }