Lean  $LEAN_TAG$
ZipDataCacheProvider.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.IO;
18 using Ionic.Zip;
19 using Ionic.Zlib;
20 using System.Linq;
21 using System.Threading;
22 using QuantConnect.Util;
23 using QuantConnect.Logging;
25 using System.Collections.Generic;
26 using System.Collections.Concurrent;
28 
30 {
31  /// <summary>
32  /// File provider implements optimized zip archives caching facility. Cache is thread safe.
33  /// </summary>
35  {
36  private readonly double _cacheSeconds;
37 
38  // ZipArchive cache used by the class
39  private readonly ConcurrentDictionary<string, CachedZipFile> _zipFileCache = new ConcurrentDictionary<string, CachedZipFile>();
40  private readonly IDataProvider _dataProvider;
41  private readonly Timer _cacheCleaner;
42 
43  /// <summary>
44  /// Property indicating the data is temporary in nature and should not be cached.
45  /// </summary>
46  public bool IsDataEphemeral { get; }
47 
48  /// <summary>
49  /// Constructor that sets the <see cref="IDataProvider"/> used to retrieve data
50  /// </summary>
51  public ZipDataCacheProvider(IDataProvider dataProvider, bool isDataEphemeral = true, double cacheTimer = double.NaN)
52  {
53  IsDataEphemeral = isDataEphemeral;
54  _cacheSeconds = double.IsNaN(cacheTimer) ? Config.GetDouble("zip-data-cache-provider", 10) : cacheTimer;
55  _dataProvider = dataProvider;
56  _cacheCleaner = new Timer(state => CleanCache(), null, TimeSpan.FromSeconds(_cacheSeconds), Timeout.InfiniteTimeSpan);
57  }
58 
59  /// <summary>
60  /// Does not attempt to retrieve any data
61  /// </summary>
62  public Stream Fetch(string key)
63  {
64  LeanData.ParseKey(key, out var filename, out var entryName);
65 
66  // handles zip files
67  if (filename.EndsWith(".zip", StringComparison.InvariantCulture))
68  {
69  Stream stream = null;
70 
71  try
72  {
73  CachedZipFile existingZip;
74  if (!_zipFileCache.TryGetValue(filename, out existingZip))
75  {
76  stream = CacheAndCreateEntryStream(filename, entryName);
77  }
78  else
79  {
80  try
81  {
82  lock (existingZip)
83  {
84  if (existingZip.Disposed)
85  {
86  // bad luck, thread race condition
87  // it was disposed and removed after we got it
88  // so lets create it again and add it
89  stream = CacheAndCreateEntryStream(filename, entryName);
90  }
91  else
92  {
93  existingZip.Refresh();
94  stream = CreateEntryStream(existingZip, entryName, filename);
95  }
96  }
97  }
98  catch (Exception exception)
99  {
100  if (exception is ZipException || exception is ZlibException)
101  {
102  Log.Error("ZipDataCacheProvider.Fetch(): Corrupt zip file/entry: " + filename + "#" + entryName + " Error: " + exception);
103  }
104  else throw;
105  }
106  }
107 
108  return stream;
109  }
110  catch (Exception err)
111  {
112  Log.Error(err, "Inner try/catch");
113  stream?.DisposeSafely();
114  return null;
115  }
116  }
117  else
118  {
119  // handles text files
120  return _dataProvider.Fetch(filename);
121  }
122  }
123 
124  /// <summary>
125  /// Store the data in the cache.
126  /// </summary>
127  /// <param name="key">The source of the data, used as a key to retrieve data in the cache</param>
128  /// <param name="data">The data as a byte array</param>
129  public void Store(string key, byte[] data)
130  {
131  LeanData.ParseKey(key, out var fileName, out var entryName);
132 
133  // We only support writing to zips with this provider, we also need an entryName to write
134  // Return silently because RemoteFileSubscriptionStreamReader depends on this function not throwing.
135  if (!fileName.EndsWith(".zip", StringComparison.InvariantCulture) || entryName == null)
136  {
137  return;
138  }
139 
140  // Only allow one thread at a time to modify our cache
141  lock (_zipFileCache)
142  {
143  // If its not in the cache, and can not be cached we need to create it
144  if (!_zipFileCache.TryGetValue(fileName, out var cachedZip) && !Cache(fileName, out cachedZip))
145  {
146  // Create the zip, if successful, cache it for later use
147  if (Compression.ZipCreateAppendData(fileName, entryName, data))
148  {
149  Cache(fileName, out _);
150  }
151  else
152  {
153  throw new InvalidOperationException($"Failed to store data {fileName}#{entryName}");
154  }
155 
156  return;
157  }
158 
159  lock (cachedZip)
160  {
161  if (cachedZip.Disposed)
162  {
163  // if disposed and we have the lock means it's not in the dictionary anymore, let's assert it
164  // but there is a window for another thread to add a **new/different** instance which is okay
165  // we will pick it up on the store call bellow
166  if (_zipFileCache.TryGetValue(fileName, out var existing) && ReferenceEquals(existing, cachedZip))
167  {
168  Log.Error($"ZipDataCacheProvider.Store(): unexpected cache state for {fileName}");
169  throw new InvalidOperationException(
170  "LEAN entered an unexpected state. Please contact support@quantconnect.com so we may debug this further.");
171  }
172  Store(key, data);
173  }
174  else
175  {
176  cachedZip.WriteEntry(entryName, data);
177  }
178  }
179  }
180  }
181 
182  /// <summary>
183  /// Returns a list of zip entries in a provided zip file
184  /// </summary>
185  public List<string> GetZipEntries(string zipFile)
186  {
187  if (!_zipFileCache.TryGetValue(zipFile, out var cachedZip))
188  {
189  if (!Cache(zipFile, out cachedZip))
190  {
191  throw new ArgumentException($"Failed to get zip entries from {zipFile}");
192  }
193  }
194 
195  lock (cachedZip)
196  {
197  cachedZip.Refresh();
198  return cachedZip.EntryCache.Keys.ToList();
199  }
200  }
201 
202  /// <summary>
203  /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
204  /// </summary>
205  /// <filterpriority>2</filterpriority>
206  public void Dispose()
207  {
208  // stop the cache cleaner timer
209  _cacheCleaner.DisposeSafely();
210  CachedZipFile zip;
211  foreach (var zipFile in _zipFileCache)
212  {
213  if (_zipFileCache.TryRemove(zipFile.Key, out zip))
214  {
215  zip.DisposeSafely();
216  }
217  }
218  }
219 
220  /// <summary>
221  /// Remove items in the cache that are older than the cutoff date
222  /// </summary>
223  private void CleanCache()
224  {
225  var utcNow = DateTime.UtcNow;
226  try
227  {
228  var clearCacheIfOlderThan = utcNow.AddSeconds(-_cacheSeconds);
229  // clean all items that that are older than CacheSeconds than the current date
230  foreach (var zip in _zipFileCache)
231  {
232  if (zip.Value.Uncache(clearCacheIfOlderThan))
233  {
234  // only clear items if they are not being used
235  if (Monitor.TryEnter(zip.Value))
236  {
237  try
238  {
239  // we first dispose it since if written it will refresh the file on disk and we don't
240  // want anyone reading it directly which should be covered by the entry being in the cache
241  // and us holding the instance lock
242  zip.Value.Dispose();
243  // removing it from the cache
244  _zipFileCache.TryRemove(zip.Key, out _);
245  }
246  finally
247  {
248  Monitor.Exit(zip.Value);
249  }
250  }
251  }
252  }
253  }
254  finally
255  {
256  try
257  {
258  var nextDueTime = Time.GetSecondUnevenWait((int)Math.Ceiling(_cacheSeconds * 1000));
259  _cacheCleaner.Change(nextDueTime, Timeout.Infinite);
260  }
261  catch (ObjectDisposedException)
262  {
263  // ignored disposed
264  }
265  }
266  }
267 
268  private Stream CacheAndCreateEntryStream(string filename, string entryName)
269  {
270  Stream stream = null;
271  var dataStream = _dataProvider.Fetch(filename);
272 
273  if (dataStream != null)
274  {
275  try
276  {
277  var newItem = new CachedZipFile(dataStream, DateTime.UtcNow, filename);
278 
279  // here we don't need to lock over the cache item
280  // because it was still not added in the cache
281  stream = CreateEntryStream(newItem, entryName, filename);
282 
283  if (!_zipFileCache.TryAdd(filename, newItem))
284  {
285  // some other thread could of added it already, lets dispose ours
286  newItem.Dispose();
287  }
288  }
289  catch (Exception exception)
290  {
291  // don't leak the file stream!
292  dataStream.DisposeSafely();
293  if (exception is ZipException || exception is ZlibException)
294  {
295  Log.Error("ZipDataCacheProvider.Fetch(): Corrupt zip file/entry: " + filename + "#" + entryName + " Error: " + exception);
296  }
297  else throw;
298  }
299  }
300  return stream;
301  }
302 
303  /// <summary>
304  /// Create a stream of a specific ZipEntry
305  /// </summary>
306  /// <param name="zipFile">The zipFile containing the zipEntry</param>
307  /// <param name="entryName">The name of the entry</param>
308  /// <param name="fileName">The name of the zip file on disk</param>
309  /// <returns>A <see cref="Stream"/> of the appropriate zip entry</returns>
310  private Stream CreateEntryStream(CachedZipFile zipFile, string entryName, string fileName)
311  {
312  ZipEntryCache entryCache;
313  if (entryName == null)
314  {
315  entryCache = zipFile.EntryCache.FirstOrDefault().Value;
316  }
317  else
318  {
319  zipFile.EntryCache.TryGetValue(entryName, out entryCache);
320  }
321 
322  if (entryCache is { Modified: true })
323  {
324  // we want to read an entry in the zip that has be edited, we need to start over
325  // because of the zip library else it blows up, we need to call 'Save'
326  zipFile.Dispose();
327  _zipFileCache.Remove(fileName, out _);
328 
329  return CacheAndCreateEntryStream(fileName, entryName);
330  }
331 
332  var entry = entryCache?.Entry;
333 
334  if (entry != null)
335  {
336  var stream = new MemoryStream();
337 
338  try
339  {
340  stream.SetLength(entry.UncompressedSize);
341  }
342  catch (ArgumentOutOfRangeException)
343  {
344  // The needed size of the MemoryStream is longer than allowed.
345  // just read the data directly from the file.
346  // Note that we cannot use entry.OpenReader() because only one OpenReader
347  // can be open at a time without causing corruption.
348 
349  // We must use fileName instead of zipFile.Name,
350  // because zipFile is initialized from a stream and not a file.
351  var zipStream = new ZipInputStream(fileName);
352 
353  var zipEntry = zipStream.GetNextEntry();
354 
355  // The zip file was empty!
356  if (zipEntry == null)
357  {
358  return null;
359  }
360 
361  // Null entry name, return the first.
362  if (entryName == null)
363  {
364  return zipStream;
365  }
366 
367  // Non-default entry name, return matching one if it exists, otherwise null.
368  while (zipEntry != null)
369  {
370  if (string.Compare(zipEntry.FileName, entryName, StringComparison.OrdinalIgnoreCase) == 0)
371  {
372  return zipStream;
373  }
374 
375  zipEntry = zipStream.GetNextEntry();
376  }
377  }
378 
379  // extract directly into the stream
380  entry.Extract(stream);
381  stream.Position = 0;
382  return stream;
383  }
384 
385  return null;
386  }
387 
388  /// <summary>
389  /// Cache a Zip
390  /// </summary>
391  /// <param name="filename">Zip to cache</param>
392  /// <param name="cachedZip">The resulting CachedZipFile</param>
393  /// <returns></returns>
394  private bool Cache(string filename, out CachedZipFile cachedZip)
395  {
396  cachedZip = null;
397  var dataStream = _dataProvider.Fetch(filename);
398  if (dataStream != null)
399  {
400  try
401  {
402  cachedZip = new CachedZipFile(dataStream, DateTime.UtcNow, filename);
403 
404  if (!_zipFileCache.TryAdd(filename, cachedZip))
405  {
406  // some other thread could of added it already, lets dispose ours
407  cachedZip.Dispose();
408  return _zipFileCache.TryGetValue(filename, out cachedZip);
409  }
410 
411  return true;
412  }
413  catch (Exception exception)
414  {
415  if (exception is ZipException || exception is ZlibException)
416  {
417  Log.Error("ZipDataCacheProvider.Fetch(): Corrupt zip file/entry: " + filename + " Error: " + exception);
418  }
419  else throw;
420  }
421 
422  dataStream.Dispose();
423  }
424 
425  return false;
426  }
427 
428 
429  /// <summary>
430  /// Type for storing zipfile in cache
431  /// </summary>
432  private class CachedZipFile : IDisposable
433  {
434  private ReferenceWrapper<DateTime> _dateCached;
435  private readonly Stream _dataStream;
436  private readonly string _filePath;
437  private long _disposed;
438  private long _modified;
439 
440  /// <summary>
441  /// The ZipFile this object represents
442  /// </summary>
443  private readonly ZipFile _zipFile;
444 
445  /// <summary>
446  /// Contains all entries of the zip file by filename
447  /// </summary>
448  public readonly Dictionary<string, ZipEntryCache> EntryCache = new (StringComparer.OrdinalIgnoreCase);
449 
450  /// <summary>
451  /// Returns if this cached zip file is disposed
452  /// </summary>
453  public bool Disposed => Interlocked.Read(ref _disposed) != 0;
454 
455  /// <summary>
456  /// Initializes a new instance of the <see cref="CachedZipFile"/>
457  /// </summary>
458  /// <param name="dataStream">Stream containing the zip file</param>
459  /// <param name="utcNow">Current utc time</param>
460  /// <param name="filePath">Path of the zip file</param>
461  public CachedZipFile(Stream dataStream, DateTime utcNow, string filePath)
462  {
463  _dataStream = dataStream;
464  _zipFile = ZipFile.Read(dataStream);
465  _zipFile.UseZip64WhenSaving = Zip64Option.Always;
466  foreach (var entry in _zipFile.Entries)
467  {
468  EntryCache[entry.FileName] = new ZipEntryCache{ Entry = entry };
469  }
470  _dateCached = new ReferenceWrapper<DateTime>(utcNow);
471  _filePath = filePath;
472  }
473 
474  /// <summary>
475  /// Method used to check if this object was created before a certain time
476  /// </summary>
477  /// <param name="date">DateTime which is compared to the DateTime this object was created</param>
478  /// <returns>Bool indicating whether this object is older than the specified time</returns>
479  public bool Uncache(DateTime date)
480  {
481  return _dateCached.Value < date;
482  }
483 
484  /// <summary>
485  /// Write to this entry, will be updated on disk when uncached
486  /// Meaning either when timer finishes or on dispose
487  /// </summary>
488  /// <param name="entryName">Entry to write this as</param>
489  /// <param name="content">Content of the entry</param>
490  public void WriteEntry(string entryName, byte[] content)
491  {
492  Interlocked.Increment(ref _modified);
493  Refresh();
494 
495  // If the entry already exists remove it
496  if (_zipFile.ContainsEntry(entryName))
497  {
498  _zipFile.RemoveEntry(entryName);
499  EntryCache.Remove(entryName);
500  }
501 
502  // Write this entry to zip file
503  var newEntry = _zipFile.AddEntry(entryName, content);
504  EntryCache.Add(entryName, new ZipEntryCache { Entry = newEntry, Modified = true });
505  }
506 
507  /// <summary>
508  /// We refresh our cache time when used to avoid it being clean up
509  /// </summary>
510  public void Refresh()
511  {
512  _dateCached = new ReferenceWrapper<DateTime>(DateTime.UtcNow);
513  }
514 
515  /// <summary>
516  /// Dispose of the ZipFile
517  /// </summary>
518  public void Dispose()
519  {
520  if (Interlocked.CompareExchange(ref _disposed, 1, 0) == 1)
521  {
522  // compare will return the original value, if it's already 1 means already being disposed off
523  return;
524  }
525 
526  // If we changed this zip we need to save
527  string tempFileName = null;
528  var modified = Interlocked.Read(ref _modified) != 0;
529  if (modified)
530  {
531  // Write our changes to disk as temp
532  tempFileName = Path.GetTempFileName();
533  _zipFile.Save(tempFileName);
534  }
535 
536  _zipFile?.DisposeSafely();
537  _dataStream?.DisposeSafely();
538 
539  //After disposal we will move it to the final location
540  if (modified && tempFileName != null)
541  {
542  File.Move(tempFileName, _filePath, true);
543  }
544  }
545  }
546 
547  /// <summary>
548  /// ZipEntry wrapper which handles flagging a modified entry
549  /// </summary>
550  private class ZipEntryCache
551  {
552  public ZipEntry Entry { get; set; }
553  public bool Modified { get; set; }
554  }
555  }
556 }