Lean  $LEAN_TAG$
LocalObjectStore.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Collections;
18 using System.Collections.Concurrent;
19 using System.Collections.Generic;
20 using System.IO;
21 using System.Linq;
22 using System.Text.RegularExpressions;
23 using System.Threading;
26 using QuantConnect.Logging;
27 using QuantConnect.Packets;
28 using QuantConnect.Util;
29 
31 {
32  /// <summary>
33  /// A local disk implementation of <see cref="IObjectStore"/>.
34  /// </summary>
36  {
37  /// <summary>
38  /// No read permissions error message
39  /// </summary>
40  protected const string NoReadPermissionsError = "The current user does not have permission to read from the organization Object Store." +
41  " Please contact your organization administrator to request permission.";
42 
43  /// <summary>
44  /// No write permissions error message
45  /// </summary>
46  protected const string NoWritePermissionsError = "The current user does not have permission to write to the organization Object Store." +
47  " Please contact your organization administrator to request permission.";
48 
49  /// <summary>
50  /// Event raised each time there's an error
51  /// </summary>
52  public event EventHandler<ObjectStoreErrorRaisedEventArgs> ErrorRaised;
53 
54  /// <summary>
55  /// Gets the default object store location
56  /// </summary>
57  public static string DefaultObjectStore { get; set; } = Path.GetFullPath(Config.Get("object-store-root", "./storage"));
58 
59  /// <summary>
60  /// Flag indicating the state of this object storage has changed since the last <seealso cref="Persist"/> invocation
61  /// </summary>
62  private volatile bool _dirty;
63 
64  private Timer _persistenceTimer;
65  private Regex _pathRegex = new (@"^\.?[a-zA-Z0-9\\/_#\-\$= ]+\.?[a-zA-Z0-9]*$", RegexOptions.Compiled);
66  private readonly ConcurrentDictionary<string, ObjectStoreEntry> _storage = new();
67  private readonly object _persistLock = new object();
68 
69  /// <summary>
70  /// Provides access to the controls governing behavior of this instance, such as the persistence interval
71  /// </summary>
72  protected Controls Controls { get; private set; }
73 
74  /// <summary>
75  /// The root storage folder for the algorithm
76  /// </summary>
77  protected string AlgorithmStorageRoot { get; private set; }
78 
79  /// <summary>
80  /// The file handler instance to use
81  /// </summary>
82  protected FileHandler FileHandler { get; set; } = new ();
83 
84  /// <summary>
85  /// Initializes the object store
86  /// </summary>
87  /// <param name="userId">The user id</param>
88  /// <param name="projectId">The project id</param>
89  /// <param name="userToken">The user token</param>
90  /// <param name="controls">The job controls instance</param>
91  public virtual void Initialize(int userId, int projectId, string userToken, Controls controls)
92  {
94 
95  // create the root path if it does not exist
97  // full name will return a normalized path which is later easier to compare
98  AlgorithmStorageRoot = directoryInfo.FullName;
99 
100  Controls = controls;
101 
102  // if <= 0 we disable periodic persistence and make it synchronous
104  {
105  _persistenceTimer = new Timer(_ => Persist(), null, Controls.PersistenceIntervalSeconds * 1000, Timeout.Infinite);
106  }
107 
108  Log.Trace($"LocalObjectStore.Initialize(): Storage Root: {directoryInfo.FullName}. StorageFileCount {controls.StorageFileCount}. StorageLimit {BytesToMb(controls.StorageLimit)}MB");
109  }
110 
111  /// <summary>
112  /// Storage root path
113  /// </summary>
114  protected virtual string StorageRoot()
115  {
116  return DefaultObjectStore;
117  }
118 
119  /// <summary>
120  /// Loads objects from the AlgorithmStorageRoot into the ObjectStore
121  /// </summary>
122  private IEnumerable<ObjectStoreEntry> GetObjectStoreEntries(bool loadContent, bool takePersistLock = true)
123  {
124  if (Controls.StoragePermissions.HasFlag(FileAccess.Read))
125  {
126  // Acquire the persist lock to avoid yielding twice the same value, just in case
127  lock (takePersistLock ? _persistLock : new object())
128  {
129  foreach (var kvp in _storage)
130  {
131  if (!loadContent || kvp.Value.Data != null)
132  {
133  // let's first serve what we already have in memory because it might include files which are not on disk yet
134  yield return kvp.Value;
135  }
136  }
137 
138  foreach (var file in FileHandler.EnumerateFiles(AlgorithmStorageRoot, "*", SearchOption.AllDirectories, out var rootFolder))
139  {
140  var path = NormalizePath(file.FullName.RemoveFromStart(rootFolder));
141 
142  ObjectStoreEntry objectStoreEntry;
143  if (loadContent)
144  {
145  if (!_storage.TryGetValue(path, out objectStoreEntry) || objectStoreEntry.Data == null)
146  {
147  if(TryCreateObjectStoreEntry(file.FullName, path, out objectStoreEntry))
148  {
149  // load file if content is null or not present, we prioritize the version we have in memory
150  yield return _storage[path] = objectStoreEntry;
151  }
152  }
153  }
154  else
155  {
156  if (!_storage.ContainsKey(path))
157  {
158  // we do not read the file contents yet, just the name. We read the contents on demand
159  yield return _storage[path] = new ObjectStoreEntry(path, null);
160  }
161  }
162  }
163  }
164  }
165  }
166 
167  /// <summary>
168  /// Returns the file paths present in the object store. This is specially useful not to load the object store into memory
169  /// </summary>
170  public ICollection<string> Keys
171  {
172  get
173  {
174  return GetObjectStoreEntries(loadContent: false).Select(objectStoreEntry => objectStoreEntry.Path).ToList();
175  }
176  }
177 
178  /// <summary>
179  /// Will clear the object store state cache. This is useful when the object store is used concurrently by nodes which want to share information
180  /// </summary>
181  public void Clear()
182  {
183  // write to disk anything pending first
184  Persist();
185 
186  _storage.Clear();
187  }
188 
189  /// <summary>
190  /// Determines whether the store contains data for the specified path
191  /// </summary>
192  /// <param name="path">The object path</param>
193  /// <returns>True if the key was found</returns>
194  public bool ContainsKey(string path)
195  {
196  if (path == null)
197  {
198  throw new ArgumentNullException(nameof(path));
199  }
200  if (!Controls.StoragePermissions.HasFlag(FileAccess.Read))
201  {
202  throw new InvalidOperationException($"LocalObjectStore.ContainsKey(): {NoReadPermissionsError}");
203  }
204 
205  path = NormalizePath(path);
206  if (_storage.ContainsKey(path))
207  {
208  return true;
209  }
210 
211  // if we don't have the file but it exists, be friendly and register it
212  var filePath = PathForKey(path);
213  if (FileHandler.Exists(filePath))
214  {
215  _storage[path] = new ObjectStoreEntry(path, null);
216  return true;
217  }
218  return false;
219  }
220 
221  /// <summary>
222  /// Returns the object data for the specified path
223  /// </summary>
224  /// <param name="path">The object path</param>
225  /// <returns>A byte array containing the data</returns>
226  public byte[] ReadBytes(string path)
227  {
228  // Ensure we have the key, also takes care of null or improper access
229  if (!ContainsKey(path))
230  {
231  throw new KeyNotFoundException($"Object with path '{path}' was not found in the current project. " +
232  "Please use ObjectStore.ContainsKey(key) to check if an object exists before attempting to read."
233  );
234  }
235  path = NormalizePath(path);
236 
237  if(!_storage.TryGetValue(path, out var objectStoreEntry) || objectStoreEntry.Data == null)
238  {
239  var filePath = PathForKey(path);
240  if (TryCreateObjectStoreEntry(filePath, path, out objectStoreEntry))
241  {
242  // if there is no data in the cache and the file exists on disk let's load it
243  _storage[path] = objectStoreEntry;
244  }
245  }
246  return objectStoreEntry?.Data;
247  }
248 
249  /// <summary>
250  /// Saves the object data for the specified path
251  /// </summary>
252  /// <param name="path">The object path</param>
253  /// <param name="contents">The object data</param>
254  /// <returns>True if the save operation was successful</returns>
255  public bool SaveBytes(string path, byte[] contents)
256  {
257  if (path == null)
258  {
259  throw new ArgumentNullException(nameof(path));
260  }
261  else if (!Controls.StoragePermissions.HasFlag(FileAccess.Write))
262  {
263  throw new InvalidOperationException($"LocalObjectStore.SaveBytes(): {NoWritePermissionsError}");
264  }
265  else if (!_pathRegex.IsMatch(path))
266  {
267  throw new ArgumentException($"LocalObjectStore: path is not supported: '{path}'");
268  }
269  else if (path.Count(c => c == '/') > 100 || path.Count(c => c == '\\') > 100)
270  {
271  // just in case
272  throw new ArgumentException($"LocalObjectStore: path is not supported: '{path}'");
273  }
274 
275  // after we check the regex
276  path = NormalizePath(path);
277 
278  if (InternalSaveBytes(path, contents)
279  // only persist if we actually stored some new data, else can skip
280  && contents != null)
281  {
282  _dirty = true;
283  // if <= 0 we disable periodic persistence and make it synchronous
285  {
286  Persist();
287  }
288  return true;
289  }
290 
291  return false;
292  }
293 
294  /// <summary>
295  /// Won't trigger persist nor will check storage write permissions, useful on initialization since it allows read only permissions to load the object store
296  /// </summary>
297  protected bool InternalSaveBytes(string path, byte[] contents)
298  {
299  if(!IsWithinStorageLimit(path, contents, takePersistLock: true))
300  {
301  return false;
302  }
303 
304  // Add the dirty entry
305  var entry = _storage[path] = new ObjectStoreEntry(path, contents);
306  entry.SetDirty();
307  return true;
308  }
309 
310  /// <summary>
311  /// Validates storage limits are respected on a new save operation
312  /// </summary>
313  protected virtual bool IsWithinStorageLimit(string path, byte[] contents, bool takePersistLock)
314  {
315  // Before saving confirm we are abiding by the control rules
316  // Start by counting our file and its length
317  var fileCount = 1;
318  var expectedStorageSizeBytes = contents?.Length ?? 0L;
319  foreach (var kvp in GetObjectStoreEntries(loadContent: false, takePersistLock: takePersistLock))
320  {
321  if (path.Equals(kvp.Path))
322  {
323  // Skip we have already counted this above
324  // If this key was already in storage it will be replaced.
325  }
326  else
327  {
328  fileCount++;
329  if(kvp.Data != null)
330  {
331  // if the data is in memory use it
332  expectedStorageSizeBytes += kvp.Data.Length;
333  }
334  else
335  {
336  expectedStorageSizeBytes += FileHandler.TryGetFileLength(PathForKey(kvp.Path));
337  }
338  }
339  }
340 
341  // Verify we are within FileCount limit
342  if (fileCount > Controls.StorageFileCount)
343  {
344  var message = $"LocalObjectStore.InternalSaveBytes(): You have reached the ObjectStore limit for files it can save: {fileCount}. Unable to save the new file: '{path}'";
345  Log.Error(message);
347  return false;
348  }
349 
350  // Verify we are within Storage limit
351  if (expectedStorageSizeBytes > Controls.StorageLimit)
352  {
353  var message = $"LocalObjectStore.InternalSaveBytes(): at storage capacity: {BytesToMb(expectedStorageSizeBytes)}MB/{BytesToMb(Controls.StorageLimit)}MB. Unable to save: '{path}'";
354  Log.Error(message);
356  return false;
357  }
358 
359  return true;
360  }
361 
362  /// <summary>
363  /// Deletes the object data for the specified path
364  /// </summary>
365  /// <param name="path">The object path</param>
366  /// <returns>True if the delete operation was successful</returns>
367  public bool Delete(string path)
368  {
369  if (path == null)
370  {
371  throw new ArgumentNullException(nameof(path));
372  }
373  if (!Controls.StoragePermissions.HasFlag(FileAccess.Write))
374  {
375  throw new InvalidOperationException($"LocalObjectStore.Delete(): {NoWritePermissionsError}");
376  }
377 
378  path = NormalizePath(path);
379 
380  var wasInCache = _storage.TryRemove(path, out var _);
381 
382  var filePath = PathForKey(path);
383  if (FileHandler.Exists(filePath))
384  {
385  try
386  {
387  FileHandler.Delete(filePath);
388  return true;
389  }
390  catch
391  {
392  // This try sentence is to prevent a race condition with the Delete within the PersisData() method
393  }
394  }
395 
396  return wasInCache;
397  }
398 
399  /// <summary>
400  /// Returns the file path for the specified path
401  /// </summary>
402  /// <remarks>If the key is not already inserted it will just return a path associated with it
403  /// and add the key with null value</remarks>
404  /// <param name="path">The object path</param>
405  /// <returns>The path for the file</returns>
406  public virtual string GetFilePath(string path)
407  {
408  // Ensure we have an object for that key
409  if (!ContainsKey(path))
410  {
411  // Add a key with null value to tell Persist() not to delete the file created in the path associated
412  // with this key and not update it with the value associated with the key(null)
413  SaveBytes(path, null);
414  }
415  else
416  {
417  // Persist to ensure pur files are up to date
418  Persist();
419  }
420 
421  // Fetch the path to file and return it
422  var normalizedPathKey = PathForKey(path);
423 
424  var parent = Directory.GetParent(normalizedPathKey);
425  if (parent != null && parent.FullName != AlgorithmStorageRoot)
426  {
427  // let's create the parent folder if it's not the root storage and it does not exist
428  if (!FileHandler.DirectoryExists(parent.FullName))
429  {
430  FileHandler.CreateDirectory(parent.FullName);
431  }
432  }
433  return normalizedPathKey;
434  }
435 
436  /// <summary>
437  /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
438  /// </summary>
439  public virtual void Dispose()
440  {
441  try
442  {
443  if (_persistenceTimer != null)
444  {
445  _persistenceTimer.Change(Timeout.Infinite, Timeout.Infinite);
446 
447  Persist();
448 
449  _persistenceTimer.DisposeSafely();
450  }
451  }
452  catch (Exception err)
453  {
454  Log.Error(err, "Error deleting storage directory.");
455  }
456  }
457 
458  /// <summary>Returns an enumerator that iterates through the collection.</summary>
459  /// <returns>A <see cref="T:System.Collections.Generic.IEnumerator`1" /> that can be used to iterate through the collection.</returns>
460  /// <filterpriority>1</filterpriority>
461  public IEnumerator<KeyValuePair<string, byte[]>> GetEnumerator()
462  {
463  return GetObjectStoreEntries(loadContent: true).Select(objectStore => new KeyValuePair<string, byte[]>(objectStore.Path, objectStore.Data)).GetEnumerator();
464  }
465 
466  /// <summary>Returns an enumerator that iterates through a collection.</summary>
467  /// <returns>An <see cref="T:System.Collections.IEnumerator" /> object that can be used to iterate through the collection.</returns>
468  /// <filterpriority>2</filterpriority>
469  IEnumerator IEnumerable.GetEnumerator()
470  {
471  return GetEnumerator();
472  }
473 
474  /// <summary>
475  /// Get's a file path for a given path.
476  /// Internal use only because it does not guarantee the existence of the file.
477  /// </summary>
478  protected string PathForKey(string path)
479  {
480  return Path.Combine(AlgorithmStorageRoot, NormalizePath(path));
481  }
482 
483  /// <summary>
484  /// Invoked periodically to persist the object store's contents
485  /// </summary>
486  private void Persist()
487  {
488  // Acquire the persist lock
489  lock (_persistLock)
490  {
491  try
492  {
493  // If there are no changes we are fine
494  if (!_dirty)
495  {
496  return;
497  }
498 
499  if (PersistData())
500  {
501  _dirty = false;
502  }
503  }
504  catch (Exception err)
505  {
506  Log.Error("LocalObjectStore.Persist()", err);
507  OnErrorRaised(err);
508  }
509  finally
510  {
511  try
512  {
513  if(_persistenceTimer != null)
514  {
515  // restart timer following end of persistence
516  _persistenceTimer.Change(Time.GetSecondUnevenWait(Controls.PersistenceIntervalSeconds * 1000), Timeout.Infinite);
517  }
518  }
519  catch (ObjectDisposedException)
520  {
521  // ignored disposed
522  }
523  }
524  }
525  }
526 
527  /// <summary>
528  /// Overridable persistence function
529  /// </summary>
530  /// <returns>True if persistence was successful, otherwise false</returns>
531  protected virtual bool PersistData()
532  {
533  try
534  {
535  // Write our store data to disk
536  // Skip the key associated with null values. They are not linked to a file yet or not loaded
537  // Also skip fails which are not flagged as dirty
538  foreach (var kvp in _storage)
539  {
540  if(kvp.Value.Data != null && kvp.Value.IsDirty)
541  {
542  var filePath = PathForKey(kvp.Key);
543  // directory might not exist for custom prefix
544  var parentDirectory = Path.GetDirectoryName(filePath);
545  if (!FileHandler.DirectoryExists(parentDirectory))
546  {
547  FileHandler.CreateDirectory(parentDirectory);
548  }
549  FileHandler.WriteAllBytes(filePath, kvp.Value.Data);
550 
551  // clear the dirty flag
552  kvp.Value.SetClean();
553 
554  // This kvp could have been deleted by the Delete() method
555  if (!_storage.Contains(kvp))
556  {
557  try
558  {
559  FileHandler.Delete(filePath);
560  }
561  catch
562  {
563  // This try sentence is to prevent a race condition with the Delete() method
564  }
565  }
566  }
567  }
568 
569  return true;
570  }
571  catch (Exception err)
572  {
573  Log.Error(err, "LocalObjectStore.PersistData()");
574  OnErrorRaised(err);
575  return false;
576  }
577  }
578 
579  /// <summary>
580  /// Event invocator for the <see cref="ErrorRaised"/> event
581  /// </summary>
582  protected virtual void OnErrorRaised(Exception error)
583  {
584  ErrorRaised?.Invoke(this, new ObjectStoreErrorRaisedEventArgs(error));
585  }
586 
587  /// <summary>
588  /// Converts a number of bytes to megabytes as it's more human legible
589  /// </summary>
590  private static double BytesToMb(long bytes)
591  {
592  return bytes / 1024.0 / 1024.0;
593  }
594 
595  private static string NormalizePath(string path)
596  {
597  if (string.IsNullOrEmpty(path))
598  {
599  return path;
600  }
601  return path.TrimStart('.').TrimStart('/', '\\').Replace('\\', '/');
602  }
603 
604  private bool TryCreateObjectStoreEntry(string filePath, string path, out ObjectStoreEntry objectStoreEntry)
605  {
606  var count = 0;
607  do
608  {
609  count++;
610  try
611  {
612  if (FileHandler.Exists(filePath))
613  {
614  objectStoreEntry = new ObjectStoreEntry(path, FileHandler.ReadAllBytes(filePath));
615  return true;
616  }
617  objectStoreEntry = null;
618  return false;
619  }
620  catch (Exception)
621  {
622  if (count > 3)
623  {
624  throw;
625  }
626  else
627  {
628  // let's be resilient and retry, avoid race conditions, someone updating it or just random io failure
629  Thread.Sleep(250);
630  }
631  }
632  } while (true);
633  }
634 
635  /// <summary>
636  /// Helper class to hold the state of an object store file
637  /// </summary>
638  private class ObjectStoreEntry
639  {
640  private long _isDirty;
641  public byte[] Data { get; }
642  public string Path { get; }
643  public bool IsDirty => Interlocked.Read(ref _isDirty) != 0;
644  public ObjectStoreEntry(string path, byte[] data)
645  {
646  Path = path;
647  Data = data;
648  }
649  public void SetDirty()
650  {
651  // flag as dirty
652  Interlocked.CompareExchange(ref _isDirty, 1, 0);
653  }
654  public void SetClean()
655  {
656  Interlocked.CompareExchange(ref _isDirty, 0, 1);
657  }
658  }
659  }
660 }