25 using System.Collections.Generic;
36 private readonly Dictionary<SubscriptionDataConfig, Queue<IDataQueueHandler>> _dataConfigAndDataHandler =
new();
43 _algorithmSettings = settings;
56 protected List<IDataQueueHandler>
DataHandlers {
get;
set; } =
new();
76 Exception failureException =
null;
82 var exchangeTimeZone = dataConfig.ExchangeTimeZone;
84 IEnumerator<BaseData> enumerator;
87 enumerator = dataHandler.Subscribe(dataConfig, immediateEmission ? newDataAvailableHandler
88 : (sender, eventArgs) => {
91 if (dataAvailable ==
null || dataAvailable.DataPoint ==
null
94 newDataAvailableHandler?.Invoke(sender, eventArgs);
98 catch (Exception exception)
101 failureException = exception;
106 if (enumerator !=
null)
108 if (!_dataConfigAndDataHandler.TryGetValue(dataConfig, out var dataQueueHandlers))
112 _dataConfigAndDataHandler[dataConfig] = dataQueueHandlers =
new Queue<IDataQueueHandler>();
114 dataQueueHandlers.Enqueue(dataHandler);
116 if (immediateEmission)
124 if (
LeanData.
UseStrictEndTime(_algorithmSettings.DailyPreciseEndTime, dataConfig.Symbol, dataConfig.Increment, exchangeHours))
136 if (failureException !=
null)
139 throw failureException;
143 if (!dataConfig.Symbol.Value.Contains(
"-UNIVERSE-", StringComparison.InvariantCultureIgnoreCase)
145 && !dataConfig.Symbol.IsCanonical())
158 if (_dataConfigAndDataHandler.TryGetValue(dataConfig, out var dataHandlers))
160 var dataHandler = dataHandlers.Dequeue();
161 dataHandler.Unsubscribe(dataConfig);
163 if (dataHandlers.Count == 0)
166 _dataConfigAndDataHandler.Remove(dataConfig);
178 Log.
Trace($
"CompositeDataQueueHandler.SetJob(): will use {dataHandlersConfig}");
179 foreach (var dataHandlerName
in dataHandlersConfig.DeserializeList())
202 dataHandler.Dispose();
213 public IEnumerable<Symbol>
LookupSymbols(
Symbol symbol,
bool includeExpired,
string securityCurrency =
null)
215 foreach (var dataHandler
in GetUniverseProviders())
217 var symbols = dataHandler.LookupSymbols(symbol, includeExpired, securityCurrency);
224 var result = symbols.ToList();
230 return Enumerable.Empty<
Symbol>();
241 return GetUniverseProviders().Any(provider => provider.CanPerformSelection());
251 if (timeProviders.Any())
253 Log.
Trace($
"DataQueueHandlerManager.InitializeFrontierTimeProvider(): will use the following IDQH frontier time providers: [{string.Join(",
", timeProviders.Select(x => x.GetType()))}]");
259 private IEnumerable<IDataQueueUniverseProvider> GetUniverseProviders()
265 yield
return universeProvider;
270 throw new NotSupportedException(
"The DataQueueHandler does not support Options and Futures.");