Lean  $LEAN_TAG$
BaseDataExchange.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Threading;
19 using QuantConnect.Util;
20 using QuantConnect.Data;
21 using QuantConnect.Logging;
23 using System.Collections.Generic;
24 using System.Collections.Concurrent;
25 
27 {
28  /// <summary>
29  /// Provides a means of distributing output from enumerators from a dedicated separate thread
30  /// </summary>
31  public class BaseDataExchange
32  {
33  private Thread _thread;
34  private uint _sleepInterval = 1;
35  private Func<Exception, bool> _isFatalError;
36  private readonly CancellationTokenSource _cancellationTokenSource;
37 
38  private readonly string _name;
39  private ManualResetEventSlim _manualResetEventSlim;
40  private ConcurrentDictionary<Symbol, EnumeratorHandler> _enumerators;
41 
42  /// <summary>
43  /// Gets or sets how long this thread will sleep when no data is available
44  /// </summary>
45  public uint SleepInterval
46  {
47  get => _sleepInterval;
48  set
49  {
50  if (value == 0)
51  {
52  throw new ArgumentException("Sleep interval should be bigger than 0");
53  }
54  _sleepInterval = value;
55  }
56  }
57 
58  /// <summary>
59  /// Gets a name for this exchange
60  /// </summary>
61  public string Name
62  {
63  get { return _name; }
64  }
65 
66  /// <summary>
67  /// Initializes a new instance of the <see cref="BaseDataExchange"/>
68  /// </summary>
69  /// <param name="name">A name for this exchange</param>
70  public BaseDataExchange(string name)
71  {
72  _name = name;
73  _isFatalError = x => false;
74  _cancellationTokenSource = new CancellationTokenSource();
75  _manualResetEventSlim = new ManualResetEventSlim(false);
76  _enumerators = new ConcurrentDictionary<Symbol, EnumeratorHandler>();
77  }
78 
79  /// <summary>
80  /// Adds the enumerator to this exchange. If it has already been added
81  /// then it will remain registered in the exchange only once
82  /// </summary>
83  /// <param name="handler">The handler to use when this symbol's data is encountered</param>
84  public void AddEnumerator(EnumeratorHandler handler)
85  {
86  _enumerators[handler.Symbol] = handler;
87  _manualResetEventSlim.Set();
88  }
89 
90  /// <summary>
91  /// Adds the enumerator to this exchange. If it has already been added
92  /// then it will remain registered in the exchange only once
93  /// </summary>
94  /// <param name="symbol">A unique symbol used to identify this enumerator</param>
95  /// <param name="enumerator">The enumerator to be added</param>
96  /// <param name="shouldMoveNext">Function used to determine if move next should be called on this
97  /// enumerator, defaults to always returning true</param>
98  /// <param name="enumeratorFinished">Delegate called when the enumerator move next returns false</param>
99  /// <param name="handleData">Handler for data if HandlesData=true</param>
100  public void AddEnumerator(Symbol symbol, IEnumerator<BaseData> enumerator, Func<bool> shouldMoveNext = null, Action<EnumeratorHandler> enumeratorFinished = null, Action<BaseData> handleData = null)
101  {
102  var enumeratorHandler = new EnumeratorHandler(symbol, enumerator, shouldMoveNext, handleData);
103  if (enumeratorFinished != null)
104  {
105  enumeratorHandler.EnumeratorFinished += (sender, args) => enumeratorFinished(args);
106  }
107  AddEnumerator(enumeratorHandler);
108  }
109 
110  /// <summary>
111  /// Sets the specified function as the error handler. This function
112  /// returns true if it is a fatal error and queue consumption should
113  /// cease.
114  /// </summary>
115  /// <param name="isFatalError">The error handling function to use when an
116  /// error is encountered during queue consumption. Returns true if queue
117  /// consumption should be stopped, returns false if queue consumption should
118  /// continue</param>
119  public void SetErrorHandler(Func<Exception, bool> isFatalError)
120  {
121  // default to false;
122  _isFatalError = isFatalError ?? (x => false);
123  }
124 
125  /// <summary>
126  /// Removes and returns enumerator handler with the specified symbol.
127  /// The removed handler is returned, null if not found
128  /// </summary>
130  {
131  EnumeratorHandler handler;
132  if (_enumerators.TryRemove(symbol, out handler))
133  {
134  handler.OnEnumeratorFinished();
135  handler.Enumerator.Dispose();
136  }
137  return handler;
138  }
139 
140  /// <summary>
141  /// Begins consumption of the wrapped <see cref="IDataQueueHandler"/> on
142  /// a separate thread
143  /// </summary>
144  public void Start()
145  {
146  var manualEvent = new ManualResetEventSlim(false);
147  _thread = new Thread(() =>
148  {
149  manualEvent.Set();
150  Log.Trace($"BaseDataExchange({Name}) Starting...");
151  ConsumeEnumerators();
152  }) { IsBackground = true, Name = Name };
153  _thread.Start();
154 
155  manualEvent.Wait();
156  manualEvent.DisposeSafely();
157  }
158 
159  /// <summary>
160  /// Ends consumption of the wrapped <see cref="IDataQueueHandler"/>
161  /// </summary>
162  public void Stop()
163  {
164  _thread.StopSafely(TimeSpan.FromSeconds(5), _cancellationTokenSource);
165  }
166 
167  /// <summary> Entry point for queue consumption </summary>
168  /// <remarks> This function only returns after <see cref="Stop"/> is called or the token is cancelled</remarks>
169  private void ConsumeEnumerators()
170  {
171  while (!_cancellationTokenSource.Token.IsCancellationRequested)
172  {
173  try
174  {
175  // call move next each enumerator and invoke the appropriate handlers
176  _manualResetEventSlim.Reset();
177  var handled = false;
178  foreach (var kvp in _enumerators)
179  {
180  if (_cancellationTokenSource.Token.IsCancellationRequested)
181  {
182  Log.Trace($"BaseDataExchange({Name}).ConsumeQueue(): Exiting...");
183  return;
184  }
185  var enumeratorHandler = kvp.Value;
186  var enumerator = enumeratorHandler.Enumerator;
187 
188  // check to see if we should advance this enumerator
189  if (!enumeratorHandler.ShouldMoveNext()) continue;
190 
191  if (!enumerator.MoveNext())
192  {
193  enumeratorHandler.OnEnumeratorFinished();
194  enumeratorHandler.Enumerator.Dispose();
195  _enumerators.TryRemove(enumeratorHandler.Symbol, out enumeratorHandler);
196  continue;
197  }
198 
199  if (enumerator.Current == null) continue;
200 
201  handled = true;
202  enumeratorHandler.HandleData(enumerator.Current);
203  }
204 
205  if (!handled)
206  {
207  // if we didn't handle anything on this past iteration, take a nap
208  // wait until we timeout, we are cancelled or there is a new enumerator added
209  _manualResetEventSlim.Wait(Time.GetSecondUnevenWait((int)_sleepInterval), _cancellationTokenSource.Token);
210  }
211  }
212  catch (OperationCanceledException)
213  {
214  // thrown by the event watcher
215  }
216  catch (Exception err)
217  {
218  Log.Error(err);
219  if (_isFatalError(err))
220  {
221  Log.Trace($"BaseDataExchange({Name}).ConsumeQueue(): Fatal error encountered. Exiting...");
222  return;
223  }
224  }
225  }
226 
227  Log.Trace($"BaseDataExchange({Name}).ConsumeQueue(): Exiting...");
228  }
229 
230  /// <summary>
231  /// Handler used to manage a single enumerator's move next/end of stream behavior
232  /// </summary>
233  public class EnumeratorHandler
234  {
235  private readonly Func<bool> _shouldMoveNext;
236  private readonly Action<BaseData> _handleData;
237 
238  /// <summary>
239  /// Event fired when MoveNext returns false
240  /// </summary>
241  public event EventHandler<EnumeratorHandler> EnumeratorFinished;
242 
243  /// <summary>
244  /// A unique symbol used to identify this enumerator
245  /// </summary>
246  public Symbol Symbol { get; init; }
247 
248  /// <summary>
249  /// The enumerator this handler handles
250  /// </summary>
251  public IEnumerator<BaseData> Enumerator { get; init; }
252 
253  /// <summary>
254  /// Initializes a new instance of the <see cref="EnumeratorHandler"/> class
255  /// </summary>
256  /// <param name="symbol">The symbol to identify this enumerator</param>
257  /// <param name="enumerator">The enumeator this handler handles</param>
258  /// <param name="shouldMoveNext">Predicate function used to determine if we should call move next
259  /// on the symbol's enumerator</param>
260  /// <param name="handleData">Handler for data if HandlesData=true</param>
261  public EnumeratorHandler(Symbol symbol, IEnumerator<BaseData> enumerator, Func<bool> shouldMoveNext = null, Action<BaseData> handleData = null)
262  {
263  Symbol = symbol;
264  Enumerator = enumerator;
265 
266  _handleData = handleData;
267  _shouldMoveNext = shouldMoveNext ?? (() => true);
268  }
269 
270  /// <summary>
271  /// Event invocator for the <see cref="EnumeratorFinished"/> event
272  /// </summary>
273  public void OnEnumeratorFinished()
274  {
275  EnumeratorFinished?.Invoke(this, this);
276  }
277 
278  /// <summary>
279  /// Returns true if this enumerator should move next
280  /// </summary>
281  public bool ShouldMoveNext()
282  {
283  return _shouldMoveNext();
284  }
285 
286  /// <summary>
287  /// Handles the specified data.
288  /// </summary>
289  /// <param name="data">The data to be handled</param>
290  public void HandleData(BaseData data)
291  {
292  _handleData?.Invoke(data);
293  }
294  }
295  }
296 }