Lean  $LEAN_TAG$
HistoryProviderManager.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using NodaTime;
18 using QuantConnect.Data;
22 using QuantConnect.Logging;
23 using QuantConnect.Util;
24 using System;
25 using System.Collections.Generic;
26 using System.Linq;
28 
30 {
31  /// <summary>
32  /// Provides an implementation of <see cref="IHistoryProvider"/> which
33  /// acts as a wrapper to use multiple history providers together
34  /// </summary>
36  {
37  private IDataPermissionManager _dataPermissionManager;
38  private IBrokerage _brokerage;
39  private bool _initialized;
40 
41  /// <summary>
42  /// Collection of history providers being used
43  /// </summary>
44  /// <remarks>Protected for testing purposes</remarks>
45  private List<IHistoryProvider> _historyProviders = new();
46 
47  /// <summary>
48  /// Gets the total number of data points emitted by this history provider
49  /// </summary>
50  public override int DataPointCount => GetDataPointCount();
51 
52  /// <summary>
53  /// Sets the brokerage to be used for historical requests
54  /// </summary>
55  /// <param name="brokerage">The brokerage instance</param>
56  public void SetBrokerage(IBrokerage brokerage)
57  {
58  _brokerage = brokerage;
59  }
60 
61  /// <summary>
62  /// Initializes this history provider to work for the specified job
63  /// </summary>
64  /// <param name="parameters">The initialization parameters</param>
65  public override void Initialize(HistoryProviderInitializeParameters parameters)
66  {
67  if (_initialized)
68  {
69  // let's make sure no one tries to change our parameters values
70  throw new InvalidOperationException("BrokerageHistoryProvider can only be initialized once");
71  }
72  _initialized = true;
73 
74  var dataProvidersList = parameters.Job?.HistoryProvider.DeserializeList() ?? new List<string>();
75  if (dataProvidersList.IsNullOrEmpty())
76  {
77  dataProvidersList.AddRange(Config.Get("history-provider", "SubscriptionDataReaderHistoryProvider").DeserializeList());
78  }
79 
80  _dataPermissionManager = parameters.DataPermissionManager;
81  foreach (var historyProviderName in dataProvidersList)
82  {
83  IHistoryProvider historyProvider;
84  if (HistoryExtensions.TryGetBrokerageName(historyProviderName, out var brokerageName))
85  {
86  // we get the data queue handler if it already exists
87  var dataQueueHandler = Composer.Instance.GetPart<IDataQueueHandler>((x) => x.GetType().Name == brokerageName);
88  if (dataQueueHandler == null)
89  {
90  // we need to create the brokerage/data queue handler
91  dataQueueHandler = Composer.Instance.GetExportedValueByTypeName<IDataQueueHandler>(brokerageName);
92  // initialize it
93  dataQueueHandler.SetJob((Packets.LiveNodePacket)parameters.Job);
94  Log.Trace($"HistoryProviderManager.Initialize(): Created and wrapped '{brokerageName}' as '{typeof(BrokerageHistoryProvider).Name}'");
95  }
96  else
97  {
98  Log.Trace($"HistoryProviderManager.Initialize(): Wrapping '{brokerageName}' instance as '{typeof(BrokerageHistoryProvider).Name}'");
99  }
100 
101  // wrap it
102  var brokerageHistoryProvider = new BrokerageHistoryProvider();
103  brokerageHistoryProvider.SetBrokerage((IBrokerage)dataQueueHandler);
104  historyProvider = brokerageHistoryProvider;
105  }
106  else
107  {
108  historyProvider = Composer.Instance.GetExportedValueByTypeName<IHistoryProvider>(historyProviderName);
109  if (historyProvider is BrokerageHistoryProvider)
110  {
111  (historyProvider as BrokerageHistoryProvider).SetBrokerage(_brokerage);
112  }
113  }
114  historyProvider.Initialize(parameters);
115  historyProvider.InvalidConfigurationDetected += (sender, args) => { OnInvalidConfigurationDetected(args); };
116  historyProvider.NumericalPrecisionLimited += (sender, args) => { OnNumericalPrecisionLimited(args); };
117  historyProvider.StartDateLimited += (sender, args) => { OnStartDateLimited(args); };
118  historyProvider.DownloadFailed += (sender, args) => { OnDownloadFailed(args); };
119  historyProvider.ReaderErrorDetected += (sender, args) => { OnReaderErrorDetected(args); };
120  _historyProviders.Add(historyProvider);
121  }
122 
123  Log.Trace($"HistoryProviderManager.Initialize(): history providers [{string.Join(",", _historyProviders.Select(x => x.GetType().Name))}]");
124  }
125 
126  /// <summary>
127  /// Gets the history for the requested securities
128  /// </summary>
129  /// <param name="requests">The historical data requests</param>
130  /// <param name="sliceTimeZone">The time zone used when time stamping the slice instances</param>
131  /// <returns>An enumerable of the slices of data covering the span specified in each request</returns>
132  public override IEnumerable<Slice> GetHistory(IEnumerable<HistoryRequest> requests, DateTimeZone sliceTimeZone)
133  {
134  List<IEnumerator<Slice>> historyEnumerators = new(_historyProviders.Count);
135 
136  var historyRequets = new List<HistoryRequest>();
137  foreach (var request in requests)
138  {
139  var config = request.ToSubscriptionDataConfig();
140  _dataPermissionManager?.AssertConfiguration(config, request.StartTimeLocal, request.EndTimeLocal);
141  historyRequets.Add(request);
142  }
143 
144  foreach (var historyProvider in _historyProviders)
145  {
146  try
147  {
148  var history = historyProvider.GetHistory(historyRequets, sliceTimeZone);
149  if (history == null)
150  {
151  // doesn't support this history request, that's okay
152  continue;
153  }
154  historyEnumerators.Add(history.GetEnumerator());
155  }
156  catch (Exception e)
157  {
158  // ignore
159  }
160  }
161  using var synchronizer = new SynchronizingSliceEnumerator(historyEnumerators);
162  Slice latestMergeSlice = null;
163  while (synchronizer.MoveNext())
164  {
165  if (synchronizer.Current == null)
166  {
167  continue;
168  }
169  if (latestMergeSlice == null)
170  {
171  latestMergeSlice = synchronizer.Current;
172  continue;
173  }
174  if (synchronizer.Current.UtcTime > latestMergeSlice.UtcTime)
175  {
176  // a newer slice we emit the old and keep a reference of the new
177  // so in the next loop we merge if required
178  yield return latestMergeSlice;
179  latestMergeSlice = synchronizer.Current;
180  }
181  else
182  {
183  // a new slice with same time we merge them into 'latestMergeSlice'
184  latestMergeSlice.MergeSlice(synchronizer.Current);
185  }
186  }
187  if (latestMergeSlice != null)
188  {
189  yield return latestMergeSlice;
190  }
191  }
192 
193  private int GetDataPointCount()
194  {
195  var dataPointCount = 0;
196  foreach (var historyProvider in _historyProviders)
197  {
198  dataPointCount += historyProvider.DataPointCount;
199  }
200  return dataPointCount;
201  }
202  }
203 }