Lean  $LEAN_TAG$
DataMonitor.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Collections.Generic;
18 using System.IO;
19 using System.Threading;
20 using Newtonsoft.Json;
23 using QuantConnect.Util;
24 
25 namespace QuantConnect.Data
26 {
27  /// <summary>
28  /// Monitors data requests and reports on missing data
29  /// </summary>
30  public class DataMonitor : IDataMonitor
31  {
32  private bool _exited;
33 
34  private TextWriter _succeededDataRequestsWriter;
35  private TextWriter _failedDataRequestsWriter;
36 
37  private long _succeededDataRequestsCount;
38  private long _failedDataRequestsCount;
39 
40  private long _succeededUniverseDataRequestsCount;
41  private long _failedUniverseDataRequestsCount;
42 
43  private readonly List<double> _requestRates = new();
44  private long _prevRequestsCount;
45  private DateTime _lastRequestRateCalculationTime;
46 
47  private Thread _requestRateCalculationThread;
48  private CancellationTokenSource _cancellationTokenSource;
49 
50  private readonly string _succeededDataRequestsFileName;
51  private readonly string _failedDataRequestsFileName;
52  private readonly string _resultsDestinationFolder;
53 
54  private readonly object _threadLock = new();
55 
56  /// <summary>
57  /// Initializes a new instance of the <see cref="DataMonitor"/> class
58  /// </summary>
59  public DataMonitor()
60  {
61  _resultsDestinationFolder = Globals.ResultsDestinationFolder;
62  _succeededDataRequestsFileName = GetFilePath("succeeded-data-requests.txt");
63  _failedDataRequestsFileName = GetFilePath("failed-data-requests.txt");
64  }
65 
66  /// <summary>
67  /// Event handler for the <see cref="IDataProvider.NewDataRequest"/> event
68  /// </summary>
70  {
71  if (_exited)
72  {
73  return;
74  }
75 
76  Initialize();
77 
78  if (e.Path.Contains("map_files", StringComparison.OrdinalIgnoreCase) ||
79  e.Path.Contains("factor_files", StringComparison.OrdinalIgnoreCase))
80  {
81  return;
82  }
83 
84  var path = StripDataFolder(e.Path);
85  var isUniverseData = path.Contains("coarse", StringComparison.OrdinalIgnoreCase) ||
86  path.Contains("universe", StringComparison.OrdinalIgnoreCase);
87 
88  if (e.Succeeded)
89  {
90  WriteLineToFile(_succeededDataRequestsWriter, path, _succeededDataRequestsFileName);
91  Interlocked.Increment(ref _succeededDataRequestsCount);
92  if (isUniverseData)
93  {
94  Interlocked.Increment(ref _succeededUniverseDataRequestsCount);
95  }
96  }
97  else
98  {
99  WriteLineToFile(_failedDataRequestsWriter, path, _failedDataRequestsFileName);
100  Interlocked.Increment(ref _failedDataRequestsCount);
101  if (isUniverseData)
102  {
103  Interlocked.Increment(ref _failedUniverseDataRequestsCount);
104  }
105 
106  if (Logging.Log.DebuggingEnabled)
107  {
108  Logging.Log.Debug($"DataMonitor.OnNewDataRequest(): Data from {path} could not be fetched, error: {e.ErrorMessage}");
109  }
110  }
111  }
112 
113  /// <summary>
114  /// Terminates the data monitor generating a final report
115  /// </summary>
116  public void Exit()
117  {
118  if (_exited || _requestRateCalculationThread == null)
119  {
120  return;
121  }
122  _exited = true;
123 
124  _requestRateCalculationThread.StopSafely(TimeSpan.FromSeconds(5), _cancellationTokenSource);
125  _succeededDataRequestsWriter?.Close();
126  _failedDataRequestsWriter?.Close();
127 
128  StoreDataMonitorReport(GenerateReport());
129 
130  _succeededDataRequestsWriter.DisposeSafely();
131  _failedDataRequestsWriter.DisposeSafely();
132  _cancellationTokenSource.DisposeSafely();
133  }
134 
135  /// <summary>
136  /// Disposes this object
137  /// </summary>
138  public void Dispose()
139  {
140  Exit();
141  }
142 
143  /// <summary>
144  /// Strips the given data folder path
145  /// </summary>
146  protected virtual string StripDataFolder(string path)
147  {
148  if (path.StartsWith(Globals.DataFolder, StringComparison.OrdinalIgnoreCase))
149  {
150  return path.Substring(Globals.DataFolder.Length);
151  }
152 
153  return path;
154  }
155 
156  /// <summary>
157  /// Initializes the <see cref="DataMonitor"/> instance
158  /// </summary>
159  private void Initialize()
160  {
161  if (_requestRateCalculationThread != null)
162  {
163  return;
164  }
165  lock (_threadLock)
166  {
167  if (_requestRateCalculationThread != null)
168  {
169  return;
170  }
171  // we create the files on demand
172  _succeededDataRequestsWriter = OpenStream(_succeededDataRequestsFileName);
173  _failedDataRequestsWriter = OpenStream(_failedDataRequestsFileName);
174 
175  _cancellationTokenSource = new CancellationTokenSource();
176 
177  _requestRateCalculationThread = new Thread(() =>
178  {
179  while (!_cancellationTokenSource.Token.WaitHandle.WaitOne(3000))
180  {
181  ComputeFileRequestFrequency();
182  }
183  })
184  { IsBackground = true };
185  _requestRateCalculationThread.Start();
186  }
187  }
188 
189  private DataMonitorReport GenerateReport()
190  {
191  var report = new DataMonitorReport(_succeededDataRequestsCount,
192  _failedDataRequestsCount,
193  _succeededUniverseDataRequestsCount,
194  _failedUniverseDataRequestsCount,
195  _requestRates);
196 
197  Logging.Log.Trace($"DataMonitor.GenerateReport():{Environment.NewLine}" +
198  $"DATA USAGE:: Total data requests {report.TotalRequestsCount}{Environment.NewLine}" +
199  $"DATA USAGE:: Succeeded data requests {report.SucceededDataRequestsCount}{Environment.NewLine}" +
200  $"DATA USAGE:: Failed data requests {report.FailedDataRequestsCount}{Environment.NewLine}" +
201  $"DATA USAGE:: Failed data requests percentage {report.FailedDataRequestsPercentage}%{Environment.NewLine}" +
202  $"DATA USAGE:: Total universe data requests {report.TotalUniverseDataRequestsCount}{Environment.NewLine}" +
203  $"DATA USAGE:: Succeeded universe data requests {report.SucceededUniverseDataRequestsCount}{Environment.NewLine}" +
204  $"DATA USAGE:: Failed universe data requests {report.FailedUniverseDataRequestsCount}{Environment.NewLine}" +
205  $"DATA USAGE:: Failed universe data requests percentage {report.FailedUniverseDataRequestsPercentage}%");
206 
207  return report;
208  }
209 
210  private void ComputeFileRequestFrequency()
211  {
212  var requestsCount = _succeededDataRequestsCount + _failedDataRequestsCount;
213 
214  if (_lastRequestRateCalculationTime == default)
215  {
216  // First time we calculate the request rate.
217  // We don't have a previous value to compare to so we just store the current value.
218  _lastRequestRateCalculationTime = DateTime.UtcNow;
219  _prevRequestsCount = requestsCount;
220  return;
221  }
222 
223  var requestsCountDelta = requestsCount - _prevRequestsCount;
224  var now = DateTime.UtcNow;
225  var timeDelta = now - _lastRequestRateCalculationTime;
226 
227  _requestRates.Add(Math.Round(requestsCountDelta / timeDelta.TotalSeconds));
228  _prevRequestsCount = requestsCount;
229  _lastRequestRateCalculationTime = now;
230  }
231 
232  /// <summary>
233  /// Stores the data monitor report
234  /// </summary>
235  /// <param name="report">The data monitor report to be stored</param>
236  private void StoreDataMonitorReport(DataMonitorReport report)
237  {
238  if (report == null)
239  {
240  return;
241  }
242 
243  var path = GetFilePath("data-monitor-report.json");
244  var data = JsonConvert.SerializeObject(report, Formatting.None);
245  File.WriteAllText(path, data);
246  }
247 
248  private string GetFilePath(string filename)
249  {
250  var baseFilename = Path.GetFileNameWithoutExtension(filename);
251  var timestamp = DateTime.UtcNow.ToStringInvariant("yyyyMMddHHmmssfff");
252  var extension = Path.GetExtension(filename);
253  return Path.Combine(_resultsDestinationFolder, $"{baseFilename}-{timestamp}{extension}");
254  }
255 
256  private static TextWriter OpenStream(string filename)
257  {
258  var writer = new StreamWriter(filename);
259  return TextWriter.Synchronized(writer);
260  }
261 
262  private static void WriteLineToFile(TextWriter writer, string line, string filename)
263  {
264  try
265  {
266  writer.WriteLine(line);
267  }
268  catch (IOException exception)
269  {
270  Logging.Log.Error($"DataMonitor.OnNewDataRequest(): Failed to write to file {filename}: {exception.Message}");
271  }
272  }
273  }
274 }