18 using System.Threading;
22 using System.Threading.Tasks;
23 using System.Collections.Generic;
24 using System.Runtime.CompilerServices;
33 private readonly
string _webSocketUrl;
34 private readonly
int _maximumSymbolsPerWebSocket;
35 private readonly
int _maximumWebSocketConnections;
36 private readonly Func<WebSocketClientWrapper> _webSocketFactory;
37 private readonly Func<IWebSocket, Symbol, bool> _subscribeFunc;
38 private readonly Func<IWebSocket, Symbol, bool> _unsubscribeFunc;
39 private readonly Action<WebSocketMessage> _messageHandler;
40 private readonly
RateGate _connectionRateLimiter;
41 private readonly System.Timers.Timer _reconnectTimer;
43 private const int ConnectionTimeout = 30000;
45 private readonly
object _locker =
new();
46 private readonly List<BrokerageMultiWebSocketEntry> _webSocketEntries =
new();
63 int maximumSymbolsPerWebSocket,
64 int maximumWebSocketConnections,
65 Dictionary<Symbol, int> symbolWeights,
66 Func<WebSocketClientWrapper> webSocketFactory,
67 Func<IWebSocket, Symbol, bool> subscribeFunc,
68 Func<IWebSocket, Symbol, bool> unsubscribeFunc,
69 Action<WebSocketMessage> messageHandler,
70 TimeSpan webSocketConnectionDuration,
71 RateGate connectionRateLimiter =
null)
73 _webSocketUrl = webSocketUrl;
74 _maximumSymbolsPerWebSocket = maximumSymbolsPerWebSocket;
75 _maximumWebSocketConnections = maximumWebSocketConnections;
76 _webSocketFactory = webSocketFactory;
77 _subscribeFunc = subscribeFunc;
78 _unsubscribeFunc = unsubscribeFunc;
79 _messageHandler = messageHandler;
80 _connectionRateLimiter = connectionRateLimiter;
82 if (_maximumWebSocketConnections > 0)
85 for (var i = 0; i < _maximumWebSocketConnections; i++)
87 var webSocket = CreateWebSocket();
94 if (webSocketConnectionDuration != TimeSpan.Zero)
96 _reconnectTimer =
new System.Timers.Timer
98 Interval = webSocketConnectionDuration.TotalMilliseconds
100 _reconnectTimer.Elapsed += (_, _) =>
102 Log.
Trace(
"BrokerageMultiWebSocketSubscriptionManager(): Restarting websocket connections");
106 foreach (var entry
in _webSocketEntries)
108 if (entry.WebSocket.IsOpen)
110 Task.Factory.StartNew(() =>
112 Log.
Trace($
"BrokerageMultiWebSocketSubscriptionManager(): Websocket restart - disconnect: ({entry.WebSocket.GetHashCode()})");
113 Disconnect(entry.WebSocket);
115 Log.
Trace($
"BrokerageMultiWebSocketSubscriptionManager(): Websocket restart - connect: ({entry.WebSocket.GetHashCode()})");
116 Connect(entry.WebSocket);
122 _reconnectTimer.Start();
124 Log.
Trace($
"BrokerageMultiWebSocketSubscriptionManager(): WebSocket connections will be restarted every: {webSocketConnectionDuration}");
135 Log.
Trace($
"BrokerageMultiWebSocketSubscriptionManager.Subscribe(): {string.Join(",
", symbols.Select(x => x.Value))}");
139 foreach (var symbol
in symbols)
141 var webSocket = GetWebSocketForSymbol(symbol);
143 success &= _subscribeFunc(webSocket, symbol);
156 Log.
Trace($
"BrokerageMultiWebSocketSubscriptionManager.Unsubscribe(): {string.Join(",
", symbols.Select(x => x.Value))}");
160 foreach (var symbol
in symbols)
162 var entry = GetWebSocketEntryBySymbol(symbol);
165 entry.RemoveSymbol(symbol);
167 success &= _unsubscribeFunc(entry.WebSocket, symbol);
179 _reconnectTimer?.Stop();
180 _reconnectTimer.DisposeSafely();
183 foreach (var entry
in _webSocketEntries)
187 entry.WebSocket.Open -= OnOpen;
188 entry.WebSocket.Message -= EventHandler;
189 entry.WebSocket.Close();
196 _webSocketEntries.Clear();
204 foreach (var entry
in _webSocketEntries.Where(entry => entry.Contains(symbol)))
216 private IWebSocket GetWebSocketForSymbol(Symbol symbol)
218 BrokerageMultiWebSocketEntry entry;
222 if (_webSocketEntries.All(x => x.SymbolCount >= _maximumSymbolsPerWebSocket))
224 if (_maximumWebSocketConnections > 0)
226 throw new NotSupportedException($
"Maximum symbol count reached for the current configuration [MaxSymbolsPerWebSocket={_maximumSymbolsPerWebSocket}, MaxWebSocketConnections:{_maximumWebSocketConnections}]");
230 var webSocket = CreateWebSocket();
232 _webSocketEntries.Add(
new BrokerageMultiWebSocketEntry(webSocket));
236 _webSocketEntries.Sort((x, y) =>
237 x.SymbolCount >= _maximumSymbolsPerWebSocket
239 : y.SymbolCount >= _maximumSymbolsPerWebSocket
241 : Math.Sign(x.TotalWeight - y.TotalWeight));
243 entry = _webSocketEntries.First();
246 if (!entry.WebSocket.IsOpen)
248 Connect(entry.WebSocket);
251 entry.AddSymbol(symbol);
253 Log.
Trace($
"BrokerageMultiWebSocketSubscriptionManager.GetWebSocketForSymbol(): added symbol: {symbol} to websocket: {entry.WebSocket.GetHashCode()} - Count: {entry.SymbolCount}");
255 return entry.WebSocket;
262 private IWebSocket CreateWebSocket()
264 var webSocket = _webSocketFactory();
265 webSocket.Open += OnOpen;
266 webSocket.Message += EventHandler;
267 webSocket.Initialize(_webSocketUrl);
272 [MethodImpl(MethodImplOptions.AggressiveInlining)]
273 private void EventHandler(
object _, WebSocketMessage message)
275 _messageHandler(message);
278 private void Connect(IWebSocket webSocket)
280 var connectedEvent =
new ManualResetEvent(
false);
281 EventHandler onOpenAction = (_, _) =>
283 connectedEvent.Set();
286 webSocket.Open += onOpenAction;
288 if (_connectionRateLimiter is { IsRateLimited:
false })
290 _connectionRateLimiter.WaitToProceed();
297 if (!connectedEvent.WaitOne(ConnectionTimeout))
299 throw new TimeoutException($
"BrokerageMultiWebSocketSubscriptionManager.Connect(): WebSocket connection timeout: {webSocket.GetHashCode()}");
304 webSocket.Open -= onOpenAction;
306 connectedEvent.DisposeSafely();
310 private void Disconnect(IWebSocket webSocket)
315 private void OnOpen(
object sender, EventArgs e)
317 var webSocket = (IWebSocket)sender;
321 foreach (var entry
in _webSocketEntries)
323 if (entry.WebSocket == webSocket && entry.Symbols.Count > 0)
325 Log.
Trace($
"BrokerageMultiWebSocketSubscriptionManager.Connect(): WebSocket opened: {webSocket.GetHashCode()} - Resubscribing existing symbols: {entry.Symbols.Count}");
327 Task.Factory.StartNew(() =>
329 foreach (var symbol
in entry.Symbols)
331 _subscribeFunc(webSocket, symbol);