Lean  $LEAN_TAG$
AggregationManager.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using QuantConnect.Data;
21 using QuantConnect.Logging;
22 using System;
23 using System.Collections.Concurrent;
24 using System.Collections.Generic;
25 using System.Linq;
26 
28 {
29  /// <summary>
30  /// Aggregates ticks and bars based on given subscriptions.
31  /// Current implementation is based on <see cref="IDataConsolidator"/> that consolidates ticks and put them into enumerator.
32  /// </summary>
34  {
35  private readonly ConcurrentDictionary<SecurityIdentifier, List<KeyValuePair<SubscriptionDataConfig, ScannableEnumerator<BaseData>>>> _enumerators
36  = new ConcurrentDictionary<SecurityIdentifier, List<KeyValuePair<SubscriptionDataConfig, ScannableEnumerator<BaseData>>>>();
37  private bool _dailyStrictEndTimeEnabled;
38 
39  /// <summary>
40  /// Continuous UTC time provider
41  /// </summary>
43 
44  /// <summary>
45  /// Initialize this instance
46  /// </summary>
47  /// <param name="parameters">The parameters dto instance</param>
49  {
50  _dailyStrictEndTimeEnabled = parameters.AlgorithmSettings.DailyPreciseEndTime;
51  Log.Trace($"AggregationManager.Initialize(): daily strict end times: {_dailyStrictEndTimeEnabled}");
52  }
53 
54  /// <summary>
55  /// Add new subscription to current <see cref="IDataAggregator"/> instance
56  /// </summary>
57  /// <param name="dataConfig">defines the parameters to subscribe to a data feed</param>
58  /// <param name="newDataAvailableHandler">handler to be fired on new data available</param>
59  /// <returns>The new enumerator for this subscription request</returns>
60  public IEnumerator<BaseData> Add(SubscriptionDataConfig dataConfig, EventHandler newDataAvailableHandler)
61  {
62  var consolidator = GetConsolidator(dataConfig);
63  var isPeriodBased = (dataConfig.Type.Name == nameof(QuoteBar) ||
64  dataConfig.Type.Name == nameof(TradeBar) ||
65  dataConfig.Type.Name == nameof(OpenInterest)) &&
66  dataConfig.Resolution != Resolution.Tick;
67  var enumerator = new ScannableEnumerator<BaseData>(consolidator, dataConfig.ExchangeTimeZone, TimeProvider, newDataAvailableHandler, isPeriodBased);
68 
69  _enumerators.AddOrUpdate(
70  dataConfig.Symbol.ID,
71  new List<KeyValuePair<SubscriptionDataConfig, ScannableEnumerator<BaseData>>> { new KeyValuePair<SubscriptionDataConfig, ScannableEnumerator<BaseData>>(dataConfig, enumerator) },
72  (k, v) => { return v.Concat(new[] { new KeyValuePair<SubscriptionDataConfig, ScannableEnumerator<BaseData>>(dataConfig, enumerator) }).ToList(); });
73 
74  return enumerator;
75  }
76 
77  /// <summary>
78  /// Removes the handler with the specified identifier
79  /// </summary>
80  /// <param name="dataConfig">Subscription data configuration to be removed</param>
81  public bool Remove(SubscriptionDataConfig dataConfig)
82  {
83  List<KeyValuePair<SubscriptionDataConfig, ScannableEnumerator<BaseData>>> enumerators;
84  if (_enumerators.TryGetValue(dataConfig.Symbol.ID, out enumerators))
85  {
86  if (enumerators.Count == 1)
87  {
88  List<KeyValuePair<SubscriptionDataConfig, ScannableEnumerator<BaseData>>> output;
89  return _enumerators.TryRemove(dataConfig.Symbol.ID, out output);
90  }
91  else
92  {
93  _enumerators[dataConfig.Symbol.ID] = enumerators.Where(pair => pair.Key != dataConfig).ToList();
94  return true;
95  }
96  }
97  else
98  {
99  Log.Debug($"AggregationManager.Update(): IDataConsolidator for symbol ({dataConfig.Symbol.Value}) was not found.");
100  return false;
101  }
102  }
103 
104  /// <summary>
105  /// Add new data to aggregator
106  /// </summary>
107  /// <param name="input">The new data</param>
108  public void Update(BaseData input)
109  {
110  try
111  {
112  List<KeyValuePair<SubscriptionDataConfig, ScannableEnumerator<BaseData>>> enumerators;
113  if (_enumerators.TryGetValue(input.Symbol.ID, out enumerators))
114  {
115  for (var i = 0; i < enumerators.Count; i++)
116  {
117  var kvp = enumerators[i];
118 
119  // for non tick resolution subscriptions drop suspicious ticks
120  if (kvp.Key.Resolution != Resolution.Tick)
121  {
122  var tick = input as Tick;
123  if (tick != null && tick.Suspicious)
124  {
125  continue;
126  }
127  }
128 
129  kvp.Value.Update(input);
130  }
131  }
132  }
133  catch (Exception exception)
134  {
135  Log.Error(exception);
136  }
137  }
138 
139  /// <summary>
140  /// Dispose of the aggregation manager.
141  /// </summary>
142  public void Dispose() { }
143 
144  /// <summary>
145  /// Gets the consolidator to aggregate data for the given config
146  /// </summary>
148  {
149  var period = config.Resolution.ToTimeSpan();
150  if (config.Resolution == Resolution.Daily && (config.Type == typeof(QuoteBar) || config.Type == typeof(TradeBar)))
151  {
152  // in backtesting, daily resolution data does not have extended market hours even if requested, so let's respect the same behavior for live
153  // also this allows us to enable the daily strict end times if required. See 'SetStrictEndTimes'
154  return new MarketHourAwareConsolidator(_dailyStrictEndTimeEnabled, config.Resolution, typeof(Tick), config.TickType, extendedMarketHours: false);
155  }
156  if (config.Type == typeof(QuoteBar))
157  {
158  return new TickQuoteBarConsolidator(period);
159  }
160  if (config.Type == typeof(TradeBar))
161  {
162  return new TickConsolidator(period);
163  }
164  if (config.Type == typeof(OpenInterest))
165  {
166  return new OpenInterestConsolidator(period);
167  }
168  if (config.Type == typeof(Tick))
169  {
171  }
172  if (config.Type == typeof(Split))
173  {
174  return new IdentityDataConsolidator<Split>();
175  }
176  if (config.Type == typeof(Dividend))
177  {
179  }
180 
181  // streaming custom data subscriptions can pass right through
182  return new FilteredIdentityDataConsolidator<BaseData>(data => data.GetType() == config.Type);
183  }
184  }
185 }