18 using System.Collections.Generic;
20 using System.Threading;
49 private readonly
object _lock;
50 private readonly
bool _liveMode;
51 private bool _cancelRequested;
52 private CancellationTokenSource _cancellationTokenSource;
101 CreateTokenBucket(job?.
Controls?.TrainingLimits),
102 TimeSpan.FromMinutes(
Config.
GetDouble(
"algorithm-manager-time-loop-maximum", 20))
122 _algorithm = algorithm;
124 var token = cancellationTokenSource.Token;
125 _cancellationTokenSource = cancellationTokenSource;
128 var methodInvokers =
new Dictionary<Type, MethodInvoker>();
129 var marginCallFrequency = TimeSpan.FromMinutes(5);
130 var nextMarginCallTime = DateTime.MinValue;
134 var pendingDelistings =
new List<Delisting>();
135 var splitWarnings =
new List<Split>();
141 foreach (var config
in algorithm.SubscriptionManager.Subscriptions)
144 if (config.IsCustomData)
147 var genericMethod = (algorithm.GetType()).GetMethod(
"OnData",
new[] { config.Type });
150 if (methodInvokers.ContainsKey(config.Type))
continue;
152 if (genericMethod !=
null)
154 methodInvokers.Add(config.Type, genericMethod.DelegateForCallMethod());
160 algorithm.Schedule.On(
"Daily Sampling", algorithm.Schedule.DateRules.EveryDay(),
161 algorithm.Schedule.TimeRules.Midnight, () =>
163 results.Sample(algorithm.UtcTime);
167 Log.
Trace($
"AlgorithmManager.Run(): Begin DataStream - Start: {algorithm.StartDate} Stop: {algorithm.EndDate} Time: {algorithm.Time} Warmup: {algorithm.IsWarmingUp}");
168 foreach (var timeSlice
in Stream(algorithm, synchronizer, results, token))
176 Log.
Error($
"AlgorithmManager.Run(): Algorithm state changed to {_algorithm.Status} at {timeSlice.Time.ToStringInvariant()}");
181 if (token.IsCancellationRequested)
183 Log.
Error($
"AlgorithmManager.Run(): CancellationRequestion at {timeSlice.Time.ToStringInvariant()}");
188 leanManager.Update();
190 time = timeSlice.Time;
193 if (backtestMode && algorithm.Portfolio.TotalPortfolioValue <= 0)
195 var logMessage =
"AlgorithmManager.Run(): Portfolio value is less than or equal to zero, stopping algorithm.";
197 results.SystemDebugMessage(logMessage);
204 realtime.ScanPastEvents(time);
207 algorithm.SubscriptionManager.ScanPastConsolidators(time, algorithm);
210 algorithm.SetDateTime(time);
213 if (timeSlice.IsTimePulse)
219 algorithm.SetCurrentSlice(timeSlice.Slice);
221 if (timeSlice.Slice.SymbolChangedEvents.Count != 0)
225 algorithm.OnSymbolChangedEvents(timeSlice.Slice.SymbolChangedEvents);
227 catch (Exception err)
229 algorithm.SetRuntimeError(err,
"OnSymbolChangedEvents");
233 foreach (var symbol
in timeSlice.Slice.SymbolChangedEvents.Keys)
236 foreach (var ticket
in transactions.GetOpenOrderTickets(x => x.Symbol == symbol))
238 ticket.Cancel(
"Open order cancelled on symbol changed event");
245 algorithm.ProcessSecurityChanges(timeSlice.SecurityChanges);
247 leanManager.OnSecuritiesChanged(timeSlice.SecurityChanges);
248 realtime.OnSecuritiesChanged(timeSlice.SecurityChanges);
249 results.OnSecuritiesChanged(timeSlice.SecurityChanges);
253 foreach (var update
in timeSlice.SecuritiesUpdateData)
255 var security = update.Target;
257 security.Update(update.Data, update.DataType, update.ContainsFillForwardData);
260 algorithm.TradeBuilder.SetMarketPrice(security.Symbol, security.Price);
264 if (time >= nextSecurityModelScan)
266 foreach (var security
in algorithm.Securities.Values)
277 if (timeSlice.UniverseData.Count > 0)
279 foreach (var dataCollection
in timeSlice.UniverseData.Values)
281 if (!dataCollection.ShouldCacheToSecurity())
continue;
283 foreach (var data
in dataCollection.Data)
285 if (algorithm.Securities.TryGetValue(data.Symbol, out var security))
287 security.Cache.StoreData(
new[] { data }, data.GetType());
294 foreach (var cash
in algorithm.Portfolio.CashBook.Values.Where(x => x.CurrencyConversion !=
null))
300 algorithm.Portfolio.InvalidateTotalPortfolioValue();
303 transactions.ProcessSynchronousEvents();
306 realtime.SetTime(timeSlice.Time);
309 ProcessSplitSymbols(algorithm, splitWarnings, pendingDelistings);
314 Log.
Error($
"AlgorithmManager.Run(): Algorithm state changed to {_algorithm.Status} at {timeSlice.Time.ToStringInvariant()}");
317 if (algorithm.RunTimeError !=
null)
319 Log.
Error($
"AlgorithmManager.Run(): Stopping, encountered a runtime error at {algorithm.UtcTime} UTC.");
324 if (time >= nextMarginCallTime || (_liveMode && nextMarginCallTime > DateTime.UtcNow))
327 bool issueMarginCallWarning;
328 var marginCallOrders = algorithm.Portfolio.MarginCallModel.GetMarginCallOrders(out issueMarginCallWarning);
329 if (marginCallOrders.Count != 0)
331 var executingMarginCall =
false;
335 algorithm.OnMarginCall(marginCallOrders);
337 executingMarginCall =
true;
340 var executedTickets = algorithm.Portfolio.MarginCallModel.ExecuteMarginCall(marginCallOrders);
341 foreach (var ticket
in executedTickets)
343 algorithm.Error($
"{algorithm.Time.ToStringInvariant()} - Executed MarginCallOrder: {ticket.Symbol} - " +
344 $
"Quantity: {ticket.Quantity.ToStringInvariant()} @ {ticket.AverageFillPrice.ToStringInvariant()}"
348 catch (Exception err)
350 algorithm.SetRuntimeError(err, executingMarginCall ?
"Portfolio.MarginCallModel.ExecuteMarginCall" :
"OnMarginCall");
355 else if (issueMarginCallWarning)
359 algorithm.OnMarginCallWarning();
361 catch (Exception err)
363 algorithm.SetRuntimeError(err,
"OnMarginCallWarning");
368 nextMarginCallTime = time + marginCallFrequency;
376 var algorithmSecurityChanges =
new SecurityChanges(timeSlice.SecurityChanges)
379 FilterCustomSecurities =
true,
381 FilterInternalSecurities =
true
384 algorithm.OnSecuritiesChanged(algorithmSecurityChanges);
385 algorithm.OnFrameworkSecuritiesChanged(algorithmSecurityChanges);
387 catch (Exception err)
389 algorithm.SetRuntimeError(err,
"OnSecuritiesChanged");
403 if (timeSlice.ConsolidatorUpdateData.Count > 0)
405 var timeKeeper = algorithm.TimeKeeper;
406 foreach (var update
in timeSlice.ConsolidatorUpdateData)
408 var localTime = timeKeeper.GetLocalTimeKeeper(update.Target.ExchangeTimeZone).LocalTime;
409 var consolidators = update.Target.Consolidators;
410 foreach (var consolidator
in consolidators)
412 foreach (var dataPoint
in update.Data)
414 consolidator.Update(dataPoint);
418 consolidator.Scan(localTime);
423 catch (Exception err)
425 algorithm.SetRuntimeError(err,
"Consolidators update");
430 foreach (var update
in timeSlice.CustomData)
432 MethodInvoker methodInvoker;
433 if (!methodInvokers.TryGetValue(update.DataType, out methodInvoker))
440 foreach (var dataPoint
in update.Data)
442 if (update.DataType.IsInstanceOfType(dataPoint))
444 methodInvoker(algorithm, dataPoint);
448 catch (Exception err)
450 algorithm.SetRuntimeError(err,
"Custom Data");
457 if (timeSlice.Slice.Splits.Count != 0)
459 algorithm.OnSplits(timeSlice.Slice.Splits);
462 catch (Exception err)
464 algorithm.SetRuntimeError(err,
"OnSplits");
470 if (timeSlice.Slice.Dividends.Count != 0)
472 algorithm.OnDividends(timeSlice.Slice.Dividends);
475 catch (Exception err)
477 algorithm.SetRuntimeError(err,
"OnDividends");
483 if (timeSlice.Slice.Delistings.Count != 0)
485 algorithm.OnDelistings(timeSlice.Slice.Delistings);
488 catch (Exception err)
490 algorithm.SetRuntimeError(err,
"OnDelistings");
495 if (!algorithm.LiveMode)
498 foreach (var delisting
in timeSlice.Slice.Delistings.Values)
503 pendingDelistings.Add(delisting);
508 var index = pendingDelistings.FindIndex(x => x.Symbol == delisting.Symbol);
511 pendingDelistings.RemoveAt(index);
518 HandleSplitSymbols(timeSlice.Slice.Splits, splitWarnings);
522 if (timeSlice.Slice.HasData)
525 algorithm.OnData(algorithm.CurrentSlice);
529 algorithm.OnFrameworkData(timeSlice.Slice);
531 catch (Exception err)
533 algorithm.SetRuntimeError(err,
"OnData");
539 transactions.ProcessSynchronousEvents();
542 results.ProcessSynchronousEvents();
545 algorithm.OnEndOfTimeStep();
553 Log.
Trace(
"AlgorithmManager.Run(): Firing On End Of Algorithm...");
556 algorithm.OnEndOfAlgorithm();
558 catch (Exception err)
560 algorithm.SetRuntimeError(err,
"OnEndOfAlgorithm");
565 results.ProcessSynchronousEvents(forceProcess:
true);
570 Log.
Trace(
"AlgorithmManager.Run(): Liquidating algorithm holdings...");
571 algorithm.Liquidate();
572 results.LogMessage(
"Algorithm Liquidated");
579 Log.
Trace(
"AlgorithmManager.Run(): Stopping algorithm...");
580 results.LogMessage(
"Algorithm Stopped");
587 Log.
Trace(
"AlgorithmManager.Run(): Deleting algorithm...");
588 results.DebugMessage(
"Algorithm Id:(" + job.AlgorithmId +
") Deleted by request.");
597 results.Sample(time);
616 if (_cancellationTokenSource !=
null && !_cancellationTokenSource.IsCancellationRequested && !_cancelRequested)
620 _cancelRequested =
true;
622 _cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(5));
626 _cancelRequested =
true;
628 _cancellationTokenSource.CancelAfter(TimeSpan.FromMinutes(1));
636 var nextWarmupStatusTime = DateTime.MinValue;
638 var warmingUpPercent = 0;
641 nextWarmupStatusTime = DateTime.UtcNow.AddSeconds(1);
642 algorithm.
Debug(
"Algorithm starting warm up...");
654 var startTimeTicks = algorithm.
UtcTime.Ticks;
660 warmupEndTicks = DateTime.UtcNow.Ticks;
664 foreach (var timeSlice
in synchronizer.
StreamData(cancellationToken))
668 var now = DateTime.UtcNow;
669 if (now > nextWarmupStatusTime)
673 nextWarmupStatusTime = now.AddSeconds(2);
674 var newPercent = (int)(100 * (timeSlice.Time.Ticks - startTimeTicks) / (double)(warmupEndTicks - startTimeTicks));
676 if (newPercent != warmingUpPercent)
678 warmingUpPercent = newPercent;
679 algorithm.
Debug($
"Processing algorithm warm-up request {warmingUpPercent}%...");
690 algorithm.
Debug(
"Algorithm finished warming up.");
693 yield
return timeSlice;
705 Log.
Trace(
"ProcessVolatilityHistoryRequirements(): Updating volatility models with historical data...");
713 Log.
Trace(
"ProcessVolatilityHistoryRequirements(): finished.");
721 foreach (var split
in timeSlice.Slice.Splits.Values)
726 if (split.Type !=
SplitType.SplitOccurred)
734 Log.
Trace($
"AlgorithmManager.Run(): {algorithm.Time}: Skip Split during live warmup: {split}");
740 Log.
Debug($
"AlgorithmManager.Run(): {algorithm.Time}: Applying Split for {split.Symbol}");
746 Log.
Trace($
"AlgorithmManager.Run(): {algorithm.Time}: Pre-Split for {split}. Security Price: {security.Price} Holdings: {security.Holdings.Quantity}");
751 .DataNormalizationMode();
760 ApplySplitOrDividendToVolatilityModel(algorithm, security, liveMode, mode);
762 if (liveMode && security !=
null)
764 Log.
Trace($
"AlgorithmManager.Run(): {algorithm.Time}: Post-Split for {split}. Security Price: {security.Price} Holdings: {security.Holdings.Quantity}");
775 catch (Exception err)
777 algorithm.SetRuntimeError(err,
"Split event");
788 foreach (var dividend
in timeSlice.Slice.Dividends.Values)
793 Log.
Trace($
"AlgorithmManager.Run(): {algorithm.Time}: Skip Dividend during live warmup: {dividend}");
799 Log.
Debug($
"AlgorithmManager.Run(): {algorithm.Time}: Applying Dividend: {dividend}");
805 Log.
Trace($
"AlgorithmManager.Run(): {algorithm.Time}: Pre-Dividend: {dividend}. " +
806 $
"Security Holdings: {security.Holdings.Quantity} Account Currency Holdings: " +
807 $
"{algorithm.Portfolio.CashBook[algorithm.AccountCurrency].Amount}");
812 .DataNormalizationMode();
818 ApplySplitOrDividendToVolatilityModel(algorithm, security, liveMode, mode);
820 if (liveMode && security !=
null)
822 Log.
Trace($
"AlgorithmManager.Run(): {algorithm.Time}: Post-Dividend: {dividend}. Security " +
823 $
"Holdings: {security.Holdings.Quantity} Account Currency Holdings: " +
824 $
"{algorithm.Portfolio.CashBook[algorithm.AccountCurrency].Amount}");
832 private void HandleSplitSymbols(
Splits newSplits, List<Split> splitWarnings)
834 foreach (var split
in newSplits.
Values)
840 Log.
Debug($
"AlgorithmManager.HandleSplitSymbols(): {_algorithm.Time} - Security split occurred: Split Factor: {split} Reference Price: {split.ReferencePrice}");
847 Log.
Debug($
"AlgorithmManager.HandleSplitSymbols(): {_algorithm.Time} - Security split warning: {split}");
850 if (!splitWarnings.Any(x => x.Symbol == split.Symbol && x.Type ==
SplitType.Warning))
852 splitWarnings.Add(split);
860 private void ProcessSplitSymbols(
IAlgorithm algorithm, List<Split> splitWarnings, List<Delisting> pendingDelistings)
866 for (
int i = splitWarnings.Count - 1; i >= 0; i--)
868 var split = splitWarnings[i];
869 var security = algorithm.
Securities[split.Symbol];
871 if (!security.IsTradable
874 Log.
Debug($
"AlgorithmManager.ProcessSplitSymbols(): {_algorithm.Time} - Removing split warning for {security.Symbol}");
877 splitWarnings.RemoveAt(i);
885 var nextMarketClose = security.Exchange.Hours.GetNextMarketClose(security.LocalTime,
false);
891 if (configs.Count == 0)
895 $
"AlgorithmManager.ProcessSplitSymbols(): {_algorithm.Time} - No subscriptions found for {security.Symbol}" +
896 $
", IsTradable: {security.IsTradable}" +
897 $
", Active: {algorithm.UniverseManager.ActiveSecurities.Keys.Contains(split.Symbol)}");
901 .RoundDownInTimeZone(configs.GetHighestResolution().ToTimeSpan(), security.Exchange.TimeZone, configs.First().DataTimeZone);
904 if (security.LocalTime < latestMarketOnCloseTimeRoundedDownByResolution)
continue;
908 potentialDerivate.Symbol.SecurityType.IsOption() &&
909 potentialDerivate.Symbol.Underlying == security.Symbol &&
910 !potentialDerivate.Symbol.Underlying.IsCanonical() &&
911 potentialDerivate.HoldStock
914 foreach (var derivative
in derivatives)
916 var optionContractSymbol = derivative.Symbol;
917 var optionContractSecurity = (
Option)derivative;
919 if (pendingDelistings.Any(x => x.Symbol == optionContractSymbol
920 && x.Time.Date == optionContractSecurity.LocalTime.Date))
927 algorithm.
Transactions.
CancelOpenOrders(optionContractSymbol,
"Canceled due to impending split. Separate MarketOnClose order submitted to liquidate position.");
930 -optionContractSecurity.Holdings.Quantity, 0, 0, algorithm.
UtcTime,
931 "Liquidated due to impending split. Option splits are not currently supported."
938 optionContractSecurity.IsTradable =
false;
940 algorithm.
Debug($
"MarketOnClose order submitted for option contract '{optionContractSymbol}' due to impending {split.Symbol.Value} split event. "
941 +
"Option splits are not currently supported.");
945 splitWarnings.RemoveAt(i);
952 private static void ApplySplitOrDividendToVolatilityModel(
IAlgorithm algorithm,
Security security,
bool liveMode,
958 algorithm.
TimeZone, liveMode, dataNormalizationMode);
968 if (controls ==
null)
976 Log.
Trace(
"AlgorithmManager.CreateTokenBucket(): Initializing LeakyBucket: " +
977 $
"Capacity: {controls.Capacity} " +
978 $
"RefillAmount: {controls.RefillAmount} " +
979 $
"TimeInterval: {controls.TimeIntervalMinutes}"