25 using System.Collections.Generic;
37 private readonly Dictionary<SubscriptionDataConfig, Queue<IDataQueueHandler>> _dataConfigAndDataHandler =
new();
44 _algorithmSettings = settings;
51 protected List<IDataQueueHandler>
DataHandlers {
get;
set; } =
new();
71 Exception failureException =
null;
76 var immediateEmission = dataConfig.Resolution ==
Resolution.Tick || dataConfig.IsCustomData || _frontierTimeProvider ==
null;
77 var exchangeTimeZone = dataConfig.ExchangeTimeZone;
79 IEnumerator<BaseData> enumerator;
82 enumerator = dataHandler.Subscribe(dataConfig, immediateEmission ? newDataAvailableHandler
83 : (sender, eventArgs) => {
86 if (dataAvailable ==
null || dataAvailable.DataPoint ==
null
87 || dataAvailable.DataPoint.EndTime.ConvertToUtc(exchangeTimeZone) <= _frontierTimeProvider.
GetUtcNow())
89 newDataAvailableHandler?.Invoke(sender, eventArgs);
93 catch (Exception exception)
96 failureException = exception;
101 if (enumerator !=
null)
103 if (!_dataConfigAndDataHandler.TryGetValue(dataConfig, out var dataQueueHandlers))
107 _dataConfigAndDataHandler[dataConfig] = dataQueueHandlers =
new Queue<IDataQueueHandler>();
109 dataQueueHandlers.Enqueue(dataHandler);
111 if (immediateEmission)
116 var utcStartTime = _frontierTimeProvider.
GetUtcNow();
119 if (
LeanData.
UseStrictEndTime(_algorithmSettings.DailyPreciseEndTime, dataConfig.Symbol, dataConfig.Increment, exchangeHours))
131 if (failureException !=
null)
134 throw failureException;
138 if (!dataConfig.Symbol.Value.Contains(
"-UNIVERSE-", StringComparison.InvariantCultureIgnoreCase)
140 && !dataConfig.Symbol.IsCanonical())
153 if (_dataConfigAndDataHandler.TryGetValue(dataConfig, out var dataHandlers))
155 var dataHandler = dataHandlers.Dequeue();
156 dataHandler.Unsubscribe(dataConfig);
158 if (dataHandlers.Count == 0)
161 _dataConfigAndDataHandler.Remove(dataConfig);
173 Log.
Trace($
"CompositeDataQueueHandler.SetJob(): will use {dataHandlersConfig}");
174 foreach (var dataHandlerName
in dataHandlersConfig.DeserializeList())
197 dataHandler.Dispose();
208 public IEnumerable<Symbol>
LookupSymbols(
Symbol symbol,
bool includeExpired,
string securityCurrency =
null)
210 foreach (var dataHandler
in GetUniverseProviders())
212 var symbols = dataHandler.LookupSymbols(symbol, includeExpired, securityCurrency);
219 var result = symbols.ToList();
225 return Enumerable.Empty<
Symbol>();
236 return GetUniverseProviders().Any(provider => provider.CanPerformSelection());
246 if (timeProviders.Any())
248 Log.
Trace($
"DataQueueHandlerManager.InitializeFrontierTimeProvider(): will use the following IDQH frontier time providers: [{string.Join(",
", timeProviders.Select(x => x.GetType()))}]");
254 private IEnumerable<IDataQueueUniverseProvider> GetUniverseProviders()
260 yield
return universeProvider;
265 throw new NotSupportedException(
"The DataQueueHandler does not support Options and Futures.");