Lean  $LEAN_TAG$
LiveTradingRealTimeHandler.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Linq;
19 using System.Threading;
20 using QuantConnect.Util;
21 using QuantConnect.Logging;
22 using QuantConnect.Packets;
28 
30 {
31  /// <summary>
32  /// Live trading realtime event processing.
33  /// </summary>
35  {
36  private Thread _realTimeThread;
37  private CancellationTokenSource _cancellationTokenSource = new();
38  private readonly bool _forceExchangeAlwaysOpen = Config.GetBool("force-exchange-always-open");
39 
40  /// <summary>
41  /// Gets the current market hours database instance
42  /// </summary>
44 
45  /// <summary>
46  /// Gets the current symbol properties database instance
47  /// </summary>
49 
50  /// <summary>
51  /// Gets the time provider
52  /// </summary>
53  /// <remarks>
54  /// This should be fixed to RealTimeHandler, but made a protected property for testing purposes
55  /// </remarks>
56  protected virtual ITimeProvider TimeProvider { get; } = RealTimeProvider.Instance;
57 
58  /// <summary>
59  /// Boolean flag indicating thread state.
60  /// </summary>
61  public override bool IsActive { get; protected set; }
62 
63  /// <summary>
64  /// Initializes the real time handler for the specified algorithm and job
65  /// </summary>
66  public override void Setup(IAlgorithm algorithm, AlgorithmNodePacket job, IResultHandler resultHandler, IApi api, IIsolatorLimitResultProvider isolatorLimitProvider)
67  {
68  base.Setup(algorithm, job, resultHandler, api, isolatorLimitProvider);
69 
70  var utcNow = TimeProvider.GetUtcNow();
71  var todayInAlgorithmTimeZone = utcNow.ConvertFromUtc(Algorithm.TimeZone).Date;
72 
73  // refresh the market hours and symbol properties for today explicitly
74  RefreshMarketHours(todayInAlgorithmTimeZone);
76 
77  // set up an scheduled event to refresh market hours and symbol properties every certain period of time
78  var times = Time.DateTimeRange(utcNow.Date, Time.EndOfTime, Algorithm.Settings.DatabasesRefreshPeriod).Where(date => date > utcNow);
79 
80  Add(new ScheduledEvent("RefreshMarketHoursAndSymbolProperties", times, (name, triggerTime) =>
81  {
82  RefreshMarketHours(triggerTime.ConvertFromUtc(Algorithm.TimeZone).Date);
84  }));
85  }
86 
87  /// <summary>
88  /// Get's the timeout the scheduled task time monitor should use
89  /// </summary>
90  protected override int GetTimeMonitorTimeout()
91  {
92  return 500;
93  }
94 
95  /// <summary>
96  /// Execute the live realtime event thread montioring.
97  /// It scans every second monitoring for an event trigger.
98  /// </summary>
99  private void Run()
100  {
101  IsActive = true;
102 
103  // continue thread until cancellation is requested
104  while (!_cancellationTokenSource.IsCancellationRequested)
105  {
106  var time = TimeProvider.GetUtcNow();
107 
108  // pause until the next second
109  var nextSecond = time.RoundUp(TimeSpan.FromSeconds(1));
110  var delay = Convert.ToInt32((nextSecond - time).TotalMilliseconds);
111  Thread.Sleep(delay < 0 ? 1 : delay);
112 
113  // poke each event to see if it should fire, we order by unique id to be deterministic
114  foreach (var kvp in ScheduledEvents.OrderBySafe(pair => pair.Value))
115  {
116  var scheduledEvent = kvp.Key;
117  try
118  {
119  IsolatorLimitProvider.Consume(scheduledEvent, time, TimeMonitor);
120  }
121  catch (Exception exception)
122  {
123  Algorithm.SetRuntimeError(exception, $"Scheduled event: '{scheduledEvent.Name}' at {time}");
124  }
125  }
126  }
127 
128  IsActive = false;
129  Log.Trace("LiveTradingRealTimeHandler.Run(): Exiting thread... Exit triggered: " + _cancellationTokenSource.IsCancellationRequested);
130  }
131 
132  /// <summary>
133  /// Refresh the market hours for each security in the given date
134  /// </summary>
135  /// <remarks>Each time this method is called, the MarketHoursDatabase is reset</remarks>
136  protected virtual void RefreshMarketHours(DateTime date)
137  {
138  date = date.Date;
140 
141  // update market hours for each security
142  foreach (var kvp in Algorithm.Securities)
143  {
144  var security = kvp.Value;
145  UpdateMarketHours(security);
146 
147  var localMarketHours = security.Exchange.Hours.GetMarketHours(date);
148  Log.Trace($"LiveTradingRealTimeHandler.RefreshMarketHoursToday({security.Type}): Market hours set: Symbol: {security.Symbol} {localMarketHours} ({security.Exchange.Hours.TimeZone})");
149  }
150  }
151 
152  /// <summary>
153  /// Refresh the symbol properties for each security
154  /// </summary>
155  /// <remarks>
156  /// - Each time this method is called, the SymbolPropertiesDatabase is reset
157  /// - Made protected virtual for testing purposes
158  /// </remarks>
159  protected virtual void RefreshSymbolProperties()
160  {
161  ResetSymbolPropertiesDatabase();
162 
163  // update market hours for each security
164  foreach (var kvp in Algorithm.Securities)
165  {
166  var security = kvp.Value;
167  UpdateSymbolProperties(security);
168 
169  Log.Trace($"LiveTradingRealTimeHandler.RefreshSymbolPropertiesToday(): Symbol properties set: " +
170  $"Symbol: {security.Symbol} {security.SymbolProperties}");
171  }
172  }
173 
174  /// <summary>
175  /// Set the current time. If the date changes re-start the realtime event setup routines.
176  /// </summary>
177  /// <param name="time"></param>
178  public override void SetTime(DateTime time)
179  {
181  {
182  base.SetTime(time);
183  }
184  else if (_realTimeThread == null)
185  {
186  // in live mode we use current time for our time keeping
187  // this method is used by backtesting to set time based on the data
188  _realTimeThread = new Thread(Run) { IsBackground = true, Name = "RealTime Thread" };
189  _realTimeThread.Start(); // RealTime scan time for time based events
190  }
191  }
192 
193  /// <summary>
194  /// Scan for past events that didn't fire because there was no data at the scheduled time.
195  /// </summary>
196  /// <param name="time">Current time.</param>
197  public override void ScanPastEvents(DateTime time)
198  {
200  {
201  base.ScanPastEvents(time);
202  }
203  // in live mode we use current time for our time keeping
204  // this method is used by backtesting to scan for past events based on the data
205  }
206 
207  /// <summary>
208  /// Stop the real time thread
209  /// </summary>
210  public override void Exit()
211  {
212  _realTimeThread.StopSafely(TimeSpan.FromMinutes(1), _cancellationTokenSource);
213  _cancellationTokenSource.DisposeSafely();
214  base.Exit();
215  }
216 
217  /// <summary>
218  /// Updates the market hours for the specified security.
219  /// </summary>
220  /// <remarks>
221  /// - This is done after a MHDB refresh
222  /// - Made protected virtual for testing purposes
223  /// </remarks>
224  protected virtual void UpdateMarketHours(Security security)
225  {
226  var hours = _forceExchangeAlwaysOpen
229 
230  // Use Update method to avoid replacing the reference
231  security.Exchange.Hours.Update(hours);
232  }
233 
234  /// <summary>
235  /// Updates the symbol properties for the specified security.
236  /// </summary>
237  /// <remarks>
238  /// - This is done after a SPDB refresh
239  /// - Made protected virtual for testing purposes
240  /// </remarks>
241  protected virtual void UpdateSymbolProperties(Security security)
242  {
243  var symbolProperties = SymbolPropertiesDatabase.GetSymbolProperties(security.Symbol.ID.Market, security.Symbol,
244  security.Symbol.ID.SecurityType, security.QuoteCurrency.Symbol);
245  security.UpdateSymbolProperties(symbolProperties);
246  }
247 
248  /// <summary>
249  /// Resets the market hours database, forcing a reload when reused.
250  /// Called in tests where multiple algorithms are run sequentially,
251  /// and we need to guarantee that every test starts with the same environment.
252  /// </summary>
253  protected virtual void ResetMarketHoursDatabase()
254  {
255  MarketHoursDatabase.ReloadEntries();
256  }
257 
258  /// <summary>
259  /// Resets the symbol properties database, forcing a reload when reused.
260  /// </summary>
261  private void ResetSymbolPropertiesDatabase()
262  {
263  SymbolPropertiesDatabase.ReloadEntries();
264  }
265  }
266 }