25 using System.Collections.Generic;
43 private static readonly
int MaximumWarmupHistoryDaysLookBack =
Config.
GetInt(
"maximum-warmup-history-days-look-back", 5);
62 private readonly TimeSpan _scheduledUniverseUtcTimeShift = TimeSpan.FromMinutes(11 * 60 + DateTime.UtcNow.Second);
63 private readonly HashSet<string> _unsupportedConfigurations =
new();
88 throw new ArgumentException(
"The LiveTradingDataFeed requires a LiveNodePacket.");
91 _algorithm = algorithm;
94 _dataProvider = dataProvider;
95 _mapFileProvider = mapFileProvider;
96 _factorFileProvider = factorFileProvider;
97 _channelProvider = dataChannelProvider;
103 _dataQueueHandler?.
SetJob(_job);
106 _customExchange.
Start();
110 base.Initialize(algorithm, job, resultHandler, mapFileProvider, factorFileProvider, dataProvider, subscriptionManager, dataFeedTimeProvider, dataChannelProvider);
125 ? CreateUniverseSubscription(request)
126 : CreateDataSubscription(request);
128 catch (Exception err)
130 Log.
Error(err, $
"CreateSubscription(): Failed configuration: '{request.Configuration}'");
132 _algorithm.SetRuntimeError(err, $
"Failed to subscribe to {request.Configuration.Symbol}");
153 _dataQueueHandler.UnsubscribeWithMapping(subscription.
Configuration);
165 Log.
Trace(
"LiveTradingDataFeed.Exit(): Start. Setting cancellation token...");
168 manager.UnsupportedConfiguration -= HandleUnsupportedConfigurationEvent;
170 _customExchange?.
Stop();
171 Log.
Trace(
"LiveTradingDataFeed.Exit(): Exit Finished.");
185 result.UnsupportedConfiguration += HandleUnsupportedConfigurationEvent;
210 IEnumerator<BaseData> enumerator =
null;
220 var enumeratorStack = factory.CreateEnumerator(request, _dataProvider);
225 enqueable.Enqueue(data);
227 subscription?.OnNewDataAvailable();
230 enumerator = enqueable;
234 var auxEnumerators =
new List<IEnumerator<BaseData>>();
237 _factorFileProvider, request.
StartTimeLocal, out var auxDataEnumator))
239 auxEnumerators.Add(auxDataEnumator);
242 EventHandler handler = (_, _) => subscription?.OnNewDataAvailable();
243 enumerator = Subscribe(request.
Configuration, handler, IsExpired);
245 if (auxEnumerators.Count > 0)
253 if (request.
Configuration.PricesShouldBeScaled(liveMode:
true))
284 enumerator = GetWarmupEnumerator(request, enumerator);
288 subscription =
new Subscription(request, subscriptionDataEnumerator, timeZoneOffsetProvider);
300 var mapFile = _mapFileProvider.ResolveMapFile(dataConfig);
301 var delistingDate = dataConfig.
Symbol.GetDelistingDate(mapFile);
305 private IEnumerator<BaseData> Subscribe(
SubscriptionDataConfig dataConfig, EventHandler newDataAvailableHandler, Func<SubscriptionDataConfig, bool> isExpired)
316 Subscription subscription =
null;
325 IEnumerator<BaseData> enumerator =
null;
328 if (timeTriggered !=
null)
330 Log.
Trace($
"LiveTradingDataFeed.CreateUniverseSubscription(): Creating user defined universe: {config.Symbol.ID}");
334 enumerator = enumeratorFactory.CreateEnumerator(request, _dataProvider);
339 _customExchange.
AddEnumerator(
new EnumeratorHandler(config.Symbol, enumerator, enqueueable));
340 enumerator = enqueueable;
346 Log.
Trace($
"LiveTradingDataFeed.CreateUniverseSubscription(): Creating {config.Type.Name} universe: {config.Symbol.ID}");
354 TimeSpan.FromMinutes(10)
356 var enumeratorStack = factory.CreateEnumerator(request, _dataProvider);
361 _customExchange.
AddEnumerator(config.Symbol, aggregator, handleData: data =>
363 enqueable.Enqueue(data);
364 subscription?.OnNewDataAvailable();
367 enumerator = GetConfiguredFrontierAwareEnumerator(enqueable, tzOffsetProvider,
369 time => time.Hour < 23 && time.Hour > 5 && time.DayOfWeek != DayOfWeek.Saturday);
373 Log.
Trace(
"LiveTradingDataFeed.CreateUniverseSubscription(): Creating option chain universe: " + config.Symbol.ID);
375 Func<SubscriptionRequest, IEnumerator<BaseData>> configure = (subRequest) =>
383 var input = Subscribe(subRequest.Configuration, (sender, args) => subscription?.OnNewDataAvailable(), (_) =>
false);
385 subRequest.Configuration.ExtendedMarketHours, localEndTime, subRequest.Configuration.Resolution,
386 subRequest.Configuration.DataTimeZone, useDailyStrictEndTimes, request.
Configuration.
Type);
392 enumerator = enumeratorFactory.CreateEnumerator(request, _dataProvider);
398 Log.
Trace(
"LiveTradingDataFeed.CreateUniverseSubscription(): Creating futures chain universe: " + config.Symbol.ID);
400 var symbolUniverse = GetUniverseProvider(
SecurityType.Future);
407 Log.
Trace(
"LiveTradingDataFeed.CreateUniverseSubscription(): Creating custom universe: " + config.Symbol.ID);
410 var enumeratorStack = factory.CreateEnumerator(request, _dataProvider);
414 _customExchange.
AddEnumerator(
new EnumeratorHandler(config.Symbol, enumerator, enqueueable));
415 enumerator = enqueueable;
418 enumerator =
AddScheduleWrapper(request, enumerator,
new PredicateTimeProvider(_frontierTimeProvider, (currentUtcDateTime) => {
420 return currentUtcDateTime.TimeOfDay > _scheduledUniverseUtcTimeShift;
423 enumerator = GetWarmupEnumerator(request, enumerator);
428 subscription =
new Subscription(request, subscriptionDataEnumerator, tzOffsetProvider);
436 private IEnumerator<BaseData> GetWarmupEnumerator(
SubscriptionRequest request, IEnumerator<BaseData> liveEnumerator)
444 if (warmupRequest.TradableDaysInDataTimeZone.Any()
447 && warmupRequest.EndTimeUtc - warmupRequest.StartTimeUtc >= warmupRequest.Configuration.Resolution.ToTimeSpan()
449 &&
LeanData.
IsValidConfiguration(warmupRequest.Configuration.SecurityType, warmupRequest.Configuration.Resolution, warmupRequest.Configuration.TickType))
453 var historyWarmup = warmupRequest;
454 var warmupHistoryStartDate = warmupRequest.EndTimeUtc.AddDays(-MaximumWarmupHistoryDaysLookBack);
455 if (warmupHistoryStartDate > warmupRequest.StartTimeUtc)
457 historyWarmup =
new SubscriptionRequest(warmupRequest, startTimeUtc: warmupHistoryStartDate);
462 var lastPointTracker =
new LastPointTracker();
466 new ConcatEnumerator(
true, GetFileBasedWarmupEnumerator(warmupRequest, lastPointTracker), GetHistoryWarmupEnumerator(historyWarmup, lastPointTracker)) { CanEmitNull =
false },
470 synchronizedWarmupEnumerator =
AddScheduleWrapper(warmupRequest, synchronizedWarmupEnumerator,
null);
473 synchronizedWarmupEnumerator =
new FilterEnumerator<BaseData>(synchronizedWarmupEnumerator, data => data ==
null || data.EndTime <= warmupRequest.EndTimeLocal);
476 liveEnumerator =
new ConcatEnumerator(
true, synchronizedWarmupEnumerator, liveEnumerator);
479 return liveEnumerator;
485 private IEnumerator<BaseData> GetFileBasedWarmupEnumerator(
SubscriptionRequest warmup, LastPointTracker lastPointTracker)
487 IEnumerator<BaseData> result =
null;
494 if (data ==
null || data.EndTime < warmup.
EndTimeLocal && !data.IsFillForward)
498 lastPointTracker.LastDataPoint = data;
507 Log.
Error(e, $
"File based warmup: {warmup.Configuration}");
515 private IEnumerator<BaseData> GetHistoryWarmupEnumerator(
SubscriptionRequest warmup, LastPointTracker lastPointTracker)
517 IEnumerator<BaseData> result;
521 result = CreateUniverseEnumerator(warmup, createUnderlyingEnumerator: (req, _) => GetHistoryWarmupEnumerator(req, lastPointTracker));
527 result =
new[] { warmup }.SelectMany(_ =>
530 if (lastPointTracker !=
null && lastPointTracker.LastDataPoint !=
null)
532 var lastPointExchangeTime = lastPointTracker.LastDataPoint.Time;
533 if (warmup.Configuration.Resolution == Resolution.Daily)
536 lastPointExchangeTime = lastPointExchangeTime.Date;
540 if (utcLastPointTime > startTimeUtc)
544 Log.Debug($
"LiveTradingDataFeed.GetHistoryWarmupEnumerator(): Adjusting history warmup start time to {utcLastPointTime} from {startTimeUtc} for {warmup.Configuration}");
546 startTimeUtc = utcLastPointTime;
552 return _algorithm.HistoryProvider.GetHistory(new[] { historyRequest }, _algorithm.
TimeZone).Select(slice =>
556 var data = slice.Get(historyRequest.DataType);
561 Log.
Error(e, $
"History warmup: {warmup.Configuration}");
570 return Enumerable.Empty<
BaseData>();
576 data => data ==
null || data.EndTime < warmup.
EndTimeLocal && !data.IsFillForward);
587 private IEnumerator<BaseData> GetConfiguredFrontierAwareEnumerator(
588 IEnumerator<BaseData> enumerator,
589 TimeZoneOffsetProvider tzOffsetProvider,
590 Func<DateTime, bool> customStepEvaluator)
592 var stepTimeProvider =
new PredicateTimeProvider(_frontierTimeProvider, customStepEvaluator);
601 throw new NotSupportedException($
"The DataQueueHandler does not support {securityType}.");
608 if (_algorithm !=
null)
610 lock (_unsupportedConfigurations)
612 var key = $
"{config.Symbol.ID.Market} {config.Symbol.ID.SecurityType} {config.Type.Name}";
613 if (_unsupportedConfigurations.Add(key))
615 Log.
Trace($
"LiveTradingDataFeed.HandleUnsupportedConfigurationEvent(): detected unsupported configuration: {config}");
617 _algorithm.
Debug($
"Warning: {key} data not supported. Please consider reviewing the data providers selection.");
626 private class EnumeratorHandler : BaseDataExchange.EnumeratorHandler
629 : base(symbol, enumerator, handleData: enqueueable.Enqueue)
631 EnumeratorFinished += (_, _) => enqueueable.
Stop();
635 private class LastPointTracker
637 public BaseData LastDataPoint {
get;
set; }