Lean  $LEAN_TAG$
WebSocketClientWrapper.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using QuantConnect.Logging;
17 using QuantConnect.Util;
18 using System;
19 using System.IO;
20 using System.Net.WebSockets;
21 using System.Runtime.CompilerServices;
22 using System.Text;
23 using System.Threading;
24 using System.Threading.Tasks;
25 
27 {
28  /// <summary>
29  /// Wrapper for System.Net.Websockets.ClientWebSocket to enhance testability
30  /// </summary>
32  {
33  private const int ReceiveBufferSize = 8192;
34 
35  private string _url;
36  private string _sessionToken;
37  private CancellationTokenSource _cts;
38  private ClientWebSocket _client;
39  private Task _taskConnect;
40  private object _connectLock = new object();
41  private readonly object _locker = new object();
42 
43  /// <summary>
44  /// Wraps constructor
45  /// </summary>
46  /// <param name="url">The target websocket url</param>
47  /// <param name="sessionToken">The websocket session token</param>
48  public void Initialize(string url, string sessionToken = null)
49  {
50  _url = url;
51  _sessionToken = sessionToken;
52  }
53 
54  /// <summary>
55  /// Wraps send method
56  /// </summary>
57  /// <param name="data"></param>
58  public void Send(string data)
59  {
60  lock (_locker)
61  {
62  var buffer = new ArraySegment<byte>(Encoding.UTF8.GetBytes(data));
63  _client.SendAsync(buffer, WebSocketMessageType.Text, true, _cts.Token).SynchronouslyAwaitTask();
64  }
65  }
66 
67  /// <summary>
68  /// Wraps Connect method
69  /// </summary>
70  public void Connect()
71  {
72  lock (_connectLock)
73  {
74  lock (_locker)
75  {
76  if (_cts == null)
77  {
78  _cts = new CancellationTokenSource();
79 
80  _client = null;
81 
82  _taskConnect = Task.Factory.StartNew(
83  () =>
84  {
85  Log.Trace($"WebSocketClientWrapper connection task started: {_url}");
86 
87  try
88  {
89  HandleConnection();
90  }
91  catch (Exception e)
92  {
93  Log.Error(e, $"Error in WebSocketClientWrapper connection task: {_url}: ");
94  }
95 
96  Log.Trace($"WebSocketClientWrapper connection task ended: {_url}");
97  },
98  _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
99  }
100  }
101 
102  var count = 0;
103  do
104  {
105  // wait for _client to be not null, we need to release the '_locker' lock used by 'HandleConnection'
106  if (_client != null || _cts.Token.WaitHandle.WaitOne(50))
107  {
108  break;
109  }
110  }
111  while (++count < 100);
112  }
113  }
114 
115  /// <summary>
116  /// Wraps Close method
117  /// </summary>
118  public void Close()
119  {
120  lock (_locker)
121  {
122  try
123  {
124  _cts?.Cancel();
125 
126  try
127  {
128  _client?.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "", _cts.Token).SynchronouslyAwaitTask();
129  }
130  catch
131  {
132  // ignored
133  }
134 
135  _taskConnect?.Wait(TimeSpan.FromSeconds(5));
136 
137  _cts.DisposeSafely();
138  }
139  catch (Exception e)
140  {
141  Log.Error($"WebSocketClientWrapper.Close({_url}): {e}");
142  }
143 
144  _cts = null;
145  }
146 
147  if (_client != null)
148  {
149  OnClose(new WebSocketCloseData(0, string.Empty, true));
150  }
151  }
152 
153  /// <summary>
154  /// Wraps IsAlive
155  /// </summary>
156  public bool IsOpen => _client?.State == WebSocketState.Open;
157 
158  /// <summary>
159  /// Wraps message event
160  /// </summary>
161  public event EventHandler<WebSocketMessage> Message;
162 
163  /// <summary>
164  /// Wraps error event
165  /// </summary>
166  public event EventHandler<WebSocketError> Error;
167 
168  /// <summary>
169  /// Wraps open method
170  /// </summary>
171  public event EventHandler Open;
172 
173  /// <summary>
174  /// Wraps close method
175  /// </summary>
176  public event EventHandler<WebSocketCloseData> Closed;
177 
178  /// <summary>
179  /// Event invocator for the <see cref="Message"/> event
180  /// </summary>
181  protected virtual void OnMessage(WebSocketMessage e)
182  {
183  Message?.Invoke(this, e);
184  }
185 
186  /// <summary>
187  /// Event invocator for the <see cref="Error"/> event
188  /// </summary>
189  /// <param name="e"></param>
190  protected virtual void OnError(WebSocketError e)
191  {
192  Log.Error(e.Exception, $"WebSocketClientWrapper.OnError(): (IsOpen:{IsOpen}, State:{_client.State}): {_url}: {e.Message}");
193  Error?.Invoke(this, e);
194  }
195 
196  /// <summary>
197  /// Event invocator for the <see cref="Open"/> event
198  /// </summary>
199  protected virtual void OnOpen()
200  {
201  Log.Trace($"WebSocketClientWrapper.OnOpen(): Connection opened (IsOpen:{IsOpen}, State:{_client.State}): {_url}");
202  Open?.Invoke(this, EventArgs.Empty);
203  }
204 
205  /// <summary>
206  /// Event invocator for the <see cref="Close"/> event
207  /// </summary>
208  protected virtual void OnClose(WebSocketCloseData e)
209  {
210  Log.Trace($"WebSocketClientWrapper.OnClose(): Connection closed (IsOpen:{IsOpen}, State:{_client.State}): {_url}");
211  Closed?.Invoke(this, e);
212  }
213 
214  private void HandleConnection()
215  {
216  var receiveBuffer = new byte[ReceiveBufferSize];
217 
218  while (_cts is { IsCancellationRequested: false })
219  {
220  Log.Trace($"WebSocketClientWrapper.HandleConnection({_url}): Connecting...");
221 
222  const int maximumWaitTimeOnError = 120 * 1000;
223  const int minimumWaitTimeOnError = 2 * 1000;
224  var waitTimeOnError = minimumWaitTimeOnError;
225  using (var connectionCts = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token))
226  {
227  try
228  {
229  lock (_locker)
230  {
231  _client.DisposeSafely();
232  _client = new ClientWebSocket();
233  if (_sessionToken != null)
234  {
235  _client.Options.SetRequestHeader("x-session-token", _sessionToken);
236  }
237  _client.ConnectAsync(new Uri(_url), connectionCts.Token).SynchronouslyAwaitTask();
238  }
239  OnOpen();
240 
241  while ((_client.State == WebSocketState.Open || _client.State == WebSocketState.CloseSent) &&
242  !connectionCts.IsCancellationRequested)
243  {
244  var messageData = ReceiveMessage(_client, connectionCts.Token, receiveBuffer);
245 
246  if (messageData == null)
247  {
248  break;
249  }
250 
251  // reset wait time
252  waitTimeOnError = minimumWaitTimeOnError;
253  OnMessage(new WebSocketMessage(this, messageData));
254  }
255  }
256  catch (OperationCanceledException) { }
257  catch (ObjectDisposedException) { }
258  catch (WebSocketException ex)
259  {
260  if (!connectionCts.IsCancellationRequested)
261  {
262  OnError(new WebSocketError(ex.Message, ex));
263  connectionCts.Token.WaitHandle.WaitOne(waitTimeOnError);
264 
265  // increase wait time until a maximum value. This is useful during brokerage down times
266  waitTimeOnError += Math.Min(maximumWaitTimeOnError, waitTimeOnError);
267  }
268  }
269  catch (Exception ex)
270  {
271  if (!connectionCts.IsCancellationRequested)
272  {
273  OnError(new WebSocketError(ex.Message, ex));
274  }
275  }
276 
277  if (!connectionCts.IsCancellationRequested)
278  {
279  connectionCts.Cancel();
280  }
281  }
282  }
283  }
284 
285  [MethodImpl(MethodImplOptions.AggressiveInlining)]
286  private MessageData ReceiveMessage(
287  ClientWebSocket webSocket,
288  CancellationToken ct,
289  byte[] receiveBuffer)
290  {
291  var buffer = new ArraySegment<byte>(receiveBuffer);
292 
293  using (var ms = new MemoryStream())
294  {
295  WebSocketReceiveResult result;
296 
297  do
298  {
299  result = webSocket.ReceiveAsync(buffer, ct).SynchronouslyAwaitTask();
300  ms.Write(buffer.Array, buffer.Offset, result.Count);
301  }
302  while (!result.EndOfMessage);
303 
304  if (result.MessageType == WebSocketMessageType.Binary)
305  {
306  return new BinaryMessage
307  {
308  Data = ms.ToArray(),
309  Count = result.Count,
310  };
311  }
312  else if (result.MessageType == WebSocketMessageType.Text)
313  {
314  return new TextMessage
315  {
316  Message = Encoding.UTF8.GetString(ms.GetBuffer(), 0, (int)ms.Length),
317  };
318  }
319  else if (result.MessageType == WebSocketMessageType.Close)
320  {
321  Log.Trace($"WebSocketClientWrapper.HandleConnection({_url}): WebSocketMessageType.Close - Data: {Encoding.UTF8.GetString(ms.GetBuffer(), 0, (int)ms.Length)}");
322  return null;
323  }
324  }
325  return null;
326  }
327 
328  /// <summary>
329  /// Defines a message of websocket data
330  /// </summary>
331  public abstract class MessageData
332  {
333  /// <summary>
334  /// Type of message
335  /// </summary>
336  public WebSocketMessageType MessageType { get; set; }
337  }
338 
339  /// <summary>
340  /// Defines a text-Type message of websocket data
341  /// </summary>
342  public class TextMessage : MessageData
343  {
344  /// <summary>
345  /// Data contained in message
346  /// </summary>
347  public string Message { get; set; }
348 
349  /// <summary>
350  /// Constructs default instance of the TextMessage
351  /// </summary>
352  public TextMessage()
353  {
354  MessageType = WebSocketMessageType.Text;
355  }
356  }
357 
358  /// <summary>
359  /// Defines a byte-Type message of websocket data
360  /// </summary>
361  public class BinaryMessage : MessageData
362  {
363  /// <summary>
364  /// Data contained in message
365  /// </summary>
366  public byte[] Data { get; set; }
367 
368  /// <summary>
369  /// Count of message
370  /// </summary>
371  public int Count { get; set; }
372 
373  /// <summary>
374  /// Constructs default instance of the BinaryMessage
375  /// </summary>
376  public BinaryMessage()
377  {
378  MessageType = WebSocketMessageType.Binary;
379  }
380  }
381  }
382 }