21 using System.Threading;
25 using System.Collections.Generic;
26 using System.Collections.Concurrent;
36 private readonly
double _cacheSeconds;
39 private readonly ConcurrentDictionary<string, CachedZipFile> _zipFileCache =
new ConcurrentDictionary<string, CachedZipFile>();
41 private readonly Timer _cacheCleaner;
54 _cacheSeconds =
double.IsNaN(cacheTimer) ?
Config.
GetDouble(
"zip-data-cache-provider", 10) : cacheTimer;
55 _dataProvider = dataProvider;
56 _cacheCleaner =
new Timer(state => CleanCache(),
null, TimeSpan.FromSeconds(_cacheSeconds), Timeout.InfiniteTimeSpan);
67 if (filename.EndsWith(
".zip", StringComparison.InvariantCulture))
73 CachedZipFile existingZip;
74 if (!_zipFileCache.TryGetValue(filename, out existingZip))
76 stream = CacheAndCreateEntryStream(filename, entryName);
84 if (existingZip.Disposed)
89 stream = CacheAndCreateEntryStream(filename, entryName);
93 existingZip.Refresh();
94 stream = CreateEntryStream(existingZip, entryName, filename);
98 catch (Exception exception)
100 if (exception is ZipException || exception is ZlibException)
102 Log.
Error(
"ZipDataCacheProvider.Fetch(): Corrupt zip file/entry: " + filename +
"#" + entryName +
" Error: " + exception);
110 catch (Exception err)
113 stream?.DisposeSafely();
120 return _dataProvider.Fetch(filename);
129 public void Store(
string key,
byte[] data)
135 if (!fileName.EndsWith(
".zip", StringComparison.InvariantCulture) || entryName ==
null)
144 if (!_zipFileCache.TryGetValue(fileName, out var cachedZip) && !Cache(fileName, out cachedZip))
149 Cache(fileName, out _);
153 throw new InvalidOperationException($
"Failed to store data {fileName}#{entryName}");
161 if (cachedZip.Disposed)
166 if (_zipFileCache.TryGetValue(fileName, out var existing) && ReferenceEquals(existing, cachedZip))
168 Log.
Error($
"ZipDataCacheProvider.Store(): unexpected cache state for {fileName}");
169 throw new InvalidOperationException(
170 "LEAN entered an unexpected state. Please contact support@quantconnect.com so we may debug this further.");
176 cachedZip.WriteEntry(entryName, data);
187 if (!_zipFileCache.TryGetValue(zipFile, out var cachedZip))
189 if (!Cache(zipFile, out cachedZip))
191 throw new ArgumentException($
"Failed to get zip entries from {zipFile}");
198 return cachedZip.EntryCache.Keys.ToList();
209 _cacheCleaner.DisposeSafely();
211 foreach (var zipFile
in _zipFileCache)
213 if (_zipFileCache.TryRemove(zipFile.Key, out zip))
223 private void CleanCache()
225 var utcNow = DateTime.UtcNow;
228 var clearCacheIfOlderThan = utcNow.AddSeconds(-_cacheSeconds);
230 foreach (var zip
in _zipFileCache)
232 if (zip.Value.Uncache(clearCacheIfOlderThan))
235 if (Monitor.TryEnter(zip.Value))
244 _zipFileCache.TryRemove(zip.Key, out _);
248 Monitor.Exit(zip.Value);
258 var nextDueTime = Time.GetSecondUnevenWait((
int)Math.Ceiling(_cacheSeconds * 1000));
259 _cacheCleaner.Change(nextDueTime, Timeout.Infinite);
261 catch (ObjectDisposedException)
268 private Stream CacheAndCreateEntryStream(
string filename,
string entryName)
270 Stream stream =
null;
271 var dataStream = _dataProvider.Fetch(filename);
273 if (dataStream !=
null)
277 var newItem =
new CachedZipFile(dataStream, DateTime.UtcNow, filename);
281 stream = CreateEntryStream(newItem, entryName, filename);
283 if (!_zipFileCache.TryAdd(filename, newItem))
289 catch (Exception exception)
292 dataStream.DisposeSafely();
293 if (exception is ZipException || exception is ZlibException)
295 Log.
Error(
"ZipDataCacheProvider.Fetch(): Corrupt zip file/entry: " + filename +
"#" + entryName +
" Error: " + exception);
310 private Stream CreateEntryStream(CachedZipFile zipFile,
string entryName,
string fileName)
312 ZipEntryCache entryCache;
313 if (entryName ==
null)
315 entryCache = zipFile.EntryCache.FirstOrDefault().Value;
319 zipFile.EntryCache.TryGetValue(entryName, out entryCache);
322 if (entryCache is { Modified:
true })
327 _zipFileCache.Remove(fileName, out _);
329 return CacheAndCreateEntryStream(fileName, entryName);
332 var entry = entryCache?.Entry;
336 var stream =
new MemoryStream();
340 stream.SetLength(entry.UncompressedSize);
342 catch (ArgumentOutOfRangeException)
351 var zipStream =
new ZipInputStream(fileName);
353 var zipEntry = zipStream.GetNextEntry();
356 if (zipEntry ==
null)
362 if (entryName ==
null)
368 while (zipEntry !=
null)
370 if (
string.Compare(zipEntry.FileName, entryName, StringComparison.OrdinalIgnoreCase) == 0)
375 zipEntry = zipStream.GetNextEntry();
380 entry.Extract(stream);
394 private bool Cache(
string filename, out CachedZipFile cachedZip)
397 var dataStream = _dataProvider.Fetch(filename);
398 if (dataStream !=
null)
402 cachedZip =
new CachedZipFile(dataStream, DateTime.UtcNow, filename);
404 if (!_zipFileCache.TryAdd(filename, cachedZip))
408 return _zipFileCache.TryGetValue(filename, out cachedZip);
413 catch (Exception exception)
415 if (exception is ZipException || exception is ZlibException)
417 Log.
Error(
"ZipDataCacheProvider.Fetch(): Corrupt zip file/entry: " + filename +
" Error: " + exception);
422 dataStream.Dispose();
432 private class CachedZipFile : IDisposable
435 private readonly Stream _dataStream;
436 private readonly
string _filePath;
437 private long _disposed;
438 private long _modified;
443 private readonly ZipFile _zipFile;
448 public readonly Dictionary<string, ZipEntryCache> EntryCache =
new (StringComparer.OrdinalIgnoreCase);
453 public bool Disposed => Interlocked.Read(ref _disposed) != 0;
461 public CachedZipFile(Stream dataStream, DateTime utcNow,
string filePath)
463 _dataStream = dataStream;
464 _zipFile = ZipFile.Read(dataStream);
465 _zipFile.UseZip64WhenSaving = Zip64Option.Always;
466 foreach (var entry
in _zipFile.Entries)
468 EntryCache[entry.FileName] =
new ZipEntryCache{ Entry = entry };
471 _filePath = filePath;
479 public bool Uncache(DateTime date)
481 return _dateCached.
Value < date;
490 public void WriteEntry(
string entryName,
byte[] content)
492 Interlocked.Increment(ref _modified);
496 if (_zipFile.ContainsEntry(entryName))
498 _zipFile.RemoveEntry(entryName);
499 EntryCache.Remove(entryName);
503 var newEntry = _zipFile.AddEntry(entryName, content);
504 EntryCache.Add(entryName,
new ZipEntryCache { Entry = newEntry, Modified =
true });
510 public void Refresh()
520 if (Interlocked.CompareExchange(ref _disposed, 1, 0) == 1)
527 string tempFileName =
null;
528 var modified = Interlocked.Read(ref _modified) != 0;
532 tempFileName = Path.GetTempFileName();
533 _zipFile.Save(tempFileName);
536 _zipFile?.DisposeSafely();
537 _dataStream?.DisposeSafely();
540 if (modified && tempFileName !=
null)
542 File.Move(tempFileName, _filePath,
true);
550 private class ZipEntryCache
552 public ZipEntry Entry {
get;
set; }
553 public bool Modified {
get;
set; }