Lean  $LEAN_TAG$
DataQueueHandlerManager.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Linq;
18 using QuantConnect.Util;
19 using QuantConnect.Data;
20 using QuantConnect.Packets;
21 using QuantConnect.Logging;
25 using System.Collections.Generic;
27 
29 {
30  /// <summary>
31  /// This is an implementation of <see cref="IDataQueueHandler"/> used to handle multiple live datafeeds
32  /// </summary>
34  {
35  private readonly IAlgorithmSettings _algorithmSettings;
36  private readonly Dictionary<SubscriptionDataConfig, Queue<IDataQueueHandler>> _dataConfigAndDataHandler = new();
37 
38  /// <summary>
39  /// Creates a new instance
40  /// </summary>
42  {
43  _algorithmSettings = settings;
44  }
45 
46  /// <summary>
47  /// Frontier time provider to use
48  /// </summary>
49  /// <remarks>Protected for testing purposes</remarks>
50  protected ITimeProvider FrontierTimeProvider { get; set; }
51 
52  /// <summary>
53  /// Collection of data queue handles being used
54  /// </summary>
55  /// <remarks>Protected for testing purposes</remarks>
56  protected List<IDataQueueHandler> DataHandlers { get; set; } = new();
57 
58  /// <summary>
59  /// True if the composite queue handler has any <see cref="IDataQueueUniverseProvider"/> instance
60  /// </summary>
62 
63  /// <summary>
64  /// Event triggered when an unsupported configuration is detected
65  /// </summary>
66  public event EventHandler<SubscriptionDataConfig> UnsupportedConfiguration;
67 
68  /// <summary>
69  /// Subscribe to the specified configuration
70  /// </summary>
71  /// <param name="dataConfig">defines the parameters to subscribe to a data feed</param>
72  /// <param name="newDataAvailableHandler">handler to be fired on new data available</param>
73  /// <returns>The new enumerator for this subscription request</returns>
74  public IEnumerator<BaseData> Subscribe(SubscriptionDataConfig dataConfig, EventHandler newDataAvailableHandler)
75  {
76  Exception failureException = null;
77  foreach (var dataHandler in DataHandlers)
78  {
79  // Emit ticks & custom data as soon as we get them, they don't need any kind of batching behavior applied to them
80  // only use the frontier time provider if we need to
81  var immediateEmission = dataConfig.Resolution == Resolution.Tick || dataConfig.IsCustomData || FrontierTimeProvider == null;
82  var exchangeTimeZone = dataConfig.ExchangeTimeZone;
83 
84  IEnumerator<BaseData> enumerator;
85  try
86  {
87  enumerator = dataHandler.Subscribe(dataConfig, immediateEmission ? newDataAvailableHandler
88  : (sender, eventArgs) => {
89  // let's only wake up the main thread if the data point is allowed to be emitted, else we could fill forward previous bar and not let this one through
90  var dataAvailable = eventArgs as NewDataAvailableEventArgs;
91  if (dataAvailable == null || dataAvailable.DataPoint == null
92  || dataAvailable.DataPoint.EndTime.ConvertToUtc(exchangeTimeZone) <= FrontierTimeProvider.GetUtcNow())
93  {
94  newDataAvailableHandler?.Invoke(sender, eventArgs);
95  }
96  });
97  }
98  catch (Exception exception)
99  {
100  // we will try the next DQH if any, if it handles the request correctly we ignore the error
101  failureException = exception;
102  continue;
103  }
104 
105  // Check if the enumerator is not empty
106  if (enumerator != null)
107  {
108  if (!_dataConfigAndDataHandler.TryGetValue(dataConfig, out var dataQueueHandlers))
109  {
110  // we can get the same subscription request multiple times, the aggregator manager handles updating each enumerator
111  // but we need to keep track so we can call unsubscribe later to the target data queue handler
112  _dataConfigAndDataHandler[dataConfig] = dataQueueHandlers = new Queue<IDataQueueHandler>();
113  }
114  dataQueueHandlers.Enqueue(dataHandler);
115 
116  if (immediateEmission)
117  {
118  return enumerator;
119  }
120 
121  var utcStartTime = FrontierTimeProvider.GetUtcNow();
122 
123  var exchangeHours = MarketHoursDatabase.FromDataFolder().GetExchangeHours(dataConfig.Symbol.ID.Market, dataConfig.Symbol, dataConfig.Symbol.SecurityType);
124  if (LeanData.UseStrictEndTime(_algorithmSettings.DailyPreciseEndTime, dataConfig.Symbol, dataConfig.Increment, exchangeHours))
125  {
126  // before the first frontier enumerator we adjust the endtimes if required
127  enumerator = new StrictDailyEndTimesEnumerator(enumerator, exchangeHours, utcStartTime.ConvertFromUtc(exchangeTimeZone));
128  }
129 
130  return new FrontierAwareEnumerator(enumerator, FrontierTimeProvider,
131  new TimeZoneOffsetProvider(exchangeTimeZone, utcStartTime, Time.EndOfTime)
132  );
133  }
134  }
135 
136  if (failureException != null)
137  {
138  // we were not able to serve the request with any DQH and we got an exception, let's bubble it up
139  throw failureException;
140  }
141 
142  // filter out warning for expected cases to reduce noise
143  if (!dataConfig.Symbol.Value.Contains("-UNIVERSE-", StringComparison.InvariantCultureIgnoreCase)
144  && dataConfig.Type != typeof(Delisting)
145  && !dataConfig.Symbol.IsCanonical())
146  {
147  UnsupportedConfiguration?.Invoke(this, dataConfig);
148  }
149  return null;
150  }
151 
152  /// <summary>
153  /// Removes the specified configuration
154  /// </summary>
155  /// <param name="dataConfig">Subscription config to be removed</param>
156  public virtual void Unsubscribe(SubscriptionDataConfig dataConfig)
157  {
158  if (_dataConfigAndDataHandler.TryGetValue(dataConfig, out var dataHandlers))
159  {
160  var dataHandler = dataHandlers.Dequeue();
161  dataHandler.Unsubscribe(dataConfig);
162 
163  if (dataHandlers.Count == 0)
164  {
165  // nothing left
166  _dataConfigAndDataHandler.Remove(dataConfig);
167  }
168  }
169  }
170 
171  /// <summary>
172  /// Sets the job we're subscribing for
173  /// </summary>
174  /// <param name="job">Job we're subscribing for</param>
175  public void SetJob(LiveNodePacket job)
176  {
177  var dataHandlersConfig = job.DataQueueHandler;
178  Log.Trace($"CompositeDataQueueHandler.SetJob(): will use {dataHandlersConfig}");
179  foreach (var dataHandlerName in dataHandlersConfig.DeserializeList())
180  {
181  var dataHandler = Composer.Instance.GetExportedValueByTypeName<IDataQueueHandler>(dataHandlerName);
182  dataHandler.SetJob(job);
183  DataHandlers.Add(dataHandler);
184  }
185 
187  }
188 
189  /// <summary>
190  /// Returns whether the data provider is connected
191  /// </summary>
192  /// <returns>true if the data provider is connected</returns>
193  public bool IsConnected => true;
194 
195  /// <summary>
196  /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
197  /// </summary>
198  public void Dispose()
199  {
200  foreach (var dataHandler in DataHandlers)
201  {
202  dataHandler.Dispose();
203  }
204  }
205 
206  /// <summary>
207  /// Method returns a collection of Symbols that are available at the data source.
208  /// </summary>
209  /// <param name="symbol">Symbol to lookup</param>
210  /// <param name="includeExpired">Include expired contracts</param>
211  /// <param name="securityCurrency">Expected security currency(if any)</param>
212  /// <returns>Enumerable of Symbols, that are associated with the provided Symbol</returns>
213  public IEnumerable<Symbol> LookupSymbols(Symbol symbol, bool includeExpired, string securityCurrency = null)
214  {
215  foreach (var dataHandler in GetUniverseProviders())
216  {
217  var symbols = dataHandler.LookupSymbols(symbol, includeExpired, securityCurrency);
218  if (symbols == null)
219  {
220  // the universe provider does not support it
221  continue;
222  }
223 
224  var result = symbols.ToList();
225  if (result.Any())
226  {
227  return result;
228  }
229  }
230  return Enumerable.Empty<Symbol>();
231  }
232 
233  /// <summary>
234  /// Returns whether selection can take place or not.
235  /// </summary>
236  /// <remarks>This is useful to avoid a selection taking place during invalid times, for example IB reset times or when not connected,
237  /// because if allowed selection would fail since IB isn't running and would kill the algorithm</remarks>
238  /// <returns>True if selection can take place</returns>
239  public bool CanPerformSelection()
240  {
241  return GetUniverseProviders().Any(provider => provider.CanPerformSelection());
242  }
243 
244  /// <summary>
245  /// Creates the frontier time provider instance
246  /// </summary>
247  /// <remarks>Protected for testing purposes</remarks>
249  {
250  var timeProviders = DataHandlers.OfType<ITimeProvider>().ToList();
251  if (timeProviders.Any())
252  {
253  Log.Trace($"DataQueueHandlerManager.InitializeFrontierTimeProvider(): will use the following IDQH frontier time providers: [{string.Join(",", timeProviders.Select(x => x.GetType()))}]");
254  return new CompositeTimeProvider(timeProviders);
255  }
256  return null;
257  }
258 
259  private IEnumerable<IDataQueueUniverseProvider> GetUniverseProviders()
260  {
261  var yielded = false;
262  foreach (var universeProvider in DataHandlers.OfType<IDataQueueUniverseProvider>())
263  {
264  yielded = true;
265  yield return universeProvider;
266  }
267 
268  if (!yielded)
269  {
270  throw new NotSupportedException("The DataQueueHandler does not support Options and Futures.");
271  }
272  }
273  }
274 }