Lean  $LEAN_TAG$
FileSystemDataFeed.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Collections.Generic;
19 using System.Linq;
20 using QuantConnect.Data;
29 using QuantConnect.Logging;
30 using QuantConnect.Packets;
32 using QuantConnect.Util;
33 
35 {
36  /// <summary>
37  /// Historical datafeed stream reader for processing files on a local disk.
38  /// </summary>
39  /// <remarks>Filesystem datafeeds are incredibly fast</remarks>
41  {
42  private IAlgorithm _algorithm;
43  private ITimeProvider _timeProvider;
44  private IResultHandler _resultHandler;
45  private IMapFileProvider _mapFileProvider;
46  private IFactorFileProvider _factorFileProvider;
47  private IDataProvider _dataProvider;
48  private IDataCacheProvider _cacheProvider;
49  private SubscriptionCollection _subscriptions;
50  private MarketHoursDatabase _marketHoursDatabase;
51  private SubscriptionDataReaderSubscriptionEnumeratorFactory _subscriptionFactory;
52 
53  /// <summary>
54  /// Flag indicating the hander thread is completely finished and ready to dispose.
55  /// </summary>
56  public bool IsActive { get; private set; }
57 
58  /// <summary>
59  /// Initializes the data feed for the specified job and algorithm
60  /// </summary>
61  public virtual void Initialize(IAlgorithm algorithm,
63  IResultHandler resultHandler,
64  IMapFileProvider mapFileProvider,
65  IFactorFileProvider factorFileProvider,
66  IDataProvider dataProvider,
67  IDataFeedSubscriptionManager subscriptionManager,
68  IDataFeedTimeProvider dataFeedTimeProvider,
69  IDataChannelProvider dataChannelProvider)
70  {
71  _algorithm = algorithm;
72  _resultHandler = resultHandler;
73  _mapFileProvider = mapFileProvider;
74  _factorFileProvider = factorFileProvider;
75  _dataProvider = dataProvider;
76  _timeProvider = dataFeedTimeProvider.FrontierTimeProvider;
77  _subscriptions = subscriptionManager.DataFeedSubscriptions;
78  _cacheProvider = new ZipDataCacheProvider(dataProvider, isDataEphemeral: false);
79  _subscriptionFactory = new SubscriptionDataReaderSubscriptionEnumeratorFactory(
80  _resultHandler,
81  _mapFileProvider,
82  _factorFileProvider,
83  _cacheProvider,
84  algorithm,
85  enablePriceScaling: false);
86 
87  IsActive = true;
88  _marketHoursDatabase = MarketHoursDatabase.FromDataFolder();
89  }
90 
91  /// <summary>
92  /// Creates a file based data enumerator for the given subscription request
93  /// </summary>
94  /// <remarks>Protected so it can be used by the <see cref="LiveTradingDataFeed"/> to warmup requests</remarks>
95  protected IEnumerator<BaseData> CreateEnumerator(SubscriptionRequest request, Resolution? fillForwardResolution = null)
96  {
97  return request.IsUniverseSubscription ? CreateUniverseEnumerator(request, CreateDataEnumerator, fillForwardResolution) : CreateDataEnumerator(request, fillForwardResolution);
98  }
99 
100  private IEnumerator<BaseData> CreateDataEnumerator(SubscriptionRequest request, Resolution? fillForwardResolution)
101  {
102  // ReSharper disable once PossibleMultipleEnumeration
103  var enumerator = _subscriptionFactory.CreateEnumerator(request, _dataProvider);
104  enumerator = ConfigureEnumerator(request, false, enumerator, fillForwardResolution);
105 
106  return enumerator;
107  }
108 
109  /// <summary>
110  /// Creates a new subscription to provide data for the specified security.
111  /// </summary>
112  /// <param name="request">Defines the subscription to be added, including start/end times the universe and security</param>
113  /// <returns>The created <see cref="Subscription"/> if successful, null otherwise</returns>
115  {
116  IEnumerator<BaseData> enumerator;
117  if(_algorithm.IsWarmingUp)
118  {
119  var pivotTimeUtc = _algorithm.StartDate.ConvertToUtc(_algorithm.TimeZone);
120 
121  var warmupRequest = new SubscriptionRequest(request, endTimeUtc: pivotTimeUtc,
122  configuration: new SubscriptionDataConfig(request.Configuration, resolution: _algorithm.Settings.WarmupResolution));
123  IEnumerator<BaseData> warmupEnumerator = null;
124  if (warmupRequest.TradableDaysInDataTimeZone.Any()
125  // since we change the resolution, let's validate it's still valid configuration (example daily equity quotes are not!)
126  && LeanData.IsValidConfiguration(warmupRequest.Configuration.SecurityType, warmupRequest.Configuration.Resolution, warmupRequest.Configuration.TickType))
127  {
128  // let them overlap a day if possible to avoid data gaps since each request will FFed it's own since they are different resolutions
129  pivotTimeUtc = Time.GetStartTimeForTradeBars(request.Security.Exchange.Hours,
130  _algorithm.StartDate.ConvertTo(_algorithm.TimeZone, request.Security.Exchange.TimeZone),
131  Time.OneDay,
132  1,
133  false,
134  warmupRequest.Configuration.DataTimeZone,
135  LeanData.UseDailyStrictEndTimes(_algorithm.Settings, request, request.Security.Symbol, Time.OneDay))
136  .ConvertToUtc(request.Security.Exchange.TimeZone);
137  if (pivotTimeUtc < warmupRequest.StartTimeUtc)
138  {
139  pivotTimeUtc = warmupRequest.StartTimeUtc;
140  }
141 
142  warmupEnumerator = CreateEnumerator(warmupRequest, _algorithm.Settings.WarmupResolution);
143  // don't let future data past
144  warmupEnumerator = new FilterEnumerator<BaseData>(warmupEnumerator, data => data == null || data.EndTime <= warmupRequest.EndTimeLocal);
145  }
146 
147  var normalEnumerator = CreateEnumerator(new SubscriptionRequest(request, startTimeUtc: pivotTimeUtc));
148  // don't let pre start data pass, since we adjust start so they overlap 1 day let's not let this data pass, we just want it for fill forwarding after the target start
149  // this is also useful to drop any initial selection point which was already emitted during warmup
150  normalEnumerator = new FilterEnumerator<BaseData>(normalEnumerator, data => data == null || data.EndTime >= warmupRequest.EndTimeLocal);
151 
152  // after the warmup enumerator we concatenate the 'normal' one
153  enumerator = new ConcatEnumerator(true, warmupEnumerator, normalEnumerator);
154  }
155  else
156  {
157  enumerator = CreateEnumerator(request);
158  }
159 
160  enumerator = AddScheduleWrapper(request, enumerator, null);
161 
162  if (request.IsUniverseSubscription && request.Universe is UserDefinedUniverse)
163  {
164  // for user defined universe we do not use a worker task, since calls to AddData can happen in any moment
165  // and we have to be able to inject selection data points into the enumerator
166  return SubscriptionUtils.Create(request, enumerator, _algorithm.Settings.DailyPreciseEndTime);
167  }
168  return SubscriptionUtils.CreateAndScheduleWorker(request, enumerator, _factorFileProvider, true, _algorithm.Settings.DailyPreciseEndTime);
169  }
170 
171  /// <summary>
172  /// Removes the subscription from the data feed, if it exists
173  /// </summary>
174  /// <param name="subscription">The subscription to remove</param>
175  public virtual void RemoveSubscription(Subscription subscription)
176  {
177  }
178 
179  /// <summary>
180  /// Creates a universe enumerator from the Subscription request, the underlying enumerator func and the fill forward resolution (in some cases)
181  /// </summary>
182  protected IEnumerator<BaseData> CreateUniverseEnumerator(SubscriptionRequest request, Func<SubscriptionRequest, Resolution?, IEnumerator<BaseData>> createUnderlyingEnumerator, Resolution? fillForwardResolution = null)
183  {
184  ISubscriptionEnumeratorFactory factory = _subscriptionFactory;
185  if (request.Universe is ITimeTriggeredUniverse)
186  {
188  _marketHoursDatabase,
189  _timeProvider);
190  }
191  else if (request.Configuration.Type == typeof(FundamentalUniverse))
192  {
194  }
195  else if (request.Configuration.Type == typeof(ZipEntryName))
196  {
197  // TODO: subscription should already come in correctly built
198  var resolution = request.Configuration.Resolution == Resolution.Tick ? Resolution.Second : request.Configuration.Resolution;
199 
200  // TODO: subscription should already come in as fill forward true
201  request = new SubscriptionRequest(request, configuration: new SubscriptionDataConfig(request.Configuration, fillForward: true, resolution: resolution));
202 
203  var result = new BaseDataSubscriptionEnumeratorFactory(_algorithm.OptionChainProvider, _algorithm.FutureChainProvider)
204  .CreateEnumerator(request, _dataProvider);
205 
206  if (LeanData.UseDailyStrictEndTimes(_algorithm.Settings, request, request.Configuration.Symbol, request.Configuration.Increment))
207  {
208  result = new StrictDailyEndTimesEnumerator(result, request.ExchangeHours, request.StartTimeLocal);
209  }
210  result = ConfigureEnumerator(request, true, result, fillForwardResolution);
211  return TryAppendUnderlyingEnumerator(request, result, createUnderlyingEnumerator, fillForwardResolution);
212  }
213 
214  // define our data enumerator
215  var enumerator = factory.CreateEnumerator(request, _dataProvider);
216  return enumerator;
217  }
218 
219  /// <summary>
220  /// Returns a scheduled enumerator from the given arguments. It can also return the given underlying enumerator
221  /// </summary>
222  protected IEnumerator<BaseData> AddScheduleWrapper(SubscriptionRequest request, IEnumerator<BaseData> underlying, ITimeProvider timeProvider)
223  {
225  {
226  return underlying;
227  }
228 
229  var schedule = request.Universe.UniverseSettings.Schedule.Get(request.StartTimeLocal, request.EndTimeLocal);
230  if (schedule != null)
231  {
232  return new ScheduledEnumerator(underlying, schedule, timeProvider, request.Configuration.ExchangeTimeZone, request.StartTimeLocal);
233  }
234  return underlying;
235  }
236 
237  /// <summary>
238  /// If required will add a new enumerator for the underlying symbol
239  /// </summary>
240  protected IEnumerator<BaseData> TryAppendUnderlyingEnumerator(SubscriptionRequest request, IEnumerator<BaseData> parent, Func<SubscriptionRequest, Resolution?, IEnumerator<BaseData>> createEnumerator, Resolution? fillForwardResolution)
241  {
242  if (request.Configuration.Symbol.SecurityType.IsOption() && request.Configuration.Symbol.HasUnderlying)
243  {
244  var underlyingSymbol = request.Configuration.Symbol.Underlying;
245  var underlyingMarketHours = _marketHoursDatabase.GetEntry(underlyingSymbol.ID.Market, underlyingSymbol, underlyingSymbol.SecurityType);
246 
247  // TODO: creating this subscription request/config is bad
248  var underlyingRequests = new SubscriptionRequest(request,
249  isUniverseSubscription: false,
250  configuration: new SubscriptionDataConfig(request.Configuration, symbol: underlyingSymbol, objectType: typeof(TradeBar), tickType: TickType.Trade,
251  // there's no guarantee the TZ are the same, specially the data timezone (index & index options)
252  dataTimeZone: underlyingMarketHours.DataTimeZone,
253  exchangeTimeZone: underlyingMarketHours.ExchangeHours.TimeZone));
254 
255  var underlying = createEnumerator(underlyingRequests, fillForwardResolution);
256  underlying = new FilterEnumerator<BaseData>(underlying, data => data.DataType != MarketDataType.Auxiliary);
257 
258  parent = new SynchronizingBaseDataEnumerator(parent, underlying);
259  // we aggregate both underlying and chain data
260  parent = new BaseDataCollectionAggregatorEnumerator(parent, request.Configuration.Symbol);
261  // only let through if underlying and chain data present
262  parent = new FilterEnumerator<BaseData>(parent, data => (data as BaseDataCollection).Underlying != null);
263  parent = ConfigureEnumerator(request, false, parent, fillForwardResolution);
264  }
265 
266  return parent;
267  }
268 
269  /// <summary>
270  /// Send an exit signal to the thread.
271  /// </summary>
272  public virtual void Exit()
273  {
274  if (IsActive)
275  {
276  IsActive = false;
277  Log.Trace("FileSystemDataFeed.Exit(): Start. Setting cancellation token...");
278  _subscriptionFactory?.DisposeSafely();
279  _cacheProvider.DisposeSafely();
280  Log.Trace("FileSystemDataFeed.Exit(): Exit Finished.");
281  }
282  }
283 
284  /// <summary>
285  /// Configure the enumerator with aggregation/fill-forward/filter behaviors. Returns new instance if re-configured
286  /// </summary>
287  protected IEnumerator<BaseData> ConfigureEnumerator(SubscriptionRequest request, bool aggregate, IEnumerator<BaseData> enumerator, Resolution? fillForwardResolution)
288  {
289  if (aggregate)
290  {
291  enumerator = new BaseDataCollectionAggregatorEnumerator(enumerator, request.Configuration.Symbol);
292  }
293 
294  enumerator = TryAddFillForwardEnumerator(request, enumerator, request.Configuration.FillDataForward, fillForwardResolution);
295 
296  // optionally apply exchange/user filters
298  {
299  enumerator = SubscriptionFilterEnumerator.WrapForDataFeed(_resultHandler, enumerator, request.Security,
300  request.EndTimeLocal, request.Configuration.ExtendedMarketHours, false, request.ExchangeHours);
301  }
302 
303  return enumerator;
304  }
305 
306  /// <summary>
307  /// Will add a fill forward enumerator if requested
308  /// </summary>
309  protected IEnumerator<BaseData> TryAddFillForwardEnumerator(SubscriptionRequest request, IEnumerator<BaseData> enumerator, bool fillForward, Resolution? fillForwardResolution)
310  {
311  // optionally apply fill forward logic, but never for tick data
312  if (fillForward && request.Configuration.Resolution != Resolution.Tick)
313  {
314  // copy forward Bid/Ask bars for QuoteBars
315  if (request.Configuration.Type == typeof(QuoteBar))
316  {
317  enumerator = new QuoteBarFillForwardEnumerator(enumerator);
318  }
319 
320  var fillForwardSpan = _subscriptions.UpdateAndGetFillForwardResolution(request.Configuration);
321  if (fillForwardResolution != null && fillForwardResolution != Resolution.Tick)
322  {
323  // if we are giving a FFspan we use it instead of the collection based one. This is useful during warmup when the warmup resolution has been set
324  fillForwardSpan = Ref.Create(fillForwardResolution.Value.ToTimeSpan());
325  }
326 
327  // Pass the security exchange hours explicitly to avoid using the ones in the request, since
328  // those could be different. e.g. when requests are created for open interest data the exchange
329  // hours are set to always open to avoid OI data being filtered out due to the exchange being closed.
330  // This way we allow OI data to be fill-forwarded to the market close time when strict end times is enabled,
331  // so that OI data is available at the same time as trades and quotes.
332  var useDailyStrictEndTimes = LeanData.UseDailyStrictEndTimes(_algorithm.Settings, request, request.Configuration.Symbol,
333  request.Configuration.Increment, request.Security.Exchange.Hours);
334  enumerator = new FillForwardEnumerator(enumerator, request.Security.Exchange, fillForwardSpan,
336  request.Configuration.DataTimeZone, useDailyStrictEndTimes, request.Configuration.Type);
337  }
338 
339  return enumerator;
340  }
341  }
342 }