Lean  $LEAN_TAG$
StreamingMessageHandler.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Net;
18 using System.Net.Sockets;
19 using Newtonsoft.Json;
22 using QuantConnect.Logging;
24 using QuantConnect.Packets;
25 using NetMQ;
26 using NetMQ.Sockets;
28 
29 namespace QuantConnect.Messaging
30 {
31  /// <summary>
32  /// Message handler that sends messages over tcp using NetMQ.
33  /// </summary>
35  {
36  private string _port;
37  private PushSocket _server;
38  private AlgorithmNodePacket _job;
39  private OrderEventJsonConverter _orderEventJsonConverter;
40 
41  /// <summary>
42  /// Gets or sets whether this messaging handler has any current subscribers.
43  /// This is not used in this message handler. Messages are sent via tcp as they arrive
44  /// </summary>
45  public bool HasSubscribers { get; set; }
46 
47  /// <summary>
48  /// Initialize the messaging system
49  /// </summary>
50  /// <param name="initializeParameters">The parameters required for initialization</param>
51  public void Initialize(MessagingHandlerInitializeParameters initializeParameters)
52  {
53  _port = Config.Get("desktop-http-port");
54  CheckPort();
55  _server = new PushSocket("@tcp://*:" + _port);
56  }
57 
58  /// <summary>
59  /// Set the user communication channel
60  /// </summary>
61  /// <param name="job"></param>
63  {
64  _job = job;
65  _orderEventJsonConverter = new OrderEventJsonConverter(job.AlgorithmId);
66  Transmit(_job);
67  }
68 
69  /// <summary>
70  /// Send any notification with a base type of Notification.
71  /// </summary>
72  /// <param name="notification">The notification to be sent.</param>
73  public void SendNotification(Notification notification)
74  {
75  if (!notification.CanSend())
76  {
77  Log.Error("Messaging.SendNotification(): Send not implemented for notification of type: " + notification.GetType().Name);
78  return;
79  }
80  notification.Send();
81  }
82 
83  /// <summary>
84  /// Send all types of packets
85  /// </summary>
86  public void Send(Packet packet)
87  {
88  Transmit(packet);
89  }
90 
91  /// <summary>
92  /// Send a message to the _server using ZeroMQ
93  /// </summary>
94  /// <param name="packet">Packet to transmit</param>
95  public void Transmit(Packet packet)
96  {
97  var payload = JsonConvert.SerializeObject(packet, _orderEventJsonConverter);
98 
99  var message = new NetMQMessage();
100 
101  message.Append(payload);
102 
103  _server.SendMultipartMessage(message);
104  }
105 
106  /// <summary>
107  /// Check if port to be used by the desktop application is available.
108  /// </summary>
109  private void CheckPort()
110  {
111  try
112  {
113  TcpListener tcpListener = new TcpListener(IPAddress.Any, _port.ToInt32());
114  tcpListener.Start();
115  tcpListener.Stop();
116  }
117  catch
118  {
119  throw new Exception("The port configured in config.json is either being used or blocked by a firewall." +
120  "Please choose a new port or open the port in the firewall.");
121  }
122  }
123 
124  /// <summary>
125  /// Dispose any resources used before destruction
126  /// </summary>
127  public void Dispose()
128  {
129  }
130  }
131 }