18 using System.Threading;
22 using System.Threading.Tasks;
23 using System.Collections.Generic;
24 using System.Runtime.CompilerServices;
33 private readonly
string _webSocketUrl;
34 private readonly
int _maximumSymbolsPerWebSocket;
35 private readonly
int _maximumWebSocketConnections;
36 private readonly Func<WebSocketClientWrapper> _webSocketFactory;
37 private readonly Func<IWebSocket, Symbol, bool> _subscribeFunc;
38 private readonly Func<IWebSocket, Symbol, bool> _unsubscribeFunc;
39 private readonly Action<WebSocketMessage> _messageHandler;
40 private readonly
RateGate _connectionRateLimiter;
41 private readonly System.Timers.Timer _reconnectTimer;
43 private const int ConnectionTimeout = 30000;
45 private readonly
object _locker =
new();
46 private readonly List<BrokerageMultiWebSocketEntry> _webSocketEntries =
new();
63 int maximumSymbolsPerWebSocket,
64 int maximumWebSocketConnections,
65 Dictionary<Symbol, int> symbolWeights,
66 Func<WebSocketClientWrapper> webSocketFactory,
67 Func<IWebSocket, Symbol, bool> subscribeFunc,
68 Func<IWebSocket, Symbol, bool> unsubscribeFunc,
69 Action<WebSocketMessage> messageHandler,
70 TimeSpan webSocketConnectionDuration,
71 RateGate connectionRateLimiter =
null)
73 _webSocketUrl = webSocketUrl;
74 _maximumSymbolsPerWebSocket = maximumSymbolsPerWebSocket;
75 _maximumWebSocketConnections = maximumWebSocketConnections;
76 _webSocketFactory = webSocketFactory;
77 _subscribeFunc = subscribeFunc;
78 _unsubscribeFunc = unsubscribeFunc;
79 _messageHandler = messageHandler;
81 _connectionRateLimiter = connectionRateLimiter ??
new RateGate(5, TimeSpan.FromSeconds(12));
83 if (_maximumWebSocketConnections > 0)
86 for (var i = 0; i < _maximumWebSocketConnections; i++)
88 var webSocket = CreateWebSocket();
95 if (webSocketConnectionDuration != TimeSpan.Zero)
97 _reconnectTimer =
new System.Timers.Timer
99 Interval = webSocketConnectionDuration.TotalMilliseconds
101 _reconnectTimer.Elapsed += (_, _) =>
103 List<BrokerageMultiWebSocketEntry> webSocketEntries;
107 webSocketEntries = _webSocketEntries.ToList();
110 Log.
Trace($
"BrokerageMultiWebSocketSubscriptionManager(): Restarting {webSocketEntries.Count} websocket connections");
112 Parallel.ForEach(webSocketEntries,
new ParallelOptions { MaxDegreeOfParallelism = 4 }, entry =>
114 if (entry.WebSocket.IsOpen)
116 Log.Trace($
"BrokerageMultiWebSocketSubscriptionManager(): Websocket restart - disconnect: ({entry.WebSocket.GetHashCode()})");
117 Disconnect(entry.WebSocket);
119 Log.Trace($
"BrokerageMultiWebSocketSubscriptionManager(): Websocket restart - connect: ({entry.WebSocket.GetHashCode()})");
120 Connect(entry.WebSocket);
124 _reconnectTimer.Start();
126 Log.
Trace($
"BrokerageMultiWebSocketSubscriptionManager(): WebSocket connections will be restarted every: {webSocketConnectionDuration}");
137 Log.
Trace($
"BrokerageMultiWebSocketSubscriptionManager.Subscribe(): {string.Join(",
", symbols.Select(x => x.Value))}");
141 foreach (var symbol
in symbols)
143 var webSocket = GetWebSocketForSymbol(symbol);
145 success &= _subscribeFunc(webSocket, symbol);
158 Log.
Trace($
"BrokerageMultiWebSocketSubscriptionManager.Unsubscribe(): {string.Join(",
", symbols.Select(x => x.Value))}");
162 foreach (var symbol
in symbols)
164 var entry = GetWebSocketEntryBySymbol(symbol);
167 entry.RemoveSymbol(symbol);
169 success &= _unsubscribeFunc(entry.WebSocket, symbol);
181 _reconnectTimer?.Stop();
182 _reconnectTimer.DisposeSafely();
185 foreach (var entry
in _webSocketEntries)
189 entry.WebSocket.Open -= OnOpen;
190 entry.WebSocket.Message -= EventHandler;
191 entry.WebSocket.Close();
198 _webSocketEntries.Clear();
206 foreach (var entry
in _webSocketEntries.Where(entry => entry.Contains(symbol)))
218 private IWebSocket GetWebSocketForSymbol(Symbol symbol)
220 BrokerageMultiWebSocketEntry entry;
224 if (_webSocketEntries.All(x => x.SymbolCount >= _maximumSymbolsPerWebSocket))
226 if (_maximumWebSocketConnections > 0)
228 throw new NotSupportedException($
"Maximum symbol count reached for the current configuration [MaxSymbolsPerWebSocket={_maximumSymbolsPerWebSocket}, MaxWebSocketConnections:{_maximumWebSocketConnections}]");
232 var webSocket = CreateWebSocket();
234 _webSocketEntries.Add(
new BrokerageMultiWebSocketEntry(webSocket));
238 _webSocketEntries.Sort((x, y) =>
239 x.SymbolCount >= _maximumSymbolsPerWebSocket
241 : y.SymbolCount >= _maximumSymbolsPerWebSocket
243 : Math.Sign(x.TotalWeight - y.TotalWeight));
245 entry = _webSocketEntries.First();
248 if (!entry.WebSocket.IsOpen)
250 Connect(entry.WebSocket);
253 entry.AddSymbol(symbol);
255 Log.
Trace($
"BrokerageMultiWebSocketSubscriptionManager.GetWebSocketForSymbol(): added symbol: {symbol} to websocket: {entry.WebSocket.GetHashCode()} - Count: {entry.SymbolCount}");
257 return entry.WebSocket;
264 private IWebSocket CreateWebSocket()
266 var webSocket = _webSocketFactory();
267 webSocket.Open += OnOpen;
268 webSocket.Message += EventHandler;
269 webSocket.Initialize(_webSocketUrl);
274 [MethodImpl(MethodImplOptions.AggressiveInlining)]
275 private void EventHandler(
object _, WebSocketMessage message)
277 _messageHandler(message);
280 private void Connect(IWebSocket webSocket)
282 var connectedEvent =
new ManualResetEvent(
false);
283 EventHandler onOpenAction = (_, _) =>
285 connectedEvent.Set();
288 webSocket.Open += onOpenAction;
290 _connectionRateLimiter.WaitToProceed();
296 if (!connectedEvent.WaitOne(ConnectionTimeout))
298 throw new TimeoutException($
"BrokerageMultiWebSocketSubscriptionManager.Connect(): WebSocket connection timeout: {webSocket.GetHashCode()}");
303 webSocket.Open -= onOpenAction;
305 connectedEvent.DisposeSafely();
309 private void Disconnect(IWebSocket webSocket)
314 private void OnOpen(
object sender, EventArgs e)
316 var webSocket = (IWebSocket)sender;
320 foreach (var entry
in _webSocketEntries)
322 if (entry.WebSocket == webSocket && entry.Symbols.Count > 0)
324 Log.
Trace($
"BrokerageMultiWebSocketSubscriptionManager.Connect(): WebSocket opened: {webSocket.GetHashCode()} - Resubscribing existing symbols: {entry.Symbols.Count}");
326 Task.Factory.StartNew(() =>
328 foreach (var symbol
in entry.Symbols)
330 _subscribeFunc(webSocket, symbol);