Lean  $LEAN_TAG$
DownloaderDataProvider.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using NodaTime;
19 using System.IO;
20 using System.Linq;
21 using QuantConnect.Util;
22 using QuantConnect.Data;
23 using QuantConnect.Logging;
26 using System.Collections.Generic;
28 using System.Collections.Concurrent;
29 
31 {
32  /// <summary>
33  /// Data provider which downloads data using an <see cref="IDataDownloader"/> or <see cref="IBrokerage"/> implementation
34  /// </summary>
36  {
37  /// <summary>
38  /// Synchronizer in charge of guaranteeing a single operation per file path
39  /// </summary>
40  private readonly static KeyStringSynchronizer DiskSynchronizer = new();
41 
42  private bool _customDataDownloadError;
43  private readonly ConcurrentDictionary<Symbol, Symbol> _marketHoursWarning = new();
44  private readonly MarketHoursDatabase _marketHoursDatabase = MarketHoursDatabase.FromDataFolder();
45  private readonly IDataDownloader _dataDownloader;
46  private readonly IDataCacheProvider _dataCacheProvider = new DiskDataCacheProvider(DiskSynchronizer);
47  private readonly IMapFileProvider _mapFileProvider = Composer.Instance.GetPart<IMapFileProvider>();
48 
49  /// <summary>
50  /// Creates a new instance
51  /// </summary>
53  {
54  var dataDownloaderConfig = Config.Get("data-downloader");
55  if (!string.IsNullOrEmpty(dataDownloaderConfig))
56  {
57  _dataDownloader = Composer.Instance.GetExportedValueByTypeName<IDataDownloader>(dataDownloaderConfig);
58  }
59  else
60  {
61  throw new ArgumentException("DownloaderDataProvider(): requires 'data-downloader' to be set with a valid type name");
62  }
63  }
64 
65  /// <summary>
66  /// Creates a new instance using a target data downloader used for testing
67  /// </summary>
68  public DownloaderDataProvider(IDataDownloader dataDownloader)
69  {
70  _dataDownloader = dataDownloader;
71  }
72 
73  /// <summary>
74  /// Determines if it should downloads new data and retrieves data from disc
75  /// </summary>
76  /// <param name="key">A string representing where the data is stored</param>
77  /// <returns>A <see cref="Stream"/> of the data requested</returns>
78  public override Stream Fetch(string key)
79  {
80  return DownloadOnce(key, s =>
81  {
82  if (LeanData.TryParsePath(key, out var symbol, out var date, out var resolution, out var tickType, out var dataType))
83  {
84  if (symbol.SecurityType == SecurityType.Base)
85  {
86  if (!_customDataDownloadError)
87  {
88  _customDataDownloadError = true;
89  // lean data writter doesn't support it
90  Log.Trace($"DownloaderDataProvider.Get(): custom data is not supported, requested: {symbol}");
91  }
92  return;
93  }
94 
96  try
97  {
98  entry = _marketHoursDatabase.GetEntry(symbol.ID.Market, symbol, symbol.SecurityType);
99  }
100  catch
101  {
102  // this could happen for some sources using the data provider but with not market hours data base entry, like interest rates
103  if (_marketHoursWarning.TryAdd(symbol, symbol))
104  {
105  // log once
106  Log.Trace($"DownloaderDataProvider.Get(): failed to find market hours for {symbol}, skipping");
107  }
108  // this shouldn't happen for data we want can download
109  return;
110  }
111 
112  var dataTimeZone = entry.DataTimeZone;
113  var exchangeTimeZone = entry.ExchangeHours.TimeZone;
114  DateTime startTimeUtc;
115  DateTime endTimeUtc;
116  // we will download until yesterday so we are sure we don't get partial data
117  var endTimeUtcLimit = DateTime.UtcNow.Date.AddDays(-1);
118  if (resolution < Resolution.Hour)
119  {
120  // we can get the date from the path
121  startTimeUtc = date.ConvertToUtc(dataTimeZone);
122  // let's get the whole day
123  endTimeUtc = date.AddDays(1).ConvertToUtc(dataTimeZone);
124  if (endTimeUtc > endTimeUtcLimit)
125  {
126  // we are at the limit, avoid getting partial data
127  return;
128  }
129  }
130  else
131  {
132  // since hourly & daily are a single file we fetch the whole file
133  endTimeUtc = endTimeUtcLimit;
134  try
135  {
136  // we don't really know when Futures, FutureOptions, Cryptos, etc, start date so let's give it a good guess
137  if (symbol.SecurityType == SecurityType.Crypto)
138  {
139  // bitcoin start
140  startTimeUtc = new DateTime(2009, 1, 1);
141  }
142  else if (symbol.SecurityType.IsOption() && symbol.SecurityType != SecurityType.FutureOption)
143  {
144  // For options, an hourly or daily file contains a year of data, so we need to get the year of the date
145  startTimeUtc = new DateTime(date.Year, 1, 1);
146  endTimeUtc = startTimeUtc.AddYears(1);
147  }
148  else
149  {
150  startTimeUtc = symbol.ID.Date;
151  }
152  }
153  catch (InvalidOperationException)
154  {
155  startTimeUtc = Time.Start;
156  }
157 
158  if (startTimeUtc < Time.Start)
159  {
160  startTimeUtc = Time.Start;
161  }
162 
163  if (endTimeUtc > endTimeUtcLimit)
164  {
165  endTimeUtc = endTimeUtcLimit;
166  }
167  }
168 
169  try
170  {
171  LeanDataWriter writer = null;
172  var getParams = new DataDownloaderGetParameters(symbol, resolution, startTimeUtc, endTimeUtc, tickType);
173 
174  var downloaderDataParameters = getParams.GetDataDownloaderParameterForAllMappedSymbols(_mapFileProvider, exchangeTimeZone);
175 
176  var downloadedData = GetDownloadedData(downloaderDataParameters, symbol, exchangeTimeZone, dataTimeZone, dataType);
177 
178  foreach (var dataPerSymbol in downloadedData)
179  {
180  if (writer == null)
181  {
182  writer = new LeanDataWriter(resolution, symbol, Globals.DataFolder, tickType, mapSymbol: true, dataCacheProvider: _dataCacheProvider);
183  }
184  // Save the data
185  writer.Write(dataPerSymbol);
186  }
187  }
188  catch (Exception e)
189  {
190  Log.Error(e);
191  }
192  }
193  });
194  }
195 
196  /// <summary>
197  /// Retrieves downloaded data grouped by symbol based on <see cref="IDownloadProvider"/>.
198  /// </summary>
199  /// <param name="downloaderDataParameters">Parameters specifying the data to be retrieved.</param>
200  /// <param name="symbol">Represents a unique security identifier, generate by ticker name.</param>
201  /// <param name="exchangeTimeZone">The time zone of the exchange where the symbol is traded.</param>
202  /// <param name="dataTimeZone">The time zone in which the data is represented.</param>
203  /// <param name="dataType">The type of data to be retrieved. (e.g. <see cref="Data.Market.TradeBar"/>)</param>
204  /// <returns>An IEnumerable containing groups of data grouped by symbol. Each group contains data related to a specific symbol.</returns>
205  /// <exception cref="ArgumentException"> Thrown when the downloaderDataParameters collection is null or empty.</exception>
206  public IEnumerable<IGrouping<Symbol, BaseData>> GetDownloadedData(
207  IEnumerable<DataDownloaderGetParameters> downloaderDataParameters,
208  Symbol symbol,
209  DateTimeZone exchangeTimeZone,
210  DateTimeZone dataTimeZone,
211  Type dataType)
212  {
213  if (downloaderDataParameters.IsNullOrEmpty())
214  {
215  throw new ArgumentException($"{nameof(DownloaderDataProvider)}.{nameof(GetDownloadedData)}: DataDownloaderGetParameters are empty or equal to null.");
216  }
217 
218  foreach (var downloaderDataParameter in downloaderDataParameters)
219  {
220  var downloadedData = _dataDownloader.Get(downloaderDataParameter);
221 
222  if (downloadedData == null)
223  {
224  // doesn't support this download request, that's okay
225  continue;
226  }
227 
228  var groupedData = FilterAndGroupDownloadDataBySymbol(
229  downloadedData,
230  symbol,
231  dataType,
232  exchangeTimeZone,
233  dataTimeZone,
234  downloaderDataParameter.StartUtc,
235  downloaderDataParameter.EndUtc);
236 
237  foreach (var data in groupedData)
238  {
239  yield return data;
240  }
241  }
242  }
243 
244  /// <summary>
245  /// Get's the stream for a given file path
246  /// </summary>
247  protected override Stream GetStream(string key)
248  {
249  if (LeanData.TryParsePath(key, out var symbol, out var date, out var resolution) && resolution > Resolution.Minute && symbol.RequiresMapping())
250  {
251  // because the file could be updated even after it's created because of symbol mapping we can't stream from disk
252  return DiskSynchronizer.Execute(key, () =>
253  {
254  var baseStream = base.Fetch(key);
255  if (baseStream != null)
256  {
257  var result = new MemoryStream();
258  baseStream.CopyTo(result);
259  baseStream.Dispose();
260  // move position back to the start
261  result.Position = 0;
262 
263  return result;
264  }
265  return null;
266  });
267  }
268 
269  return base.Fetch(key);
270  }
271 
272  /// <summary>
273  /// Main filter to determine if this file needs to be downloaded
274  /// </summary>
275  /// <param name="filePath">File we are looking at</param>
276  /// <returns>True if should download</returns>
277  protected override bool NeedToDownload(string filePath)
278  {
279  // Ignore null and invalid data requests
280  if (filePath == null
281  || filePath.Contains("fine", StringComparison.InvariantCultureIgnoreCase) && filePath.Contains("fundamental", StringComparison.InvariantCultureIgnoreCase)
282  || filePath.Contains("map_files", StringComparison.InvariantCultureIgnoreCase)
283  || filePath.Contains("factor_files", StringComparison.InvariantCultureIgnoreCase)
284  || filePath.Contains("margins", StringComparison.InvariantCultureIgnoreCase) && filePath.Contains("future", StringComparison.InvariantCultureIgnoreCase))
285  {
286  return false;
287  }
288 
289  // Only download if it doesn't exist or is out of date.
290  // Files are only "out of date" for non date based files (hour, daily, margins, etc.) because this data is stored all in one file
291  return !File.Exists(filePath) || filePath.IsOutOfDate();
292  }
293 
294  /// <summary>
295  /// Filters and groups the provided download data by symbol, based on specified criteria.
296  /// </summary>
297  /// <param name="downloadData">The collection of download data to process.</param>
298  /// <param name="symbol">The symbol to filter the data for.</param>
299  /// <param name="dataType">The type of data to filter for.</param>
300  /// <param name="exchangeTimeZone">The time zone of the exchange.</param>
301  /// <param name="dataTimeZone">The desired time zone for the data.</param>
302  /// <param name="downloaderStartTimeUtc">The start time of data downloading in UTC.</param>
303  /// <param name="downloaderEndTimeUtc">The end time of data downloading in UTC.</param>
304  /// <returns>
305  /// An enumerable collection of groupings of download data, grouped by symbol.
306  /// </returns>
307  public static IEnumerable<IGrouping<Symbol, BaseData>> FilterAndGroupDownloadDataBySymbol(
308  IEnumerable<BaseData> downloadData,
309  Symbol symbol,
310  Type dataType,
311  DateTimeZone exchangeTimeZone,
312  DateTimeZone dataTimeZone,
313  DateTime downloaderStartTimeUtc,
314  DateTime downloaderEndTimeUtc)
315  {
316  var startDateTimeInExchangeTimeZone = downloaderStartTimeUtc.ConvertFromUtc(exchangeTimeZone);
317  var endDateTimeInExchangeTimeZone = downloaderEndTimeUtc.ConvertFromUtc(exchangeTimeZone);
318 
319  return downloadData
320  .Where(baseData =>
321  {
322  // Sometimes, external Downloader provider returns excess data
323  if (baseData.Time < startDateTimeInExchangeTimeZone || baseData.Time > endDateTimeInExchangeTimeZone)
324  {
325  return false;
326  }
327 
328  if (symbol.SecurityType == SecurityType.Base || baseData.GetType() == dataType)
329  {
330  // we need to store the data in data time zone
331  baseData.Time = baseData.Time.ConvertTo(exchangeTimeZone, dataTimeZone);
332  baseData.EndTime = baseData.EndTime.ConvertTo(exchangeTimeZone, dataTimeZone);
333  return true;
334  }
335  return false;
336  })
337  // for canonical symbols, downloader will return data for all of the chain
338  .GroupBy(baseData => baseData.Symbol);
339  }
340  }
341 }