18 using System.Threading;
19 using System.Threading.Tasks;
37 private static readonly TimeSpan DefaultOpenThreshold = TimeSpan.FromMinutes(5);
38 private static readonly TimeSpan DefaultInitialDelay = TimeSpan.FromMinutes(15);
40 private volatile bool _connected;
43 private readonly TimeSpan _openThreshold;
44 private readonly TimeSpan _initialDelay;
45 private CancellationTokenSource _cancellationTokenSource;
54 : this(algorithm, null, null, initialDelay, openThreshold)
68 _algorithm = algorithm;
70 _openThreshold = openThreshold ?? DefaultOpenThreshold;
71 _initialDelay = initialDelay ?? DefaultInitialDelay;
93 _algorithm.SetRuntimeError(
new Exception(message.
Message),
102 var open = (from kvp in _algorithm.Securities
103 let security = kvp.Value
105 let exchange = security.Exchange
106 let localTime = _algorithm.UtcTime.ConvertFromUtc(exchange.TimeZone)
107 where exchange.IsOpenDuringBar(
109 localTime + _openThreshold,
110 _algorithm.SubscriptionManager.SubscriptionDataConfigService
111 .GetSubscriptionDataConfigs(security.Symbol)
112 .IsExtendedMarketHours())
113 select security).Any();
121 StartCheckReconnected(_initialDelay, message);
128 DateTime nextMarketOpenUtc;
129 if (_algorithm.Securities.Count != 0)
131 nextMarketOpenUtc = (from kvp in _algorithm.Securities
132 let security = kvp.Value
134 let exchange = security.Exchange
135 let localTime = _algorithm.UtcTime.ConvertFromUtc(exchange.TimeZone)
136 let marketOpen = exchange.Hours.GetNextMarketOpen(localTime,
137 _algorithm.SubscriptionManager.SubscriptionDataConfigService
138 .GetSubscriptionDataConfigs(security.Symbol)
139 .IsExtendedMarketHours())
140 let marketOpenUtc = marketOpen.ConvertToUtc(exchange.TimeZone)
141 select marketOpenUtc).Min();
146 nextMarketOpenUtc = DateTime.UtcNow.AddHours(1);
149 var timeUntilNextMarketOpen = nextMarketOpenUtc - DateTime.UtcNow - _openThreshold;
153 StartCheckReconnected(timeUntilNextMarketOpen, message);
161 if (_cancellationTokenSource !=
null && !_cancellationTokenSource.IsCancellationRequested)
163 _cancellationTokenSource.Cancel();
181 _cancellationTokenSource.DisposeSafely();
182 _cancellationTokenSource =
new CancellationTokenSource(delay);
186 while (!_cancellationTokenSource.IsCancellationRequested)
188 Thread.Sleep(TimeSpan.FromMinutes(1));
191 CheckReconnected(message);
193 }, _cancellationTokenSource.Token);
196 private void CheckReconnected(BrokerageMessageEvent message)
200 Log.
Error(Messages.DefaultBrokerageMessageHandler.StillDisconnected);
201 _algorithm.SetRuntimeError(
new Exception(message.Message),
202 Messages.DefaultBrokerageMessageHandler.BrokerageDisconnectedShutDownContext);