Lean  $LEAN_TAG$
DataManager.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Collections.Generic;
19 using System.Collections.Specialized;
20 using System.Linq;
21 using QuantConnect.Data;
26 using QuantConnect.Logging;
28 using QuantConnect.Util;
29 
31 {
32  /// <summary>
33  /// DataManager will manage the subscriptions for both the DataFeeds and the SubscriptionManager
34  /// </summary>
36  {
37  private readonly IDataFeed _dataFeed;
38  private readonly MarketHoursDatabase _marketHoursDatabase;
39  private readonly ITimeKeeper _timeKeeper;
40  private readonly bool _liveMode;
41  private bool _sentUniverseScheduleWarning;
42  private readonly IRegisteredSecurityDataTypesProvider _registeredTypesProvider;
43  private readonly IDataPermissionManager _dataPermissionManager;
44  private List<SubscriptionDataConfig> _subscriptionDataConfigsEnumerator;
45 
46  /// There is no ConcurrentHashSet collection in .NET,
47  /// so we use ConcurrentDictionary with byte value to minimize memory usage
48  private readonly Dictionary<SubscriptionDataConfig, SubscriptionDataConfig> _subscriptionManagerSubscriptions = new();
49 
50  /// <summary>
51  /// Event fired when a new subscription is added
52  /// </summary>
53  public event EventHandler<Subscription> SubscriptionAdded;
54 
55  /// <summary>
56  /// Event fired when an existing subscription is removed
57  /// </summary>
58  public event EventHandler<Subscription> SubscriptionRemoved;
59 
60  /// <summary>
61  /// Creates a new instance of the DataManager
62  /// </summary>
63  public DataManager(
64  IDataFeed dataFeed,
65  UniverseSelection universeSelection,
66  IAlgorithm algorithm,
67  ITimeKeeper timeKeeper,
68  MarketHoursDatabase marketHoursDatabase,
69  bool liveMode,
70  IRegisteredSecurityDataTypesProvider registeredTypesProvider,
71  IDataPermissionManager dataPermissionManager)
72  {
73  _dataFeed = dataFeed;
74  UniverseSelection = universeSelection;
77  _timeKeeper = timeKeeper;
78  _marketHoursDatabase = marketHoursDatabase;
79  _liveMode = liveMode;
80  _registeredTypesProvider = registeredTypesProvider;
81  _dataPermissionManager = dataPermissionManager;
82 
83  // wire ourselves up to receive notifications when universes are added/removed
84  algorithm.UniverseManager.CollectionChanged += (sender, args) =>
85  {
86  switch (args.Action)
87  {
88  case NotifyCollectionChangedAction.Add:
89  foreach (var universe in args.NewItems.OfType<Universe>())
90  {
91  var config = universe.Configuration;
92  var start = algorithm.UtcTime;
93 
94  var end = algorithm.LiveMode ? Time.EndOfTime
95  : algorithm.EndDate.ConvertToUtc(algorithm.TimeZone);
96 
97  Security security;
98  if (!algorithm.Securities.TryGetValue(config.Symbol, out security))
99  {
100  // create a canonical security object if it doesn't exist
101  security = new Security(
102  _marketHoursDatabase.GetExchangeHours(config),
103  config,
104  algorithm.Portfolio.CashBook[algorithm.AccountCurrency],
106  algorithm.Portfolio.CashBook,
108  new SecurityCache()
109  );
110  }
111 
112  // Let's adjust the start time to the previous tradable date
113  // so universe selection always happens right away at the start of the algorithm.
114  var universeType = universe.GetType();
115  if (
116  // We exclude the OptionChainUniverse because their selection in live trading is based on having a full bar
117  // of the underlying. In the future, option chain universe file-based selection will be improved
118  // in order to avoid this.
119  (universeType != typeof(OptionChainUniverse) || config.Symbol.SecurityType != SecurityType.FutureOption) &&
120  // We exclude the UserDefinedUniverse because their selection already happens at the algorithm start time.
121  // For instance, ETFs universe selection depends its first trigger time to be before the equity universe
122  // (the UserDefinedUniverse), because the ETFs are EndTime-indexed and that would make their first selection
123  // time to be before the algorithm start time, with the EndTime being the algorithms's start date,
124  // and both the Equity and the ETFs constituents first selection to happen together.
125  !universeType.IsAssignableTo(typeof(UserDefinedUniverse)) &&
126  // We exclude the ScheduledUniverse because it's already scheduled to run at a specific time.
127  // Adjusting the start time would cause the first selection trigger time to be before the algorithm start time,
128  // making the selection to be triggered at the first algorithm time, which would be the exact StartDate.
129  universeType != typeof(ScheduledUniverse))
130  {
131  const int maximumLookback = 60;
132  var loopCount = 0;
133  var startLocalTime = start.ConvertFromUtc(security.Exchange.TimeZone);
134  if (universe.UniverseSettings.Schedule.Initialized)
135  {
136  do
137  {
138  // determine if there's a scheduled selection time at the current start local time date, note that next
139  // we get the previous day of the first scheduled date we find, so we are sure the data is available to trigger selection
140  if (universe.UniverseSettings.Schedule.Get(startLocalTime.Date, startLocalTime.Date).Any())
141  {
142  break;
143  }
144  startLocalTime = startLocalTime.AddDays(-1);
145  if (++loopCount >= maximumLookback)
146  {
147  // fallback to the original, we found none
148  startLocalTime = algorithm.UtcTime.ConvertFromUtc(security.Exchange.TimeZone);
149  if (!_sentUniverseScheduleWarning)
150  {
151  // just in case
152  _sentUniverseScheduleWarning = true;
153  algorithm.Debug($"Warning: Found no valid start time for scheduled universe, will use default");
154  }
155  }
156  } while (loopCount < maximumLookback);
157  }
158 
159  startLocalTime = Time.GetStartTimeForTradeBars(security.Exchange.Hours, startLocalTime,
160  // disable universe selection on extended market hours, for example futures/index options have a sunday pre market we are not interested on
161  Time.OneDay, 1, extendedMarketHours: false, config.DataTimeZone,
162  LeanData.UseDailyStrictEndTimes(algorithm.Settings, config.Type, security.Symbol, Time.OneDay, security.Exchange.Hours));
163  start = startLocalTime.ConvertToUtc(security.Exchange.TimeZone);
164  }
165 
167  new SubscriptionRequest(true,
168  universe,
169  security,
170  config,
171  start,
172  end));
173  }
174  break;
175 
176  case NotifyCollectionChangedAction.Remove:
177  foreach (var universe in args.OldItems.OfType<Universe>())
178  {
179  // removing the subscription will be handled by the SubscriptionSynchronizer
180  // in the next loop as well as executing a UniverseSelection one last time.
181  if (!universe.DisposeRequested)
182  {
183  universe.Dispose();
184  }
185  }
186  break;
187 
188  default:
189  throw new NotImplementedException("The specified action is not implemented: " + args.Action);
190  }
191  };
192 
194  if (!_liveMode)
195  {
197  {
198  var requests = DataFeedSubscriptions
199  // we don't fill forward tick resolution so we don't need to touch their subscriptions
200  .Where(subscription => subscription.Configuration.FillDataForward && subscription.Configuration.Resolution != Resolution.Tick)
201  .SelectMany(subscription => subscription.SubscriptionRequests)
202  .ToList();
203 
204  if(requests.Count > 0)
205  {
206  Log.Trace($"DataManager(): Fill forward resolution has changed from {changedEvent.Old} to {changedEvent.New} at utc: {algorithm.UtcTime}. " +
207  $"Restarting {requests.Count} subscriptions...");
208 
209  // disable reentry while we remove and re add
211 
212  // remove
213  foreach (var request in requests)
214  {
215  // force because we want them actually removed even if still a member of the universe, because the FF res changed
216  // which means we will drop any data points that could be in the next potential slice being created
217  RemoveSubscriptionInternal(request.Configuration, universe: request.Universe, forceSubscriptionRemoval: true);
218  }
219 
220  // re add
221  foreach (var request in requests)
222  {
223  // If it is an add we will set time 1 tick ahead to properly sync data
224  // with next timeslice, avoid emitting now twice.
225  // We do the same in the 'TimeTriggeredUniverseSubscriptionEnumeratorFactory' when handling changes
226  var startUtc = algorithm.UtcTime;
227  // If the algorithm is not initialized (locked) the request start time can be even before the algorithm start time,
228  // like in the case of universe requests that are scheduled to run at a specific time in the past for immediate selection.
229  if (!algorithm.GetLocked() && request.StartTimeUtc < startUtc)
230  {
231  startUtc = request.StartTimeUtc;
232  }
233  AddSubscription(new SubscriptionRequest(request, startTimeUtc: startUtc.AddTicks(1)));
234  }
235 
237  }
238  };
239  }
240  }
241 
242  #region IDataFeedSubscriptionManager
243 
244  /// <summary>
245  /// Gets the data feed subscription collection
246  /// </summary>
248 
249  /// <summary>
250  /// Will remove all current <see cref="Subscription"/>
251  /// </summary>
253  {
254  // remove each subscription from our collection
255  foreach (var subscription in DataFeedSubscriptions)
256  {
257  try
258  {
259  RemoveSubscription(subscription.Configuration);
260  }
261  catch (Exception err)
262  {
263  Log.Error(err, "DataManager.RemoveAllSubscriptions():" +
264  $"Error removing: {subscription.Configuration}");
265  }
266  }
267  }
268 
269  /// <summary>
270  /// Adds a new <see cref="Subscription"/> to provide data for the specified security.
271  /// </summary>
272  /// <param name="request">Defines the <see cref="SubscriptionRequest"/> to be added</param>
273  /// <returns>True if the subscription was created and added successfully, false otherwise</returns>
275  {
276  lock (_subscriptionManagerSubscriptions)
277  {
278  // guarantee the configuration is present in our config collection
279  // this is related to GH issue 3877: where we added a configuration which we also removed
280  if(_subscriptionManagerSubscriptions.TryAdd(request.Configuration, request.Configuration))
281  {
282  _subscriptionDataConfigsEnumerator = null;
283  }
284  }
285 
286  Subscription subscription;
287  if (DataFeedSubscriptions.TryGetValue(request.Configuration, out subscription))
288  {
289  // duplicate subscription request
290  subscription.AddSubscriptionRequest(request);
291  // only result true if the existing subscription is internal, we actually added something from the users perspective
292  return subscription.Configuration.IsInternalFeed;
293  }
294 
296  {
297  throw new InvalidOperationException($"{DataNormalizationMode.ScaledRaw} normalization mode only intended for history requests.");
298  }
299 
300  // before adding the configuration to the data feed let's assert it's valid
301  _dataPermissionManager.AssertConfiguration(request.Configuration, request.StartTimeLocal, request.EndTimeLocal);
302 
303  subscription = _dataFeed.CreateSubscription(request);
304 
305  if (subscription == null)
306  {
307  Log.Trace($"DataManager.AddSubscription(): Unable to add subscription for: {request.Configuration}");
308  // subscription will be null when there's no tradeable dates for the security between the requested times, so
309  // don't even try to load the data
310  return false;
311  }
312 
313  if (_liveMode)
314  {
315  OnSubscriptionAdded(subscription);
316  Log.Trace($"DataManager.AddSubscription(): Added {request.Configuration}." +
317  $" Start: {request.StartTimeUtc}. End: {request.EndTimeUtc}");
318  }
319  else if(Log.DebuggingEnabled)
320  {
321  // for performance lets not create the message string if debugging is not enabled
322  // this can be executed many times and its in the algorithm thread
323  Log.Debug($"DataManager.AddSubscription(): Added {request.Configuration}." +
324  $" Start: {request.StartTimeUtc}. End: {request.EndTimeUtc}");
325  }
326 
327  return DataFeedSubscriptions.TryAdd(subscription);
328  }
329 
330  /// <summary>
331  /// Removes the <see cref="Subscription"/>, if it exists
332  /// </summary>
333  /// <param name="configuration">The <see cref="SubscriptionDataConfig"/> of the subscription to remove</param>
334  /// <param name="universe">Universe requesting to remove <see cref="Subscription"/>.
335  /// Default value, null, will remove all universes</param>
336  /// <returns>True if the subscription was successfully removed, false otherwise</returns>
337  public bool RemoveSubscription(SubscriptionDataConfig configuration, Universe universe = null)
338  {
339  return RemoveSubscriptionInternal(configuration, universe, forceSubscriptionRemoval: false);
340  }
341 
342  /// <summary>
343  /// Removes the <see cref="Subscription"/>, if it exists
344  /// </summary>
345  /// <param name="configuration">The <see cref="SubscriptionDataConfig"/> of the subscription to remove</param>
346  /// <param name="universe">Universe requesting to remove <see cref="Subscription"/>.
347  /// Default value, null, will remove all universes</param>
348  /// <param name="forceSubscriptionRemoval">We force the subscription removal by marking it as removed from universe, so that all it's data is dropped</param>
349  /// <returns>True if the subscription was successfully removed, false otherwise</returns>
350  private bool RemoveSubscriptionInternal(SubscriptionDataConfig configuration, Universe universe, bool forceSubscriptionRemoval)
351  {
352  // remove the subscription from our collection, if it exists
353  Subscription subscription;
354 
355  if (DataFeedSubscriptions.TryGetValue(configuration, out subscription))
356  {
357  // we remove the subscription when there are no other requests left
358  if (subscription.RemoveSubscriptionRequest(universe))
359  {
360  if (!DataFeedSubscriptions.TryRemove(configuration, out subscription))
361  {
362  Log.Error($"DataManager.RemoveSubscription(): Unable to remove {configuration}");
363  return false;
364  }
365 
366  _dataFeed.RemoveSubscription(subscription);
367 
368  if (_liveMode)
369  {
370  OnSubscriptionRemoved(subscription);
371  }
372 
373  subscription.Dispose();
374 
375  RemoveSubscriptionDataConfig(subscription);
376 
377  if (forceSubscriptionRemoval)
378  {
379  subscription.MarkAsRemovedFromUniverse();
380  }
381 
382  if (_liveMode)
383  {
384  Log.Trace($"DataManager.RemoveSubscription(): Removed {configuration}");
385  }
386  else if(Log.DebuggingEnabled)
387  {
388  // for performance lets not create the message string if debugging is not enabled
389  // this can be executed many times and its in the algorithm thread
390  Log.Debug($"DataManager.RemoveSubscription(): Removed {configuration}");
391  }
392  return true;
393  }
394  }
395  else if (universe != null)
396  {
397  // a universe requested removal of a subscription which wasn't present anymore, this can happen when a subscription ends
398  // it will get removed from the data feed subscription list, but the configuration will remain until the universe removes it
399  // why? the effect I found is that the fill models are using these subscriptions to determine which data they could use
400  lock (_subscriptionManagerSubscriptions)
401  {
402  if (_subscriptionManagerSubscriptions.Remove(configuration))
403  {
404  _subscriptionDataConfigsEnumerator = null;
405  }
406  }
407  }
408  return false;
409  }
410 
411  /// <summary>
412  /// Event invocator for the <see cref="SubscriptionAdded"/> event
413  /// </summary>
414  /// <param name="subscription">The added subscription</param>
415  private void OnSubscriptionAdded(Subscription subscription)
416  {
417  SubscriptionAdded?.Invoke(this, subscription);
418  }
419 
420  /// <summary>
421  /// Event invocator for the <see cref="SubscriptionRemoved"/> event
422  /// </summary>
423  /// <param name="subscription">The removed subscription</param>
424  private void OnSubscriptionRemoved(Subscription subscription)
425  {
426  SubscriptionRemoved?.Invoke(this, subscription);
427  }
428 
429  #endregion
430 
431  #region IAlgorithmSubscriptionManager
432 
433  /// <summary>
434  /// Gets all the current data config subscriptions that are being processed for the SubscriptionManager
435  /// </summary>
436  public IEnumerable<SubscriptionDataConfig> SubscriptionManagerSubscriptions
437  {
438  get
439  {
440  lock (_subscriptionManagerSubscriptions)
441  {
442  if(_subscriptionDataConfigsEnumerator == null)
443  {
444  _subscriptionDataConfigsEnumerator = _subscriptionManagerSubscriptions.Values.ToList();
445  }
446  return _subscriptionDataConfigsEnumerator;
447  }
448  }
449  }
450 
451  /// <summary>
452  /// Gets existing or adds new <see cref="SubscriptionDataConfig" />
453  /// </summary>
454  /// <returns>Returns the SubscriptionDataConfig instance used</returns>
456  {
457  SubscriptionDataConfig config;
458  lock (_subscriptionManagerSubscriptions)
459  {
460  if (!_subscriptionManagerSubscriptions.TryGetValue(newConfig, out config))
461  {
462  _subscriptionManagerSubscriptions[newConfig] = config = newConfig;
463  _subscriptionDataConfigsEnumerator = null;
464  }
465  }
466 
467  // if the reference is not the same, means it was already there and we did not add anything new
468  if (!ReferenceEquals(config, newConfig))
469  {
470  // for performance lets not create the message string if debugging is not enabled
471  // this can be executed many times and its in the algorithm thread
472  if (Log.DebuggingEnabled)
473  {
474  Log.Debug("DataManager.SubscriptionManagerGetOrAdd(): subscription already added: " + config);
475  }
476  }
477  else
478  {
479  // add the time zone to our time keeper
480  _timeKeeper.AddTimeZone(newConfig.ExchangeTimeZone);
481  }
482 
483  return config;
484  }
485 
486  /// <summary>
487  /// Will try to remove a <see cref="SubscriptionDataConfig"/> and update the corresponding
488  /// consumers accordingly
489  /// </summary>
490  /// <param name="subscription">The <see cref="Subscription"/> owning the configuration to remove</param>
491  private void RemoveSubscriptionDataConfig(Subscription subscription)
492  {
493  // the subscription could of ended but might still be part of the universe
494  if (subscription.RemovedFromUniverse.Value)
495  {
496  lock (_subscriptionManagerSubscriptions)
497  {
498  if (_subscriptionManagerSubscriptions.Remove(subscription.Configuration))
499  {
500  _subscriptionDataConfigsEnumerator = null;
501  }
502  }
503  }
504  }
505 
506  /// <summary>
507  /// Returns the amount of data config subscriptions processed for the SubscriptionManager
508  /// </summary>
510  {
511  lock (_subscriptionManagerSubscriptions)
512  {
513  return _subscriptionManagerSubscriptions.Count;
514  }
515  }
516 
517  #region ISubscriptionDataConfigService
518 
519  /// <summary>
520  /// The different <see cref="TickType" /> each <see cref="SecurityType" /> supports
521  /// </summary>
522  public Dictionary<SecurityType, List<TickType>> AvailableDataTypes { get; }
523 
524  /// <summary>
525  /// Creates and adds a list of <see cref="SubscriptionDataConfig" /> for a given symbol and configuration.
526  /// Can optionally pass in desired subscription data type to use.
527  /// If the config already existed will return existing instance instead
528  /// </summary>
530  Type dataType,
531  Symbol symbol,
532  Resolution? resolution = null,
533  bool fillForward = true,
534  bool extendedMarketHours = false,
535  bool isFilteredSubscription = true,
536  bool isInternalFeed = false,
537  bool isCustomData = false,
538  DataNormalizationMode dataNormalizationMode = DataNormalizationMode.Adjusted,
539  DataMappingMode dataMappingMode = DataMappingMode.OpenInterest,
540  uint contractDepthOffset = 0
541  )
542  {
543  return Add(symbol, resolution, fillForward, extendedMarketHours, isFilteredSubscription, isInternalFeed, isCustomData,
544  new List<Tuple<Type, TickType>> { new Tuple<Type, TickType>(dataType, LeanData.GetCommonTickTypeForCommonDataTypes(dataType, symbol.SecurityType))},
545  dataNormalizationMode, dataMappingMode, contractDepthOffset)
546  .First();
547  }
548 
549  /// <summary>
550  /// Creates and adds a list of <see cref="SubscriptionDataConfig" /> for a given symbol and configuration.
551  /// Can optionally pass in desired subscription data types to use.
552  /// If the config already existed will return existing instance instead
553  /// </summary>
554  public List<SubscriptionDataConfig> Add(
555  Symbol symbol,
556  Resolution? resolution = null,
557  bool fillForward = true,
558  bool extendedMarketHours = false,
559  bool isFilteredSubscription = true,
560  bool isInternalFeed = false,
561  bool isCustomData = false,
562  List<Tuple<Type, TickType>> subscriptionDataTypes = null,
563  DataNormalizationMode dataNormalizationMode = DataNormalizationMode.Adjusted,
564  DataMappingMode dataMappingMode = DataMappingMode.OpenInterest,
565  uint contractDepthOffset = 0
566  )
567  {
568  var dataTypes = subscriptionDataTypes;
569  if(dataTypes == null)
570  {
571  if (symbol.SecurityType == SecurityType.Base && SecurityIdentifier.TryGetCustomDataTypeInstance(symbol.ID.Symbol, out var type))
572  {
573  // we've detected custom data request if we find a type let's use it
574  dataTypes = new List<Tuple<Type, TickType>> { new Tuple<Type, TickType>(type, TickType.Trade) };
575  }
576  else
577  {
578  dataTypes = LookupSubscriptionConfigDataTypes(symbol.SecurityType, resolution ?? Resolution.Minute, symbol.IsCanonical());
579  }
580  }
581 
582  if (!dataTypes.Any())
583  {
584  throw new ArgumentNullException(nameof(dataTypes), "At least one type needed to create new subscriptions");
585  }
586 
587  var resolutionWasProvided = resolution.HasValue;
588  foreach (var typeTuple in dataTypes)
589  {
590  var baseInstance = typeTuple.Item1.GetBaseDataInstance();
591  baseInstance.Symbol = symbol;
592  if (!resolutionWasProvided)
593  {
594  var defaultResolution = baseInstance.DefaultResolution();
595  if (resolution.HasValue && resolution != defaultResolution)
596  {
597  // we are here because there are multiple 'dataTypes'.
598  // if we get different default resolutions lets throw, this shouldn't happen
599  throw new InvalidOperationException(
600  $"Different data types ({string.Join(",", dataTypes.Select(tuple => tuple.Item1))})" +
601  $" provided different default resolutions {defaultResolution} and {resolution}, this is an unexpected invalid operation.");
602  }
603  resolution = defaultResolution;
604  }
605  else
606  {
607  // only assert resolution in backtesting, live can use other data source
608  // for example daily data for options
609  if (!_liveMode)
610  {
611  var supportedResolutions = baseInstance.SupportedResolutions();
612  if (supportedResolutions.Contains(resolution.Value))
613  {
614  continue;
615  }
616 
617  throw new ArgumentException($"Sorry {resolution.ToStringInvariant()} is not a supported resolution for {typeTuple.Item1.Name}" +
618  $" and SecurityType.{symbol.SecurityType.ToStringInvariant()}." +
619  $" Please change your AddData to use one of the supported resolutions ({string.Join(",", supportedResolutions)}).");
620  }
621  }
622  }
623  var marketHoursDbEntry = _marketHoursDatabase.GetEntry(symbol, dataTypes.Select(tuple => tuple.Item1));
624 
625  var exchangeHours = marketHoursDbEntry.ExchangeHours;
626  if (symbol.ID.SecurityType.IsOption() ||
627  symbol.ID.SecurityType == SecurityType.Index)
628  {
629  dataNormalizationMode = DataNormalizationMode.Raw;
630  }
631 
632  if (marketHoursDbEntry.DataTimeZone == null)
633  {
634  throw new ArgumentNullException(nameof(marketHoursDbEntry.DataTimeZone),
635  "DataTimeZone is a required parameter for new subscriptions. Set to the time zone the raw data is time stamped in.");
636  }
637 
638  if (exchangeHours.TimeZone == null)
639  {
640  throw new ArgumentNullException(nameof(exchangeHours.TimeZone),
641  "ExchangeTimeZone is a required parameter for new subscriptions. Set to the time zone the security exchange resides in.");
642  }
643 
644  var result = (from subscriptionDataType in dataTypes
645  let dataType = subscriptionDataType.Item1
646  let tickType = subscriptionDataType.Item2
647  select new SubscriptionDataConfig(
648  dataType,
649  symbol,
650  resolution.Value,
651  marketHoursDbEntry.DataTimeZone,
652  exchangeHours.TimeZone,
653  fillForward,
654  extendedMarketHours,
655  // if the subscription data types were not provided and the tick type is OpenInterest we make it internal
656  subscriptionDataTypes == null && tickType == TickType.OpenInterest || isInternalFeed,
657  isCustomData,
658  isFilteredSubscription: isFilteredSubscription,
659  tickType: tickType,
660  dataNormalizationMode: dataNormalizationMode,
661  dataMappingMode: dataMappingMode,
662  contractDepthOffset: contractDepthOffset)).ToList();
663 
664  for (int i = 0; i < result.Count; i++)
665  {
666  result[i] = SubscriptionManagerGetOrAdd(result[i]);
667 
668  // track all registered data types
669  _registeredTypesProvider.RegisterType(result[i].Type);
670  }
671  return result;
672  }
673 
674  /// <summary>
675  /// Get the data feed types for a given <see cref="SecurityType" /> <see cref="Resolution" />
676  /// </summary>
677  /// <param name="symbolSecurityType">The <see cref="SecurityType" /> used to determine the types</param>
678  /// <param name="resolution">The resolution of the data requested</param>
679  /// <param name="isCanonical">Indicates whether the security is Canonical (future and options)</param>
680  /// <returns>Types that should be added to the <see cref="SubscriptionDataConfig" /></returns>
681  /// <remarks>TODO: data type additions are very related to ticktype and should be more generic/independent of each other</remarks>
682  public List<Tuple<Type, TickType>> LookupSubscriptionConfigDataTypes(
683  SecurityType symbolSecurityType,
684  Resolution resolution,
685  bool isCanonical
686  )
687  {
688  if (isCanonical)
689  {
690  if (symbolSecurityType != SecurityType.FutureOption && symbolSecurityType.IsOption())
691  {
692  return new List<Tuple<Type, TickType>> { new Tuple<Type, TickType>(typeof(OptionUniverse), TickType.Quote) };
693  }
694 
695  return new List<Tuple<Type, TickType>> { new Tuple<Type, TickType>(typeof(ZipEntryName), TickType.Quote) };
696  }
697 
698  IEnumerable<TickType> availableDataType = AvailableDataTypes[symbolSecurityType]
699  // Equities will only look for trades in case of low resolutions.
700  .Where(tickType => LeanData.IsValidConfiguration(symbolSecurityType, resolution, tickType));
701 
702  var result = availableDataType
703  .Select(tickType => new Tuple<Type, TickType>(LeanData.GetDataType(resolution, tickType), tickType)).ToList();
704 
705  if(symbolSecurityType == SecurityType.CryptoFuture)
706  {
707  result.Add(new Tuple<Type, TickType>(typeof(MarginInterestRate), TickType.Quote));
708  }
709  return result;
710  }
711 
712  /// <summary>
713  /// Gets a list of all registered <see cref="SubscriptionDataConfig"/> for a given <see cref="Symbol"/>
714  /// </summary>
715  /// <remarks>Will not return internal subscriptions by default</remarks>
716  public List<SubscriptionDataConfig> GetSubscriptionDataConfigs(Symbol symbol = null, bool includeInternalConfigs = false)
717  {
718  lock (_subscriptionManagerSubscriptions)
719  {
720  return _subscriptionManagerSubscriptions.Keys.Where(config => (includeInternalConfigs || !config.IsInternalFeed)
721  && (symbol == null || config.Symbol.ID == symbol.ID)).ToList();
722  }
723  }
724 
725  #endregion
726 
727  #endregion
728 
729  #region IDataManager
730 
731  /// <summary>
732  /// Get the universe selection instance
733  /// </summary>
735 
736  #endregion
737  }
738 }