Lean  $LEAN_TAG$
LiveTradingRealTimeHandler.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Linq;
19 using System.Threading;
20 using QuantConnect.Util;
21 using QuantConnect.Logging;
22 using QuantConnect.Packets;
28 
30 {
31  /// <summary>
32  /// Live trading realtime event processing.
33  /// </summary>
35  {
36  private Thread _realTimeThread;
37  private CancellationTokenSource _cancellationTokenSource = new();
38  private readonly bool _forceExchangeAlwaysOpen = Config.GetBool("force-exchange-always-open");
39 
40  /// <summary>
41  /// Gets the current market hours database instance
42  /// </summary>
44 
45  /// <summary>
46  /// Gets the current symbol properties database instance
47  /// </summary>
49 
50  /// <summary>
51  /// Gets the time provider
52  /// </summary>
53  /// <remarks>
54  /// This should be fixed to RealTimeHandler, but made a protected property for testing purposes
55  /// </remarks>
56  protected virtual ITimeProvider TimeProvider { get; } = RealTimeProvider.Instance;
57 
58  /// <summary>
59  /// Boolean flag indicating thread state.
60  /// </summary>
61  public override bool IsActive { get; protected set; }
62 
63  /// <summary>
64  /// Initializes the real time handler for the specified algorithm and job
65  /// </summary>
66  public override void Setup(IAlgorithm algorithm, AlgorithmNodePacket job, IResultHandler resultHandler, IApi api, IIsolatorLimitResultProvider isolatorLimitProvider)
67  {
68  base.Setup(algorithm, job, resultHandler, api, isolatorLimitProvider);
69 
70  var utcNow = TimeProvider.GetUtcNow();
71  var todayInAlgorithmTimeZone = utcNow.ConvertFromUtc(Algorithm.TimeZone).Date;
72 
73  // refresh the market hours and symbol properties for today explicitly
74  RefreshMarketHours(todayInAlgorithmTimeZone);
76 
77  // set up an scheduled event to refresh market hours and symbol properties every certain period of time
78  var times = Time.DateTimeRange(utcNow.Date, Time.EndOfTime, Algorithm.Settings.DatabasesRefreshPeriod).Where(date => date > utcNow);
79 
80  Add(new ScheduledEvent("RefreshMarketHoursAndSymbolProperties", times, (name, triggerTime) =>
81  {
82  RefreshMarketHours(triggerTime.ConvertFromUtc(Algorithm.TimeZone).Date);
84  }));
85  }
86 
87  /// <summary>
88  /// Get's the timeout the scheduled task time monitor should use
89  /// </summary>
90  protected override int GetTimeMonitorTimeout()
91  {
92  return 500;
93  }
94 
95  /// <summary>
96  /// Execute the live realtime event thread montioring.
97  /// It scans every second monitoring for an event trigger.
98  /// </summary>
99  private void Run()
100  {
101  IsActive = true;
102 
103  // continue thread until cancellation is requested
104  while (!_cancellationTokenSource.IsCancellationRequested)
105  {
106  var time = TimeProvider.GetUtcNow();
107 
108  // pause until the next second
109  var nextSecond = time.RoundUp(TimeSpan.FromSeconds(1));
110  var delay = Convert.ToInt32((nextSecond - time).TotalMilliseconds);
111  Thread.Sleep(delay < 0 ? 1 : delay);
112 
113  // poke each event to see if it should fire, we order by unique id to be deterministic
114  foreach (var kvp in ScheduledEvents.OrderBySafe(pair => pair.Value))
115  {
116  var scheduledEvent = kvp.Key;
117  try
118  {
119  IsolatorLimitProvider.Consume(scheduledEvent, time, TimeMonitor);
120  }
121  catch (Exception exception)
122  {
123  Algorithm.SetRuntimeError(exception, $"Scheduled event: '{scheduledEvent.Name}' at {time}");
124  }
125  }
126  }
127 
128  IsActive = false;
129  Log.Trace("LiveTradingRealTimeHandler.Run(): Exiting thread... Exit triggered: " + _cancellationTokenSource.IsCancellationRequested);
130  }
131 
132  /// <summary>
133  /// Refresh the market hours for each security in the given date
134  /// </summary>
135  /// <remarks>Each time this method is called, the MarketHoursDatabase is reset</remarks>
136  protected virtual void RefreshMarketHours(DateTime date)
137  {
138  date = date.Date;
140 
141  // update market hours for each security
142  foreach (var kvp in Algorithm.Securities)
143  {
144  var security = kvp.Value;
145  UpdateMarketHours(security);
146 
147  var localMarketHours = security.Exchange.Hours.GetMarketHours(date);
148 
149  // All future and option contracts sharing the same canonical symbol, share the same market
150  // hours too. Thus, in order to reduce logs, we log the market hours using the canonical
151  // symbol. See the optional parameter "overrideMessageFloodProtection" in Log.Trace()
152  // method for further information
153  var symbol = security.Symbol.HasCanonical() ? security.Symbol.Canonical : security.Symbol;
154  Log.Trace($"LiveTradingRealTimeHandler.RefreshMarketHoursToday({security.Type}): Market hours set: Symbol: {symbol} {localMarketHours} ({security.Exchange.Hours.TimeZone})");
155  }
156  }
157 
158  /// <summary>
159  /// Refresh the symbol properties for each security
160  /// </summary>
161  /// <remarks>
162  /// - Each time this method is called, the SymbolPropertiesDatabase is reset
163  /// - Made protected virtual for testing purposes
164  /// </remarks>
165  protected virtual void RefreshSymbolProperties()
166  {
167  ResetSymbolPropertiesDatabase();
168 
169  // update market hours for each security
170  foreach (var kvp in Algorithm.Securities)
171  {
172  var security = kvp.Value;
173  UpdateSymbolProperties(security);
174 
175  // All future and option contracts sharing the same canonical symbol, share the same symbol
176  // properties too. Thus, in order to reduce logs, we log the symbol properties using the
177  // canonical symbol. See the optional parameter "overrideMessageFloodProtection" in
178  // Log.Trace() method for further information
179  var symbol = security.Symbol.HasCanonical() ? security.Symbol.Canonical : security.Symbol;
180  Log.Trace($"LiveTradingRealTimeHandler.RefreshSymbolPropertiesToday(): Symbol properties set: " +
181  $"Symbol: {symbol} {security.SymbolProperties}");
182  }
183  }
184 
185  /// <summary>
186  /// Set the current time. If the date changes re-start the realtime event setup routines.
187  /// </summary>
188  /// <param name="time"></param>
189  public override void SetTime(DateTime time)
190  {
192  {
193  base.SetTime(time);
194  }
195  else if (_realTimeThread == null)
196  {
197  // in live mode we use current time for our time keeping
198  // this method is used by backtesting to set time based on the data
199  _realTimeThread = new Thread(Run) { IsBackground = true, Name = "RealTime Thread" };
200  _realTimeThread.Start(); // RealTime scan time for time based events
201  }
202  }
203 
204  /// <summary>
205  /// Scan for past events that didn't fire because there was no data at the scheduled time.
206  /// </summary>
207  /// <param name="time">Current time.</param>
208  public override void ScanPastEvents(DateTime time)
209  {
211  {
212  base.ScanPastEvents(time);
213  }
214  // in live mode we use current time for our time keeping
215  // this method is used by backtesting to scan for past events based on the data
216  }
217 
218  /// <summary>
219  /// Stop the real time thread
220  /// </summary>
221  public override void Exit()
222  {
223  _realTimeThread.StopSafely(TimeSpan.FromMinutes(1), _cancellationTokenSource);
224  _cancellationTokenSource.DisposeSafely();
225  base.Exit();
226  }
227 
228  /// <summary>
229  /// Updates the market hours for the specified security.
230  /// </summary>
231  /// <remarks>
232  /// - This is done after a MHDB refresh
233  /// - Made protected virtual for testing purposes
234  /// </remarks>
235  protected virtual void UpdateMarketHours(Security security)
236  {
237  var hours = _forceExchangeAlwaysOpen
240 
241  // Use Update method to avoid replacing the reference
242  security.Exchange.Hours.Update(hours);
243  }
244 
245  /// <summary>
246  /// Updates the symbol properties for the specified security.
247  /// </summary>
248  /// <remarks>
249  /// - This is done after a SPDB refresh
250  /// - Made protected virtual for testing purposes
251  /// </remarks>
252  protected virtual void UpdateSymbolProperties(Security security)
253  {
254  var symbolProperties = SymbolPropertiesDatabase.GetSymbolProperties(security.Symbol.ID.Market, security.Symbol,
255  security.Symbol.ID.SecurityType, security.QuoteCurrency.Symbol);
256  security.UpdateSymbolProperties(symbolProperties);
257  }
258 
259  /// <summary>
260  /// Resets the market hours database, forcing a reload when reused.
261  /// Called in tests where multiple algorithms are run sequentially,
262  /// and we need to guarantee that every test starts with the same environment.
263  /// </summary>
264  protected virtual void ResetMarketHoursDatabase()
265  {
266  MarketHoursDatabase.ReloadEntries();
267  }
268 
269  /// <summary>
270  /// Resets the symbol properties database, forcing a reload when reused.
271  /// </summary>
272  private void ResetSymbolPropertiesDatabase()
273  {
274  SymbolPropertiesDatabase.ReloadEntries();
275  }
276  }
277 }