Lean  $LEAN_TAG$
FileSystemDataFeed.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Collections.Generic;
19 using System.Linq;
20 using QuantConnect.Data;
29 using QuantConnect.Logging;
30 using QuantConnect.Packets;
32 using QuantConnect.Util;
33 
35 {
36  /// <summary>
37  /// Historical datafeed stream reader for processing files on a local disk.
38  /// </summary>
39  /// <remarks>Filesystem datafeeds are incredibly fast</remarks>
41  {
42  private IAlgorithm _algorithm;
43  private ITimeProvider _timeProvider;
44  private IResultHandler _resultHandler;
45  private IMapFileProvider _mapFileProvider;
46  private IFactorFileProvider _factorFileProvider;
47  private IDataProvider _dataProvider;
48  private IDataCacheProvider _cacheProvider;
49  private SubscriptionCollection _subscriptions;
50  private MarketHoursDatabase _marketHoursDatabase;
51  private SubscriptionDataReaderSubscriptionEnumeratorFactory _subscriptionFactory;
52 
53  /// <summary>
54  /// Flag indicating the hander thread is completely finished and ready to dispose.
55  /// </summary>
56  public bool IsActive { get; private set; }
57 
58  /// <summary>
59  /// Initializes the data feed for the specified job and algorithm
60  /// </summary>
61  public virtual void Initialize(IAlgorithm algorithm,
63  IResultHandler resultHandler,
64  IMapFileProvider mapFileProvider,
65  IFactorFileProvider factorFileProvider,
66  IDataProvider dataProvider,
67  IDataFeedSubscriptionManager subscriptionManager,
68  IDataFeedTimeProvider dataFeedTimeProvider,
69  IDataChannelProvider dataChannelProvider)
70  {
71  _algorithm = algorithm;
72  _resultHandler = resultHandler;
73  _mapFileProvider = mapFileProvider;
74  _factorFileProvider = factorFileProvider;
75  _dataProvider = dataProvider;
76  _timeProvider = dataFeedTimeProvider.FrontierTimeProvider;
77  _subscriptions = subscriptionManager.DataFeedSubscriptions;
78  _cacheProvider = new ZipDataCacheProvider(dataProvider, isDataEphemeral: false);
79  _subscriptionFactory = new SubscriptionDataReaderSubscriptionEnumeratorFactory(
80  _resultHandler,
81  _mapFileProvider,
82  _factorFileProvider,
83  _cacheProvider,
84  algorithm,
85  enablePriceScaling: false);
86 
87  IsActive = true;
88  _marketHoursDatabase = MarketHoursDatabase.FromDataFolder();
89  }
90 
91  /// <summary>
92  /// Creates a file based data enumerator for the given subscription request
93  /// </summary>
94  /// <remarks>Protected so it can be used by the <see cref="LiveTradingDataFeed"/> to warmup requests</remarks>
95  protected IEnumerator<BaseData> CreateEnumerator(SubscriptionRequest request, Resolution? fillForwardResolution = null)
96  {
97  return request.IsUniverseSubscription ? CreateUniverseEnumerator(request, CreateDataEnumerator, fillForwardResolution) : CreateDataEnumerator(request, fillForwardResolution);
98  }
99 
100  private IEnumerator<BaseData> CreateDataEnumerator(SubscriptionRequest request, Resolution? fillForwardResolution)
101  {
102  // ReSharper disable once PossibleMultipleEnumeration
103  if (!request.TradableDaysInDataTimeZone.Any())
104  {
105  _algorithm.Error(
106  $"No data loaded for {request.Security.Symbol} because there were no tradeable dates for this security."
107  );
108  return null;
109  }
110 
111  // ReSharper disable once PossibleMultipleEnumeration
112  var enumerator = _subscriptionFactory.CreateEnumerator(request, _dataProvider);
113  enumerator = ConfigureEnumerator(request, false, enumerator, fillForwardResolution);
114 
115  return enumerator;
116  }
117 
118  /// <summary>
119  /// Creates a new subscription to provide data for the specified security.
120  /// </summary>
121  /// <param name="request">Defines the subscription to be added, including start/end times the universe and security</param>
122  /// <returns>The created <see cref="Subscription"/> if successful, null otherwise</returns>
124  {
125  IEnumerator<BaseData> enumerator;
126  if(_algorithm.IsWarmingUp)
127  {
128  var pivotTimeUtc = _algorithm.StartDate.ConvertToUtc(_algorithm.TimeZone);
129 
130  var warmupRequest = new SubscriptionRequest(request, endTimeUtc: pivotTimeUtc,
131  configuration: new SubscriptionDataConfig(request.Configuration, resolution: _algorithm.Settings.WarmupResolution));
132  IEnumerator<BaseData> warmupEnumerator = null;
133  if (warmupRequest.TradableDaysInDataTimeZone.Any()
134  // since we change the resolution, let's validate it's still valid configuration (example daily equity quotes are not!)
135  && LeanData.IsValidConfiguration(warmupRequest.Configuration.SecurityType, warmupRequest.Configuration.Resolution, warmupRequest.Configuration.TickType))
136  {
137  // let them overlap a day if possible to avoid data gaps since each request will FFed it's own since they are different resolutions
138  pivotTimeUtc = Time.GetStartTimeForTradeBars(request.Security.Exchange.Hours,
139  _algorithm.StartDate.ConvertTo(_algorithm.TimeZone, request.Security.Exchange.TimeZone),
140  Time.OneDay,
141  1,
142  false,
143  warmupRequest.Configuration.DataTimeZone,
144  LeanData.UseDailyStrictEndTimes(_algorithm.Settings, request, request.Security.Symbol, Time.OneDay))
145  .ConvertToUtc(request.Security.Exchange.TimeZone);
146  if (pivotTimeUtc < warmupRequest.StartTimeUtc)
147  {
148  pivotTimeUtc = warmupRequest.StartTimeUtc;
149  }
150 
151  warmupEnumerator = CreateEnumerator(warmupRequest, _algorithm.Settings.WarmupResolution);
152  // don't let future data past
153  warmupEnumerator = new FilterEnumerator<BaseData>(warmupEnumerator, data => data == null || data.EndTime <= warmupRequest.EndTimeLocal);
154  }
155 
156  var normalEnumerator = CreateEnumerator(new SubscriptionRequest(request, startTimeUtc: pivotTimeUtc));
157  // don't let pre start data pass, since we adjust start so they overlap 1 day let's not let this data pass, we just want it for fill forwarding after the target start
158  // this is also useful to drop any initial selection point which was already emitted during warmup
159  normalEnumerator = new FilterEnumerator<BaseData>(normalEnumerator, data => data == null || data.EndTime >= warmupRequest.EndTimeLocal);
160 
161  // after the warmup enumerator we concatenate the 'normal' one
162  enumerator = new ConcatEnumerator(true, warmupEnumerator, normalEnumerator);
163  }
164  else
165  {
166  enumerator = CreateEnumerator(request);
167  }
168 
169  enumerator = AddScheduleWrapper(request, enumerator, null);
170 
171  if (request.IsUniverseSubscription && request.Universe is UserDefinedUniverse)
172  {
173  // for user defined universe we do not use a worker task, since calls to AddData can happen in any moment
174  // and we have to be able to inject selection data points into the enumerator
175  return SubscriptionUtils.Create(request, enumerator, _algorithm.Settings.DailyPreciseEndTime);
176  }
177  return SubscriptionUtils.CreateAndScheduleWorker(request, enumerator, _factorFileProvider, true, _algorithm.Settings.DailyPreciseEndTime);
178  }
179 
180  /// <summary>
181  /// Removes the subscription from the data feed, if it exists
182  /// </summary>
183  /// <param name="subscription">The subscription to remove</param>
184  public virtual void RemoveSubscription(Subscription subscription)
185  {
186  }
187 
188  /// <summary>
189  /// Creates a universe enumerator from the Subscription request, the underlying enumerator func and the fill forward resolution (in some cases)
190  /// </summary>
191  protected IEnumerator<BaseData> CreateUniverseEnumerator(SubscriptionRequest request, Func<SubscriptionRequest, Resolution?, IEnumerator<BaseData>> createUnderlyingEnumerator, Resolution? fillForwardResolution = null)
192  {
193  ISubscriptionEnumeratorFactory factory = _subscriptionFactory;
194  if (request.Universe is ITimeTriggeredUniverse)
195  {
197  _marketHoursDatabase,
198  _timeProvider);
199  }
200  else if (request.Configuration.Type == typeof(FundamentalUniverse))
201  {
203  }
204  else if (request.Configuration.Type == typeof(ZipEntryName))
205  {
206  // TODO: subscription should already come in correctly built
207  var resolution = request.Configuration.Resolution == Resolution.Tick ? Resolution.Second : request.Configuration.Resolution;
208 
209  // TODO: subscription should already come in as fill forward true
210  request = new SubscriptionRequest(request, configuration: new SubscriptionDataConfig(request.Configuration, fillForward: true, resolution: resolution));
211 
212  var result = new BaseDataSubscriptionEnumeratorFactory(_algorithm.OptionChainProvider, _algorithm.FutureChainProvider)
213  .CreateEnumerator(request, _dataProvider);
214 
215  if (LeanData.UseDailyStrictEndTimes(_algorithm.Settings, request, request.Configuration.Symbol, request.Configuration.Increment))
216  {
217  result = new StrictDailyEndTimesEnumerator(result, request.ExchangeHours, request.StartTimeLocal);
218  }
219  result = ConfigureEnumerator(request, true, result, fillForwardResolution);
220  return TryAppendUnderlyingEnumerator(request, result, createUnderlyingEnumerator, fillForwardResolution);
221  }
222 
223  // define our data enumerator
224  var enumerator = factory.CreateEnumerator(request, _dataProvider);
225  return enumerator;
226  }
227 
228  /// <summary>
229  /// Returns a scheduled enumerator from the given arguments. It can also return the given underlying enumerator
230  /// </summary>
231  protected IEnumerator<BaseData> AddScheduleWrapper(SubscriptionRequest request, IEnumerator<BaseData> underlying, ITimeProvider timeProvider)
232  {
234  {
235  return underlying;
236  }
237 
238  var schedule = request.Universe.UniverseSettings.Schedule.Get(request.StartTimeLocal, request.EndTimeLocal);
239  if (schedule != null)
240  {
241  return new ScheduledEnumerator(underlying, schedule, timeProvider, request.Configuration.ExchangeTimeZone, request.StartTimeLocal);
242  }
243  return underlying;
244  }
245 
246  /// <summary>
247  /// If required will add a new enumerator for the underlying symbol
248  /// </summary>
249  protected IEnumerator<BaseData> TryAppendUnderlyingEnumerator(SubscriptionRequest request, IEnumerator<BaseData> parent, Func<SubscriptionRequest, Resolution?, IEnumerator<BaseData>> createEnumerator, Resolution? fillForwardResolution)
250  {
251  if (request.Configuration.Symbol.SecurityType.IsOption() && request.Configuration.Symbol.HasUnderlying)
252  {
253  var underlyingSymbol = request.Configuration.Symbol.Underlying;
254  var underlyingMarketHours = _marketHoursDatabase.GetEntry(underlyingSymbol.ID.Market, underlyingSymbol, underlyingSymbol.SecurityType);
255 
256  // TODO: creating this subscription request/config is bad
257  var underlyingRequests = new SubscriptionRequest(request,
258  isUniverseSubscription: false,
259  configuration: new SubscriptionDataConfig(request.Configuration, symbol: underlyingSymbol, objectType: typeof(TradeBar), tickType: TickType.Trade,
260  // there's no guarantee the TZ are the same, specially the data timezone (index & index options)
261  dataTimeZone: underlyingMarketHours.DataTimeZone,
262  exchangeTimeZone: underlyingMarketHours.ExchangeHours.TimeZone));
263 
264  var underlying = createEnumerator(underlyingRequests, fillForwardResolution);
265  underlying = new FilterEnumerator<BaseData>(underlying, data => data.DataType != MarketDataType.Auxiliary);
266 
267  parent = new SynchronizingBaseDataEnumerator(parent, underlying);
268  // we aggregate both underlying and chain data
269  parent = new BaseDataCollectionAggregatorEnumerator(parent, request.Configuration.Symbol);
270  // only let through if underlying and chain data present
271  parent = new FilterEnumerator<BaseData>(parent, data => (data as BaseDataCollection).Underlying != null);
272  parent = ConfigureEnumerator(request, false, parent, fillForwardResolution);
273  }
274 
275  return parent;
276  }
277 
278  /// <summary>
279  /// Send an exit signal to the thread.
280  /// </summary>
281  public virtual void Exit()
282  {
283  if (IsActive)
284  {
285  IsActive = false;
286  Log.Trace("FileSystemDataFeed.Exit(): Start. Setting cancellation token...");
287  _subscriptionFactory?.DisposeSafely();
288  _cacheProvider.DisposeSafely();
289  Log.Trace("FileSystemDataFeed.Exit(): Exit Finished.");
290  }
291  }
292 
293  /// <summary>
294  /// Configure the enumerator with aggregation/fill-forward/filter behaviors. Returns new instance if re-configured
295  /// </summary>
296  protected IEnumerator<BaseData> ConfigureEnumerator(SubscriptionRequest request, bool aggregate, IEnumerator<BaseData> enumerator, Resolution? fillForwardResolution)
297  {
298  if (aggregate)
299  {
300  enumerator = new BaseDataCollectionAggregatorEnumerator(enumerator, request.Configuration.Symbol);
301  }
302 
303  enumerator = TryAddFillForwardEnumerator(request, enumerator, request.Configuration.FillDataForward, fillForwardResolution);
304 
305  // optionally apply exchange/user filters
307  {
308  enumerator = SubscriptionFilterEnumerator.WrapForDataFeed(_resultHandler, enumerator, request.Security,
309  request.EndTimeLocal, request.Configuration.ExtendedMarketHours, false, request.ExchangeHours);
310  }
311 
312  return enumerator;
313  }
314 
315  /// <summary>
316  /// Will add a fill forward enumerator if requested
317  /// </summary>
318  protected IEnumerator<BaseData> TryAddFillForwardEnumerator(SubscriptionRequest request, IEnumerator<BaseData> enumerator, bool fillForward, Resolution? fillForwardResolution)
319  {
320  // optionally apply fill forward logic, but never for tick data
321  if (fillForward && request.Configuration.Resolution != Resolution.Tick)
322  {
323  // copy forward Bid/Ask bars for QuoteBars
324  if (request.Configuration.Type == typeof(QuoteBar))
325  {
326  enumerator = new QuoteBarFillForwardEnumerator(enumerator);
327  }
328 
329  var fillForwardSpan = _subscriptions.UpdateAndGetFillForwardResolution(request.Configuration);
330  if (fillForwardResolution != null && fillForwardResolution != Resolution.Tick)
331  {
332  // if we are giving a FFspan we use it instead of the collection based one. This is useful during warmup when the warmup resolution has been set
333  fillForwardSpan = Ref.Create(fillForwardResolution.Value.ToTimeSpan());
334  }
335 
336  // Pass the security exchange hours explicitly to avoid using the ones in the request, since
337  // those could be different. e.g. when requests are created for open interest data the exchange
338  // hours are set to always open to avoid OI data being filtered out due to the exchange being closed.
339  // This way we allow OI data to be fill-forwarded to the market close time when strict end times is enabled,
340  // so that OI data is available at the same time as trades and quotes.
341  var useDailyStrictEndTimes = LeanData.UseDailyStrictEndTimes(_algorithm.Settings, request, request.Configuration.Symbol,
342  request.Configuration.Increment, request.Security.Exchange.Hours);
343  enumerator = new FillForwardEnumerator(enumerator, request.Security.Exchange, fillForwardSpan,
345  request.Configuration.DataTimeZone, useDailyStrictEndTimes, request.Configuration.Type);
346  }
347 
348  return enumerator;
349  }
350  }
351 }