Lean  $LEAN_TAG$
TextSubscriptionDataSourceReader.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Linq;
18 using QuantConnect.Data;
19 using QuantConnect.Logging;
21 using System.Collections.Generic;
24 
26 {
27  /// <summary>
28  /// Provides an implementations of <see cref="ISubscriptionDataSourceReader"/> that uses the
29  /// <see cref="BaseData.Reader(SubscriptionDataConfig,string,DateTime,bool)"/>
30  /// method to read lines of text from a <see cref="SubscriptionDataSource"/>
31  /// </summary>
33  {
34  private readonly bool _implementsStreamReader;
35  private readonly DateTime _date;
36  private BaseData _factory;
37  private bool _shouldCacheDataPoints;
38 
39  private static int CacheSize = 100;
40  private static volatile Dictionary<string, List<BaseData>> BaseDataSourceCache = new Dictionary<string, List<BaseData>>(100);
41  private static Queue<string> CacheKeys = new Queue<string>(100);
42 
43  /// <summary>
44  /// The requested subscription configuration
45  /// </summary>
46  protected SubscriptionDataConfig Config { get; set; }
47 
48  /// <summary>
49  /// Event fired when an exception is thrown during a call to
50  /// <see cref="BaseData.Reader(SubscriptionDataConfig,string,DateTime,bool)"/>
51  /// </summary>
52  public event EventHandler<ReaderErrorEventArgs> ReaderError;
53 
54  /// <summary>
55  /// Initializes a new instance of the <see cref="TextSubscriptionDataSourceReader"/> class
56  /// </summary>
57  /// <param name="dataCacheProvider">This provider caches files if needed</param>
58  /// <param name="config">The subscription's configuration</param>
59  /// <param name="date">The date this factory was produced to read data for</param>
60  /// <param name="isLiveMode">True if we're in live mode, false for backtesting</param>
61  /// <param name="objectStore">The object storage for data persistence.</param>
62  public TextSubscriptionDataSourceReader(IDataCacheProvider dataCacheProvider, SubscriptionDataConfig config, DateTime date, bool isLiveMode,
63  IObjectStore objectStore)
64  : base(dataCacheProvider, isLiveMode, objectStore)
65  {
66  _date = date;
67  Config = config;
68  _shouldCacheDataPoints = !Config.IsCustomData && Config.Resolution >= Resolution.Hour
69  && Config.Type != typeof(FineFundamental) && Config.Type != typeof(CoarseFundamental) && Config.Type != typeof(Fundamental)
70  // don't cache universe data, doesn't make much sense and we don't want to change the symbol of the clone
71  && !Config.Type.IsAssignableTo(typeof(BaseDataCollection))
73 
74  _implementsStreamReader = Config.Type.ImplementsStreamReader();
75  }
76 
77  /// <summary>
78  /// Reads the specified <paramref name="source"/>
79  /// </summary>
80  /// <param name="source">The source to be read</param>
81  /// <returns>An <see cref="IEnumerable{BaseData}"/> that contains the data in the source</returns>
82  public override IEnumerable<BaseData> Read(SubscriptionDataSource source)
83  {
84  List<BaseData> cache = null;
85  _shouldCacheDataPoints = _shouldCacheDataPoints &&
86  // only cache local files
87  source.TransportMedium == SubscriptionTransportMedium.LocalFile;
88 
89  string cacheKey = null;
90  if (_shouldCacheDataPoints)
91  {
92  cacheKey = source.Source + Config.Type;
93  BaseDataSourceCache.TryGetValue(cacheKey, out cache);
94  }
95  if (cache == null)
96  {
97  cache = _shouldCacheDataPoints ? new List<BaseData>(30000) : null;
98  using (var reader = CreateStreamReader(source))
99  {
100  if (reader == null)
101  {
102  // if the reader doesn't have data then we're done with this subscription
103  yield break;
104  }
105 
106  if (_factory == null)
107  {
108  // only create a factory if the stream isn't null
109  _factory = Config.GetBaseDataInstance();
110  }
111  // while the reader has data
112  while (!reader.EndOfStream)
113  {
114  BaseData instance = null;
115  string line = null;
116  try
117  {
118  if (reader.StreamReader != null && _implementsStreamReader)
119  {
120  instance = _factory.Reader(Config, reader.StreamReader, _date, IsLiveMode);
121  }
122  else
123  {
124  // read a line and pass it to the base data factory
125  line = reader.ReadLine();
126  instance = _factory.Reader(Config, line, _date, IsLiveMode);
127  }
128  }
129  catch (Exception err)
130  {
131  OnReaderError(line ?? "StreamReader", err);
132  }
133 
134  if (instance != null && instance.EndTime != default)
135  {
136  if (_shouldCacheDataPoints)
137  {
138  cache.Add(instance);
139  }
140  else
141  {
142  yield return instance;
143  }
144  }
145  else if (reader.ShouldBeRateLimited)
146  {
147  yield return instance;
148  }
149  }
150  }
151 
152  if (!_shouldCacheDataPoints)
153  {
154  yield break;
155  }
156 
157  lock (CacheKeys)
158  {
159  CacheKeys.Enqueue(cacheKey);
160  // we create a new dictionary, so we don't have to take locks when reading, and add our new item
161  var newCache = new Dictionary<string, List<BaseData>>(BaseDataSourceCache) { [cacheKey] = cache };
162 
163  if (BaseDataSourceCache.Count > CacheSize)
164  {
165  var removeCount = 0;
166  // we remove a portion of the first in entries
167  while (++removeCount < (CacheSize / 4))
168  {
169  newCache.Remove(CacheKeys.Dequeue());
170  }
171  // update the cache instance
172  BaseDataSourceCache = newCache;
173  }
174  else
175  {
176  // update the cache instance
177  BaseDataSourceCache = newCache;
178  }
179  }
180  }
181 
182  if (cache == null)
183  {
184  throw new InvalidOperationException($"Cache should not be null. Key: {cacheKey}");
185  }
186  // Find the first data point 10 days (just in case) before the desired date
187  // and subtract one item (just in case there was a time gap and data.Time is after _date)
188  var frontier = _date.AddDays(-10);
189  var index = cache.FindIndex(data => data.Time > frontier);
190  index = index > 0 ? (index - 1) : 0;
191  foreach (var data in cache.Skip(index))
192  {
193  var clone = data.Clone();
194  clone.Symbol = Config.Symbol;
195  yield return clone;
196  }
197  }
198 
199  /// <summary>
200  /// Event invocator for the <see cref="ReaderError"/> event
201  /// </summary>
202  /// <param name="line">The line that caused the exception</param>
203  /// <param name="exception">The exception that was caught</param>
204  private void OnReaderError(string line, Exception exception)
205  {
206  var handler = ReaderError;
207  if (handler != null) handler(this, new ReaderErrorEventArgs(line, exception));
208  }
209 
210  /// <summary>
211  /// Set the cache size to use
212  /// </summary>
213  /// <remarks>How to size this cache: Take worst case scenario, BTCUSD hour, 60k QuoteBar entries, which are roughly 200 bytes in size -> 11 MB * CacheSize</remarks>
214  public static void SetCacheSize(int megaBytesToUse)
215  {
216  if (megaBytesToUse != 0)
217  {
218  // we take worst case scenario, each entry is 12 MB
219  CacheSize = megaBytesToUse / 12;
220  Log.Trace($"TextSubscriptionDataSourceReader.SetCacheSize(): Setting cache size to {CacheSize} items");
221  }
222  }
223 
224  /// <summary>
225  /// Will clear the data cache.
226  /// Used for testing different time zones for the same data set and allow a clean fresh start for each backtest
227  /// </summary>
228  public static void ClearCache()
229  {
230  BaseDataSourceCache = new();
231  }
232  }
233 }