20 using System.Net.WebSockets;
21 using System.Runtime.CompilerServices;
23 using System.Threading;
24 using System.Threading.Tasks;
33 private const int ReceiveBufferSize = 8192;
36 private string _sessionToken;
37 private CancellationTokenSource _cts;
38 private ClientWebSocket _client;
39 private Task _taskConnect;
40 private object _connectLock =
new object();
41 private readonly
object _locker =
new object();
48 public void Initialize(
string url,
string sessionToken =
null)
51 _sessionToken = sessionToken;
58 public void Send(
string data)
62 var buffer =
new ArraySegment<byte>(Encoding.UTF8.GetBytes(data));
63 _client.SendAsync(buffer, WebSocketMessageType.Text,
true, _cts.Token).SynchronouslyAwaitTask();
78 _cts =
new CancellationTokenSource();
82 _taskConnect = Task.Factory.StartNew(
85 Log.
Trace($
"WebSocketClientWrapper connection task started: {_url}");
93 Log.
Error(e, $
"Error in WebSocketClientWrapper connection task: {_url}: ");
96 Log.
Trace($
"WebSocketClientWrapper connection task ended: {_url}");
98 _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
106 if (_client !=
null || _cts.Token.WaitHandle.WaitOne(50))
111 while (++count < 100);
128 _client?.CloseOutputAsync(WebSocketCloseStatus.NormalClosure,
"", _cts.Token).SynchronouslyAwaitTask();
135 _taskConnect?.Wait(TimeSpan.FromSeconds(5));
137 _cts.DisposeSafely();
141 Log.
Error($
"WebSocketClientWrapper.Close({_url}): {e}");
156 public bool IsOpen => _client?.State == WebSocketState.Open;
161 public event EventHandler<WebSocketMessage>
Message;
166 public event EventHandler<WebSocketError>
Error;
171 public event EventHandler
Open;
176 public event EventHandler<WebSocketCloseData>
Closed;
192 Log.
Error(e.
Exception, $
"WebSocketClientWrapper.OnError(): (IsOpen:{IsOpen}, State:{_client.State}): {_url}: {e.Message}");
193 Error?.Invoke(
this, e);
201 Log.
Trace($
"WebSocketClientWrapper.OnOpen(): Connection opened (IsOpen:{IsOpen}, State:{_client.State}): {_url}");
202 Open?.Invoke(
this, EventArgs.Empty);
210 Log.
Trace($
"WebSocketClientWrapper.OnClose(): Connection closed (IsOpen:{IsOpen}, State:{_client.State}): {_url}");
214 private void HandleConnection()
216 var receiveBuffer =
new byte[ReceiveBufferSize];
218 while (_cts is { IsCancellationRequested:
false })
220 Log.
Trace($
"WebSocketClientWrapper.HandleConnection({_url}): Connecting...");
222 const int maximumWaitTimeOnError = 120 * 1000;
223 const int minimumWaitTimeOnError = 2 * 1000;
224 var waitTimeOnError = minimumWaitTimeOnError;
225 using (var connectionCts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token))
231 _client.DisposeSafely();
232 _client =
new ClientWebSocket();
233 if (_sessionToken !=
null)
235 _client.Options.SetRequestHeader(
"x-session-token", _sessionToken);
237 _client.ConnectAsync(
new Uri(_url), connectionCts.Token).SynchronouslyAwaitTask();
241 while ((_client.State == WebSocketState.Open || _client.State == WebSocketState.CloseSent) &&
242 !connectionCts.IsCancellationRequested)
244 var messageData = ReceiveMessage(_client, connectionCts.Token, receiveBuffer);
246 if (messageData ==
null)
252 waitTimeOnError = minimumWaitTimeOnError;
253 OnMessage(
new WebSocketMessage(
this, messageData));
256 catch (OperationCanceledException) { }
257 catch (ObjectDisposedException) { }
258 catch (WebSocketException ex)
260 if (!connectionCts.IsCancellationRequested)
262 OnError(
new WebSocketError(ex.Message, ex));
263 connectionCts.Token.WaitHandle.WaitOne(waitTimeOnError);
266 waitTimeOnError += Math.Min(maximumWaitTimeOnError, waitTimeOnError);
271 if (!connectionCts.IsCancellationRequested)
273 OnError(
new WebSocketError(ex.Message, ex));
277 if (!connectionCts.IsCancellationRequested)
279 connectionCts.Cancel();
285 [MethodImpl(MethodImplOptions.AggressiveInlining)]
286 private MessageData ReceiveMessage(
287 ClientWebSocket webSocket,
288 CancellationToken ct,
289 byte[] receiveBuffer)
291 var buffer =
new ArraySegment<byte>(receiveBuffer);
293 using (var ms =
new MemoryStream())
295 WebSocketReceiveResult result;
299 result = webSocket.ReceiveAsync(buffer, ct).SynchronouslyAwaitTask();
300 ms.Write(buffer.Array, buffer.Offset, result.Count);
302 while (!result.EndOfMessage);
304 if (result.MessageType == WebSocketMessageType.Binary)
306 return new BinaryMessage
309 Count = result.Count,
312 else if (result.MessageType == WebSocketMessageType.Text)
314 return new TextMessage
316 Message = Encoding.UTF8.GetString(ms.GetBuffer(), 0, (int)ms.Length),
319 else if (result.MessageType == WebSocketMessageType.Close)
321 Log.
Trace($
"WebSocketClientWrapper.HandleConnection({_url}): WebSocketMessageType.Close - Data: {Encoding.UTF8.GetString(ms.GetBuffer(), 0, (int)ms.Length)}");
366 public byte[]
Data {
get;
set; }