Lean  $LEAN_TAG$
BrokerageMultiWebSocketSubscriptionManager.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 using System;
17 using System.Linq;
18 using System.Threading;
19 using QuantConnect.Data;
20 using QuantConnect.Util;
21 using QuantConnect.Logging;
22 using System.Threading.Tasks;
23 using System.Collections.Generic;
24 using System.Runtime.CompilerServices;
25 
27 {
28  /// <summary>
29  /// Handles brokerage data subscriptions with multiple websocket connections, with optional symbol weighting
30  /// </summary>
32  {
33  private readonly string _webSocketUrl;
34  private readonly int _maximumSymbolsPerWebSocket;
35  private readonly int _maximumWebSocketConnections;
36  private readonly Func<WebSocketClientWrapper> _webSocketFactory;
37  private readonly Func<IWebSocket, Symbol, bool> _subscribeFunc;
38  private readonly Func<IWebSocket, Symbol, bool> _unsubscribeFunc;
39  private readonly Action<WebSocketMessage> _messageHandler;
40  private readonly RateGate _connectionRateLimiter;
41  private readonly System.Timers.Timer _reconnectTimer;
42 
43  private const int ConnectionTimeout = 30000;
44 
45  private readonly object _locker = new();
46  private readonly List<BrokerageMultiWebSocketEntry> _webSocketEntries = new();
47 
48  /// <summary>
49  /// Initializes a new instance of the <see cref="BrokerageMultiWebSocketSubscriptionManager"/> class
50  /// </summary>
51  /// <param name="webSocketUrl">The URL for websocket connections</param>
52  /// <param name="maximumSymbolsPerWebSocket">The maximum number of symbols per websocket connection</param>
53  /// <param name="maximumWebSocketConnections">The maximum number of websocket connections allowed (if zero, symbol weighting is disabled)</param>
54  /// <param name="symbolWeights">A dictionary for the symbol weights</param>
55  /// <param name="webSocketFactory">A function which returns a new websocket instance</param>
56  /// <param name="subscribeFunc">A function which subscribes a symbol</param>
57  /// <param name="unsubscribeFunc">A function which unsubscribes a symbol</param>
58  /// <param name="messageHandler">The websocket message handler</param>
59  /// <param name="webSocketConnectionDuration">The maximum duration of the websocket connection, TimeSpan.Zero for no duration limit</param>
60  /// <param name="connectionRateLimiter">The rate limiter for creating new websocket connections</param>
62  string webSocketUrl,
63  int maximumSymbolsPerWebSocket,
64  int maximumWebSocketConnections,
65  Dictionary<Symbol, int> symbolWeights,
66  Func<WebSocketClientWrapper> webSocketFactory,
67  Func<IWebSocket, Symbol, bool> subscribeFunc,
68  Func<IWebSocket, Symbol, bool> unsubscribeFunc,
69  Action<WebSocketMessage> messageHandler,
70  TimeSpan webSocketConnectionDuration,
71  RateGate connectionRateLimiter = null)
72  {
73  _webSocketUrl = webSocketUrl;
74  _maximumSymbolsPerWebSocket = maximumSymbolsPerWebSocket;
75  _maximumWebSocketConnections = maximumWebSocketConnections;
76  _webSocketFactory = webSocketFactory;
77  _subscribeFunc = subscribeFunc;
78  _unsubscribeFunc = unsubscribeFunc;
79  _messageHandler = messageHandler;
80  // let's use a reasonable default, no API will like to get DOS on reconnections. 50 WS will take 120s
81  _connectionRateLimiter = connectionRateLimiter ?? new RateGate(5, TimeSpan.FromSeconds(12));
82 
83  if (_maximumWebSocketConnections > 0)
84  {
85  // symbol weighting enabled, create all websocket instances
86  for (var i = 0; i < _maximumWebSocketConnections; i++)
87  {
88  var webSocket = CreateWebSocket();
89 
90  _webSocketEntries.Add(new BrokerageMultiWebSocketEntry(symbolWeights, webSocket));
91  }
92  }
93 
94  // Some exchanges (e.g. Binance) require a daily restart for websocket connections
95  if (webSocketConnectionDuration != TimeSpan.Zero)
96  {
97  _reconnectTimer = new System.Timers.Timer
98  {
99  Interval = webSocketConnectionDuration.TotalMilliseconds
100  };
101  _reconnectTimer.Elapsed += (_, _) =>
102  {
103  List<BrokerageMultiWebSocketEntry> webSocketEntries;
104  lock (_locker)
105  {
106  // let's make a copy so we don't hold the lock
107  webSocketEntries = _webSocketEntries.ToList();
108  }
109 
110  Log.Trace($"BrokerageMultiWebSocketSubscriptionManager(): Restarting {webSocketEntries.Count} websocket connections");
111 
112  Parallel.ForEach(webSocketEntries, new ParallelOptions { MaxDegreeOfParallelism = 4 }, entry =>
113  {
114  if (entry.WebSocket.IsOpen)
115  {
116  Log.Trace($"BrokerageMultiWebSocketSubscriptionManager(): Websocket restart - disconnect: ({entry.WebSocket.GetHashCode()})");
117  Disconnect(entry.WebSocket);
118 
119  Log.Trace($"BrokerageMultiWebSocketSubscriptionManager(): Websocket restart - connect: ({entry.WebSocket.GetHashCode()})");
120  Connect(entry.WebSocket);
121  }
122  });
123  };
124  _reconnectTimer.Start();
125 
126  Log.Trace($"BrokerageMultiWebSocketSubscriptionManager(): WebSocket connections will be restarted every: {webSocketConnectionDuration}");
127  }
128  }
129 
130  /// <summary>
131  /// Subscribes to the symbols
132  /// </summary>
133  /// <param name="symbols">Symbols to subscribe</param>
134  /// <param name="tickType">Type of tick data</param>
135  protected override bool Subscribe(IEnumerable<Symbol> symbols, TickType tickType)
136  {
137  Log.Trace($"BrokerageMultiWebSocketSubscriptionManager.Subscribe(): {string.Join(",", symbols.Select(x => x.Value))}");
138 
139  var success = true;
140 
141  foreach (var symbol in symbols)
142  {
143  var webSocket = GetWebSocketForSymbol(symbol);
144 
145  success &= _subscribeFunc(webSocket, symbol);
146  }
147 
148  return success;
149  }
150 
151  /// <summary>
152  /// Unsubscribes from the symbols
153  /// </summary>
154  /// <param name="symbols">Symbols to subscribe</param>
155  /// <param name="tickType">Type of tick data</param>
156  protected override bool Unsubscribe(IEnumerable<Symbol> symbols, TickType tickType)
157  {
158  Log.Trace($"BrokerageMultiWebSocketSubscriptionManager.Unsubscribe(): {string.Join(",", symbols.Select(x => x.Value))}");
159 
160  var success = true;
161 
162  foreach (var symbol in symbols)
163  {
164  var entry = GetWebSocketEntryBySymbol(symbol);
165  if (entry != null)
166  {
167  entry.RemoveSymbol(symbol);
168 
169  success &= _unsubscribeFunc(entry.WebSocket, symbol);
170  }
171  }
172 
173  return success;
174  }
175 
176  /// <summary>
177  /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
178  /// </summary>
179  public override void Dispose()
180  {
181  _reconnectTimer?.Stop();
182  _reconnectTimer.DisposeSafely();
183  lock (_locker)
184  {
185  foreach (var entry in _webSocketEntries)
186  {
187  try
188  {
189  entry.WebSocket.Open -= OnOpen;
190  entry.WebSocket.Message -= EventHandler;
191  entry.WebSocket.Close();
192  }
193  catch (Exception ex)
194  {
195  Log.Error(ex);
196  }
197  }
198  _webSocketEntries.Clear();
199  }
200  }
201 
202  private BrokerageMultiWebSocketEntry GetWebSocketEntryBySymbol(Symbol symbol)
203  {
204  lock (_locker)
205  {
206  foreach (var entry in _webSocketEntries.Where(entry => entry.Contains(symbol)))
207  {
208  return entry;
209  }
210  }
211 
212  return null;
213  }
214 
215  /// <summary>
216  /// Adds a symbol to an existing or new websocket connection
217  /// </summary>
218  private IWebSocket GetWebSocketForSymbol(Symbol symbol)
219  {
220  BrokerageMultiWebSocketEntry entry;
221 
222  lock (_locker)
223  {
224  if (_webSocketEntries.All(x => x.SymbolCount >= _maximumSymbolsPerWebSocket))
225  {
226  if (_maximumWebSocketConnections > 0)
227  {
228  throw new NotSupportedException($"Maximum symbol count reached for the current configuration [MaxSymbolsPerWebSocket={_maximumSymbolsPerWebSocket}, MaxWebSocketConnections:{_maximumWebSocketConnections}]");
229  }
230 
231  // symbol limit reached on all, create new websocket instance
232  var webSocket = CreateWebSocket();
233 
234  _webSocketEntries.Add(new BrokerageMultiWebSocketEntry(webSocket));
235  }
236 
237  // sort by weight ascending, taking into account the symbol limit per websocket
238  _webSocketEntries.Sort((x, y) =>
239  x.SymbolCount >= _maximumSymbolsPerWebSocket
240  ? 1
241  : y.SymbolCount >= _maximumSymbolsPerWebSocket
242  ? -1
243  : Math.Sign(x.TotalWeight - y.TotalWeight));
244 
245  entry = _webSocketEntries.First();
246  }
247 
248  if (!entry.WebSocket.IsOpen)
249  {
250  Connect(entry.WebSocket);
251  }
252 
253  entry.AddSymbol(symbol);
254 
255  Log.Trace($"BrokerageMultiWebSocketSubscriptionManager.GetWebSocketForSymbol(): added symbol: {symbol} to websocket: {entry.WebSocket.GetHashCode()} - Count: {entry.SymbolCount}");
256 
257  return entry.WebSocket;
258  }
259 
260  /// <summary>
261  /// When we create a websocket we will subscribe to it's events once and initialize it
262  /// </summary>
263  /// <remarks>Note that the websocket is no connected yet <see cref="Connect(IWebSocket)"/></remarks>
264  private IWebSocket CreateWebSocket()
265  {
266  var webSocket = _webSocketFactory();
267  webSocket.Open += OnOpen;
268  webSocket.Message += EventHandler;
269  webSocket.Initialize(_webSocketUrl);
270 
271  return webSocket;
272  }
273 
274  [MethodImpl(MethodImplOptions.AggressiveInlining)]
275  private void EventHandler(object _, WebSocketMessage message)
276  {
277  _messageHandler(message);
278  }
279 
280  private void Connect(IWebSocket webSocket)
281  {
282  var connectedEvent = new ManualResetEvent(false);
283  EventHandler onOpenAction = (_, _) =>
284  {
285  connectedEvent.Set();
286  };
287 
288  webSocket.Open += onOpenAction;
289 
290  _connectionRateLimiter.WaitToProceed();
291 
292  try
293  {
294  webSocket.Connect();
295 
296  if (!connectedEvent.WaitOne(ConnectionTimeout))
297  {
298  throw new TimeoutException($"BrokerageMultiWebSocketSubscriptionManager.Connect(): WebSocket connection timeout: {webSocket.GetHashCode()}");
299  }
300  }
301  finally
302  {
303  webSocket.Open -= onOpenAction;
304 
305  connectedEvent.DisposeSafely();
306  }
307  }
308 
309  private void Disconnect(IWebSocket webSocket)
310  {
311  webSocket.Close();
312  }
313 
314  private void OnOpen(object sender, EventArgs e)
315  {
316  var webSocket = (IWebSocket)sender;
317 
318  lock (_locker)
319  {
320  foreach (var entry in _webSocketEntries)
321  {
322  if (entry.WebSocket == webSocket && entry.Symbols.Count > 0)
323  {
324  Log.Trace($"BrokerageMultiWebSocketSubscriptionManager.Connect(): WebSocket opened: {webSocket.GetHashCode()} - Resubscribing existing symbols: {entry.Symbols.Count}");
325 
326  Task.Factory.StartNew(() =>
327  {
328  foreach (var symbol in entry.Symbols)
329  {
330  _subscribeFunc(webSocket, symbol);
331  }
332  });
333  break;
334  }
335  }
336  }
337  }
338  }
339 }