18 using System.Collections.Generic;
20 using System.Threading;
49 private readonly
object _lock;
50 private readonly
bool _liveMode;
51 private bool _cancelRequested;
52 private CancellationTokenSource _cancellationTokenSource;
101 CreateTokenBucket(job?.
Controls?.TrainingLimits),
102 TimeSpan.FromMinutes(
Config.
GetDouble(
"algorithm-manager-time-loop-maximum", 20))
122 _algorithm = algorithm;
124 var token = cancellationTokenSource.Token;
125 _cancellationTokenSource = cancellationTokenSource;
128 var methodInvokers =
new Dictionary<Type, MethodInvoker>();
129 var marginCallFrequency = TimeSpan.FromMinutes(5);
130 var nextMarginCallTime = DateTime.MinValue;
134 var pendingDelistings =
new List<Delisting>();
135 var splitWarnings =
new List<Split>();
141 foreach (var config
in algorithm.SubscriptionManager.Subscriptions)
144 if (config.IsCustomData)
147 var genericMethod = (algorithm.GetType()).GetMethod(
"OnData",
new[] { config.Type });
150 if (methodInvokers.ContainsKey(config.Type))
continue;
152 if (genericMethod !=
null)
154 methodInvokers.Add(config.Type, genericMethod.DelegateForCallMethod());
160 algorithm.Schedule.On(
"Daily Sampling", algorithm.Schedule.DateRules.EveryDay(),
161 algorithm.Schedule.TimeRules.Midnight, () =>
163 results.Sample(algorithm.UtcTime);
167 Log.
Trace($
"AlgorithmManager.Run(): Begin DataStream - Start: {algorithm.StartDate} Stop: {algorithm.EndDate} Time: {algorithm.Time} Warmup: {algorithm.IsWarmingUp}");
168 foreach (var timeSlice
in Stream(algorithm, synchronizer, results, token))
176 Log.
Error($
"AlgorithmManager.Run(): Algorithm state changed to {_algorithm.Status} at {timeSlice.Time.ToStringInvariant()}");
181 if (token.IsCancellationRequested)
183 Log.
Error($
"AlgorithmManager.Run(): CancellationRequestion at {timeSlice.Time.ToStringInvariant()}");
188 leanManager.Update();
190 time = timeSlice.Time;
193 if (backtestMode && algorithm.Portfolio.TotalPortfolioValue <= 0)
195 var logMessage =
"AlgorithmManager.Run(): Portfolio value is less than or equal to zero, stopping algorithm.";
197 results.SystemDebugMessage(logMessage);
204 realtime.ScanPastEvents(time);
212 var pastConsolidatorsScanTime = _liveMode ? time.RoundDown(
Time.
OneSecond) : time;
213 algorithm.SubscriptionManager.ScanPastConsolidators(pastConsolidatorsScanTime, algorithm);
216 algorithm.SetDateTime(time);
219 if (timeSlice.IsTimePulse)
225 algorithm.SetCurrentSlice(timeSlice.Slice);
227 if (timeSlice.Slice.SymbolChangedEvents.Count != 0)
231 algorithm.OnSymbolChangedEvents(timeSlice.Slice.SymbolChangedEvents);
233 catch (Exception err)
235 algorithm.SetRuntimeError(err,
"OnSymbolChangedEvents");
239 foreach (var symbol
in timeSlice.Slice.SymbolChangedEvents.Keys)
242 foreach (var ticket
in transactions.GetOpenOrderTickets(x => x.Symbol == symbol))
244 ticket.Cancel(
"Open order cancelled on symbol changed event");
251 algorithm.ProcessSecurityChanges(timeSlice.SecurityChanges);
253 leanManager.OnSecuritiesChanged(timeSlice.SecurityChanges);
254 realtime.OnSecuritiesChanged(timeSlice.SecurityChanges);
255 results.OnSecuritiesChanged(timeSlice.SecurityChanges);
259 foreach (var update
in timeSlice.SecuritiesUpdateData)
261 var security = update.Target;
263 security.Update(update.Data, update.DataType, update.ContainsFillForwardData);
266 algorithm.TradeBuilder.SetMarketPrice(security.Symbol, security.Price);
270 if (time >= nextSecurityModelScan)
272 foreach (var security
in algorithm.Securities.Values)
283 if (timeSlice.UniverseData.Count > 0)
285 foreach (var dataCollection
in timeSlice.UniverseData.Values)
287 if (!dataCollection.ShouldCacheToSecurity())
continue;
289 foreach (var data
in dataCollection.Data)
291 if (algorithm.Securities.TryGetValue(data.Symbol, out var security))
293 security.Cache.StoreData(
new[] { data }, data.GetType());
300 foreach (var cash
in algorithm.Portfolio.CashBook.Values.Where(x => x.CurrencyConversion !=
null))
306 algorithm.Portfolio.InvalidateTotalPortfolioValue();
309 transactions.ProcessSynchronousEvents();
312 realtime.SetTime(timeSlice.Time);
315 ProcessSplitSymbols(algorithm, splitWarnings, pendingDelistings);
320 Log.
Error($
"AlgorithmManager.Run(): Algorithm state changed to {_algorithm.Status} at {timeSlice.Time.ToStringInvariant()}");
323 if (algorithm.RunTimeError !=
null)
325 Log.
Error($
"AlgorithmManager.Run(): Stopping, encountered a runtime error at {algorithm.UtcTime} UTC.");
330 if (time >= nextMarginCallTime || (_liveMode && nextMarginCallTime > DateTime.UtcNow))
333 bool issueMarginCallWarning;
334 var marginCallOrders = algorithm.Portfolio.MarginCallModel.GetMarginCallOrders(out issueMarginCallWarning);
335 if (marginCallOrders.Count != 0)
337 var executingMarginCall =
false;
341 algorithm.OnMarginCall(marginCallOrders);
343 executingMarginCall =
true;
346 var executedTickets = algorithm.Portfolio.MarginCallModel.ExecuteMarginCall(marginCallOrders);
347 foreach (var ticket
in executedTickets)
349 algorithm.Error($
"{algorithm.Time.ToStringInvariant()} - Executed MarginCallOrder: {ticket.Symbol} - " +
350 $
"Quantity: {ticket.Quantity.ToStringInvariant()} @ {ticket.AverageFillPrice.ToStringInvariant()}"
354 catch (Exception err)
356 algorithm.SetRuntimeError(err, executingMarginCall ?
"Portfolio.MarginCallModel.ExecuteMarginCall" :
"OnMarginCall");
361 else if (issueMarginCallWarning)
365 algorithm.OnMarginCallWarning();
367 catch (Exception err)
369 algorithm.SetRuntimeError(err,
"OnMarginCallWarning");
374 nextMarginCallTime = time + marginCallFrequency;
382 var algorithmSecurityChanges =
new SecurityChanges(timeSlice.SecurityChanges)
385 FilterCustomSecurities =
true,
387 FilterInternalSecurities =
true
390 algorithm.OnSecuritiesChanged(algorithmSecurityChanges);
391 algorithm.OnFrameworkSecuritiesChanged(algorithmSecurityChanges);
393 catch (Exception err)
395 algorithm.SetRuntimeError(err,
"OnSecuritiesChanged");
409 if (timeSlice.ConsolidatorUpdateData.Count > 0)
411 var timeKeeper = algorithm.TimeKeeper;
412 foreach (var update
in timeSlice.ConsolidatorUpdateData)
414 var localTime = timeKeeper.GetLocalTimeKeeper(update.Target.ExchangeTimeZone).LocalTime;
415 var consolidators = update.Target.Consolidators;
416 foreach (var consolidator
in consolidators)
418 foreach (var dataPoint
in update.Data)
420 consolidator.Update(dataPoint);
424 consolidator.Scan(localTime);
429 catch (Exception err)
431 algorithm.SetRuntimeError(err,
"Consolidators update");
436 foreach (var update
in timeSlice.CustomData)
438 MethodInvoker methodInvoker;
439 if (!methodInvokers.TryGetValue(update.DataType, out methodInvoker))
446 foreach (var dataPoint
in update.Data)
448 if (update.DataType.IsInstanceOfType(dataPoint))
450 methodInvoker(algorithm, dataPoint);
454 catch (Exception err)
456 algorithm.SetRuntimeError(err,
"Custom Data");
463 if (timeSlice.Slice.Splits.Count != 0)
465 algorithm.OnSplits(timeSlice.Slice.Splits);
468 catch (Exception err)
470 algorithm.SetRuntimeError(err,
"OnSplits");
476 if (timeSlice.Slice.Dividends.Count != 0)
478 algorithm.OnDividends(timeSlice.Slice.Dividends);
481 catch (Exception err)
483 algorithm.SetRuntimeError(err,
"OnDividends");
489 if (timeSlice.Slice.Delistings.Count != 0)
491 algorithm.OnDelistings(timeSlice.Slice.Delistings);
494 catch (Exception err)
496 algorithm.SetRuntimeError(err,
"OnDelistings");
501 if (!algorithm.LiveMode)
504 foreach (var delisting
in timeSlice.Slice.Delistings.Values)
509 pendingDelistings.Add(delisting);
514 var index = pendingDelistings.FindIndex(x => x.Symbol == delisting.Symbol);
517 pendingDelistings.RemoveAt(index);
524 HandleSplitSymbols(timeSlice.Slice.Splits, splitWarnings);
528 if (timeSlice.Slice.HasData)
531 algorithm.OnData(algorithm.CurrentSlice);
535 algorithm.OnFrameworkData(timeSlice.Slice);
537 catch (Exception err)
539 algorithm.SetRuntimeError(err,
"OnData");
545 transactions.ProcessSynchronousEvents();
548 results.ProcessSynchronousEvents();
551 algorithm.OnEndOfTimeStep();
559 Log.
Trace(
"AlgorithmManager.Run(): Firing On End Of Algorithm...");
562 algorithm.OnEndOfAlgorithm();
564 catch (Exception err)
566 algorithm.SetRuntimeError(err,
"OnEndOfAlgorithm");
571 results.ProcessSynchronousEvents(forceProcess:
true);
576 Log.
Trace(
"AlgorithmManager.Run(): Liquidating algorithm holdings...");
577 algorithm.Liquidate();
578 results.LogMessage(
"Algorithm Liquidated");
585 Log.
Trace(
"AlgorithmManager.Run(): Stopping algorithm...");
586 results.LogMessage(
"Algorithm Stopped");
593 Log.
Trace(
"AlgorithmManager.Run(): Deleting algorithm...");
594 results.DebugMessage(
"Algorithm Id:(" + job.AlgorithmId +
") Deleted by request.");
603 results.Sample(time);
622 if (_cancellationTokenSource !=
null && !_cancellationTokenSource.IsCancellationRequested && !_cancelRequested)
626 _cancelRequested =
true;
628 _cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(5));
632 _cancelRequested =
true;
634 _cancellationTokenSource.CancelAfter(TimeSpan.FromMinutes(1));
642 var nextWarmupStatusTime = DateTime.MinValue;
644 var warmingUpPercent = 0;
647 nextWarmupStatusTime = DateTime.UtcNow.AddSeconds(1);
648 algorithm.
Debug(
"Algorithm starting warm up...");
660 var startTimeTicks = algorithm.
UtcTime.Ticks;
666 warmupEndTicks = DateTime.UtcNow.Ticks;
670 foreach (var timeSlice
in synchronizer.
StreamData(cancellationToken))
674 var now = DateTime.UtcNow;
675 if (now > nextWarmupStatusTime)
679 nextWarmupStatusTime = now.AddSeconds(2);
680 var newPercent = (int)(100 * (timeSlice.Time.Ticks - startTimeTicks) / (double)(warmupEndTicks - startTimeTicks));
682 if (newPercent != warmingUpPercent)
684 warmingUpPercent = newPercent;
685 algorithm.
Debug($
"Processing algorithm warm-up request {warmingUpPercent}%...");
696 algorithm.
Debug(
"Algorithm finished warming up.");
699 yield
return timeSlice;
711 Log.
Trace(
"ProcessVolatilityHistoryRequirements(): Updating volatility models with historical data...");
719 Log.
Trace(
"ProcessVolatilityHistoryRequirements(): finished.");
727 foreach (var split
in timeSlice.Slice.Splits.Values)
732 if (split.Type !=
SplitType.SplitOccurred)
740 Log.
Trace($
"AlgorithmManager.Run(): {algorithm.Time}: Skip Split during live warmup: {split}");
746 Log.
Debug($
"AlgorithmManager.Run(): {algorithm.Time}: Applying Split for {split.Symbol}");
752 Log.
Trace($
"AlgorithmManager.Run(): {algorithm.Time}: Pre-Split for {split}. Security Price: {security.Price} Holdings: {security.Holdings.Quantity}");
757 .DataNormalizationMode();
766 ApplySplitOrDividendToVolatilityModel(algorithm, security, liveMode, mode);
768 if (liveMode && security !=
null)
770 Log.
Trace($
"AlgorithmManager.Run(): {algorithm.Time}: Post-Split for {split}. Security Price: {security.Price} Holdings: {security.Holdings.Quantity}");
781 catch (Exception err)
783 algorithm.SetRuntimeError(err,
"Split event");
794 foreach (var dividend
in timeSlice.Slice.Dividends.Values)
799 Log.
Trace($
"AlgorithmManager.Run(): {algorithm.Time}: Skip Dividend during live warmup: {dividend}");
805 Log.
Debug($
"AlgorithmManager.Run(): {algorithm.Time}: Applying Dividend: {dividend}");
811 Log.
Trace($
"AlgorithmManager.Run(): {algorithm.Time}: Pre-Dividend: {dividend}. " +
812 $
"Security Holdings: {security.Holdings.Quantity} Account Currency Holdings: " +
813 $
"{algorithm.Portfolio.CashBook[algorithm.AccountCurrency].Amount}");
818 .DataNormalizationMode();
824 ApplySplitOrDividendToVolatilityModel(algorithm, security, liveMode, mode);
826 if (liveMode && security !=
null)
828 Log.
Trace($
"AlgorithmManager.Run(): {algorithm.Time}: Post-Dividend: {dividend}. Security " +
829 $
"Holdings: {security.Holdings.Quantity} Account Currency Holdings: " +
830 $
"{algorithm.Portfolio.CashBook[algorithm.AccountCurrency].Amount}");
838 private void HandleSplitSymbols(
Splits newSplits, List<Split> splitWarnings)
840 foreach (var split
in newSplits.
Values)
846 Log.
Debug($
"AlgorithmManager.HandleSplitSymbols(): {_algorithm.Time} - Security split occurred: Split Factor: {split} Reference Price: {split.ReferencePrice}");
853 Log.
Debug($
"AlgorithmManager.HandleSplitSymbols(): {_algorithm.Time} - Security split warning: {split}");
856 if (!splitWarnings.Any(x => x.Symbol == split.Symbol && x.Type ==
SplitType.Warning))
858 splitWarnings.Add(split);
866 private void ProcessSplitSymbols(
IAlgorithm algorithm, List<Split> splitWarnings, List<Delisting> pendingDelistings)
872 for (
int i = splitWarnings.Count - 1; i >= 0; i--)
874 var split = splitWarnings[i];
875 var security = algorithm.
Securities[split.Symbol];
877 if (!security.IsTradable
880 Log.
Debug($
"AlgorithmManager.ProcessSplitSymbols(): {_algorithm.Time} - Removing split warning for {security.Symbol}");
883 splitWarnings.RemoveAt(i);
891 var nextMarketClose = security.Exchange.Hours.GetNextMarketClose(security.LocalTime,
false);
897 if (configs.Count == 0)
901 $
"AlgorithmManager.ProcessSplitSymbols(): {_algorithm.Time} - No subscriptions found for {security.Symbol}" +
902 $
", IsTradable: {security.IsTradable}" +
903 $
", Active: {algorithm.UniverseManager.ActiveSecurities.Keys.Contains(split.Symbol)}");
907 .RoundDownInTimeZone(configs.GetHighestResolution().ToTimeSpan(), security.Exchange.TimeZone, configs.First().DataTimeZone);
910 if (security.LocalTime < latestMarketOnCloseTimeRoundedDownByResolution)
continue;
914 potentialDerivate.Symbol.SecurityType.IsOption() &&
915 potentialDerivate.Symbol.Underlying == security.Symbol &&
916 !potentialDerivate.Symbol.Underlying.IsCanonical() &&
917 potentialDerivate.HoldStock
920 foreach (var derivative
in derivatives)
922 var optionContractSymbol = derivative.Symbol;
923 var optionContractSecurity = (
Option)derivative;
925 if (pendingDelistings.Any(x => x.Symbol == optionContractSymbol
926 && x.Time.Date == optionContractSecurity.LocalTime.Date))
933 algorithm.
Transactions.
CancelOpenOrders(optionContractSymbol,
"Canceled due to impending split. Separate MarketOnClose order submitted to liquidate position.");
936 -optionContractSecurity.Holdings.Quantity, 0, 0, algorithm.
UtcTime,
937 "Liquidated due to impending split. Option splits are not currently supported."
944 optionContractSecurity.IsTradable =
false;
946 algorithm.
Debug($
"MarketOnClose order submitted for option contract '{optionContractSymbol}' due to impending {split.Symbol.Value} split event. "
947 +
"Option splits are not currently supported.");
951 splitWarnings.RemoveAt(i);
958 private static void ApplySplitOrDividendToVolatilityModel(
IAlgorithm algorithm,
Security security,
bool liveMode,
964 algorithm.
TimeZone, liveMode, dataNormalizationMode);
974 if (controls ==
null)
982 Log.
Trace(
"AlgorithmManager.CreateTokenBucket(): Initializing LeakyBucket: " +
983 $
"Capacity: {controls.Capacity} " +
984 $
"RefillAmount: {controls.RefillAmount} " +
985 $
"TimeInterval: {controls.TimeIntervalMinutes}"