18 using System.Collections.Generic;
19 using System.Collections.Specialized;
40 private readonly
bool _liveMode;
41 private bool _sentUniverseScheduleWarning;
44 private List<SubscriptionDataConfig> _subscriptionDataConfigsEnumerator;
48 private readonly Dictionary<SubscriptionDataConfig, SubscriptionDataConfig> _subscriptionManagerSubscriptions =
new();
77 _timeKeeper = timeKeeper;
78 _marketHoursDatabase = marketHoursDatabase;
80 _registeredTypesProvider = registeredTypesProvider;
81 _dataPermissionManager = dataPermissionManager;
88 case NotifyCollectionChangedAction.Add:
89 foreach (var universe
in args.NewItems.OfType<
Universe>())
91 var config = universe.Configuration;
102 _marketHoursDatabase.GetExchangeHours(config),
114 var universeType = universe.GetType();
131 const int maximumLookback = 60;
133 var startLocalTime = start.ConvertFromUtc(security.Exchange.TimeZone);
134 if (universe.UniverseSettings.Schedule.Initialized)
140 if (universe.UniverseSettings.Schedule.Get(startLocalTime.Date, startLocalTime.Date).Any())
144 startLocalTime = startLocalTime.AddDays(-1);
145 if (++loopCount >= maximumLookback)
148 startLocalTime = algorithm.
UtcTime.ConvertFromUtc(security.Exchange.TimeZone);
149 if (!_sentUniverseScheduleWarning)
152 _sentUniverseScheduleWarning =
true;
153 algorithm.
Debug($
"Warning: Found no valid start time for scheduled universe, will use default");
156 }
while (loopCount < maximumLookback);
161 Time.
OneDay, 1, extendedMarketHours:
false, config.DataTimeZone,
163 start = startLocalTime.ConvertToUtc(security.Exchange.TimeZone);
176 case NotifyCollectionChangedAction.Remove:
177 foreach (var universe
in args.OldItems.OfType<
Universe>())
181 if (!universe.DisposeRequested)
189 throw new NotImplementedException(
"The specified action is not implemented: " + args.Action);
200 .Where(subscription => subscription.Configuration.FillDataForward && subscription.Configuration.Resolution !=
Resolution.Tick)
201 .SelectMany(subscription => subscription.SubscriptionRequests)
204 if(requests.Count > 0)
206 Log.
Trace($
"DataManager(): Fill forward resolution has changed from {changedEvent.Old} to {changedEvent.New} at utc: {algorithm.UtcTime}. " +
207 $
"Restarting {requests.Count} subscriptions...");
213 foreach (var request
in requests)
217 RemoveSubscriptionInternal(request.Configuration, universe: request.Universe, forceSubscriptionRemoval:
true);
221 foreach (var request
in requests)
226 var startUtc = algorithm.
UtcTime;
229 if (!algorithm.
GetLocked() && request.StartTimeUtc < startUtc)
231 startUtc = request.StartTimeUtc;
242 #region IDataFeedSubscriptionManager
261 catch (Exception err)
263 Log.
Error(err,
"DataManager.RemoveAllSubscriptions():" +
264 $
"Error removing: {subscription.Configuration}");
276 lock (_subscriptionManagerSubscriptions)
282 _subscriptionDataConfigsEnumerator =
null;
292 return subscription.Configuration.IsInternalFeed;
297 throw new InvalidOperationException($
"{DataNormalizationMode.ScaledRaw} normalization mode only intended for history requests.");
303 subscription = _dataFeed.CreateSubscription(request);
305 if (subscription ==
null)
307 Log.
Trace($
"DataManager.AddSubscription(): Unable to add subscription for: {request.Configuration}");
315 OnSubscriptionAdded(subscription);
316 Log.
Trace($
"DataManager.AddSubscription(): Added {request.Configuration}." +
317 $
" Start: {request.StartTimeUtc}. End: {request.EndTimeUtc}");
323 Log.
Debug($
"DataManager.AddSubscription(): Added {request.Configuration}." +
324 $
" Start: {request.StartTimeUtc}. End: {request.EndTimeUtc}");
339 return RemoveSubscriptionInternal(configuration, universe, forceSubscriptionRemoval:
false);
358 if (subscription.RemoveSubscriptionRequest(universe))
362 Log.
Error($
"DataManager.RemoveSubscription(): Unable to remove {configuration}");
366 _dataFeed.RemoveSubscription(subscription);
370 OnSubscriptionRemoved(subscription);
373 subscription.Dispose();
375 RemoveSubscriptionDataConfig(subscription);
377 if (forceSubscriptionRemoval)
379 subscription.MarkAsRemovedFromUniverse();
384 Log.
Trace($
"DataManager.RemoveSubscription(): Removed {configuration}");
390 Log.
Debug($
"DataManager.RemoveSubscription(): Removed {configuration}");
395 else if (universe !=
null)
400 lock (_subscriptionManagerSubscriptions)
402 if (_subscriptionManagerSubscriptions.Remove(configuration))
404 _subscriptionDataConfigsEnumerator =
null;
415 private void OnSubscriptionAdded(Subscription subscription)
424 private void OnSubscriptionRemoved(Subscription subscription)
431 #region IAlgorithmSubscriptionManager
440 lock (_subscriptionManagerSubscriptions)
442 if(_subscriptionDataConfigsEnumerator ==
null)
444 _subscriptionDataConfigsEnumerator = _subscriptionManagerSubscriptions.Values.ToList();
446 return _subscriptionDataConfigsEnumerator;
458 lock (_subscriptionManagerSubscriptions)
460 if (!_subscriptionManagerSubscriptions.TryGetValue(newConfig, out config))
462 _subscriptionManagerSubscriptions[newConfig] = config = newConfig;
463 _subscriptionDataConfigsEnumerator =
null;
468 if (!ReferenceEquals(config, newConfig))
474 Log.
Debug(
"DataManager.SubscriptionManagerGetOrAdd(): subscription already added: " + config);
491 private void RemoveSubscriptionDataConfig(
Subscription subscription)
496 lock (_subscriptionManagerSubscriptions)
498 if (_subscriptionManagerSubscriptions.Remove(subscription.
Configuration))
500 _subscriptionDataConfigsEnumerator =
null;
511 lock (_subscriptionManagerSubscriptions)
513 return _subscriptionManagerSubscriptions.Count;
517 #region ISubscriptionDataConfigService
533 bool fillForward =
true,
534 bool extendedMarketHours =
false,
535 bool isFilteredSubscription =
true,
536 bool isInternalFeed =
false,
537 bool isCustomData =
false,
540 uint contractDepthOffset = 0
543 return Add(symbol, resolution, fillForward, extendedMarketHours, isFilteredSubscription, isInternalFeed, isCustomData,
545 dataNormalizationMode, dataMappingMode, contractDepthOffset)
554 public List<SubscriptionDataConfig>
Add(
557 bool fillForward =
true,
558 bool extendedMarketHours =
false,
559 bool isFilteredSubscription =
true,
560 bool isInternalFeed =
false,
561 bool isCustomData =
false,
562 List<Tuple<Type, TickType>> subscriptionDataTypes =
null,
565 uint contractDepthOffset = 0
568 var dataTypes = subscriptionDataTypes;
569 if(dataTypes ==
null)
574 dataTypes =
new List<Tuple<Type, TickType>> {
new Tuple<Type, TickType>(type,
TickType.Trade) };
582 if (!dataTypes.Any())
584 throw new ArgumentNullException(nameof(dataTypes),
"At least one type needed to create new subscriptions");
587 var resolutionWasProvided = resolution.HasValue;
588 foreach (var typeTuple
in dataTypes)
590 var baseInstance = typeTuple.Item1.GetBaseDataInstance();
591 baseInstance.Symbol = symbol;
592 if (!resolutionWasProvided)
594 var defaultResolution = baseInstance.DefaultResolution();
595 if (resolution.HasValue && resolution != defaultResolution)
599 throw new InvalidOperationException(
600 $
"Different data types ({string.Join(",
", dataTypes.Select(tuple => tuple.Item1))})" +
601 $
" provided different default resolutions {defaultResolution} and {resolution}, this is an unexpected invalid operation.");
603 resolution = defaultResolution;
611 var supportedResolutions = baseInstance.SupportedResolutions();
612 if (supportedResolutions.Contains(resolution.Value))
617 throw new ArgumentException($
"Sorry {resolution.ToStringInvariant()} is not a supported resolution for {typeTuple.Item1.Name}" +
618 $
" and SecurityType.{symbol.SecurityType.ToStringInvariant()}." +
619 $
" Please change your AddData to use one of the supported resolutions ({string.Join(",
", supportedResolutions)}).");
623 var marketHoursDbEntry = _marketHoursDatabase.GetEntry(symbol, dataTypes.Select(tuple => tuple.Item1));
625 var exchangeHours = marketHoursDbEntry.ExchangeHours;
626 if (symbol.ID.SecurityType.IsOption() ||
632 if (marketHoursDbEntry.DataTimeZone ==
null)
634 throw new ArgumentNullException(nameof(marketHoursDbEntry.DataTimeZone),
635 "DataTimeZone is a required parameter for new subscriptions. Set to the time zone the raw data is time stamped in.");
638 if (exchangeHours.TimeZone ==
null)
640 throw new ArgumentNullException(nameof(exchangeHours.TimeZone),
641 "ExchangeTimeZone is a required parameter for new subscriptions. Set to the time zone the security exchange resides in.");
644 var result = (from subscriptionDataType in dataTypes
645 let dataType = subscriptionDataType.Item1
646 let tickType = subscriptionDataType.Item2
651 marketHoursDbEntry.DataTimeZone,
652 exchangeHours.TimeZone,
656 subscriptionDataTypes ==
null && tickType ==
TickType.OpenInterest || isInternalFeed,
658 isFilteredSubscription: isFilteredSubscription,
660 dataNormalizationMode: dataNormalizationMode,
661 dataMappingMode: dataMappingMode,
662 contractDepthOffset: contractDepthOffset)).ToList();
664 for (
int i = 0; i < result.Count; i++)
669 _registeredTypesProvider.RegisterType(result[i].Type);
690 if (symbolSecurityType !=
SecurityType.FutureOption && symbolSecurityType.IsOption())
692 return new List<Tuple<Type, TickType>> {
new Tuple<Type, TickType>(typeof(
OptionUniverse),
TickType.Quote) };
695 return new List<Tuple<Type, TickType>> {
new Tuple<Type, TickType>(typeof(
ZipEntryName),
TickType.Quote) };
702 var result = availableDataType
703 .Select(tickType =>
new Tuple<Type, TickType>(
LeanData.
GetDataType(resolution, tickType), tickType)).ToList();
718 lock (_subscriptionManagerSubscriptions)
720 return _subscriptionManagerSubscriptions.Keys.Where(config => (includeInternalConfigs || !config.IsInternalFeed)
721 && (symbol ==
null || config.Symbol.ID == symbol.ID)).ToList();