18 using System.Collections.Generic;
19 using System.Diagnostics;
22 using System.Runtime.CompilerServices;
23 using System.Threading;
24 using System.Threading.Tasks;
25 using Newtonsoft.Json;
51 private DateTime _nextUpdate;
52 private DateTime _nextChartsUpdate;
53 private DateTime _nextChartTrimming;
54 private DateTime _nextLogStoreUpdate;
55 private DateTime _nextStatisticsUpdate;
56 private DateTime _nextInsightStoreUpdate;
57 private DateTime _currentUtcDate;
59 private readonly TimeSpan _storeInsightPeriod;
61 private DateTime _nextPortfolioMarginUpdate;
62 private DateTime _previousPortfolioMarginUpdate;
63 private readonly TimeSpan _samplePortfolioPeriod;
69 private DateTime _nextStatusUpdate;
72 private DateTime _nextSample;
74 private readonly CancellationTokenSource _cancellationTokenSource;
75 private readonly
int _streamedChartLimit;
76 private readonly
int _streamedChartGroupSize;
78 private bool _sampleChartAlways;
79 private bool _userExchangeIsOpen;
82 private DateTime _lastChartSampleLogicCheck;
83 private readonly Dictionary<string, SecurityExchangeHours> _exchangeHours;
91 _exchangeHours =
new Dictionary<string, SecurityExchangeHours>();
92 _cancellationTokenSource =
new CancellationTokenSource();
95 _samplePortfolioPeriod = _storeInsightPeriod = TimeSpan.FromMinutes(10);
96 _streamedChartLimit =
Config.
GetInt(
"streamed-chart-limit", 12);
97 _streamedChartGroupSize =
Config.
GetInt(
"streamed-chart-group-size", 3);
109 _api = parameters.
Api;
111 if (_job ==
null)
throw new Exception(
"LiveResultHandler.Constructor(): Submitted Job type invalid.");
112 var utcNow = DateTime.UtcNow;
113 _currentUtcDate = utcNow.Date;
115 _nextPortfolioMarginUpdate = utcNow.RoundDown(_samplePortfolioPeriod).Add(_samplePortfolioPeriod);
116 base.Initialize(parameters);
122 protected override void Run()
134 if (
Messages.TryDequeue(out packet))
148 catch (Exception err)
154 Log.
Trace(
"LiveTradingResultHandler.Run(): Ending Thread...");
161 private void Update()
166 Log.
Debug(
"LiveTradingResultHandler.Update(): Algorithm not yet initialized.");
176 var utcNow = DateTime.UtcNow;
177 if (utcNow > _nextUpdate)
181 Dictionary<int, Order> deltaOrders;
183 var stopwatch = Stopwatch.StartNew();
190 var deltaCharts =
new Dictionary<string, Chart>();
191 Log.
Debug(
"LiveTradingResultHandler.Update(): Build delta charts");
192 var performanceCharts =
new Dictionary<string, Chart>();
196 foreach (var chart
in Charts)
198 var chartUpdates = chart.Value.GetUpdates();
200 if (!chartUpdates.IsEmpty())
203 var safeName = chart.Value.Name.Replace(
'/',
'-');
204 DictionarySafeAdd(deltaCharts, safeName, chartUpdates,
"deltaCharts");
209 performanceCharts[chart.Key] = chart.Value.Clone();
218 Log.
Debug(
"LiveTradingResultHandler.Update(): End build delta charts");
221 var deltaStatistics =
new Dictionary<string, string>();
226 Log.
Debug(
"LiveTradingResultHandler.Update(): Build run time stats");
230 Log.
Debug(
"LiveTradingResultHandler.Update(): End build run time stats");
235 var splitPackets = SplitPackets(deltaCharts, deltaOrders, holdings,
Algorithm.
Portfolio.
CashBook, deltaStatistics, runtimeStatistics, serverStatistics, deltaOrderEvents);
237 foreach (var liveResultPacket
in splitPackets)
243 if (utcNow > _nextChartsUpdate)
245 Log.
Debug(
"LiveTradingResultHandler.Update(): Pre-store result");
246 var chartComplete =
new Dictionary<string, Chart>();
249 foreach (var chart
in Charts)
252 var safeName = chart.Value.Name.Replace(
'/',
'-');
253 DictionarySafeAdd(chartComplete, safeName, chart.Value.Clone(),
"chartComplete");
257 var orderEvents = GetOrderEventsToStore();
260 var complete =
new LiveResultPacket(_job,
new LiveResult(
new LiveResultParameters(chartComplete, orders,
Algorithm.
Transactions.
TransactionRecord, holdings,
Algorithm.
Portfolio.
CashBook, deltaStatistics, runtimeStatistics, orderEvents, serverStatistics, state:
GetAlgorithmState())));
263 Log.
Debug(
"LiveTradingResultHandler.Update(): End-store result");
267 if (utcNow > _nextLogStoreUpdate)
270 Log.
Debug(
"LiveTradingResultHandler.Update(): Storing log...");
274 logs =
new List<LogEntry>(
LogStore);
279 _nextLogStoreUpdate = DateTime.UtcNow.AddMinutes(2);
280 Log.
Debug(
"LiveTradingResultHandler.Update(): Finished storing log");
284 if (utcNow > _nextStatisticsUpdate)
299 catch (Exception err)
301 Log.
Error(err,
"Error sending statistics:");
303 _nextStatisticsUpdate = utcNow.AddMinutes(1);
306 if (utcNow > _nextStatusUpdate)
308 var chartComplete =
new Dictionary<string, Chart>();
311 foreach (var chart
in Charts)
314 var safeName = chart.Value.Name.Replace(
'/',
'-');
315 DictionarySafeAdd(chartComplete, safeName, chart.Value.Clone(),
"chartComplete");
321 holdings.Where(pair => pair.Value.Quantity != 0).ToDictionary(pair => pair.Key, pair => pair.Value),
330 if (_currentUtcDate != utcNow.Date)
334 _currentUtcDate = utcNow.Date;
337 if (utcNow > _nextChartTrimming)
339 Log.
Debug(
"LiveTradingResultHandler.Update(): Trimming charts");
340 var timeLimitUtc = utcNow.AddDays(-2);
343 foreach (var chart
in Charts)
345 foreach (var series
in chart.Value.Series)
348 series.Value.Values =
349 (from v in series.Value.Values
350 where v.Time > timeLimitUtc
355 _nextChartTrimming = DateTime.UtcNow.AddMinutes(10);
356 Log.
Debug(
"LiveTradingResultHandler.Update(): Finished trimming charts");
359 if (utcNow > _nextInsightStoreUpdate)
363 _nextInsightStoreUpdate = DateTime.UtcNow.Add(_storeInsightPeriod);
366 catch (Exception err)
368 Log.
Error(err,
"LiveTradingResultHandler().Update(): ",
true);
383 _nextStatusUpdate = DateTime.UtcNow.AddMinutes(10);
393 if (orderEvents.Count <= 0)
398 var filename = $
"{AlgorithmId}-{utcTime:yyyy-MM-dd}-order-events.json";
401 var data = JsonConvert.SerializeObject(orderEvents, Formatting.None,
SerializerSettings);
403 File.WriteAllText(path, data);
409 private List<OrderEvent> GetOrderEventsToStore()
411 return TransactionHandler.OrderEvents.Where(orderEvent => orderEvent.UtcTime >= _currentUtcDate).ToList();
419 private void StoreStatusFile(SortedDictionary<string, string> runtimeStatistics,
420 Dictionary<string, Holding> holdings,
421 Dictionary<string, Chart> chartComplete,
422 Dictionary<string, string> algorithmState,
423 SortedDictionary<DateTime, decimal> profitLoss,
424 Dictionary<string, string> serverStatistics =
null,
429 Log.
Debug(
"LiveTradingResultHandler.Update(): status update start...");
431 if (statistics ==
null)
437 var dailySampler =
new SeriesSampler(TimeSpan.FromHours(12));
450 statistics: statistics.Summary,
451 runtimeStatistics: runtimeStatistics,
453 serverStatistics: serverStatistics,
454 state: algorithmState));
457 Log.
Debug(
"LiveTradingResultHandler.Update(): status update end.");
459 catch (Exception err)
461 Log.
Error(err,
"Error storing status update");
468 private IEnumerable<LiveResultPacket> SplitPackets(Dictionary<string, Chart> deltaCharts,
469 Dictionary<int, Order> deltaOrders,
470 Dictionary<string, Holding> holdings,
472 Dictionary<string, string> deltaStatistics,
473 SortedDictionary<string, string> runtimeStatistics,
474 Dictionary<string, string> serverStatistics,
475 List<OrderEvent> deltaOrderEvents)
478 var current =
new Dictionary<string, Chart>();
479 var chartPackets =
new List<LiveResultPacket>();
485 foreach (var deltaChart
in deltaCharts.Values)
487 current.Add(deltaChart.Name, deltaChart);
489 if (current.Count >= _streamedChartGroupSize)
495 current =
new Dictionary<string, Chart>();
496 if (chartPackets.Count * _streamedChartGroupSize >= _streamedChartLimit)
506 if (current.Count > 0)
519 ServerStatistics = serverStatistics
523 var result = packets.Concat(chartPackets);
526 if (deltaOrders.Count > 0 || deltaOrderEvents.Count > 0)
528 result = result.Concat(
new[] {
new LiveResultPacket(_job,
new LiveResult { Orders = deltaOrders, OrderEvents = deltaOrderEvents }) });
577 Log.
Debug(
"LiveTradingResultHandler.AddToLogStore(): Adding");
578 base.AddToLogStore(DateTime.Now.ToStringInvariant(
DateFormat.
UI) +
" " + message);
579 Log.
Debug(
"LiveTradingResultHandler.AddToLogStore(): Finished adding");
591 AddToLogStore(message + (!
string.IsNullOrEmpty(stacktrace) ?
": StackTrace: " + stacktrace :
string.Empty));
609 public virtual void RuntimeError(
string message,
string stacktrace =
"")
612 AddToLogStore(message + (!
string.IsNullOrEmpty(stacktrace) ?
": StackTrace: " + stacktrace :
string.Empty));
644 Log.
Debug(
"LiveTradingResultHandler.Sample(): Sampling " + chartName +
"." + seriesName);
648 if (!
Charts.TryGetValue(chartName, out var chart))
650 Charts.AddOrUpdate(chartName,
new Chart(chartName));
651 chart =
Charts[chartName];
655 if (!chart.Series.TryGetValue(seriesName, out var series))
658 chart.Series.Add(seriesName, series);
664 Log.
Debug(
"LiveTradingResultHandler.Sample(): Done sampling " + chartName +
"." + seriesName);
674 Log.
Debug(
"LiveTradingResultHandler.SampleRange(): Begin sampling");
677 foreach (var update
in updates)
681 if (!
Charts.TryGetValue(update.Name, out chart))
683 chart =
new Chart(update.Name);
684 Charts.AddOrUpdate(update.Name, chart);
690 if (series.
Values.Count > 0)
692 var thisSeries = chart.TryAddAndGetSeries(series.
Name, series, forceAddNew:
false);
696 if (dataPoint !=
null)
698 thisSeries.AddPoint(dataPoint);
704 thisSeries.Values.AddRange(series.
Values);
710 Log.
Debug(
"LiveTradingResultHandler.SampleRange(): Finished sampling");
727 var types =
new List<SecurityType>();
730 var security = kvp.Value;
732 if (!types.Contains(security.Type)) types.Add(security.Type);
739 Console.SetOut(debug);
740 Console.SetError(error);
742 UpdateAlgorithmStatus();
757 Log.
Trace($
"LiveTradingResultHandler.SendStatusUpdate(): status: '{status}'. {(string.IsNullOrEmpty(message) ? string.Empty : " " + message)}");
770 Log.
Debug(
"LiveTradingResultHandler.RuntimeStatistic(): Begin setting statistic");
779 Log.
Debug(
"LiveTradingResultHandler.RuntimeStatistic(): End setting statistic");
787 Log.
Trace(
"LiveTradingResultHandler.SendFinalResult(): Starting...");
790 var endTime = DateTime.UtcNow;
797 var charts =
new Dictionary<string, Chart>();
800 foreach (var kvp
in Charts)
802 charts.Add(kvp.Key, kvp.Value.Clone());
812 StoreStatusFile(runtime, holdings, charts, endState, profitLoss, statistics: statisticsResults);
822 StoreStatusFile(
new(),
new(),
new(), endState,
new());
833 Log.
Trace(
"LiveTradingResultHandler.SendFinalResult(): Finished storing results. Start sending...");
840 catch (Exception err)
844 Log.
Trace(
"LiveTradingResultHandler.SendFinalResult(): Ended");
853 public override string SaveLogs(
string id, List<LogEntry> logs)
857 var logLines = logs.Select(x => x.Message);
858 var filename = $
"{id}-log.txt";
860 File.AppendAllLines(path, logLines);
863 catch (Exception err)
878 Log.
Debug(
"LiveTradingResultHandler.StoreResult(): Begin store result sampling");
888 if (live.Results.OrderEvents !=
null)
893 live.Results.OrderEvents =
null;
897 var start = DateTime.UtcNow.Date;
898 var stop = start.AddDays(1);
901 Truncate(live.Results, start, stop);
903 var highResolutionCharts =
new Dictionary<string, Chart>(live.Results.Charts);
906 var minuteSampler =
new SeriesSampler(TimeSpan.FromMinutes(1));
907 var minuteCharts = minuteSampler.SampleCharts(live.Results.Charts, start, stop);
911 live.Results.Charts = minuteCharts;
915 var tenminuteSampler =
new SeriesSampler(TimeSpan.FromMinutes(10));
916 var tenminuteCharts = tenminuteSampler.SampleCharts(live.Results.Charts, start, stop);
917 lock (_intradayPortfolioState)
919 var clone = _intradayPortfolioState.Clone();
924 live.Results.Charts = tenminuteCharts;
929 live.Results.Charts = highResolutionCharts;
930 start = DateTime.UtcNow.RoundDown(TimeSpan.FromHours(1));
931 stop = DateTime.UtcNow.RoundUp(TimeSpan.FromHours(1));
933 Truncate(live.Results, start, stop);
935 foreach (var name
in live.Results.Charts.Keys)
939 Orders =
new Dictionary<int, Order>(live.Results.Orders),
940 Holdings =
new Dictionary<string, Holding>(live.Results.Holdings),
941 Charts =
new Dictionary<string, Chart> { { name, live.Results.Charts[name] } }
949 Log.
Error(
"LiveResultHandler.StoreResult(): Result Null.");
952 Log.
Debug(
"LiveTradingResultHandler.StoreResult(): End store result sampling");
954 catch (Exception err)
966 var brokerIds =
string.Empty;
968 if (order !=
null && order.BrokerId.Count > 0) brokerIds =
string.Join(
", ", order.BrokerId);
971 Log.
Trace(
"LiveTradingResultHandler.OrderEvent(): " + newEvent +
" BrokerId: " + brokerIds,
true);
974 var message =
"New Order Event: " + newEvent;
985 _cancellationTokenSource.Cancel();
1009 _cancellationTokenSource.DisposeSafely();
1016 private static void Truncate(
LiveResult result, DateTime start, DateTime stop)
1021 var charts =
new Dictionary<string, Chart>();
1022 foreach (var kvp
in result.
Charts)
1024 var chart = kvp.Value;
1025 var newChart =
new Chart(chart.Name);
1026 charts.Add(kvp.Key, newChart);
1027 foreach (var series
in chart.Series.Values)
1029 var newSeries = series.Clone(empty:
true);
1030 newSeries.Values.AddRange(series.Values.Where(chartPoint => chartPoint.Time >= start && chartPoint.Time <= stop));
1031 newChart.AddSeries(newSeries);
1036 (x.Time >= start && x.Time <= stop) ||
1037 (x.LastFillTime !=
null && x.LastFillTime >= start && x.LastFillTime <= stop) ||
1038 (x.LastUpdateTime !=
null && x.LastUpdateTime >= start && x.LastUpdateTime <= stop)
1039 ).ToDictionary(x => x.Id);
1044 private string CreateKey(
string suffix,
string dateFormat =
"yyyy-MM-dd")
1046 return $
"{AlgorithmId}-{DateTime.UtcNow.ToStringInvariant(dateFormat)}_{suffix}.json";
1056 return Uri.EscapeDataString(chartName);
1066 var time = DateTime.UtcNow;
1069 UpdatePortfolioValue(time, forceProcess);
1074 if (time > _nextPortfolioMarginUpdate || forceProcess)
1076 _nextPortfolioMarginUpdate = time.RoundDown(_samplePortfolioPeriod).Add(_samplePortfolioPeriod);
1079 lock (_intradayPortfolioState)
1081 if (_previousPortfolioMarginUpdate.Date != time.Date)
1084 _previousPortfolioMarginUpdate = time.Date;
1085 _intradayPortfolioState.Series.Clear();
1088 if (newState !=
null)
1095 if (time > _nextSample || forceProcess)
1097 Log.
Debug(
"LiveTradingResultHandler.ProcessSynchronousEvents(): Enter");
1103 UpdateBenchmarkValue(time, forceProcess);
1121 var timeout = DateTime.UtcNow.AddSeconds(1);
1128 Log.
Trace(
"LiveTradingResultHandler.ProcessSynchronousEvents(): Processing Notification...");
1133 catch (Exception err)
1136 Log.
Error(err,
"Sending notification: " + message.GetType().FullName);
1141 Log.
Debug(
"LiveTradingResultHandler.ProcessSynchronousEvents(): Exit");
1151 if (_sampleChartAlways)
1155 foreach (var securityChange
in changes.AddedSecurities)
1157 var symbol = securityChange.Symbol;
1158 if (symbol.SecurityType ==
QuantConnect.SecurityType.Base)
1165 _sampleChartAlways = symbol.SecurityType ==
QuantConnect.SecurityType.Crypto
1166 || symbol.SecurityType ==
QuantConnect.SecurityType.Forex
1168 .Any(config => config.ExtendedMarketHours || config.Resolution ==
Resolution.Daily);
1169 if (_sampleChartAlways)
1175 if (!_exchangeHours.ContainsKey(securityChange.Symbol.ID.Market))
1178 _exchangeHours[securityChange.Symbol.ID.Market] = securityChange.Exchange.Hours;
1190 UpdatePortfolioValue(time);
1191 UpdateBenchmarkValue(time);
1202 return _portfolioValue.
Value;
1213 return _benchmarkValue.
Value;
1221 [MethodImpl(MethodImplOptions.AggressiveInlining)]
1222 private bool UserExchangeIsOpen(DateTime utcDateTime)
1224 if (_sampleChartAlways || _exchangeHours.Count == 0)
1229 if (_lastChartSampleLogicCheck.Day == utcDateTime.Day
1230 && _lastChartSampleLogicCheck.Hour == utcDateTime.Hour
1231 && _lastChartSampleLogicCheck.Minute == utcDateTime.Minute)
1234 return _userExchangeIsOpen;
1236 _lastChartSampleLogicCheck = utcDateTime;
1238 foreach (var exchangeHour
in _exchangeHours.Values)
1240 if (exchangeHour.IsOpen(utcDateTime.ConvertFromUtc(exchangeHour.TimeZone),
false))
1243 _userExchangeIsOpen =
true;
1249 _userExchangeIsOpen =
false;
1253 private static void DictionarySafeAdd<T>(Dictionary<string, T> dictionary,
string key, T value,
string dictionaryName)
1255 if (!dictionary.TryAdd(key, value))
1257 Log.
Error($
"LiveTradingResultHandler.DictionarySafeAdd(): dictionary {dictionaryName} already contains key {key}");
1264 private void UpdateAlgorithmStatus()
1267 && !_cancellationTokenSource.IsCancellationRequested)
1274 Task.Delay(TimeSpan.FromMinutes(1), _cancellationTokenSource.Token).ContinueWith(_ => UpdateAlgorithmStatus());
1278 [MethodImpl(MethodImplOptions.AggressiveInlining)]
1279 private void UpdateBenchmarkValue(DateTime time,
bool force =
false)
1281 if (force || UserExchangeIsOpen(time))
1287 [MethodImpl(MethodImplOptions.AggressiveInlining)]
1288 private void UpdatePortfolioValue(DateTime time,
bool force =
false)
1290 if (force || UserExchangeIsOpen(time))
1301 var holdings =
new Dictionary<string, Holding>();
1303 foreach (var security
in securities
1305 .Where(s => s.Invested || !onlyInvested && (!s.IsInternalFeed() && s.IsTradable && !s.Symbol.IsCanonical()
1308 || s.Symbol.SecurityType ==
QuantConnect.SecurityType.Future && (s.IsTradable || s.Symbol.IsCanonical() && subscriptionDataConfigService.GetSubscriptionDataConfigs(s.Symbol).Any())))
1309 .OrderBy(x => x.Symbol.Value))
1311 DictionarySafeAdd(holdings, security.Symbol.ID.ToString(),
new Holding(security),
"holdings");