Lean  $LEAN_TAG$
EventMessagingHandler.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System.Collections.Generic;
18 using QuantConnect.Logging;
20 using QuantConnect.Packets;
21 
23 {
24  /// <summary>
25  /// Desktop implementation of messaging system for Lean Engine
26  /// </summary>
28  {
29  private AlgorithmNodePacket _job;
30  private volatile bool _loaded;
31  private Queue<Packet> _queue;
32 
33  /// <summary>
34  /// Gets or sets whether this messaging handler has any current subscribers.
35  /// When set to false, messages won't be sent.
36  /// </summary>
37  public bool HasSubscribers
38  {
39  get;
40  set;
41  }
42 
43  /// <summary>
44  /// Initialize the Messaging System Plugin.
45  /// </summary>
46  /// <param name="initializeParameters">The parameters required for initialization</param>
47  public void Initialize(MessagingHandlerInitializeParameters initializeParameters)
48  {
49  _queue = new Queue<Packet>();
50 
51  ConsumerReadyEvent += () => { _loaded = true; };
52  }
53 
54  /// <summary>
55  /// Set Loaded to true
56  /// </summary>
57  public void LoadingComplete()
58  {
59  _loaded = true;
60  }
61 
62  /// <summary>
63  /// Set the user communication channel
64  /// </summary>
65  /// <param name="job"></param>
67  {
68  _job = job;
69  }
70 
71 #pragma warning disable 1591
72  public delegate void DebugEventRaised(DebugPacket packet);
73  public event DebugEventRaised DebugEvent;
74 
75  public delegate void SystemDebugEventRaised(SystemDebugPacket packet);
76 #pragma warning disable 0067 // SystemDebugEvent is not used currently; ignore the warning
77  public event SystemDebugEventRaised SystemDebugEvent;
78 #pragma warning restore 0067
79 
80  public delegate void LogEventRaised(LogPacket packet);
81  public event LogEventRaised LogEvent;
82 
83  public delegate void RuntimeErrorEventRaised(RuntimeErrorPacket packet);
84  public event RuntimeErrorEventRaised RuntimeErrorEvent;
85 
86  public delegate void HandledErrorEventRaised(HandledErrorPacket packet);
87  public event HandledErrorEventRaised HandledErrorEvent;
88 
89  public delegate void BacktestResultEventRaised(BacktestResultPacket packet);
90  public event BacktestResultEventRaised BacktestResultEvent;
91 
92  public delegate void ConsumerReadyEventRaised();
93  public event ConsumerReadyEventRaised ConsumerReadyEvent;
94 #pragma warning restore 1591
95 
96  /// <summary>
97  /// Send any message with a base type of Packet.
98  /// </summary>
99  public void Send(Packet packet)
100  {
101  //Until we're loaded queue it up
102  if (!_loaded)
103  {
104  _queue.Enqueue(packet);
105  return;
106  }
107 
108  //Catch up if this is the first time
109  while (_queue.TryDequeue(out var item))
110  {
111  ProcessPacket(item);
112  }
113 
114  //Finally process this new packet
115  ProcessPacket(packet);
116  }
117 
118  /// <summary>
119  /// Send any notification with a base type of Notification.
120  /// </summary>
121  /// <param name="notification">The notification to be sent.</param>
122  public void SendNotification(Notification notification)
123  {
124  if (!notification.CanSend())
125  {
126  Log.Error("Messaging.SendNotification(): Send not implemented for notification of type: " + notification.GetType().Name);
127  return;
128  }
129  notification.Send();
130  }
131 
132  /// <summary>
133  /// Send any message with a base type of Packet that has been enqueued.
134  /// </summary>
135  public void SendEnqueuedPackets()
136  {
137  while (_loaded && _queue.TryDequeue(out var item))
138  {
139  ProcessPacket(item);
140  }
141  }
142 
143  /// <summary>
144  /// Packet processing implementation
145  /// </summary>
146  private void ProcessPacket(Packet packet)
147  {
148  //Packets we handled in the UX.
149  switch (packet.Type)
150  {
151  case PacketType.Debug:
152  var debug = (DebugPacket)packet;
153  OnDebugEvent(debug);
154  break;
155 
156  case PacketType.SystemDebug:
157  var systemDebug = (SystemDebugPacket)packet;
158  OnSystemDebugEvent(systemDebug);
159  break;
160 
161  case PacketType.Log:
162  var log = (LogPacket)packet;
163  OnLogEvent(log);
164  break;
165 
166  case PacketType.RuntimeError:
167  var runtime = (RuntimeErrorPacket)packet;
168  OnRuntimeErrorEvent(runtime);
169  break;
170 
171  case PacketType.HandledError:
172  var handled = (HandledErrorPacket)packet;
173  OnHandledErrorEvent(handled);
174  break;
175 
176  case PacketType.BacktestResult:
177  var result = (BacktestResultPacket)packet;
178  OnBacktestResultEvent(result);
179  break;
180  }
181  }
182 
183  /// <summary>
184  /// Raise a debug event safely
185  /// </summary>
186  protected virtual void OnDebugEvent(DebugPacket packet)
187  {
188  var handler = DebugEvent;
189 
190  if (handler != null)
191  {
192  handler(packet);
193  }
194  }
195 
196 
197  /// <summary>
198  /// Raise a system debug event safely
199  /// </summary>
200  protected virtual void OnSystemDebugEvent(SystemDebugPacket packet)
201  {
202  var handler = DebugEvent;
203 
204  if (handler != null)
205  {
206  handler(packet);
207  }
208  }
209 
210 
211  /// <summary>
212  /// Handler for consumer ready code.
213  /// </summary>
214  public virtual void OnConsumerReadyEvent()
215  {
216  var handler = ConsumerReadyEvent;
217  if (handler != null)
218  {
219  handler();
220  }
221  }
222 
223  /// <summary>
224  /// Raise a log event safely
225  /// </summary>
226  protected virtual void OnLogEvent(LogPacket packet)
227  {
228  var handler = LogEvent;
229  if (handler != null)
230  {
231  handler(packet);
232  }
233  }
234 
235  /// <summary>
236  /// Raise a handled error event safely
237  /// </summary>
238  protected virtual void OnHandledErrorEvent(HandledErrorPacket packet)
239  {
240  var handler = HandledErrorEvent;
241  if (handler != null)
242  {
243  handler(packet);
244  }
245  }
246 
247  /// <summary>
248  /// Raise runtime error safely
249  /// </summary>
250  protected virtual void OnRuntimeErrorEvent(RuntimeErrorPacket packet)
251  {
252  var handler = RuntimeErrorEvent;
253  if (handler != null)
254  {
255  handler(packet);
256  }
257  }
258 
259  /// <summary>
260  /// Raise a backtest result event safely.
261  /// </summary>
262  protected virtual void OnBacktestResultEvent(BacktestResultPacket packet)
263  {
264  var handler = BacktestResultEvent;
265  if (handler != null)
266  {
267  handler(packet);
268  }
269  }
270 
271  /// <summary>
272  /// Dispose of any resources
273  /// </summary>
274  public virtual void Dispose()
275  {
276  }
277  }
278 }