Lean  $LEAN_TAG$
DefaultBrokerageMessageHandler.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Linq;
18 using System.Threading;
19 using System.Threading.Tasks;
20 using QuantConnect.Data;
22 using QuantConnect.Logging;
23 using QuantConnect.Packets;
24 using QuantConnect.Util;
25 
27 {
28  /// <summary>
29  /// Provides a default implementation o <see cref="IBrokerageMessageHandler"/> that will forward
30  /// messages as follows:
31  /// Information -> IResultHandler.Debug
32  /// Warning -> IResultHandler.Error &amp;&amp; IApi.SendUserEmail
33  /// Error -> IResultHandler.Error &amp;&amp; IAlgorithm.RunTimeError
34  /// </summary>
36  {
37  private static readonly TimeSpan DefaultOpenThreshold = TimeSpan.FromMinutes(5);
38  private static readonly TimeSpan DefaultInitialDelay = TimeSpan.FromMinutes(15);
39 
40  private volatile bool _connected;
41 
42  private readonly IAlgorithm _algorithm;
43  private readonly TimeSpan _openThreshold;
44  private readonly TimeSpan _initialDelay;
45  private CancellationTokenSource _cancellationTokenSource;
46 
47  /// <summary>
48  /// Initializes a new instance of the <see cref="DefaultBrokerageMessageHandler"/> class
49  /// </summary>
50  /// <param name="algorithm">The running algorithm</param>
51  /// <param name="initialDelay"></param>
52  /// <param name="openThreshold">Defines how long before market open to re-check for brokerage reconnect message</param>
53  public DefaultBrokerageMessageHandler(IAlgorithm algorithm, TimeSpan? initialDelay = null, TimeSpan? openThreshold = null)
54  : this(algorithm, null, null, initialDelay, openThreshold)
55  {
56  }
57 
58  /// <summary>
59  /// Initializes a new instance of the <see cref="DefaultBrokerageMessageHandler"/> class
60  /// </summary>
61  /// <param name="algorithm">The running algorithm</param>
62  /// <param name="job">The job that produced the algorithm</param>
63  /// <param name="api">The api for the algorithm</param>
64  /// <param name="initialDelay"></param>
65  /// <param name="openThreshold">Defines how long before market open to re-check for brokerage reconnect message</param>
66  public DefaultBrokerageMessageHandler(IAlgorithm algorithm, AlgorithmNodePacket job, IApi api, TimeSpan? initialDelay = null, TimeSpan? openThreshold = null)
67  {
68  _algorithm = algorithm;
69  _connected = true;
70  _openThreshold = openThreshold ?? DefaultOpenThreshold;
71  _initialDelay = initialDelay ?? DefaultInitialDelay;
72  }
73 
74  /// <summary>
75  /// Handles the message
76  /// </summary>
77  /// <param name="message">The message to be handled</param>
78  public void HandleMessage(BrokerageMessageEvent message)
79  {
80  // based on message type dispatch to result handler
81  switch (message.Type)
82  {
83  case BrokerageMessageType.Information:
84  _algorithm.Debug(Messages.DefaultBrokerageMessageHandler.BrokerageInfo(message));
85  break;
86 
87  case BrokerageMessageType.Warning:
88  _algorithm.Error(Messages.DefaultBrokerageMessageHandler.BrokerageWarning(message));
89  break;
90 
91  case BrokerageMessageType.Error:
92  // unexpected error, we need to close down shop
93  _algorithm.SetRuntimeError(new Exception(message.Message),
95  break;
96 
97  case BrokerageMessageType.Disconnect:
98  _connected = false;
100 
101  // check to see if any non-custom security exchanges are open within the next x minutes
102  var open = (from kvp in _algorithm.Securities
103  let security = kvp.Value
104  where security.Type != SecurityType.Base
105  let exchange = security.Exchange
106  let localTime = _algorithm.UtcTime.ConvertFromUtc(exchange.TimeZone)
107  where exchange.IsOpenDuringBar(
108  localTime,
109  localTime + _openThreshold,
110  _algorithm.SubscriptionManager.SubscriptionDataConfigService
111  .GetSubscriptionDataConfigs(security.Symbol)
112  .IsExtendedMarketHours())
113  select security).Any();
114 
115  // if any are open then we need to kill the algorithm
116  if (open)
117  {
119 
120  // wait 15 minutes before killing algorithm
121  StartCheckReconnected(_initialDelay, message);
122  }
123  else
124  {
126 
127  // if they aren't open, we'll need to check again a little bit before markets open
128  DateTime nextMarketOpenUtc;
129  if (_algorithm.Securities.Count != 0)
130  {
131  nextMarketOpenUtc = (from kvp in _algorithm.Securities
132  let security = kvp.Value
133  where security.Type != SecurityType.Base
134  let exchange = security.Exchange
135  let localTime = _algorithm.UtcTime.ConvertFromUtc(exchange.TimeZone)
136  let marketOpen = exchange.Hours.GetNextMarketOpen(localTime,
137  _algorithm.SubscriptionManager.SubscriptionDataConfigService
138  .GetSubscriptionDataConfigs(security.Symbol)
139  .IsExtendedMarketHours())
140  let marketOpenUtc = marketOpen.ConvertToUtc(exchange.TimeZone)
141  select marketOpenUtc).Min();
142  }
143  else
144  {
145  // if we have no securities just make next market open an hour from now
146  nextMarketOpenUtc = DateTime.UtcNow.AddHours(1);
147  }
148 
149  var timeUntilNextMarketOpen = nextMarketOpenUtc - DateTime.UtcNow - _openThreshold;
151 
152  // wake up 5 minutes before market open and check if we've reconnected
153  StartCheckReconnected(timeUntilNextMarketOpen, message);
154  }
155  break;
156 
157  case BrokerageMessageType.Reconnect:
158  _connected = true;
160 
161  if (_cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested)
162  {
163  _cancellationTokenSource.Cancel();
164  }
165  break;
166  }
167  }
168 
169  /// <summary>
170  /// Handles a new order placed manually in the brokerage side
171  /// </summary>
172  /// <param name="eventArgs">The new order event</param>
173  /// <returns>Whether the order should be added to the transaction handler</returns>
175  {
176  return false;
177  }
178 
179  private void StartCheckReconnected(TimeSpan delay, BrokerageMessageEvent message)
180  {
181  _cancellationTokenSource.DisposeSafely();
182  _cancellationTokenSource = new CancellationTokenSource(delay);
183 
184  Task.Run(() =>
185  {
186  while (!_cancellationTokenSource.IsCancellationRequested)
187  {
188  Thread.Sleep(TimeSpan.FromMinutes(1));
189  }
190 
191  CheckReconnected(message);
192 
193  }, _cancellationTokenSource.Token);
194  }
195 
196  private void CheckReconnected(BrokerageMessageEvent message)
197  {
198  if (!_connected)
199  {
200  Log.Error(Messages.DefaultBrokerageMessageHandler.StillDisconnected);
201  _algorithm.SetRuntimeError(new Exception(message.Message),
202  Messages.DefaultBrokerageMessageHandler.BrokerageDisconnectedShutDownContext);
203  }
204  }
205  }
206 }