21 using System.Collections.Generic;
34 private readonly
bool _implementsStreamReader;
35 private readonly DateTime _date;
37 private bool _shouldCacheDataPoints;
39 private static int CacheSize = 100;
40 private static volatile Dictionary<string, List<BaseData>> BaseDataSourceCache =
new Dictionary<string, List<BaseData>>(100);
41 private static Queue<string> CacheKeys =
new Queue<string>(100);
64 : base(dataCacheProvider, isLiveMode, objectStore)
74 _implementsStreamReader =
Config.
Type.ImplementsStreamReader();
84 List<BaseData> cache =
null;
85 _shouldCacheDataPoints = _shouldCacheDataPoints &&
89 string cacheKey =
null;
90 if (_shouldCacheDataPoints)
93 BaseDataSourceCache.TryGetValue(cacheKey, out cache);
97 cache = _shouldCacheDataPoints ?
new List<BaseData>(30000) :
null;
106 if (_factory ==
null)
109 _factory =
Config.GetBaseDataInstance();
112 while (!reader.EndOfStream)
118 if (reader.StreamReader !=
null && _implementsStreamReader)
125 line = reader.ReadLine();
129 catch (Exception err)
131 OnReaderError(line ??
"StreamReader", err);
134 if (instance !=
null && instance.
EndTime !=
default)
136 if (_shouldCacheDataPoints)
142 yield
return instance;
145 else if (reader.ShouldBeRateLimited)
147 yield
return instance;
152 if (!_shouldCacheDataPoints)
159 CacheKeys.Enqueue(cacheKey);
161 var newCache =
new Dictionary<string, List<BaseData>>(BaseDataSourceCache) { [cacheKey] = cache };
163 if (BaseDataSourceCache.Count > CacheSize)
167 while (++removeCount < (CacheSize / 4))
169 newCache.Remove(CacheKeys.Dequeue());
172 BaseDataSourceCache = newCache;
177 BaseDataSourceCache = newCache;
184 throw new InvalidOperationException($
"Cache should not be null. Key: {cacheKey}");
188 var frontier = _date.AddDays(-10);
189 var index = cache.FindIndex(data => data.Time > frontier);
190 index = index > 0 ? (index - 1) : 0;
191 foreach (var data
in cache.Skip(index))
193 var clone = data.Clone();
204 private void OnReaderError(
string line, Exception exception)
216 if (megaBytesToUse != 0)
219 CacheSize = megaBytesToUse / 12;
220 Log.
Trace($
"TextSubscriptionDataSourceReader.SetCacheSize(): Setting cache size to {CacheSize} items");
230 BaseDataSourceCache =
new();