Lean  $LEAN_TAG$
SubscriptionDataReaderHistoryProvider.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Collections.Generic;
19 using NodaTime;
20 using QuantConnect.Data;
28 using QuantConnect.Util;
30 
32 {
33  /// <summary>
34  /// Provides an implementation of <see cref="IHistoryProvider"/> that uses <see cref="BaseData"/>
35  /// instances to retrieve historical data
36  /// </summary>
38  {
39  private SymbolProperties _nullSymbolProperties;
40  private SecurityCache _nullCache;
41  private Cash _nullCash;
42 
43  private IDataProvider _dataProvider;
44  private IMapFileProvider _mapFileProvider;
45  private IFactorFileProvider _factorFileProvider;
46  private IDataCacheProvider _dataCacheProvider;
47  private IObjectStore _objectStore;
48  private bool _parallelHistoryRequestsEnabled;
49  private bool _initialized;
50 
51  /// <summary>
52  /// Manager used to allow or deny access to a requested datasource for specific users
53  /// </summary>
55 
56  /// <summary>
57  /// Initializes this history provider to work for the specified job
58  /// </summary>
59  /// <param name="parameters">The initialization parameters</param>
60  public override void Initialize(HistoryProviderInitializeParameters parameters)
61  {
62  if (_initialized)
63  {
64  // let's make sure no one tries to change our parameters values
65  throw new InvalidOperationException("SubscriptionDataReaderHistoryProvider can only be initialized once");
66  }
67  _initialized = true;
68  _dataProvider = parameters.DataProvider;
69  _mapFileProvider = parameters.MapFileProvider;
70  _dataCacheProvider = parameters.DataCacheProvider;
71  _factorFileProvider = parameters.FactorFileProvider;
72  _objectStore = parameters.ObjectStore;
75  _parallelHistoryRequestsEnabled = parameters.ParallelHistoryRequestsEnabled;
76 
77  _nullCache = new SecurityCache();
78  _nullCash = new Cash(Currencies.NullCurrency, 0, 1m);
79  _nullSymbolProperties = SymbolProperties.GetDefault(Currencies.NullCurrency);
80  }
81 
82  /// <summary>
83  /// Gets the history for the requested securities
84  /// </summary>
85  /// <param name="requests">The historical data requests</param>
86  /// <param name="sliceTimeZone">The time zone used when time stamping the slice instances</param>
87  /// <returns>An enumerable of the slices of data covering the span specified in each request</returns>
88  public override IEnumerable<Slice> GetHistory(IEnumerable<HistoryRequest> requests, DateTimeZone sliceTimeZone)
89  {
90  // create subscription objects from the configs
91  var subscriptions = new List<Subscription>();
92  foreach (var request in requests)
93  {
94  var subscription = CreateSubscription(request);
95  subscriptions.Add(subscription);
96  }
97 
98  return CreateSliceEnumerableFromSubscriptions(subscriptions, sliceTimeZone);
99  }
100 
101  /// <summary>
102  /// Creates a subscription to process the request
103  /// </summary>
104  private Subscription CreateSubscription(HistoryRequest request)
105  {
106  var config = request.ToSubscriptionDataConfig();
107 
108  // this security is internal only we do not need to worry about a few of it's properties
109  // TODO: we don't need fee/fill/BPM/etc either. Even better we should refactor & remove the need for the security
110  var security = new Security(
111  request.ExchangeHours,
112  config,
113  _nullCash,
114  _nullSymbolProperties,
117  _nullCache
118  );
119 
120  var dataReader = new SubscriptionDataReader(config,
121  request,
122  _mapFileProvider,
123  _factorFileProvider,
124  _dataCacheProvider,
125  _dataProvider,
126  _objectStore);
127 
128  dataReader.InvalidConfigurationDetected += (sender, args) => { OnInvalidConfigurationDetected(args); };
129  dataReader.NumericalPrecisionLimited += (sender, args) => { OnNumericalPrecisionLimited(args); };
130  dataReader.StartDateLimited += (sender, args) => { OnStartDateLimited(args); };
131  dataReader.DownloadFailed += (sender, args) => { OnDownloadFailed(args); };
132  dataReader.ReaderErrorDetected += (sender, args) => { OnReaderErrorDetected(args); };
133 
134  IEnumerator<BaseData> reader = dataReader;
135  var intraday = GetIntradayDataEnumerator(dataReader, request);
136  if (intraday != null)
137  {
138  // we optionally concatenate the intraday data enumerator
139  reader = new ConcatEnumerator(true, reader, intraday);
140  }
141 
142  var useDailyStrictEndTimes = LeanData.UseDailyStrictEndTimes(AlgorithmSettings, request, config.Symbol, config.Increment);
143  if (useDailyStrictEndTimes)
144  {
145  // before corporate events which might yield data and we synchronize both feeds
146  reader = new StrictDailyEndTimesEnumerator(reader, request.ExchangeHours, request.StartTimeLocal);
147  }
148 
150  reader,
151  config,
152  _factorFileProvider,
153  dataReader,
154  _mapFileProvider,
155  request.StartTimeLocal,
156  request.EndTimeLocal);
157 
158  // optionally apply fill forward behavior
159  if (request.FillForwardResolution.HasValue)
160  {
161  // copy forward Bid/Ask bars for QuoteBars
162  if (request.DataType == typeof(QuoteBar))
163  {
164  reader = new QuoteBarFillForwardEnumerator(reader);
165  }
166 
167  var readOnlyRef = Ref.CreateReadOnly(() => request.FillForwardResolution.Value.ToTimeSpan());
168  reader = new FillForwardEnumerator(reader, security.Exchange, readOnlyRef, request.IncludeExtendedMarketHours, request.EndTimeLocal, config.Increment, config.DataTimeZone, useDailyStrictEndTimes, request.DataType);
169  }
170 
171  // since the SubscriptionDataReader performs an any overlap condition on the trade bar's entire
172  // range (time->end time) we can end up passing the incorrect data (too far past, possibly future),
173  // so to combat this we deliberately filter the results from the data reader to fix these cases
174  // which only apply to non-tick data
175 
176  reader = new SubscriptionFilterEnumerator(reader, security, request.EndTimeLocal, config.ExtendedMarketHours, false, request.ExchangeHours);
177 
178  // allow all ticks
179  if (config.Resolution != Resolution.Tick)
180  {
181  var timeBasedFilter = new TimeBasedFilter(request);
182  reader = new FilterEnumerator<BaseData>(reader, timeBasedFilter.Filter);
183  }
184 
185  var subscriptionRequest = new SubscriptionRequest(false, null, security, config, request.StartTimeUtc, request.EndTimeUtc);
186  if (_parallelHistoryRequestsEnabled)
187  {
188  return SubscriptionUtils.CreateAndScheduleWorker(subscriptionRequest, reader, _factorFileProvider, false, AlgorithmSettings.DailyPreciseEndTime);
189  }
190  return SubscriptionUtils.Create(subscriptionRequest, reader, AlgorithmSettings.DailyPreciseEndTime);
191  }
192 
193  /// <summary>
194  /// Gets the intraday data enumerator if any
195  /// </summary>
196  protected virtual IEnumerator<BaseData> GetIntradayDataEnumerator(IEnumerator<BaseData> rawData, HistoryRequest request)
197  {
198  return null;
199  }
200 
201  /// <summary>
202  /// Internal helper class to filter data based on requested times
203  /// </summary>
204  private class TimeBasedFilter
205  {
206  public Type RequestedType { get; set; }
207  public DateTime EndTimeLocal { get; set; }
208  public DateTime StartTimeLocal { get; set; }
209  public TimeBasedFilter(HistoryRequest request)
210  {
211  RequestedType = request.DataType;
212  EndTimeLocal = request.EndTimeLocal;
213  StartTimeLocal = request.StartTimeLocal;
214  }
215  public bool Filter(BaseData data)
216  {
217  // filter out all aux data, unless if we are asking for aux data
218  if (data.DataType == MarketDataType.Auxiliary && data.GetType() != RequestedType) return false;
219  // filter out future data
220  if (data.EndTime > EndTimeLocal) return false;
221  // filter out data before the start
222  return data.EndTime > StartTimeLocal;
223  }
224  }
225  }
226 }