Lean  $LEAN_TAG$
LiveTradingResultHandler.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Collections.Generic;
19 using System.Diagnostics;
20 using System.IO;
21 using System.Linq;
22 using System.Runtime.CompilerServices;
23 using System.Threading;
24 using System.Threading.Tasks;
25 using Newtonsoft.Json;
30 using QuantConnect.Logging;
32 using QuantConnect.Orders;
33 using QuantConnect.Packets;
37 using QuantConnect.Util;
38 
40 {
41  /// <summary>
42  /// Live trading result handler implementation passes the messages to the QC live trading interface.
43  /// </summary>
44  /// <remarks>Live trading result handler is quite busy. It sends constant price updates, equity updates and order/holdings updates.</remarks>
46  {
47  // Required properties for the cloud app.
48  private LiveNodePacket _job;
49 
50  //Update loop:
51  private DateTime _nextUpdate;
52  private DateTime _nextChartsUpdate;
53  private DateTime _nextChartTrimming;
54  private DateTime _nextLogStoreUpdate;
55  private DateTime _nextStatisticsUpdate;
56  private DateTime _nextInsightStoreUpdate;
57  private DateTime _currentUtcDate;
58 
59  private readonly TimeSpan _storeInsightPeriod;
60 
61  private DateTime _nextPortfolioMarginUpdate;
62  private DateTime _previousPortfolioMarginUpdate;
63  private readonly TimeSpan _samplePortfolioPeriod;
64  private readonly Chart _intradayPortfolioState = new(PortfolioMarginKey);
65 
66  /// <summary>
67  /// The earliest time of next dump to the status file
68  /// </summary>
69  private DateTime _nextStatusUpdate;
70 
71  //Log Message Store:
72  private DateTime _nextSample;
73  private IApi _api;
74  private readonly CancellationTokenSource _cancellationTokenSource;
75  private readonly int _streamedChartLimit;
76  private readonly int _streamedChartGroupSize;
77 
78  private bool _sampleChartAlways;
79  private bool _userExchangeIsOpen;
80  private ReferenceWrapper<decimal> _portfolioValue;
81  private ReferenceWrapper<decimal> _benchmarkValue;
82  private DateTime _lastChartSampleLogicCheck;
83  private readonly Dictionary<string, SecurityExchangeHours> _exchangeHours;
84 
85 
86  /// <summary>
87  /// Creates a new instance
88  /// </summary>
90  {
91  _exchangeHours = new Dictionary<string, SecurityExchangeHours>();
92  _cancellationTokenSource = new CancellationTokenSource();
93  ResamplePeriod = TimeSpan.FromSeconds(2);
94  NotificationPeriod = TimeSpan.FromSeconds(1);
95  _samplePortfolioPeriod = _storeInsightPeriod = TimeSpan.FromMinutes(10);
96  _streamedChartLimit = Config.GetInt("streamed-chart-limit", 12);
97  _streamedChartGroupSize = Config.GetInt("streamed-chart-group-size", 3);
98 
99  _portfolioValue = new ReferenceWrapper<decimal>(0);
100  _benchmarkValue = new ReferenceWrapper<decimal>(0);
101  }
102 
103  /// <summary>
104  /// Initialize the result handler with this result packet.
105  /// </summary>
106  /// <param name="parameters">DTO parameters class to initialize a result handler</param>
107  public override void Initialize(ResultHandlerInitializeParameters parameters)
108  {
109  _api = parameters.Api;
110  _job = (LiveNodePacket)parameters.Job;
111  if (_job == null) throw new Exception("LiveResultHandler.Constructor(): Submitted Job type invalid.");
112  var utcNow = DateTime.UtcNow;
113  _currentUtcDate = utcNow.Date;
114 
115  _nextPortfolioMarginUpdate = utcNow.RoundDown(_samplePortfolioPeriod).Add(_samplePortfolioPeriod);
116  base.Initialize(parameters);
117  }
118 
119  /// <summary>
120  /// Live trading result handler thread.
121  /// </summary>
122  protected override void Run()
123  {
124  // give the algorithm time to initialize, else we will log an error right away
125  ExitEvent.WaitOne(3000);
126 
127  // -> 1. Run Primary Sender Loop: Continually process messages from queue as soon as they arrive.
128  while (!(ExitTriggered && Messages.IsEmpty))
129  {
130  try
131  {
132  //1. Process Simple Messages in Queue
133  Packet packet;
134  if (Messages.TryDequeue(out packet))
135  {
136  MessagingHandler.Send(packet);
137  }
138 
139  //2. Update the packet scanner:
140  Update();
141 
142  if (Messages.IsEmpty)
143  {
144  // prevent thread lock/tight loop when there's no work to be done
145  ExitEvent.WaitOne(Time.GetSecondUnevenWait(1000));
146  }
147  }
148  catch (Exception err)
149  {
150  Log.Error(err);
151  }
152  } // While !End.
153 
154  Log.Trace("LiveTradingResultHandler.Run(): Ending Thread...");
155  } // End Run();
156 
157 
158  /// <summary>
159  /// Every so often send an update to the browser with the current state of the algorithm.
160  /// </summary>
161  private void Update()
162  {
163  //Error checks if the algorithm & threads have not loaded yet, or are closing down.
164  if (Algorithm?.Transactions == null || TransactionHandler.Orders == null || !Algorithm.GetLocked())
165  {
166  Log.Debug("LiveTradingResultHandler.Update(): Algorithm not yet initialized.");
167  ExitEvent.WaitOne(1000);
168  return;
169  }
170 
171  if (ExitTriggered)
172  {
173  return;
174  }
175 
176  var utcNow = DateTime.UtcNow;
177  if (utcNow > _nextUpdate)
178  {
179  try
180  {
181  Dictionary<int, Order> deltaOrders;
182  {
183  var stopwatch = Stopwatch.StartNew();
184  deltaOrders = GetDeltaOrders(LastDeltaOrderPosition, shouldStop: orderCount => stopwatch.ElapsedMilliseconds > 15);
185  }
186  var deltaOrderEvents = TransactionHandler.OrderEvents.Skip(LastDeltaOrderEventsPosition).Take(50).ToList();
187  LastDeltaOrderEventsPosition += deltaOrderEvents.Count;
188 
189  //Create and send back the changes in chart since the algorithm started.
190  var deltaCharts = new Dictionary<string, Chart>();
191  Log.Debug("LiveTradingResultHandler.Update(): Build delta charts");
192  var performanceCharts = new Dictionary<string, Chart>();
193  lock (ChartLock)
194  {
195  //Get the updates since the last chart
196  foreach (var chart in Charts)
197  {
198  var chartUpdates = chart.Value.GetUpdates();
199  // we only want to stream charts that have new updates
200  if (!chartUpdates.IsEmpty())
201  {
202  // remove directory pathing characters from chart names
203  var safeName = chart.Value.Name.Replace('/', '-');
204  DictionarySafeAdd(deltaCharts, safeName, chartUpdates, "deltaCharts");
205  }
206 
207  if (AlgorithmPerformanceCharts.Contains(chart.Key))
208  {
209  performanceCharts[chart.Key] = chart.Value.Clone();
210  }
211 
212  if (chartUpdates.Name == PortfolioMarginKey)
213  {
215  }
216  }
217  }
218  Log.Debug("LiveTradingResultHandler.Update(): End build delta charts");
219 
220  //Profit loss changes, get the banner statistics, summary information on the performance for the headers.
221  var deltaStatistics = new Dictionary<string, string>();
222  var serverStatistics = GetServerStatistics(utcNow);
223  var holdings = GetHoldings(Algorithm.Securities.Values, Algorithm.SubscriptionManager.SubscriptionDataConfigService);
224 
225  //Add the algorithm statistics first.
226  Log.Debug("LiveTradingResultHandler.Update(): Build run time stats");
227 
228  var summary = GenerateStatisticsResults(performanceCharts).Summary;
229  var runtimeStatistics = GetAlgorithmRuntimeStatistics(summary);
230  Log.Debug("LiveTradingResultHandler.Update(): End build run time stats");
231 
232 
233  // since we're sending multiple packets, let's do it async and forget about it
234  // chart data can get big so let's break them up into groups
235  var splitPackets = SplitPackets(deltaCharts, deltaOrders, holdings, Algorithm.Portfolio.CashBook, deltaStatistics, runtimeStatistics, serverStatistics, deltaOrderEvents);
236 
237  foreach (var liveResultPacket in splitPackets)
238  {
239  MessagingHandler.Send(liveResultPacket);
240  }
241 
242  //Send full packet to storage.
243  if (utcNow > _nextChartsUpdate)
244  {
245  Log.Debug("LiveTradingResultHandler.Update(): Pre-store result");
246  var chartComplete = new Dictionary<string, Chart>();
247  lock (ChartLock)
248  {
249  foreach (var chart in Charts)
250  {
251  // remove directory pathing characters from chart names
252  var safeName = chart.Value.Name.Replace('/', '-');
253  DictionarySafeAdd(chartComplete, safeName, chart.Value.Clone(), "chartComplete");
254  }
255  }
256 
257  var orderEvents = GetOrderEventsToStore();
258 
259  var orders = new Dictionary<int, Order>(TransactionHandler.Orders);
260  var complete = new LiveResultPacket(_job, new LiveResult(new LiveResultParameters(chartComplete, orders, Algorithm.Transactions.TransactionRecord, holdings, Algorithm.Portfolio.CashBook, deltaStatistics, runtimeStatistics, orderEvents, serverStatistics, state: GetAlgorithmState())));
261  StoreResult(complete);
262  _nextChartsUpdate = DateTime.UtcNow.Add(ChartUpdateInterval);
263  Log.Debug("LiveTradingResultHandler.Update(): End-store result");
264  }
265 
266  // Upload the logs every 1-2 minutes; this can be a heavy operation depending on amount of live logging and should probably be done asynchronously.
267  if (utcNow > _nextLogStoreUpdate)
268  {
269  List<LogEntry> logs;
270  Log.Debug("LiveTradingResultHandler.Update(): Storing log...");
271  lock (LogStore)
272  {
273  // we need a new container instance so we can store the logs outside the lock
274  logs = new List<LogEntry>(LogStore);
275  LogStore.Clear();
276  }
277  SaveLogs(AlgorithmId, logs);
278 
279  _nextLogStoreUpdate = DateTime.UtcNow.AddMinutes(2);
280  Log.Debug("LiveTradingResultHandler.Update(): Finished storing log");
281  }
282 
283  // Every minute send usage statistics:
284  if (utcNow > _nextStatisticsUpdate)
285  {
286  try
287  {
288  _api.SendStatistics(
289  _job.AlgorithmId,
295  GetNetReturn(),
297  TotalTradesCount(), 0);
298  }
299  catch (Exception err)
300  {
301  Log.Error(err, "Error sending statistics:");
302  }
303  _nextStatisticsUpdate = utcNow.AddMinutes(1);
304  }
305 
306  if (utcNow > _nextStatusUpdate)
307  {
308  var chartComplete = new Dictionary<string, Chart>();
309  lock (ChartLock)
310  {
311  foreach (var chart in Charts)
312  {
313  // remove directory pathing characters from chart names
314  var safeName = chart.Value.Name.Replace('/', '-');
315  DictionarySafeAdd(chartComplete, safeName, chart.Value.Clone(), "chartComplete");
316  }
317  }
318  StoreStatusFile(
319  runtimeStatistics,
320  // only store holdings we are invested in
321  holdings.Where(pair => pair.Value.Quantity != 0).ToDictionary(pair => pair.Key, pair => pair.Value),
322  chartComplete,
324  new SortedDictionary<DateTime, decimal>(Algorithm.Transactions.TransactionRecord),
325  serverStatistics);
326 
328  }
329 
330  if (_currentUtcDate != utcNow.Date)
331  {
332  StoreOrderEvents(_currentUtcDate, GetOrderEventsToStore());
333  // start storing in a new date file
334  _currentUtcDate = utcNow.Date;
335  }
336 
337  if (utcNow > _nextChartTrimming)
338  {
339  Log.Debug("LiveTradingResultHandler.Update(): Trimming charts");
340  var timeLimitUtc = utcNow.AddDays(-2);
341  lock (ChartLock)
342  {
343  foreach (var chart in Charts)
344  {
345  foreach (var series in chart.Value.Series)
346  {
347  // trim data that's older than 2 days
348  series.Value.Values =
349  (from v in series.Value.Values
350  where v.Time > timeLimitUtc
351  select v).ToList();
352  }
353  }
354  }
355  _nextChartTrimming = DateTime.UtcNow.AddMinutes(10);
356  Log.Debug("LiveTradingResultHandler.Update(): Finished trimming charts");
357  }
358 
359  if (utcNow > _nextInsightStoreUpdate)
360  {
361  StoreInsights();
362 
363  _nextInsightStoreUpdate = DateTime.UtcNow.Add(_storeInsightPeriod);
364  }
365  }
366  catch (Exception err)
367  {
368  Log.Error(err, "LiveTradingResultHandler().Update(): ", true);
369  }
370 
371  //Set the new update time after we've finished processing.
372  // The processing can takes time depending on how large the packets are.
373  _nextUpdate = DateTime.UtcNow.Add(MainUpdateInterval);
374  } // End Update Charts:
375  }
376 
377  /// <summary>
378  /// Assigns the next earliest status update time
379  /// </summary>
380  protected virtual void SetNextStatusUpdate()
381  {
382  // Update the status json file every X
383  _nextStatusUpdate = DateTime.UtcNow.AddMinutes(10);
384  }
385 
386  /// <summary>
387  /// Stores the order events
388  /// </summary>
389  /// <param name="utcTime">The utc date associated with these order events</param>
390  /// <param name="orderEvents">The order events to store</param>
391  protected override void StoreOrderEvents(DateTime utcTime, List<OrderEvent> orderEvents)
392  {
393  if (orderEvents.Count <= 0)
394  {
395  return;
396  }
397 
398  var filename = $"{AlgorithmId}-{utcTime:yyyy-MM-dd}-order-events.json";
399  var path = GetResultsPath(filename);
400 
401  var data = JsonConvert.SerializeObject(orderEvents, Formatting.None, SerializerSettings);
402 
403  File.WriteAllText(path, data);
404  }
405 
406  /// <summary>
407  /// Gets the order events generated in '_currentUtcDate'
408  /// </summary>
409  private List<OrderEvent> GetOrderEventsToStore()
410  {
411  return TransactionHandler.OrderEvents.Where(orderEvent => orderEvent.UtcTime >= _currentUtcDate).ToList();
412  }
413 
414  /// <summary>
415  /// Will store the complete status of the algorithm in a single json file
416  /// </summary>
417  /// <remarks>Will sample charts every 12 hours, 2 data points per day at maximum,
418  /// to reduce file size</remarks>
419  private void StoreStatusFile(SortedDictionary<string, string> runtimeStatistics,
420  Dictionary<string, Holding> holdings,
421  Dictionary<string, Chart> chartComplete,
422  Dictionary<string, string> algorithmState,
423  SortedDictionary<DateTime, decimal> profitLoss,
424  Dictionary<string, string> serverStatistics = null,
425  StatisticsResults statistics = null)
426  {
427  try
428  {
429  Log.Debug("LiveTradingResultHandler.Update(): status update start...");
430 
431  if (statistics == null)
432  {
433  statistics = GenerateStatisticsResults(chartComplete, profitLoss);
434  }
435 
436  // sample the entire charts with a 12 hours resolution
437  var dailySampler = new SeriesSampler(TimeSpan.FromHours(12));
438  chartComplete = dailySampler.SampleCharts(chartComplete, Time.Start, Time.EndOfTime);
439 
440  if (chartComplete.TryGetValue(PortfolioMarginKey, out var marginChart))
441  {
443  }
444 
445  var result = new LiveResult(new LiveResultParameters(chartComplete,
446  new Dictionary<int, Order>(TransactionHandler.Orders),
448  holdings,
449  Algorithm?.Portfolio.CashBook ?? new(),
450  statistics: statistics.Summary,
451  runtimeStatistics: runtimeStatistics,
452  orderEvents: null, // we stored order events separately
453  serverStatistics: serverStatistics,
454  state: algorithmState));
455 
456  SaveResults($"{AlgorithmId}.json", result);
457  Log.Debug("LiveTradingResultHandler.Update(): status update end.");
458  }
459  catch (Exception err)
460  {
461  Log.Error(err, "Error storing status update");
462  }
463  }
464 
465  /// <summary>
466  /// Run over all the data and break it into smaller packets to ensure they all arrive at the terminal
467  /// </summary>
468  private IEnumerable<LiveResultPacket> SplitPackets(Dictionary<string, Chart> deltaCharts,
469  Dictionary<int, Order> deltaOrders,
470  Dictionary<string, Holding> holdings,
471  CashBook cashbook,
472  Dictionary<string, string> deltaStatistics,
473  SortedDictionary<string, string> runtimeStatistics,
474  Dictionary<string, string> serverStatistics,
475  List<OrderEvent> deltaOrderEvents)
476  {
477  // break the charts into groups
478  var current = new Dictionary<string, Chart>();
479  var chartPackets = new List<LiveResultPacket>();
480 
481  // First add send charts
482 
483  // Loop through all the charts, add them to packets to be sent.
484  // Group three charts per packet
485  foreach (var deltaChart in deltaCharts.Values)
486  {
487  current.Add(deltaChart.Name, deltaChart);
488 
489  if (current.Count >= _streamedChartGroupSize)
490  {
491  // Add the micro packet to transport.
492  chartPackets.Add(new LiveResultPacket(_job, new LiveResult { Charts = current }));
493 
494  // Reset the carrier variable.
495  current = new Dictionary<string, Chart>();
496  if (chartPackets.Count * _streamedChartGroupSize >= _streamedChartLimit)
497  {
498  // stream a maximum number of charts
499  break;
500  }
501  }
502  }
503 
504  // Add whatever is left over here too
505  // unless it is a wildcard subscription
506  if (current.Count > 0)
507  {
508  chartPackets.Add(new LiveResultPacket(_job, new LiveResult { Charts = current }));
509  }
510 
511  // these are easier to split up, not as big as the chart objects
512  var packets = new[]
513  {
514  new LiveResultPacket(_job, new LiveResult { Holdings = holdings, CashBook = cashbook}),
515  new LiveResultPacket(_job, new LiveResult
516  {
517  Statistics = deltaStatistics,
518  RuntimeStatistics = runtimeStatistics,
519  ServerStatistics = serverStatistics
520  })
521  };
522 
523  var result = packets.Concat(chartPackets);
524 
525  // only send order and order event packet if there is actually any update
526  if (deltaOrders.Count > 0 || deltaOrderEvents.Count > 0)
527  {
528  result = result.Concat(new[] { new LiveResultPacket(_job, new LiveResult { Orders = deltaOrders, OrderEvents = deltaOrderEvents }) });
529  }
530 
531  return result;
532  }
533 
534 
535  /// <summary>
536  /// Send a live trading debug message to the live console.
537  /// </summary>
538  /// <param name="message">Message we'd like shown in console.</param>
539  /// <remarks>When there are already 500 messages in the queue it stops adding new messages.</remarks>
540  public void DebugMessage(string message)
541  {
542  if (Messages.Count > 500) return; //if too many in the queue already skip the logging.
543  Messages.Enqueue(new DebugPacket(_job.ProjectId, AlgorithmId, CompileId, message));
544  AddToLogStore(message);
545  }
546 
547  /// <summary>
548  /// Send a live trading system debug message to the live console.
549  /// </summary>
550  /// <param name="message">Message we'd like shown in console.</param>
551  public void SystemDebugMessage(string message)
552  {
553  Messages.Enqueue(new SystemDebugPacket(_job.ProjectId, AlgorithmId, CompileId, message));
554  AddToLogStore(message);
555  }
556 
557 
558  /// <summary>
559  /// Log string messages and send them to the console.
560  /// </summary>
561  /// <param name="message">String message wed like logged.</param>
562  /// <remarks>When there are already 500 messages in the queue it stops adding new messages.</remarks>
563  public void LogMessage(string message)
564  {
565  //Send the logging messages out immediately for live trading:
566  if (Messages.Count > 500) return;
567  Messages.Enqueue(new LogPacket(AlgorithmId, message));
568  AddToLogStore(message);
569  }
570 
571  /// <summary>
572  /// Save an algorithm message to the log store. Uses a different timestamped method of adding messaging to interweve debug and logging messages.
573  /// </summary>
574  /// <param name="message">String message to send to browser.</param>
575  protected override void AddToLogStore(string message)
576  {
577  Log.Debug("LiveTradingResultHandler.AddToLogStore(): Adding");
578  base.AddToLogStore(DateTime.Now.ToStringInvariant(DateFormat.UI) + " " + message);
579  Log.Debug("LiveTradingResultHandler.AddToLogStore(): Finished adding");
580  }
581 
582  /// <summary>
583  /// Send an error message back to the browser console and highlight it read.
584  /// </summary>
585  /// <param name="message">Message we'd like shown in console.</param>
586  /// <param name="stacktrace">Stacktrace to show in the console.</param>
587  public void ErrorMessage(string message, string stacktrace = "")
588  {
589  if (Messages.Count > 500) return;
590  Messages.Enqueue(new HandledErrorPacket(AlgorithmId, message, stacktrace));
591  AddToLogStore(message + (!string.IsNullOrEmpty(stacktrace) ? ": StackTrace: " + stacktrace : string.Empty));
592  }
593 
594  /// <summary>
595  /// Send a list of secutity types that the algorithm trades to the browser to show the market clock - is this market open or closed!
596  /// </summary>
597  /// <param name="types">List of security types</param>
598  public void SecurityType(List<SecurityType> types)
599  {
600  var packet = new SecurityTypesPacket { Types = types };
601  Messages.Enqueue(packet);
602  }
603 
604  /// <summary>
605  /// Send a runtime error back to the users browser and highlight it red.
606  /// </summary>
607  /// <param name="message">Runtime error message</param>
608  /// <param name="stacktrace">Associated error stack trace.</param>
609  public virtual void RuntimeError(string message, string stacktrace = "")
610  {
611  Messages.Enqueue(new RuntimeErrorPacket(_job.UserId, AlgorithmId, message, stacktrace));
612  AddToLogStore(message + (!string.IsNullOrEmpty(stacktrace) ? ": StackTrace: " + stacktrace : string.Empty));
613  SetAlgorithmState(message, stacktrace);
614  }
615 
616  /// <summary>
617  /// Process brokerage message events
618  /// </summary>
619  /// <param name="brokerageMessageEvent">The brokerage message event</param>
620  public virtual void BrokerageMessage(BrokerageMessageEvent brokerageMessageEvent)
621  {
622  // NOP
623  }
624 
625  /// <summary>
626  /// Add a sample to the chart specified by the chartName, and seriesName.
627  /// </summary>
628  /// <param name="chartName">String chart name to place the sample.</param>
629  /// <param name="seriesName">Series name for the chart.</param>
630  /// <param name="seriesIndex">Series chart index - which chart should this series belong</param>
631  /// <param name="seriesType">Series type for the chart.</param>
632  /// <param name="value">Value for the chart sample.</param>
633  /// <param name="unit">Unit for the chart axis</param>
634  /// <remarks>Sample can be used to create new charts or sample equity - daily performance.</remarks>
635  protected override void Sample(string chartName, string seriesName, int seriesIndex, SeriesType seriesType, ISeriesPoint value,
636  string unit = "$")
637  {
638  // Sampling during warming up period skews statistics
640  {
641  return;
642  }
643 
644  Log.Debug("LiveTradingResultHandler.Sample(): Sampling " + chartName + "." + seriesName);
645  lock (ChartLock)
646  {
647  //Add a copy locally:
648  if (!Charts.TryGetValue(chartName, out var chart))
649  {
650  Charts.AddOrUpdate(chartName, new Chart(chartName));
651  chart = Charts[chartName];
652  }
653 
654  //Add the sample to our chart:
655  if (!chart.Series.TryGetValue(seriesName, out var series))
656  {
657  series = BaseSeries.Create(seriesType, seriesName, seriesIndex, unit);
658  chart.Series.Add(seriesName, series);
659  }
660 
661  //Add our value:
662  series.Values.Add(value);
663  }
664  Log.Debug("LiveTradingResultHandler.Sample(): Done sampling " + chartName + "." + seriesName);
665  }
666 
667  /// <summary>
668  /// Add a range of samples from the users algorithms to the end of our current list.
669  /// </summary>
670  /// <param name="updates">Chart updates since the last request.</param>
671  /// <seealso cref="Sample(string,string,int,SeriesType,ISeriesPoint,string)"/>
672  protected void SampleRange(IEnumerable<Chart> updates)
673  {
674  Log.Debug("LiveTradingResultHandler.SampleRange(): Begin sampling");
675  lock (ChartLock)
676  {
677  foreach (var update in updates)
678  {
679  //Create the chart if it doesn't exist already:
680  Chart chart;
681  if (!Charts.TryGetValue(update.Name, out chart))
682  {
683  chart = new Chart(update.Name);
684  Charts.AddOrUpdate(update.Name, chart);
685  }
686 
687  //Add these samples to this chart.
688  foreach (BaseSeries series in update.Series.Values)
689  {
690  if (series.Values.Count > 0)
691  {
692  var thisSeries = chart.TryAddAndGetSeries(series.Name, series, forceAddNew: false);
693  if (series.SeriesType == SeriesType.Pie)
694  {
695  var dataPoint = series.ConsolidateChartPoints();
696  if (dataPoint != null)
697  {
698  thisSeries.AddPoint(dataPoint);
699  }
700  }
701  else
702  {
703  //We already have this record, so just the new samples to the end:
704  thisSeries.Values.AddRange(series.Values);
705  }
706  }
707  }
708  }
709  }
710  Log.Debug("LiveTradingResultHandler.SampleRange(): Finished sampling");
711  }
712 
713  /// <summary>
714  /// Set the algorithm of the result handler after its been initialized.
715  /// </summary>
716  /// <param name="algorithm">Algorithm object matching IAlgorithm interface</param>
717  /// <param name="startingPortfolioValue">Algorithm starting capital for statistics calculations</param>
718  public virtual void SetAlgorithm(IAlgorithm algorithm, decimal startingPortfolioValue)
719  {
720  Algorithm = algorithm;
722  DailyPortfolioValue = StartingPortfolioValue = startingPortfolioValue;
723  _portfolioValue = new ReferenceWrapper<decimal>(startingPortfolioValue);
726 
727  var types = new List<SecurityType>();
728  foreach (var kvp in Algorithm.Securities)
729  {
730  var security = kvp.Value;
731 
732  if (!types.Contains(security.Type)) types.Add(security.Type);
733  }
734  SecurityType(types);
735 
736  // we need to forward Console.Write messages to the algorithm's Debug function
737  var debug = new FuncTextWriter(algorithm.Debug);
738  var error = new FuncTextWriter(algorithm.Error);
739  Console.SetOut(debug);
740  Console.SetError(error);
741 
742  UpdateAlgorithmStatus();
743 
744  // Wire algorithm name and tags updates
745  algorithm.NameUpdated += (sender, name) => AlgorithmNameUpdated(name);
746  algorithm.TagsUpdated += (sender, tags) => AlgorithmTagsUpdated(tags);
747  }
748 
749 
750  /// <summary>
751  /// Send a algorithm status update to the user of the algorithms running state.
752  /// </summary>
753  /// <param name="status">Status enum of the algorithm.</param>
754  /// <param name="message">Optional string message describing reason for status change.</param>
755  public void SendStatusUpdate(AlgorithmStatus status, string message = "")
756  {
757  Log.Trace($"LiveTradingResultHandler.SendStatusUpdate(): status: '{status}'. {(string.IsNullOrEmpty(message) ? string.Empty : " " + message)}");
758  var packet = new AlgorithmStatusPacket(_job.AlgorithmId, _job.ProjectId, status, message);
759  Messages.Enqueue(packet);
760  }
761 
762 
763  /// <summary>
764  /// Set a dynamic runtime statistic to show in the (live) algorithm header
765  /// </summary>
766  /// <param name="key">Runtime headline statistic name</param>
767  /// <param name="value">Runtime headline statistic value</param>
768  public void RuntimeStatistic(string key, string value)
769  {
770  Log.Debug("LiveTradingResultHandler.RuntimeStatistic(): Begin setting statistic");
771  lock (RuntimeStatistics)
772  {
773  if (!RuntimeStatistics.ContainsKey(key))
774  {
775  RuntimeStatistics.Add(key, value);
776  }
777  RuntimeStatistics[key] = value;
778  }
779  Log.Debug("LiveTradingResultHandler.RuntimeStatistic(): End setting statistic");
780  }
781 
782  /// <summary>
783  /// Send a final analysis result back to the IDE.
784  /// </summary>
785  protected void SendFinalResult()
786  {
787  Log.Trace("LiveTradingResultHandler.SendFinalResult(): Starting...");
788  try
789  {
790  var endTime = DateTime.UtcNow;
791  var endState = GetAlgorithmState(endTime);
792  LiveResultPacket result;
793  // could happen if algorithm failed to init
794  if (Algorithm != null)
795  {
796  //Convert local dictionary:
797  var charts = new Dictionary<string, Chart>();
798  lock (ChartLock)
799  {
800  foreach (var kvp in Charts)
801  {
802  charts.Add(kvp.Key, kvp.Value.Clone());
803  }
804  }
805 
806  var orders = new Dictionary<int, Order>(TransactionHandler.Orders);
807  var profitLoss = new SortedDictionary<DateTime, decimal>(Algorithm.Transactions.TransactionRecord);
808  var holdings = GetHoldings(Algorithm.Securities.Values, Algorithm.SubscriptionManager.SubscriptionDataConfigService, onlyInvested: true);
809  var statisticsResults = GenerateStatisticsResults(charts, profitLoss);
810  var runtime = GetAlgorithmRuntimeStatistics(statisticsResults.Summary);
811 
812  StoreStatusFile(runtime, holdings, charts, endState, profitLoss, statistics: statisticsResults);
813 
814  //Create a packet:
815  result = new LiveResultPacket(_job,
816  new LiveResult(new LiveResultParameters(charts, orders, profitLoss, new Dictionary<string, Holding>(),
817  Algorithm.Portfolio.CashBook, statisticsResults.Summary, runtime, GetOrderEventsToStore(),
818  algorithmConfiguration: AlgorithmConfiguration.Create(Algorithm, null), state: endState)));
819  }
820  else
821  {
822  StoreStatusFile(new(), new(), new(), endState, new());
823 
824  result = LiveResultPacket.CreateEmpty(_job);
825  result.Results.State = endState;
826  }
827  result.ProcessingTime = (endTime - StartTime).TotalSeconds;
828 
829  StoreInsights();
830 
831  //Store to S3:
832  StoreResult(result);
833  Log.Trace("LiveTradingResultHandler.SendFinalResult(): Finished storing results. Start sending...");
834  //Truncate packet to fit within 32kb:
835  result.Results = new LiveResult();
836 
837  //Send the truncated packet:
838  MessagingHandler.Send(result);
839  }
840  catch (Exception err)
841  {
842  Log.Error(err);
843  }
844  Log.Trace("LiveTradingResultHandler.SendFinalResult(): Ended");
845  }
846 
847  /// <summary>
848  /// Process the log entries and save it to permanent storage
849  /// </summary>
850  /// <param name="id">Id that will be incorporated into the algorithm log name</param>
851  /// <param name="logs">Log list</param>
852  /// <returns>Returns the location of the logs</returns>
853  public override string SaveLogs(string id, List<LogEntry> logs)
854  {
855  try
856  {
857  var logLines = logs.Select(x => x.Message);
858  var filename = $"{id}-log.txt";
859  var path = GetResultsPath(filename);
860  File.AppendAllLines(path, logLines);
861  return path;
862  }
863  catch (Exception err)
864  {
865  Log.Error(err);
866  }
867  return "";
868  }
869 
870  /// <summary>
871  /// Save the snapshot of the total results to storage.
872  /// </summary>
873  /// <param name="packet">Packet to store.</param>
874  protected override void StoreResult(Packet packet)
875  {
876  try
877  {
878  Log.Debug("LiveTradingResultHandler.StoreResult(): Begin store result sampling");
879 
880  // Make sure this is the right type of packet:
881  if (packet.Type != PacketType.LiveResult) return;
882 
883  // Port to packet format:
884  var live = packet as LiveResultPacket;
885 
886  if (live != null)
887  {
888  if (live.Results.OrderEvents != null)
889  {
890  // we store order events separately
891  StoreOrderEvents(_currentUtcDate, live.Results.OrderEvents);
892  // lets null the orders events so that they aren't stored again and generate a giant file
893  live.Results.OrderEvents = null;
894  }
895 
896  // we need to down sample
897  var start = DateTime.UtcNow.Date;
898  var stop = start.AddDays(1);
899 
900  // truncate to just today, we don't need more than this for anyone
901  Truncate(live.Results, start, stop);
902 
903  var highResolutionCharts = new Dictionary<string, Chart>(live.Results.Charts);
904 
905  // minute resolution data, save today
906  var minuteSampler = new SeriesSampler(TimeSpan.FromMinutes(1));
907  var minuteCharts = minuteSampler.SampleCharts(live.Results.Charts, start, stop);
908 
909  // swap out our charts with the sampled data
910  minuteCharts.Remove(PortfolioMarginKey);
911  live.Results.Charts = minuteCharts;
912  SaveResults(CreateKey("minute"), live.Results);
913 
914  // 10 minute resolution data, save today
915  var tenminuteSampler = new SeriesSampler(TimeSpan.FromMinutes(10));
916  var tenminuteCharts = tenminuteSampler.SampleCharts(live.Results.Charts, start, stop);
917  lock (_intradayPortfolioState)
918  {
919  var clone = _intradayPortfolioState.Clone();
921  tenminuteCharts[PortfolioMarginKey] = clone;
922  }
923 
924  live.Results.Charts = tenminuteCharts;
925  SaveResults(CreateKey("10minute"), live.Results);
926 
927  // high resolution data, we only want to save an hour
928  highResolutionCharts.Remove(PortfolioMarginKey);
929  live.Results.Charts = highResolutionCharts;
930  start = DateTime.UtcNow.RoundDown(TimeSpan.FromHours(1));
931  stop = DateTime.UtcNow.RoundUp(TimeSpan.FromHours(1));
932 
933  Truncate(live.Results, start, stop);
934 
935  foreach (var name in live.Results.Charts.Keys)
936  {
937  var result = new LiveResult
938  {
939  Orders = new Dictionary<int, Order>(live.Results.Orders),
940  Holdings = new Dictionary<string, Holding>(live.Results.Holdings),
941  Charts = new Dictionary<string, Chart> { { name, live.Results.Charts[name] } }
942  };
943 
944  SaveResults(CreateKey("second_" + CreateSafeChartName(name), "yyyy-MM-dd-HH"), result);
945  }
946  }
947  else
948  {
949  Log.Error("LiveResultHandler.StoreResult(): Result Null.");
950  }
951 
952  Log.Debug("LiveTradingResultHandler.StoreResult(): End store result sampling");
953  }
954  catch (Exception err)
955  {
956  Log.Error(err);
957  }
958  }
959 
960  /// <summary>
961  /// New order event for the algorithm
962  /// </summary>
963  /// <param name="newEvent">New event details</param>
964  public override void OrderEvent(OrderEvent newEvent)
965  {
966  var brokerIds = string.Empty;
967  var order = TransactionHandler.GetOrderById(newEvent.OrderId);
968  if (order != null && order.BrokerId.Count > 0) brokerIds = string.Join(", ", order.BrokerId);
969 
970  //Send the message to frontend as packet:
971  Log.Trace("LiveTradingResultHandler.OrderEvent(): " + newEvent + " BrokerId: " + brokerIds, true);
972  Messages.Enqueue(new OrderEventPacket(AlgorithmId, newEvent));
973 
974  var message = "New Order Event: " + newEvent;
975  DebugMessage(message);
976  }
977 
978  /// <summary>
979  /// Terminate the result thread and apply any required exit procedures like sending final results
980  /// </summary>
981  public override void Exit()
982  {
983  if (!ExitTriggered)
984  {
985  _cancellationTokenSource.Cancel();
986 
987  if (Algorithm != null)
988  {
989  // first process synchronous events so we add any new message or log
991  }
992 
993  // Set exit flag, update task will send any message before stopping
994  ExitTriggered = true;
995  ExitEvent.Set();
996 
997  lock (LogStore)
998  {
1000  LogStore.Clear();
1001  }
1002 
1003  StopUpdateRunner();
1004 
1005  SendFinalResult();
1006 
1007  base.Exit();
1008 
1009  _cancellationTokenSource.DisposeSafely();
1010  }
1011  }
1012 
1013  /// <summary>
1014  /// Truncates the chart and order data in the result packet to within the specified time frame
1015  /// </summary>
1016  private static void Truncate(LiveResult result, DateTime start, DateTime stop)
1017  {
1018  //Log.Trace("LiveTradingResultHandler.Truncate: Start: " + start.ToString("u") + " Stop : " + stop.ToString("u"));
1019  //Log.Trace("LiveTradingResultHandler.Truncate: Truncate Delta: " + (unixDateStop - unixDateStart) + " Incoming Points: " + result.Charts["Strategy Equity"].Series["Equity"].Values.Count);
1020 
1021  var charts = new Dictionary<string, Chart>();
1022  foreach (var kvp in result.Charts)
1023  {
1024  var chart = kvp.Value;
1025  var newChart = new Chart(chart.Name);
1026  charts.Add(kvp.Key, newChart);
1027  foreach (var series in chart.Series.Values)
1028  {
1029  var newSeries = series.Clone(empty: true);
1030  newSeries.Values.AddRange(series.Values.Where(chartPoint => chartPoint.Time >= start && chartPoint.Time <= stop));
1031  newChart.AddSeries(newSeries);
1032  }
1033  }
1034  result.Charts = charts;
1035  result.Orders = result.Orders.Values.Where(x =>
1036  (x.Time >= start && x.Time <= stop) ||
1037  (x.LastFillTime != null && x.LastFillTime >= start && x.LastFillTime <= stop) ||
1038  (x.LastUpdateTime != null && x.LastUpdateTime >= start && x.LastUpdateTime <= stop)
1039  ).ToDictionary(x => x.Id);
1040 
1041  //Log.Trace("LiveTradingResultHandler.Truncate: Truncate Outgoing: " + result.Charts["Strategy Equity"].Series["Equity"].Values.Count);
1042  }
1043 
1044  private string CreateKey(string suffix, string dateFormat = "yyyy-MM-dd")
1045  {
1046  return $"{AlgorithmId}-{DateTime.UtcNow.ToStringInvariant(dateFormat)}_{suffix}.json";
1047  }
1048 
1049  /// <summary>
1050  /// Escape the chartname so that it can be saved to a file system
1051  /// </summary>
1052  /// <param name="chartName">The name of a chart</param>
1053  /// <returns>The name of the chart will all escape all characters except RFC 2396 unreserved characters</returns>
1054  protected virtual string CreateSafeChartName(string chartName)
1055  {
1056  return Uri.EscapeDataString(chartName);
1057  }
1058 
1059  /// <summary>
1060  /// Process the synchronous result events, sampling and message reading.
1061  /// This method is triggered from the algorithm manager thread.
1062  /// </summary>
1063  /// <remarks>Prime candidate for putting into a base class. Is identical across all result handlers.</remarks>
1064  public virtual void ProcessSynchronousEvents(bool forceProcess = false)
1065  {
1066  var time = DateTime.UtcNow;
1067 
1068  // Check to see if we should update stored portfolio values
1069  UpdatePortfolioValue(time, forceProcess);
1070 
1071  // Update the equity bar
1073 
1074  if (time > _nextPortfolioMarginUpdate || forceProcess)
1075  {
1076  _nextPortfolioMarginUpdate = time.RoundDown(_samplePortfolioPeriod).Add(_samplePortfolioPeriod);
1077 
1078  var newState = PortfolioState.Create(Algorithm.Portfolio, time, GetPortfolioValue());
1079  lock (_intradayPortfolioState)
1080  {
1081  if (_previousPortfolioMarginUpdate.Date != time.Date)
1082  {
1083  // we crossed into a new day
1084  _previousPortfolioMarginUpdate = time.Date;
1085  _intradayPortfolioState.Series.Clear();
1086  }
1087 
1088  if (newState != null)
1089  {
1090  PortfolioMarginChart.AddSample(_intradayPortfolioState, newState, MapFileProvider, time);
1091  }
1092  }
1093  }
1094 
1095  if (time > _nextSample || forceProcess)
1096  {
1097  Log.Debug("LiveTradingResultHandler.ProcessSynchronousEvents(): Enter");
1098 
1099  //Set next sample time: 4000 samples per backtest
1100  _nextSample = time.Add(ResamplePeriod);
1101 
1102  // Check to see if we should update stored bench values
1103  UpdateBenchmarkValue(time, forceProcess);
1104 
1105  //Sample the portfolio value over time for chart.
1106  SampleEquity(time);
1107 
1108  //Also add the user samples / plots to the result handler tracking:
1110  }
1111 
1112  ProcessAlgorithmLogs(messageQueueLimit: 500);
1113 
1114  //Set the running statistics:
1115  foreach (var pair in Algorithm.RuntimeStatistics)
1116  {
1117  RuntimeStatistic(pair.Key, pair.Value);
1118  }
1119 
1120  //Send all the notification messages but timeout within a second, or if this is a force process, wait till its done.
1121  var timeout = DateTime.UtcNow.AddSeconds(1);
1122  while (!Algorithm.Notify.Messages.IsEmpty && (DateTime.UtcNow < timeout || forceProcess))
1123  {
1124  Notification message;
1125  if (Algorithm.Notify.Messages.TryDequeue(out message))
1126  {
1127  //Process the notification messages:
1128  Log.Trace("LiveTradingResultHandler.ProcessSynchronousEvents(): Processing Notification...");
1129  try
1130  {
1132  }
1133  catch (Exception err)
1134  {
1135  Algorithm.Debug(err.Message);
1136  Log.Error(err, "Sending notification: " + message.GetType().FullName);
1137  }
1138  }
1139  }
1140 
1141  Log.Debug("LiveTradingResultHandler.ProcessSynchronousEvents(): Exit");
1142  }
1143 
1144  /// <summary>
1145  /// Event fired each time that we add/remove securities from the data feed.
1146  /// On Security change we re determine when should we sample charts, if the user added Crypto, Forex or an extended market hours subscription
1147  /// we will always sample charts. Else, we will keep the exchange per market to query later on demand
1148  /// </summary>
1149  public override void OnSecuritiesChanged(SecurityChanges changes)
1150  {
1151  if (_sampleChartAlways)
1152  {
1153  return;
1154  }
1155  foreach (var securityChange in changes.AddedSecurities)
1156  {
1157  var symbol = securityChange.Symbol;
1158  if (symbol.SecurityType == QuantConnect.SecurityType.Base)
1159  {
1160  // ignore custom data
1161  continue;
1162  }
1163 
1164  // if the user added Crypto, Forex, Daily or an extended market hours subscription just sample always, one way trip.
1165  _sampleChartAlways = symbol.SecurityType == QuantConnect.SecurityType.Crypto
1166  || symbol.SecurityType == QuantConnect.SecurityType.Forex
1167  || Algorithm.SubscriptionManager.SubscriptionDataConfigService.GetSubscriptionDataConfigs(symbol)
1168  .Any(config => config.ExtendedMarketHours || config.Resolution == Resolution.Daily);
1169  if (_sampleChartAlways)
1170  {
1171  // we set it once to true
1172  return;
1173  }
1174 
1175  if (!_exchangeHours.ContainsKey(securityChange.Symbol.ID.Market))
1176  {
1177  // per market we keep track of the exchange hours
1178  _exchangeHours[securityChange.Symbol.ID.Market] = securityChange.Exchange.Hours;
1179  }
1180  }
1181  }
1182 
1183  /// <summary>
1184  /// Samples portfolio equity, benchmark, and daily performance
1185  /// </summary>
1186  /// <param name="time">Current UTC time in the AlgorithmManager loop</param>
1187  public void Sample(DateTime time)
1188  {
1189  // Force an update for our values before doing our daily sample
1190  UpdatePortfolioValue(time);
1191  UpdateBenchmarkValue(time);
1192  base.Sample(time);
1193  }
1194 
1195  /// <summary>
1196  /// Gets the current portfolio value
1197  /// </summary>
1198  /// <remarks>Useful so that live trading implementation can freeze the returned value if there is no user exchange open
1199  /// so we ignore extended market hours updates</remarks>
1200  protected override decimal GetPortfolioValue()
1201  {
1202  return _portfolioValue.Value;
1203  }
1204 
1205  /// <summary>
1206  /// Gets the current benchmark value
1207  /// </summary>
1208  /// <remarks>Useful so that live trading implementation can freeze the returned value if there is no user exchange open
1209  /// so we ignore extended market hours updates</remarks>
1210  /// <param name="time">Time to resolve benchmark value at</param>
1211  protected override decimal GetBenchmarkValue(DateTime time)
1212  {
1213  return _benchmarkValue.Value;
1214  }
1215 
1216  /// <summary>
1217  /// True if user exchange are open and we should update portfolio and benchmark value
1218  /// </summary>
1219  /// <remarks>Useful so that live trading implementation can freeze the returned value if there is no user exchange open
1220  /// so we ignore extended market hours updates</remarks>
1221  [MethodImpl(MethodImplOptions.AggressiveInlining)]
1222  private bool UserExchangeIsOpen(DateTime utcDateTime)
1223  {
1224  if (_sampleChartAlways || _exchangeHours.Count == 0)
1225  {
1226  return true;
1227  }
1228 
1229  if (_lastChartSampleLogicCheck.Day == utcDateTime.Day
1230  && _lastChartSampleLogicCheck.Hour == utcDateTime.Hour
1231  && _lastChartSampleLogicCheck.Minute == utcDateTime.Minute)
1232  {
1233  // we cache the value for a minute
1234  return _userExchangeIsOpen;
1235  }
1236  _lastChartSampleLogicCheck = utcDateTime;
1237 
1238  foreach (var exchangeHour in _exchangeHours.Values)
1239  {
1240  if (exchangeHour.IsOpen(utcDateTime.ConvertFromUtc(exchangeHour.TimeZone), false))
1241  {
1242  // one of the users exchanges is open
1243  _userExchangeIsOpen = true;
1244  return true;
1245  }
1246  }
1247 
1248  // no user exchange is open
1249  _userExchangeIsOpen = false;
1250  return false;
1251  }
1252 
1253  private static void DictionarySafeAdd<T>(Dictionary<string, T> dictionary, string key, T value, string dictionaryName)
1254  {
1255  if (!dictionary.TryAdd(key, value))
1256  {
1257  Log.Error($"LiveTradingResultHandler.DictionarySafeAdd(): dictionary {dictionaryName} already contains key {key}");
1258  }
1259  }
1260 
1261  /// <summary>
1262  /// Will launch a task which will call the API and update the algorithm status every minute
1263  /// </summary>
1264  private void UpdateAlgorithmStatus()
1265  {
1266  if (!ExitTriggered
1267  && !_cancellationTokenSource.IsCancellationRequested) // just in case
1268  {
1269  // wait until after we're warmed up to start sending running status each minute
1270  if (!Algorithm.IsWarmingUp)
1271  {
1272  _api.SetAlgorithmStatus(_job.AlgorithmId, AlgorithmStatus.Running);
1273  }
1274  Task.Delay(TimeSpan.FromMinutes(1), _cancellationTokenSource.Token).ContinueWith(_ => UpdateAlgorithmStatus());
1275  }
1276  }
1277 
1278  [MethodImpl(MethodImplOptions.AggressiveInlining)]
1279  private void UpdateBenchmarkValue(DateTime time, bool force = false)
1280  {
1281  if (force || UserExchangeIsOpen(time))
1282  {
1283  _benchmarkValue = new ReferenceWrapper<decimal>(base.GetBenchmarkValue(time));
1284  }
1285  }
1286 
1287  [MethodImpl(MethodImplOptions.AggressiveInlining)]
1288  private void UpdatePortfolioValue(DateTime time, bool force = false)
1289  {
1290  if (force || UserExchangeIsOpen(time))
1291  {
1292  _portfolioValue = new ReferenceWrapper<decimal>(base.GetPortfolioValue());
1293  }
1294  }
1295 
1296  /// <summary>
1297  /// Helper method to fetch the algorithm holdings
1298  /// </summary>
1299  public static Dictionary<string, Holding> GetHoldings(IEnumerable<Security> securities, ISubscriptionDataConfigService subscriptionDataConfigService, bool onlyInvested = false)
1300  {
1301  var holdings = new Dictionary<string, Holding>();
1302 
1303  foreach (var security in securities
1304  // If we are invested we send it always, if not, we send non internal, non canonical and tradable securities. When securities are removed they are marked as non tradable.
1305  .Where(s => s.Invested || !onlyInvested && (!s.IsInternalFeed() && s.IsTradable && !s.Symbol.IsCanonical()
1306  // Continuous futures are different because it's mapped securities are internal and the continuous contract is canonical and non tradable but we want to send them anyways
1307  // but we don't want to sent non canonical, non tradable futures, these would be the future chain assets, or continuous mapped contracts that have been removed
1308  || s.Symbol.SecurityType == QuantConnect.SecurityType.Future && (s.IsTradable || s.Symbol.IsCanonical() && subscriptionDataConfigService.GetSubscriptionDataConfigs(s.Symbol).Any())))
1309  .OrderBy(x => x.Symbol.Value))
1310  {
1311  DictionarySafeAdd(holdings, security.Symbol.ID.ToString(), new Holding(security), "holdings");
1312  }
1313 
1314  return holdings;
1315  }
1316 
1317  /// <summary>
1318  /// Calculates and gets the current statistics for the algorithm
1319  /// </summary>
1320  /// <returns>The current statistics</returns>
1322  {
1323  return GenerateStatisticsResults();
1324  }
1325 
1326  /// <summary>
1327  /// Sets or updates a custom summary statistic
1328  /// </summary>
1329  /// <param name="name">The statistic name</param>
1330  /// <param name="value">The statistic value</param>
1331  public void SetSummaryStatistic(string name, string value)
1332  {
1333  SummaryStatistic(name, value);
1334  }
1335 
1336  /// <summary>
1337  /// Handles updates to the algorithm's name
1338  /// </summary>
1339  /// <param name="name">The new name</param>
1340  public virtual void AlgorithmNameUpdated(string name)
1341  {
1342  Messages.Enqueue(new AlgorithmNameUpdatePacket(AlgorithmId, name));
1343  }
1344 
1345  /// <summary>
1346  /// Handles updates to the algorithm's tags
1347  /// </summary>
1348  /// <param name="tags">The new tags</param>
1349  public virtual void AlgorithmTagsUpdated(HashSet<string> tags)
1350  {
1351  Messages.Enqueue(new AlgorithmTagsUpdatePacket(AlgorithmId, tags));
1352  }
1353  }
1354 }