20 using System.Net.WebSockets;
22 using System.Threading;
23 using System.Threading.Tasks;
32 private const int ReceiveBufferSize = 8192;
35 private string _sessionToken;
36 private CancellationTokenSource _cts;
37 private ClientWebSocket _client;
38 private Task _taskConnect;
39 private object _connectLock =
new object();
40 private readonly
object _locker =
new object();
47 public void Initialize(
string url,
string sessionToken =
null)
50 _sessionToken = sessionToken;
57 public void Send(
string data)
61 var buffer =
new ArraySegment<byte>(Encoding.UTF8.GetBytes(data));
62 _client.SendAsync(buffer, WebSocketMessageType.Text,
true, _cts.Token).SynchronouslyAwaitTask();
77 _cts =
new CancellationTokenSource();
81 _taskConnect = Task.Factory.StartNew(
84 Log.
Trace($
"WebSocketClientWrapper connection task started: {_url}");
92 Log.
Error(e, $
"Error in WebSocketClientWrapper connection task: {_url}: ");
95 Log.
Trace($
"WebSocketClientWrapper connection task ended: {_url}");
97 _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
105 if (_client !=
null || _cts.Token.WaitHandle.WaitOne(50))
110 while (++count < 100);
125 _client?.CloseOutputAsync(WebSocketCloseStatus.NormalClosure,
"", _cts.Token).SynchronouslyAwaitTask();
134 _taskConnect?.Wait(TimeSpan.FromSeconds(5));
136 _cts.DisposeSafely();
140 Log.
Error($
"WebSocketClientWrapper.Close({_url}): {e}");
155 public bool IsOpen => _client?.State == WebSocketState.Open;
160 public event EventHandler<WebSocketMessage>
Message;
165 public event EventHandler<WebSocketError>
Error;
170 public event EventHandler
Open;
175 public event EventHandler<WebSocketCloseData>
Closed;
191 Log.
Error(e.
Exception, $
"WebSocketClientWrapper.OnError(): (IsOpen:{IsOpen}, State:{_client.State}): {_url}: {e.Message}");
192 Error?.Invoke(
this, e);
200 Log.
Trace($
"WebSocketClientWrapper.OnOpen(): Connection opened (IsOpen:{IsOpen}, State:{_client.State}): {_url}");
201 Open?.Invoke(
this, EventArgs.Empty);
209 Log.
Trace($
"WebSocketClientWrapper.OnClose(): Connection closed (IsOpen:{IsOpen}, State:{_client.State}): {_url}");
213 private void HandleConnection()
215 var receiveBuffer =
new byte[ReceiveBufferSize];
217 while (_cts is { IsCancellationRequested:
false })
219 Log.
Trace($
"WebSocketClientWrapper.HandleConnection({_url}): Connecting...");
221 const int maximumWaitTimeOnError = 120 * 1000;
222 const int minimumWaitTimeOnError = 2 * 1000;
223 var waitTimeOnError = minimumWaitTimeOnError;
224 using (var connectionCts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token))
230 _client.DisposeSafely();
231 _client =
new ClientWebSocket();
232 if (_sessionToken !=
null)
234 _client.Options.SetRequestHeader(
"x-session-token", _sessionToken);
236 _client.ConnectAsync(
new Uri(_url), connectionCts.Token).SynchronouslyAwaitTask();
240 while ((_client.State == WebSocketState.Open || _client.State == WebSocketState.CloseSent) &&
241 !connectionCts.IsCancellationRequested)
243 var messageData = ReceiveMessage(_client, connectionCts.Token, receiveBuffer);
245 if (messageData ==
null)
251 waitTimeOnError = minimumWaitTimeOnError;
252 OnMessage(
new WebSocketMessage(
this, messageData));
255 catch (OperationCanceledException) { }
256 catch (WebSocketException ex)
258 OnError(
new WebSocketError(ex.Message, ex));
259 connectionCts.Token.WaitHandle.WaitOne(waitTimeOnError);
262 waitTimeOnError += Math.Min(maximumWaitTimeOnError, waitTimeOnError);
266 OnError(
new WebSocketError(ex.Message, ex));
268 connectionCts.Cancel();
273 private MessageData ReceiveMessage(
275 CancellationToken ct,
276 byte[] receiveBuffer,
277 long maxSize =
long.MaxValue)
279 var buffer =
new ArraySegment<byte>(receiveBuffer);
281 using (var ms =
new MemoryStream())
283 WebSocketReceiveResult result;
287 result = webSocket.ReceiveAsync(buffer, ct).SynchronouslyAwaitTask();
288 ms.Write(buffer.Array, buffer.Offset, result.Count);
289 if (ms.Length > maxSize)
291 throw new InvalidOperationException($
"Maximum size of the message was exceeded: {_url}");
294 while (!result.EndOfMessage);
296 if (result.MessageType == WebSocketMessageType.Binary)
298 return new BinaryMessage
301 Count = result.Count,
304 else if (result.MessageType == WebSocketMessageType.Text)
306 return new TextMessage
308 Message = Encoding.UTF8.GetString(ms.GetBuffer(), 0, (
int)ms.Length),
311 else if (result.MessageType == WebSocketMessageType.Close)
313 Log.
Trace($
"WebSocketClientWrapper.HandleConnection({_url}): WebSocketMessageType.Close - Data: {Encoding.UTF8.GetString(ms.GetBuffer(), 0, (int)ms.Length)}");
358 public byte[]
Data {
get;
set; }