Lean  $LEAN_TAG$
RealTimeScheduleEventService.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Threading;
18 using QuantConnect.Util;
19 using System.Collections.Generic;
20 
22 {
23  /// <summary>
24  /// Allows to setup a real time scheduled event, internally using a <see cref="Thread"/>,
25  /// that is guaranteed to trigger at or after the requested time, never before.
26  /// </summary>
27  /// <remarks>This class is of value because <see cref="Timer"/> could fire the
28  /// event before time.</remarks>
29  public class RealTimeScheduleEventService : IDisposable
30  {
31  private readonly Thread _pulseThread;
32  private readonly Queue<DateTime> _work;
33  private readonly ManualResetEvent _event;
34  private readonly CancellationTokenSource _tokenSource;
35 
36  /// <summary>
37  /// Event fired when the scheduled time is past
38  /// </summary>
39  public event EventHandler NewEvent;
40 
41  /// <summary>
42  /// Creates a new instance
43  /// </summary>
44  /// <param name="timeProvider">The time provider to use</param>
46  {
47  _tokenSource = new CancellationTokenSource();
48  _event = new ManualResetEvent(false);
49  _work = new Queue<DateTime>();
50  _pulseThread = new Thread(() =>
51  {
52  while (!_tokenSource.Token.IsCancellationRequested)
53  {
54  DateTime nextUtcScheduledEvent;
55  lock (_work)
56  {
57  _work.TryDequeue(out nextUtcScheduledEvent);
58  }
59 
60  if (nextUtcScheduledEvent == default)
61  {
62  _event.WaitOne(_tokenSource.Token);
63  _event.Reset();
64  if (_tokenSource.Token.IsCancellationRequested)
65  {
66  return;
67  }
68  continue;
69  }
70 
71  // testing has shown that it sometimes requires more than one loop
72  var diff = nextUtcScheduledEvent - timeProvider.GetUtcNow();
73  while (diff.Ticks > 0)
74  {
75  _tokenSource.Token.WaitHandle.WaitOne(diff);
76 
77  diff = nextUtcScheduledEvent - timeProvider.GetUtcNow();
78 
79  if (_tokenSource.Token.IsCancellationRequested)
80  {
81  return;
82  }
83  }
84 
85  NewEvent?.Invoke(this, EventArgs.Empty);
86  }
87  }) { IsBackground = true, Name = "RealTimeScheduleEventService" };
88  _pulseThread.Start();
89  }
90 
91  /// <summary>
92  /// Schedules a new event
93  /// </summary>
94  /// <param name="dueTime">The desired due time</param>
95  /// <param name="utcNow">Current utc time</param>
96  /// <remarks>Scheduling a new event will try to disable previous scheduled event,
97  /// but it is not guaranteed.</remarks>
98  public void ScheduleEvent(TimeSpan dueTime, DateTime utcNow)
99  {
100  lock (_work)
101  {
102  _work.Enqueue(utcNow + dueTime);
103  _event.Set();
104  }
105  }
106 
107  /// <summary>
108  /// Disposes of the underlying <see cref="Timer"/> instance
109  /// </summary>
110  public void Dispose()
111  {
112  _pulseThread.StopSafely(TimeSpan.FromSeconds(1), _tokenSource);
113  _tokenSource.DisposeSafely();
114  _event.DisposeSafely();
115  }
116  }
117 }