Lean  $LEAN_TAG$
SubscriptionManager.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using Python.Runtime;
17 using System;
18 using System.Collections.Generic;
19 using System.Linq;
20 using NodaTime;
24 using QuantConnect.Util;
25 using QuantConnect.Python;
26 
27 namespace QuantConnect.Data
28 {
29  /// <summary>
30  /// Enumerable Subscription Management Class
31  /// </summary>
32  public class SubscriptionManager
33  {
34  private readonly PriorityQueue<ConsolidatorWrapper, ConsolidatorScanPriority> _consolidatorsSortedByScanTime;
35  private readonly Dictionary<IDataConsolidator, ConsolidatorWrapper> _consolidators;
36  private List<Tuple<ConsolidatorWrapper, ConsolidatorScanPriority>> _consolidatorsToAdd;
37  private readonly object _threadSafeCollectionLock;
38  private readonly ITimeKeeper _timeKeeper;
39  private IAlgorithmSubscriptionManager _subscriptionManager;
40 
41  /// <summary>
42  /// Instance that implements <see cref="ISubscriptionDataConfigService" />
43  /// </summary>
45 
46  /// <summary>
47  /// Returns an IEnumerable of Subscriptions
48  /// </summary>
49  /// <remarks>Will not return internal subscriptions</remarks>
50  public IEnumerable<SubscriptionDataConfig> Subscriptions => _subscriptionManager.SubscriptionManagerSubscriptions.Where(config => !config.IsInternalFeed);
51 
52  /// <summary>
53  /// The different <see cref="TickType" /> each <see cref="SecurityType" /> supports
54  /// </summary>
55  public Dictionary<SecurityType, List<TickType>> AvailableDataTypes => _subscriptionManager.AvailableDataTypes;
56 
57  /// <summary>
58  /// Get the count of assets:
59  /// </summary>
60  public int Count => _subscriptionManager.SubscriptionManagerCount();
61 
62  /// <summary>
63  /// Creates a new instance
64  /// </summary>
65  public SubscriptionManager(ITimeKeeper timeKeeper)
66  {
67  _consolidators = new();
68  _timeKeeper = timeKeeper;
69  _consolidatorsSortedByScanTime = new(1000, ConsolidatorScanPriority.Comparer);
70  _threadSafeCollectionLock = new object();
71  }
72 
73  /// <summary>
74  /// Add Market Data Required (Overloaded method for backwards compatibility).
75  /// </summary>
76  /// <param name="symbol">Symbol of the asset we're like</param>
77  /// <param name="resolution">Resolution of Asset Required</param>
78  /// <param name="timeZone">The time zone the subscription's data is time stamped in</param>
79  /// <param name="exchangeTimeZone">
80  /// Specifies the time zone of the exchange for the security this subscription is for. This
81  /// is this output time zone, that is, the time zone that will be used on BaseData instances
82  /// </param>
83  /// <param name="isCustomData">True if this is custom user supplied data, false for normal QC data</param>
84  /// <param name="fillForward">when there is no data pass the last tradebar forward</param>
85  /// <param name="extendedMarketHours">Request premarket data as well when true </param>
86  /// <returns>
87  /// The newly created <see cref="SubscriptionDataConfig" /> or existing instance if it already existed
88  /// </returns>
90  Symbol symbol,
91  Resolution resolution,
92  DateTimeZone timeZone,
93  DateTimeZone exchangeTimeZone,
94  bool isCustomData = false,
95  bool fillForward = true,
96  bool extendedMarketHours = false
97  )
98  {
99  //Set the type: market data only comes in two forms -- ticks(trade by trade) or tradebar(time summaries)
100  var dataType = typeof(TradeBar);
101  if (resolution == Resolution.Tick)
102  {
103  dataType = typeof(Tick);
104  }
105 
106  var tickType = LeanData.GetCommonTickTypeForCommonDataTypes(dataType, symbol.SecurityType);
107  return Add(dataType, tickType, symbol, resolution, timeZone, exchangeTimeZone, isCustomData, fillForward,
108  extendedMarketHours);
109  }
110 
111  /// <summary>
112  /// Add Market Data Required - generic data typing support as long as Type implements BaseData.
113  /// </summary>
114  /// <param name="dataType">Set the type of the data we're subscribing to.</param>
115  /// <param name="tickType">Tick type for the subscription.</param>
116  /// <param name="symbol">Symbol of the asset we're like</param>
117  /// <param name="resolution">Resolution of Asset Required</param>
118  /// <param name="dataTimeZone">The time zone the subscription's data is time stamped in</param>
119  /// <param name="exchangeTimeZone">
120  /// Specifies the time zone of the exchange for the security this subscription is for. This
121  /// is this output time zone, that is, the time zone that will be used on BaseData instances
122  /// </param>
123  /// <param name="isCustomData">True if this is custom user supplied data, false for normal QC data</param>
124  /// <param name="fillForward">when there is no data pass the last tradebar forward</param>
125  /// <param name="extendedMarketHours">Request premarket data as well when true </param>
126  /// <param name="isInternalFeed">
127  /// Set to true to prevent data from this subscription from being sent into the algorithm's
128  /// OnData events
129  /// </param>
130  /// <param name="isFilteredSubscription">
131  /// True if this subscription should have filters applied to it (market hours/user
132  /// filters from security), false otherwise
133  /// </param>
134  /// <param name="dataNormalizationMode">Define how data is normalized</param>
135  /// <returns>
136  /// The newly created <see cref="SubscriptionDataConfig" /> or existing instance if it already existed
137  /// </returns>
139  Type dataType,
140  TickType tickType,
141  Symbol symbol,
142  Resolution resolution,
143  DateTimeZone dataTimeZone,
144  DateTimeZone exchangeTimeZone,
145  bool isCustomData,
146  bool fillForward = true,
147  bool extendedMarketHours = false,
148  bool isInternalFeed = false,
149  bool isFilteredSubscription = true,
150  DataNormalizationMode dataNormalizationMode = DataNormalizationMode.Adjusted
151  )
152  {
153  return SubscriptionDataConfigService.Add(symbol, resolution, fillForward,
154  extendedMarketHours, isFilteredSubscription, isInternalFeed, isCustomData,
155  new List<Tuple<Type, TickType>> { new Tuple<Type, TickType>(dataType, tickType) },
156  dataNormalizationMode).First();
157  }
158 
159  /// <summary>
160  /// Add a consolidator for the symbol
161  /// </summary>
162  /// <param name="symbol">Symbol of the asset to consolidate</param>
163  /// <param name="consolidator">The consolidator</param>
164  /// <param name="tickType">Desired tick type for the subscription</param>
165  public void AddConsolidator(Symbol symbol, IDataConsolidator consolidator, TickType? tickType = null)
166  {
167  // Find the right subscription and add the consolidator to it
168  var subscriptions = Subscriptions.Where(x => x.Symbol == symbol).ToList();
169 
170  if (subscriptions.Count == 0)
171  {
172  // If we made it here it is because we never found the symbol in the subscription list
173  throw new ArgumentException("Please subscribe to this symbol before adding a consolidator for it. Symbol: " +
174  symbol.Value);
175  }
176 
177  foreach (var subscription in subscriptions)
178  {
179  // we need to be able to pipe data directly from the data feed into the consolidator
180  if (IsSubscriptionValidForConsolidator(subscription, consolidator, tickType))
181  {
182  subscription.Consolidators.Add(consolidator);
183 
184  var wrapper = _consolidators[consolidator] =
185  new ConsolidatorWrapper(consolidator, subscription.Increment, _timeKeeper, _timeKeeper.GetLocalTimeKeeper(subscription.ExchangeTimeZone));
186 
187  lock (_threadSafeCollectionLock)
188  {
189  _consolidatorsToAdd ??= new();
190  _consolidatorsToAdd.Add(new(wrapper, wrapper.Priority));
191  }
192  return;
193  }
194  }
195 
196  string tickTypeException = null;
197  if (tickType != null && !subscriptions.Where(x => x.TickType == tickType).Any())
198  {
199  tickTypeException = $"No subscription with the requested Tick Type {tickType} was found. Available Tick Types: {string.Join(", ", subscriptions.Select(x => x.TickType))}";
200  }
201 
202  throw new ArgumentException(tickTypeException ?? ("Type mismatch found between consolidator and symbol. " +
203  $"Symbol: {symbol.Value} does not support input type: {consolidator.InputType.Name}. " +
204  $"Supported types: {string.Join(",", subscriptions.Select(x => x.Type.Name))}."));
205  }
206 
207  /// <summary>
208  /// Add a custom python consolidator for the symbol
209  /// </summary>
210  /// <param name="symbol">Symbol of the asset to consolidate</param>
211  /// <param name="pyConsolidator">The custom python consolidator</param>
212  public void AddConsolidator(Symbol symbol, PyObject pyConsolidator)
213  {
214  if (!pyConsolidator.TryConvert(out IDataConsolidator consolidator))
215  {
216  consolidator = new DataConsolidatorPythonWrapper(pyConsolidator);
217  }
218 
219  AddConsolidator(symbol, consolidator);
220  }
221 
222  /// <summary>
223  /// Removes the specified consolidator for the symbol
224  /// </summary>
225  /// <param name="symbol">The symbol the consolidator is receiving data from</param>
226  /// <param name="consolidator">The consolidator instance to be removed</param>
227  public void RemoveConsolidator(Symbol symbol, IDataConsolidator consolidator)
228  {
229  // let's try to get associated symbol, not required but nice to have
230  symbol ??= consolidator.Consolidated?.Symbol;
231  symbol ??= consolidator.WorkingData?.Symbol;
232 
233  // remove consolidator from each subscription
234  foreach (var subscription in _subscriptionManager.GetSubscriptionDataConfigs(symbol))
235  {
236  subscription.Consolidators.Remove(consolidator);
237 
238  if (_consolidators.Remove(consolidator, out var consolidatorsToScan))
239  {
240  consolidatorsToScan.Dispose();
241  }
242  }
243 
244  // dispose of the consolidator to remove any remaining event handlers
245  consolidator.DisposeSafely();
246  }
247 
248  /// <summary>
249  /// Removes the specified python consolidator for the symbol
250  /// </summary>
251  /// <param name="symbol">The symbol the consolidator is receiving data from</param>
252  /// <param name="pyConsolidator">The python consolidator instance to be removed</param>
253  public void RemoveConsolidator(Symbol symbol, PyObject pyConsolidator)
254  {
255  if (!pyConsolidator.TryConvert(out IDataConsolidator consolidator))
256  {
257  consolidator = new DataConsolidatorPythonWrapper(pyConsolidator);
258  }
259 
260  RemoveConsolidator(symbol, consolidator);
261  }
262 
263  /// <summary>
264  /// Will trigger past consolidator scans
265  /// </summary>
266  /// <param name="newUtcTime">The new utc time</param>
267  /// <param name="algorithm">The algorithm instance</param>
268  public void ScanPastConsolidators(DateTime newUtcTime, IAlgorithm algorithm)
269  {
270  if (_consolidatorsToAdd != null)
271  {
272  lock (_threadSafeCollectionLock)
273  {
274  _consolidatorsToAdd.DoForEach(x => _consolidatorsSortedByScanTime.Enqueue(x.Item1, x.Item2));
275  _consolidatorsToAdd = null;
276  }
277  }
278 
279  while (_consolidatorsSortedByScanTime.TryPeek(out _, out var priority) && priority.UtcScanTime < newUtcTime)
280  {
281  var consolidatorToScan = _consolidatorsSortedByScanTime.Dequeue();
282  if (consolidatorToScan.Disposed)
283  {
284  // consolidator has been removed
285  continue;
286  }
287 
288  if (priority.UtcScanTime != algorithm.UtcTime)
289  {
290  // only update the algorithm time once, it's not cheap because of TZ conversions
291  algorithm.SetDateTime(priority.UtcScanTime);
292  }
293 
294  if (consolidatorToScan.UtcScanTime <= priority.UtcScanTime)
295  {
296  // only scan if we still need to
297  consolidatorToScan.Scan();
298  }
299 
300  _consolidatorsSortedByScanTime.Enqueue(consolidatorToScan, consolidatorToScan.Priority);
301  }
302  }
303 
304  /// <summary>
305  /// Hard code the set of default available data feeds
306  /// </summary>
307  public static Dictionary<SecurityType, List<TickType>> DefaultDataTypes()
308  {
309  return new Dictionary<SecurityType, List<TickType>>
310  {
311  {SecurityType.Base, new List<TickType> {TickType.Trade}},
312  {SecurityType.Index, new List<TickType> {TickType.Trade}},
313  {SecurityType.Forex, new List<TickType> {TickType.Quote}},
314  {SecurityType.Equity, new List<TickType> {TickType.Trade, TickType.Quote}},
315  {SecurityType.Option, new List<TickType> {TickType.Quote, TickType.Trade, TickType.OpenInterest}},
316  {SecurityType.FutureOption, new List<TickType> {TickType.Quote, TickType.Trade, TickType.OpenInterest}},
317  {SecurityType.IndexOption, new List<TickType> {TickType.Quote, TickType.Trade, TickType.OpenInterest}},
318  {SecurityType.Cfd, new List<TickType> {TickType.Quote}},
319  {SecurityType.Future, new List<TickType> {TickType.Quote, TickType.Trade, TickType.OpenInterest}},
320  {SecurityType.Commodity, new List<TickType> {TickType.Trade}},
321  {SecurityType.Crypto, new List<TickType> {TickType.Trade, TickType.Quote}},
322  {SecurityType.CryptoFuture, new List<TickType> {TickType.Trade, TickType.Quote}}
323  };
324  }
325 
326  /// <summary>
327  /// Get the available data types for a security
328  /// </summary>
329  public IReadOnlyList<TickType> GetDataTypesForSecurity(SecurityType securityType)
330  {
331  return AvailableDataTypes[securityType];
332  }
333 
334  /// <summary>
335  /// Get the data feed types for a given <see cref="SecurityType" /> <see cref="Resolution" />
336  /// </summary>
337  /// <param name="symbolSecurityType">The <see cref="SecurityType" /> used to determine the types</param>
338  /// <param name="resolution">The resolution of the data requested</param>
339  /// <param name="isCanonical">Indicates whether the security is Canonical (future and options)</param>
340  /// <returns>Types that should be added to the <see cref="SubscriptionDataConfig" /></returns>
341  public List<Tuple<Type, TickType>> LookupSubscriptionConfigDataTypes(
342  SecurityType symbolSecurityType,
343  Resolution resolution,
344  bool isCanonical
345  )
346  {
347  return _subscriptionManager.LookupSubscriptionConfigDataTypes(symbolSecurityType, resolution, isCanonical);
348  }
349 
350  /// <summary>
351  /// Sets the Subscription Manager
352  /// </summary>
353  public void SetDataManager(IAlgorithmSubscriptionManager subscriptionManager)
354  {
355  _subscriptionManager = subscriptionManager;
356  }
357 
358  /// <summary>
359  /// Checks if the subscription is valid for the consolidator
360  /// </summary>
361  /// <param name="subscription">The subscription configuration</param>
362  /// <param name="consolidator">The consolidator</param>
363  /// <param name="desiredTickType">The desired tick type for the subscription. If not given is null.</param>
364  /// <returns>true if the subscription is valid for the consolidator</returns>
365  public static bool IsSubscriptionValidForConsolidator(SubscriptionDataConfig subscription, IDataConsolidator consolidator, TickType? desiredTickType = null)
366  {
367  if (subscription.Type == typeof(Tick) &&
369  {
370  if (desiredTickType == null)
371  {
373  consolidator.OutputType,
374  subscription.Symbol.SecurityType);
375 
376  return subscription.TickType == tickType;
377  }
378  else if (subscription.TickType != desiredTickType)
379  {
380  return false;
381  }
382  }
383 
384  return consolidator.InputType.IsAssignableFrom(subscription.Type);
385  }
386 
387  /// <summary>
388  /// Returns true if the provided data is the default data type associated with it's <see cref="SecurityType"/>.
389  /// This is useful to determine if a data point should be used/cached in an environment where consumers will not provider a data type and we want to preserve
390  /// determinism and backwards compatibility when there are multiple data types available per <see cref="SecurityType"/> or new ones added.
391  /// </summary>
392  /// <remarks>Temporary until we have a dictionary for the default data type per security type see GH issue 4196.
393  /// Internal so it's only accessible from this assembly.</remarks>
394  internal static bool IsDefaultDataType(BaseData data)
395  {
396  switch (data.Symbol.SecurityType)
397  {
398  case SecurityType.Equity:
399  if (data.DataType == MarketDataType.QuoteBar || data.DataType == MarketDataType.Tick && (data as Tick).TickType == TickType.Quote)
400  {
401  return false;
402  }
403  break;
404  }
405  return true;
406  }
407  }
408 }