Lean  $LEAN_TAG$
DataQueueHandlerManager.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Linq;
18 using QuantConnect.Util;
19 using QuantConnect.Data;
20 using QuantConnect.Packets;
21 using QuantConnect.Logging;
25 using System.Collections.Generic;
27 
29 {
30  /// <summary>
31  /// This is an implementation of <see cref="IDataQueueHandler"/> used to handle multiple live datafeeds
32  /// </summary>
34  {
35  private ITimeProvider _frontierTimeProvider;
36  private readonly IAlgorithmSettings _algorithmSettings;
37  private readonly Dictionary<SubscriptionDataConfig, Queue<IDataQueueHandler>> _dataConfigAndDataHandler = new();
38 
39  /// <summary>
40  /// Creates a new instance
41  /// </summary>
43  {
44  _algorithmSettings = settings;
45  }
46 
47  /// <summary>
48  /// Collection of data queue handles being used
49  /// </summary>
50  /// <remarks>Protected for testing purposes</remarks>
51  protected List<IDataQueueHandler> DataHandlers { get; set; } = new();
52 
53  /// <summary>
54  /// True if the composite queue handler has any <see cref="IDataQueueUniverseProvider"/> instance
55  /// </summary>
57 
58  /// <summary>
59  /// Event triggered when an unsupported configuration is detected
60  /// </summary>
61  public event EventHandler<SubscriptionDataConfig> UnsupportedConfiguration;
62 
63  /// <summary>
64  /// Subscribe to the specified configuration
65  /// </summary>
66  /// <param name="dataConfig">defines the parameters to subscribe to a data feed</param>
67  /// <param name="newDataAvailableHandler">handler to be fired on new data available</param>
68  /// <returns>The new enumerator for this subscription request</returns>
69  public IEnumerator<BaseData> Subscribe(SubscriptionDataConfig dataConfig, EventHandler newDataAvailableHandler)
70  {
71  Exception failureException = null;
72  foreach (var dataHandler in DataHandlers)
73  {
74  // Emit ticks & custom data as soon as we get them, they don't need any kind of batching behavior applied to them
75  // only use the frontier time provider if we need to
76  var immediateEmission = dataConfig.Resolution == Resolution.Tick || dataConfig.IsCustomData || _frontierTimeProvider == null;
77  var exchangeTimeZone = dataConfig.ExchangeTimeZone;
78 
79  IEnumerator<BaseData> enumerator;
80  try
81  {
82  enumerator = dataHandler.Subscribe(dataConfig, immediateEmission ? newDataAvailableHandler
83  : (sender, eventArgs) => {
84  // let's only wake up the main thread if the data point is allowed to be emitted, else we could fill forward previous bar and not let this one through
85  var dataAvailable = eventArgs as NewDataAvailableEventArgs;
86  if (dataAvailable == null || dataAvailable.DataPoint == null
87  || dataAvailable.DataPoint.EndTime.ConvertToUtc(exchangeTimeZone) <= _frontierTimeProvider.GetUtcNow())
88  {
89  newDataAvailableHandler?.Invoke(sender, eventArgs);
90  }
91  });
92  }
93  catch (Exception exception)
94  {
95  // we will try the next DQH if any, if it handles the request correctly we ignore the error
96  failureException = exception;
97  continue;
98  }
99 
100  // Check if the enumerator is not empty
101  if (enumerator != null)
102  {
103  if (!_dataConfigAndDataHandler.TryGetValue(dataConfig, out var dataQueueHandlers))
104  {
105  // we can get the same subscription request multiple times, the aggregator manager handles updating each enumerator
106  // but we need to keep track so we can call unsubscribe later to the target data queue handler
107  _dataConfigAndDataHandler[dataConfig] = dataQueueHandlers = new Queue<IDataQueueHandler>();
108  }
109  dataQueueHandlers.Enqueue(dataHandler);
110 
111  if (immediateEmission)
112  {
113  return enumerator;
114  }
115 
116  var utcStartTime = _frontierTimeProvider.GetUtcNow();
117 
118  var exchangeHours = MarketHoursDatabase.FromDataFolder().GetExchangeHours(dataConfig.Symbol.ID.Market, dataConfig.Symbol, dataConfig.Symbol.SecurityType);
119  if (LeanData.UseStrictEndTime(_algorithmSettings.DailyPreciseEndTime, dataConfig.Symbol, dataConfig.Increment, exchangeHours))
120  {
121  // before the first frontier enumerator we adjust the endtimes if required
122  enumerator = new StrictDailyEndTimesEnumerator(enumerator, exchangeHours, utcStartTime.ConvertFromUtc(exchangeTimeZone));
123  }
124 
125  return new FrontierAwareEnumerator(enumerator, _frontierTimeProvider,
126  new TimeZoneOffsetProvider(exchangeTimeZone, utcStartTime, Time.EndOfTime)
127  );
128  }
129  }
130 
131  if (failureException != null)
132  {
133  // we were not able to serve the request with any DQH and we got an exception, let's bubble it up
134  throw failureException;
135  }
136 
137  // filter out warning for expected cases to reduce noise
138  if (!dataConfig.Symbol.Value.Contains("-UNIVERSE-", StringComparison.InvariantCultureIgnoreCase)
139  && dataConfig.Type != typeof(Delisting)
140  && !dataConfig.Symbol.IsCanonical())
141  {
142  UnsupportedConfiguration?.Invoke(this, dataConfig);
143  }
144  return null;
145  }
146 
147  /// <summary>
148  /// Removes the specified configuration
149  /// </summary>
150  /// <param name="dataConfig">Subscription config to be removed</param>
151  public virtual void Unsubscribe(SubscriptionDataConfig dataConfig)
152  {
153  if (_dataConfigAndDataHandler.TryGetValue(dataConfig, out var dataHandlers))
154  {
155  var dataHandler = dataHandlers.Dequeue();
156  dataHandler.Unsubscribe(dataConfig);
157 
158  if (dataHandlers.Count == 0)
159  {
160  // nothing left
161  _dataConfigAndDataHandler.Remove(dataConfig);
162  }
163  }
164  }
165 
166  /// <summary>
167  /// Sets the job we're subscribing for
168  /// </summary>
169  /// <param name="job">Job we're subscribing for</param>
170  public void SetJob(LiveNodePacket job)
171  {
172  var dataHandlersConfig = job.DataQueueHandler;
173  Log.Trace($"CompositeDataQueueHandler.SetJob(): will use {dataHandlersConfig}");
174  foreach (var dataHandlerName in dataHandlersConfig.DeserializeList())
175  {
176  var dataHandler = Composer.Instance.GetExportedValueByTypeName<IDataQueueHandler>(dataHandlerName);
177  dataHandler.SetJob(job);
178  DataHandlers.Add(dataHandler);
179  }
180 
181  _frontierTimeProvider = InitializeFrontierTimeProvider();
182  }
183 
184  /// <summary>
185  /// Returns whether the data provider is connected
186  /// </summary>
187  /// <returns>true if the data provider is connected</returns>
188  public bool IsConnected => true;
189 
190  /// <summary>
191  /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
192  /// </summary>
193  public void Dispose()
194  {
195  foreach (var dataHandler in DataHandlers)
196  {
197  dataHandler.Dispose();
198  }
199  }
200 
201  /// <summary>
202  /// Method returns a collection of Symbols that are available at the data source.
203  /// </summary>
204  /// <param name="symbol">Symbol to lookup</param>
205  /// <param name="includeExpired">Include expired contracts</param>
206  /// <param name="securityCurrency">Expected security currency(if any)</param>
207  /// <returns>Enumerable of Symbols, that are associated with the provided Symbol</returns>
208  public IEnumerable<Symbol> LookupSymbols(Symbol symbol, bool includeExpired, string securityCurrency = null)
209  {
210  foreach (var dataHandler in GetUniverseProviders())
211  {
212  var symbols = dataHandler.LookupSymbols(symbol, includeExpired, securityCurrency);
213  if (symbols == null)
214  {
215  // the universe provider does not support it
216  continue;
217  }
218 
219  var result = symbols.ToList();
220  if (result.Any())
221  {
222  return result;
223  }
224  }
225  return Enumerable.Empty<Symbol>();
226  }
227 
228  /// <summary>
229  /// Returns whether selection can take place or not.
230  /// </summary>
231  /// <remarks>This is useful to avoid a selection taking place during invalid times, for example IB reset times or when not connected,
232  /// because if allowed selection would fail since IB isn't running and would kill the algorithm</remarks>
233  /// <returns>True if selection can take place</returns>
234  public bool CanPerformSelection()
235  {
236  return GetUniverseProviders().Any(provider => provider.CanPerformSelection());
237  }
238 
239  /// <summary>
240  /// Creates the frontier time provider instance
241  /// </summary>
242  /// <remarks>Protected for testing purposes</remarks>
244  {
245  var timeProviders = DataHandlers.OfType<ITimeProvider>().ToList();
246  if (timeProviders.Any())
247  {
248  Log.Trace($"DataQueueHandlerManager.InitializeFrontierTimeProvider(): will use the following IDQH frontier time providers: [{string.Join(",", timeProviders.Select(x => x.GetType()))}]");
249  return new CompositeTimeProvider(timeProviders);
250  }
251  return null;
252  }
253 
254  private IEnumerable<IDataQueueUniverseProvider> GetUniverseProviders()
255  {
256  var yielded = false;
257  foreach (var universeProvider in DataHandlers.OfType<IDataQueueUniverseProvider>())
258  {
259  yielded = true;
260  yield return universeProvider;
261  }
262 
263  if (!yielded)
264  {
265  throw new NotSupportedException("The DataQueueHandler does not support Options and Futures.");
266  }
267  }
268  }
269 }