Lean  $LEAN_TAG$
AlgorithmManager.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Collections.Generic;
19 using System.Linq;
20 using System.Threading;
21 using Fasterflect;
24 using QuantConnect.Data;
33 using QuantConnect.Logging;
34 using QuantConnect.Orders;
35 using QuantConnect.Packets;
40 
42 {
43  /// <summary>
44  /// Algorithm manager class executes the algorithm and generates and passes through the algorithm events.
45  /// </summary>
46  public class AlgorithmManager
47  {
48  private IAlgorithm _algorithm;
49  private readonly object _lock;
50  private readonly bool _liveMode;
51  private bool _cancelRequested;
52  private CancellationTokenSource _cancellationTokenSource;
53 
54  /// <summary>
55  /// Publicly accessible algorithm status
56  /// </summary>
57  public AlgorithmStatus State => _algorithm?.Status ?? AlgorithmStatus.Running;
58 
59  /// <summary>
60  /// Public access to the currently running algorithm id.
61  /// </summary>
62  public string AlgorithmId { get; private set; }
63 
64  /// <summary>
65  /// Provides the isolator with a function for verifying that we're not spending too much time in each
66  /// algorithm manager time loop
67  /// </summary>
69 
70  /// <summary>
71  /// Quit state flag for the running algorithm. When true the user has requested the backtest stops through a Quit() method.
72  /// </summary>
73  /// <seealso cref="QCAlgorithm.Quit(String)"/>
74  public bool QuitState => State == AlgorithmStatus.Deleted;
75 
76  /// <summary>
77  /// Gets the number of data points processed per second
78  /// </summary>
79  public long DataPoints { get; private set; }
80 
81  /// <summary>
82  /// Gets the number of data points of algorithm history provider
83  /// </summary>
85 
86  /// <summary>
87  /// Initializes a new instance of the <see cref="AlgorithmManager"/> class
88  /// </summary>
89  /// <param name="liveMode">True if we're running in live mode, false for backtest mode</param>
90  /// <param name="job">Provided by LEAN when creating a new algo manager. This is the job
91  /// that the algo manager is about to execute. Research and other consumers can provide the
92  /// default value of null</param>
93  public AlgorithmManager(bool liveMode, AlgorithmNodePacket job = null)
94  {
95  AlgorithmId = "";
96  _liveMode = liveMode;
97  _lock = new object();
98 
99  // initialize the time limit manager
101  CreateTokenBucket(job?.Controls?.TrainingLimits),
102  TimeSpan.FromMinutes(Config.GetDouble("algorithm-manager-time-loop-maximum", 20))
103  );
104  }
105 
106  /// <summary>
107  /// Launch the algorithm manager to run this strategy
108  /// </summary>
109  /// <param name="job">Algorithm job</param>
110  /// <param name="algorithm">Algorithm instance</param>
111  /// <param name="synchronizer">Instance which implements <see cref="ISynchronizer"/>. Used to stream the data</param>
112  /// <param name="transactions">Transaction manager object</param>
113  /// <param name="results">Result handler object</param>
114  /// <param name="realtime">Realtime processing object</param>
115  /// <param name="leanManager">ILeanManager implementation that is updated periodically with the IAlgorithm instance</param>
116  /// <param name="cancellationTokenSource">Cancellation token source to monitor</param>
117  /// <remarks>Modify with caution</remarks>
118  public void Run(AlgorithmNodePacket job, IAlgorithm algorithm, ISynchronizer synchronizer, ITransactionHandler transactions, IResultHandler results, IRealTimeHandler realtime, ILeanManager leanManager, CancellationTokenSource cancellationTokenSource)
119  {
120  //Initialize:
121  DataPoints = 0;
122  _algorithm = algorithm;
123 
124  var token = cancellationTokenSource.Token;
125  _cancellationTokenSource = cancellationTokenSource;
126 
127  var backtestMode = (job.Type == PacketType.BacktestNode);
128  var methodInvokers = new Dictionary<Type, MethodInvoker>();
129  var marginCallFrequency = TimeSpan.FromMinutes(5);
130  var nextMarginCallTime = DateTime.MinValue;
131  var nextSecurityModelScan = algorithm.UtcTime.RoundDown(Time.OneHour) + Time.OneHour;
132  var time = algorithm.StartDate.Date;
133 
134  var pendingDelistings = new List<Delisting>();
135  var splitWarnings = new List<Split>();
136 
137  //Initialize Properties:
138  AlgorithmId = job.AlgorithmId;
139 
140  //Go through the subscription types and create invokers to trigger the event handlers for each custom type:
141  foreach (var config in algorithm.SubscriptionManager.Subscriptions)
142  {
143  //If type is a custom feed, check for a dedicated event handler
144  if (config.IsCustomData)
145  {
146  //Get the matching method for this event handler - e.g. public void OnData(Quandl data) { .. }
147  var genericMethod = (algorithm.GetType()).GetMethod("OnData", new[] { config.Type });
148 
149  //If we already have this Type-handler then don't add it to invokers again.
150  if (methodInvokers.ContainsKey(config.Type)) continue;
151 
152  if (genericMethod != null)
153  {
154  methodInvokers.Add(config.Type, genericMethod.DelegateForCallMethod());
155  }
156  }
157  }
158 
159  // Schedule a daily event for sampling at midnight every night
160  algorithm.Schedule.On("Daily Sampling", algorithm.Schedule.DateRules.EveryDay(),
161  algorithm.Schedule.TimeRules.Midnight, () =>
162  {
163  results.Sample(algorithm.UtcTime);
164  });
165 
166  //Loop over the queues: get a data collection, then pass them all into relevent methods in the algorithm.
167  Log.Trace($"AlgorithmManager.Run(): Begin DataStream - Start: {algorithm.StartDate} Stop: {algorithm.EndDate} Time: {algorithm.Time} Warmup: {algorithm.IsWarmingUp}");
168  foreach (var timeSlice in Stream(algorithm, synchronizer, results, token))
169  {
170  // reset our timer on each loop
172 
173  //Check this backtest is still running:
174  if (_algorithm.Status != AlgorithmStatus.Running && _algorithm.RunTimeError == null)
175  {
176  Log.Error($"AlgorithmManager.Run(): Algorithm state changed to {_algorithm.Status} at {timeSlice.Time.ToStringInvariant()}");
177  break;
178  }
179 
180  //Execute with TimeLimit Monitor:
181  if (token.IsCancellationRequested)
182  {
183  Log.Error($"AlgorithmManager.Run(): CancellationRequestion at {timeSlice.Time.ToStringInvariant()}");
184  return;
185  }
186 
187  // Update the ILeanManager
188  leanManager.Update();
189 
190  time = timeSlice.Time;
191  DataPoints += timeSlice.DataPointCount;
192 
193  if (backtestMode && algorithm.Portfolio.TotalPortfolioValue <= 0)
194  {
195  var logMessage = "AlgorithmManager.Run(): Portfolio value is less than or equal to zero, stopping algorithm.";
196  Log.Error(logMessage);
197  results.SystemDebugMessage(logMessage);
198  break;
199  }
200 
201  // If backtesting/warmup, we need to check if there are realtime events in the past
202  // which didn't fire because at the scheduled times there was no data (i.e. markets closed)
203  // and fire them with the correct date/time.
204  realtime.ScanPastEvents(time);
205 
206  // will scan registered consolidators for which we've past the expected scan call
207  algorithm.SubscriptionManager.ScanPastConsolidators(time, algorithm);
208 
209  //Set the algorithm and real time handler's time
210  algorithm.SetDateTime(time);
211 
212  // the time pulse are just to advance algorithm time, lets shortcut the loop here
213  if (timeSlice.IsTimePulse)
214  {
215  continue;
216  }
217 
218  // Update the current slice before firing scheduled events or any other task
219  algorithm.SetCurrentSlice(timeSlice.Slice);
220 
221  if (timeSlice.Slice.SymbolChangedEvents.Count != 0)
222  {
223  try
224  {
225  algorithm.OnSymbolChangedEvents(timeSlice.Slice.SymbolChangedEvents);
226  }
227  catch (Exception err)
228  {
229  algorithm.SetRuntimeError(err, "OnSymbolChangedEvents");
230  return;
231  }
232 
233  foreach (var symbol in timeSlice.Slice.SymbolChangedEvents.Keys)
234  {
235  // cancel all orders for the old symbol
236  foreach (var ticket in transactions.GetOpenOrderTickets(x => x.Symbol == symbol))
237  {
238  ticket.Cancel("Open order cancelled on symbol changed event");
239  }
240  }
241  }
242 
243  if (timeSlice.SecurityChanges != SecurityChanges.None)
244  {
245  algorithm.ProcessSecurityChanges(timeSlice.SecurityChanges);
246 
247  leanManager.OnSecuritiesChanged(timeSlice.SecurityChanges);
248  realtime.OnSecuritiesChanged(timeSlice.SecurityChanges);
249  results.OnSecuritiesChanged(timeSlice.SecurityChanges);
250  }
251 
252  //Update the securities properties: first before calling user code to avoid issues with data
253  foreach (var update in timeSlice.SecuritiesUpdateData)
254  {
255  var security = update.Target;
256 
257  security.Update(update.Data, update.DataType, update.ContainsFillForwardData);
258 
259  // Send market price updates to the TradeBuilder
260  algorithm.TradeBuilder.SetMarketPrice(security.Symbol, security.Price);
261  }
262 
263  // TODO: potentially push into a scheduled event
264  if (time >= nextSecurityModelScan)
265  {
266  foreach (var security in algorithm.Securities.Values)
267  {
268  security.MarginInterestRateModel.ApplyMarginInterestRate(new MarginInterestRateParameters(security, time));
269 
270  // perform check for settlement of unsettled funds
271  security.SettlementModel.Scan(new ScanSettlementModelParameters(algorithm.Portfolio, security, time));
272  }
273  nextSecurityModelScan = time.RoundDown(Time.OneHour) + Time.OneHour;
274  }
275 
276  //Update the securities properties with any universe data
277  if (timeSlice.UniverseData.Count > 0)
278  {
279  foreach (var dataCollection in timeSlice.UniverseData.Values)
280  {
281  if (!dataCollection.ShouldCacheToSecurity()) continue;
282 
283  foreach (var data in dataCollection.Data)
284  {
285  if (algorithm.Securities.TryGetValue(data.Symbol, out var security))
286  {
287  security.Cache.StoreData(new[] { data }, data.GetType());
288  }
289  }
290  }
291  }
292 
293  // poke each cash object to update from the recent security data
294  foreach (var cash in algorithm.Portfolio.CashBook.Values.Where(x => x.CurrencyConversion != null))
295  {
296  cash.Update();
297  }
298 
299  // security prices got updated
300  algorithm.Portfolio.InvalidateTotalPortfolioValue();
301 
302  // process fill models on the updated data before entering algorithm, applies to all non-market orders
303  transactions.ProcessSynchronousEvents();
304 
305  // fire real time events after we've updated based on the new data
306  realtime.SetTime(timeSlice.Time);
307 
308  // process split warnings for options
309  ProcessSplitSymbols(algorithm, splitWarnings, pendingDelistings);
310 
311  //Check if the user's signalled Quit: loop over data until day changes.
312  if (_algorithm.Status != AlgorithmStatus.Running && _algorithm.RunTimeError == null)
313  {
314  Log.Error($"AlgorithmManager.Run(): Algorithm state changed to {_algorithm.Status} at {timeSlice.Time.ToStringInvariant()}");
315  break;
316  }
317  if (algorithm.RunTimeError != null)
318  {
319  Log.Error($"AlgorithmManager.Run(): Stopping, encountered a runtime error at {algorithm.UtcTime} UTC.");
320  return;
321  }
322 
323  // perform margin calls, in live mode we can also use realtime to emit these
324  if (time >= nextMarginCallTime || (_liveMode && nextMarginCallTime > DateTime.UtcNow))
325  {
326  // determine if there are possible margin call orders to be executed
327  bool issueMarginCallWarning;
328  var marginCallOrders = algorithm.Portfolio.MarginCallModel.GetMarginCallOrders(out issueMarginCallWarning);
329  if (marginCallOrders.Count != 0)
330  {
331  var executingMarginCall = false;
332  try
333  {
334  // tell the algorithm we're about to issue the margin call
335  algorithm.OnMarginCall(marginCallOrders);
336 
337  executingMarginCall = true;
338 
339  // execute the margin call orders
340  var executedTickets = algorithm.Portfolio.MarginCallModel.ExecuteMarginCall(marginCallOrders);
341  foreach (var ticket in executedTickets)
342  {
343  algorithm.Error($"{algorithm.Time.ToStringInvariant()} - Executed MarginCallOrder: {ticket.Symbol} - " +
344  $"Quantity: {ticket.Quantity.ToStringInvariant()} @ {ticket.AverageFillPrice.ToStringInvariant()}"
345  );
346  }
347  }
348  catch (Exception err)
349  {
350  algorithm.SetRuntimeError(err, executingMarginCall ? "Portfolio.MarginCallModel.ExecuteMarginCall" : "OnMarginCall");
351  return;
352  }
353  }
354  // we didn't perform a margin call, but got the warning flag back, so issue the warning to the algorithm
355  else if (issueMarginCallWarning)
356  {
357  try
358  {
359  algorithm.OnMarginCallWarning();
360  }
361  catch (Exception err)
362  {
363  algorithm.SetRuntimeError(err, "OnMarginCallWarning");
364  return;
365  }
366  }
367 
368  nextMarginCallTime = time + marginCallFrequency;
369  }
370 
371  // before we call any events, let the algorithm know about universe changes
372  if (timeSlice.SecurityChanges != SecurityChanges.None)
373  {
374  try
375  {
376  var algorithmSecurityChanges = new SecurityChanges(timeSlice.SecurityChanges)
377  {
378  // by default for user code we want to filter out custom securities
379  FilterCustomSecurities = true,
380  // by default for user code we want to filter out internal securities
381  FilterInternalSecurities = true
382  };
383 
384  algorithm.OnSecuritiesChanged(algorithmSecurityChanges);
385  algorithm.OnFrameworkSecuritiesChanged(algorithmSecurityChanges);
386  }
387  catch (Exception err)
388  {
389  algorithm.SetRuntimeError(err, "OnSecuritiesChanged");
390  return;
391  }
392  }
393 
394  // apply dividends
395  HandleDividends(timeSlice, algorithm, _liveMode);
396 
397  // apply splits
398  HandleSplits(timeSlice, algorithm, _liveMode);
399 
400  //Update registered consolidators for this symbol index
401  try
402  {
403  if (timeSlice.ConsolidatorUpdateData.Count > 0)
404  {
405  var timeKeeper = algorithm.TimeKeeper;
406  foreach (var update in timeSlice.ConsolidatorUpdateData)
407  {
408  var localTime = timeKeeper.GetLocalTimeKeeper(update.Target.ExchangeTimeZone).LocalTime;
409  var consolidators = update.Target.Consolidators;
410  foreach (var consolidator in consolidators)
411  {
412  foreach (var dataPoint in update.Data)
413  {
414  consolidator.Update(dataPoint);
415  }
416 
417  // scan for time after we've pumped all the data through for this consolidator
418  consolidator.Scan(localTime);
419  }
420  }
421  }
422  }
423  catch (Exception err)
424  {
425  algorithm.SetRuntimeError(err, "Consolidators update");
426  return;
427  }
428 
429  // fire custom event handlers
430  foreach (var update in timeSlice.CustomData)
431  {
432  MethodInvoker methodInvoker;
433  if (!methodInvokers.TryGetValue(update.DataType, out methodInvoker))
434  {
435  continue;
436  }
437 
438  try
439  {
440  foreach (var dataPoint in update.Data)
441  {
442  if (update.DataType.IsInstanceOfType(dataPoint))
443  {
444  methodInvoker(algorithm, dataPoint);
445  }
446  }
447  }
448  catch (Exception err)
449  {
450  algorithm.SetRuntimeError(err, "Custom Data");
451  return;
452  }
453  }
454 
455  try
456  {
457  if (timeSlice.Slice.Splits.Count != 0)
458  {
459  algorithm.OnSplits(timeSlice.Slice.Splits);
460  }
461  }
462  catch (Exception err)
463  {
464  algorithm.SetRuntimeError(err, "OnSplits");
465  return;
466  }
467 
468  try
469  {
470  if (timeSlice.Slice.Dividends.Count != 0)
471  {
472  algorithm.OnDividends(timeSlice.Slice.Dividends);
473  }
474  }
475  catch (Exception err)
476  {
477  algorithm.SetRuntimeError(err, "OnDividends");
478  return;
479  }
480 
481  try
482  {
483  if (timeSlice.Slice.Delistings.Count != 0)
484  {
485  algorithm.OnDelistings(timeSlice.Slice.Delistings);
486  }
487  }
488  catch (Exception err)
489  {
490  algorithm.SetRuntimeError(err, "OnDelistings");
491  return;
492  }
493 
494  // Only track pending delistings in non-live mode.
495  if (!algorithm.LiveMode)
496  {
497  // Keep this up to date even though we don't process delistings here anymore
498  foreach (var delisting in timeSlice.Slice.Delistings.Values)
499  {
500  if (delisting.Type == DelistingType.Warning)
501  {
502  // Store our delistings warnings because they are still used by ProcessSplitSymbols above
503  pendingDelistings.Add(delisting);
504  }
505  else
506  {
507  // If we have an actual delisting event, remove it from pending delistings
508  var index = pendingDelistings.FindIndex(x => x.Symbol == delisting.Symbol);
509  if (index != -1)
510  {
511  pendingDelistings.RemoveAt(index);
512  }
513  }
514  }
515  }
516 
517  // run split logic after firing split events
518  HandleSplitSymbols(timeSlice.Slice.Splits, splitWarnings);
519 
520  try
521  {
522  if (timeSlice.Slice.HasData)
523  {
524  // EVENT HANDLER v3.0 -- all data in a single event
525  algorithm.OnData(algorithm.CurrentSlice);
526  }
527 
528  // always turn the crank on this method to ensure universe selection models function properly on day changes w/out data
529  algorithm.OnFrameworkData(timeSlice.Slice);
530  }
531  catch (Exception err)
532  {
533  algorithm.SetRuntimeError(err, "OnData");
534  return;
535  }
536 
537  //If its the historical/paper trading models, wait until market orders have been "filled"
538  // Manually trigger the event handler to prevent thread switch.
539  transactions.ProcessSynchronousEvents();
540 
541  // Process any required events of the results handler such as sampling assets, equity, or stock prices.
542  results.ProcessSynchronousEvents();
543 
544  // poke the algorithm at the end of each time step
545  algorithm.OnEndOfTimeStep();
546 
547  } // End of ForEach feed.Bridge.GetConsumingEnumerable
548 
549  // stop timing the loops
550  TimeLimit.StopEnforcingTimeLimit();
551 
552  //Stream over:: Send the final packet and fire final events:
553  Log.Trace("AlgorithmManager.Run(): Firing On End Of Algorithm...");
554  try
555  {
556  algorithm.OnEndOfAlgorithm();
557  }
558  catch (Exception err)
559  {
560  algorithm.SetRuntimeError(err, "OnEndOfAlgorithm");
561  return;
562  }
563 
564  // Process any required events of the results handler such as sampling assets, equity, or stock prices.
565  results.ProcessSynchronousEvents(forceProcess: true);
566 
567  //Liquidate Holdings for Calculations:
568  if (_algorithm.Status == AlgorithmStatus.Liquidated && _liveMode)
569  {
570  Log.Trace("AlgorithmManager.Run(): Liquidating algorithm holdings...");
571  algorithm.Liquidate();
572  results.LogMessage("Algorithm Liquidated");
573  results.SendStatusUpdate(AlgorithmStatus.Liquidated);
574  }
575 
576  //Manually stopped the algorithm
577  if (_algorithm.Status == AlgorithmStatus.Stopped)
578  {
579  Log.Trace("AlgorithmManager.Run(): Stopping algorithm...");
580  results.LogMessage("Algorithm Stopped");
581  results.SendStatusUpdate(AlgorithmStatus.Stopped);
582  }
583 
584  //Backtest deleted.
585  if (_algorithm.Status == AlgorithmStatus.Deleted)
586  {
587  Log.Trace("AlgorithmManager.Run(): Deleting algorithm...");
588  results.DebugMessage("Algorithm Id:(" + job.AlgorithmId + ") Deleted by request.");
589  results.SendStatusUpdate(AlgorithmStatus.Deleted);
590  }
591 
592  //Algorithm finished, send regardless of commands:
593  results.SendStatusUpdate(AlgorithmStatus.Completed);
594  SetStatus(AlgorithmStatus.Completed);
595 
596  //Take final samples:
597  results.Sample(time);
598 
599  } // End of Run();
600 
601  /// <summary>
602  /// Set the quit state.
603  /// </summary>
604  public void SetStatus(AlgorithmStatus state)
605  {
606  lock (_lock)
607  {
608  //We don't want anyone else to set our internal state to "Running".
609  //This is controlled by the algorithm private variable only.
610  //Algorithm could be null after it's initialized and they call Run on us
611  if (state != AlgorithmStatus.Running && _algorithm != null)
612  {
613  _algorithm.SetStatus(state);
614  }
615 
616  if (_cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested && !_cancelRequested)
617  {
618  if (state == AlgorithmStatus.Deleted)
619  {
620  _cancelRequested = true;
621  // if the algorithm was deleted, let's give the algorithm a few seconds to shutdown and cancel it out
622  _cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(5));
623  }
624  else if (state == AlgorithmStatus.Stopped)
625  {
626  _cancelRequested = true;
627  // if the algorithm was stopped, let's give the algorithm a few seconds to shutdown and cancel it out
628  _cancellationTokenSource.CancelAfter(TimeSpan.FromMinutes(1));
629  }
630  }
631  }
632  }
633 
634  private IEnumerable<TimeSlice> Stream(IAlgorithm algorithm, ISynchronizer synchronizer, IResultHandler results, CancellationToken cancellationToken)
635  {
636  var nextWarmupStatusTime = DateTime.MinValue;
637  var warmingUp = algorithm.IsWarmingUp;
638  var warmingUpPercent = 0;
639  if (warmingUp)
640  {
641  nextWarmupStatusTime = DateTime.UtcNow.AddSeconds(1);
642  algorithm.Debug("Algorithm starting warm up...");
643  results.SendStatusUpdate(AlgorithmStatus.History, $"{warmingUpPercent}");
644  }
645  else
646  {
647  results.SendStatusUpdate(AlgorithmStatus.Running);
648  // let's be polite, and call warmup finished even though there was no warmup period and avoid algorithms having to handle it instead.
649  // we trigger this callback here and not internally in the algorithm so that we can go through python if required
650  algorithm.OnWarmupFinished();
651  }
652 
653  // bellow we compare with slice.Time which is in UTC
654  var startTimeTicks = algorithm.UtcTime.Ticks;
655  var warmupEndTicks = algorithm.StartDate.ConvertToUtc(algorithm.TimeZone).Ticks;
656 
657  // fulfilling history requirements of volatility models in live mode
658  if (algorithm.LiveMode)
659  {
660  warmupEndTicks = DateTime.UtcNow.Ticks;
661  ProcessVolatilityHistoryRequirements(algorithm, _liveMode);
662  }
663 
664  foreach (var timeSlice in synchronizer.StreamData(cancellationToken))
665  {
666  if (algorithm.IsWarmingUp)
667  {
668  var now = DateTime.UtcNow;
669  if (now > nextWarmupStatusTime)
670  {
671  // send some status to the user letting them know we're done history, but still warming up,
672  // catching up to real time data
673  nextWarmupStatusTime = now.AddSeconds(2);
674  var newPercent = (int)(100 * (timeSlice.Time.Ticks - startTimeTicks) / (double)(warmupEndTicks - startTimeTicks));
675  // if there isn't any progress don't send the same update many times
676  if (newPercent != warmingUpPercent)
677  {
678  warmingUpPercent = newPercent;
679  algorithm.Debug($"Processing algorithm warm-up request {warmingUpPercent}%...");
680  results.SendStatusUpdate(AlgorithmStatus.History, $"{warmingUpPercent}");
681  }
682  }
683  }
684  else if (warmingUp)
685  {
686  // warmup finished, send an update
687  warmingUp = false;
688  // we trigger this callback here and not internally in the algorithm so that we can go through python if required
689  algorithm.OnWarmupFinished();
690  algorithm.Debug("Algorithm finished warming up.");
691  results.SendStatusUpdate(AlgorithmStatus.Running, "100");
692  }
693  yield return timeSlice;
694  }
695  }
696 
697  /// <summary>
698  /// Helper method used to process securities volatility history requirements
699  /// </summary>
700  /// <remarks>Implemented as static to facilitate testing</remarks>
701  /// <param name="algorithm">The algorithm instance</param>
702  /// <param name="liveMode">Whether the algorithm is in live mode</param>
703  public static void ProcessVolatilityHistoryRequirements(IAlgorithm algorithm, bool liveMode)
704  {
705  Log.Trace("ProcessVolatilityHistoryRequirements(): Updating volatility models with historical data...");
706 
707  foreach (var security in algorithm.Securities.Values)
708  {
709  security.VolatilityModel.WarmUp(algorithm.HistoryProvider, algorithm.SubscriptionManager, security, algorithm.UtcTime,
710  algorithm.TimeZone, liveMode);
711  }
712 
713  Log.Trace("ProcessVolatilityHistoryRequirements(): finished.");
714  }
715 
716  /// <summary>
717  /// Helper method to apply a split to an algorithm instance
718  /// </summary>
719  public static void HandleSplits(TimeSlice timeSlice, IAlgorithm algorithm, bool liveMode)
720  {
721  foreach (var split in timeSlice.Slice.Splits.Values)
722  {
723  try
724  {
725  // only process split occurred events (ignore warnings)
726  if (split.Type != SplitType.SplitOccurred)
727  {
728  continue;
729  }
730 
731  if (liveMode && algorithm.IsWarmingUp)
732  {
733  // skip past split during live warmup, the algorithms position already reflects them
734  Log.Trace($"AlgorithmManager.Run(): {algorithm.Time}: Skip Split during live warmup: {split}");
735  continue;
736  }
737 
738  if (Log.DebuggingEnabled)
739  {
740  Log.Debug($"AlgorithmManager.Run(): {algorithm.Time}: Applying Split for {split.Symbol}");
741  }
742 
743  Security security = null;
744  if (algorithm.Securities.TryGetValue(split.Symbol, out security) && liveMode)
745  {
746  Log.Trace($"AlgorithmManager.Run(): {algorithm.Time}: Pre-Split for {split}. Security Price: {security.Price} Holdings: {security.Holdings.Quantity}");
747  }
748 
750  .GetSubscriptionDataConfigs(split.Symbol)
751  .DataNormalizationMode();
752 
753  // apply the split event to the portfolio
754  algorithm.Portfolio.ApplySplit(split, security, liveMode, mode);
755 
756  // apply the split event to the trade builder
757  algorithm.TradeBuilder.ApplySplit(split, liveMode, mode);
758 
759  // apply the split event to the security volatility model
760  ApplySplitOrDividendToVolatilityModel(algorithm, security, liveMode, mode);
761 
762  if (liveMode && security != null)
763  {
764  Log.Trace($"AlgorithmManager.Run(): {algorithm.Time}: Post-Split for {split}. Security Price: {security.Price} Holdings: {security.Holdings.Quantity}");
765  }
766 
767  // apply the split to open orders as well in raw mode, all other modes are split adjusted
768  if (liveMode || mode == DataNormalizationMode.Raw)
769  {
770  // in live mode we always want to have our order match the order at the brokerage, so apply the split to the orders
771  var openOrders = algorithm.Transactions.GetOpenOrderTickets(ticket => ticket.Symbol == split.Symbol);
772  algorithm.BrokerageModel.ApplySplit(openOrders.ToList(), split);
773  }
774  }
775  catch (Exception err)
776  {
777  algorithm.SetRuntimeError(err, "Split event");
778  return;
779  }
780  }
781  }
782 
783  /// <summary>
784  /// Helper method to apply a dividend to an algorithm instance
785  /// </summary>
786  public static void HandleDividends(TimeSlice timeSlice, IAlgorithm algorithm, bool liveMode)
787  {
788  foreach (var dividend in timeSlice.Slice.Dividends.Values)
789  {
790  if (liveMode && algorithm.IsWarmingUp)
791  {
792  // skip past dividends during live warmup, the algorithms position already reflects them
793  Log.Trace($"AlgorithmManager.Run(): {algorithm.Time}: Skip Dividend during live warmup: {dividend}");
794  continue;
795  }
796 
797  if (Log.DebuggingEnabled)
798  {
799  Log.Debug($"AlgorithmManager.Run(): {algorithm.Time}: Applying Dividend: {dividend}");
800  }
801 
802  Security security = null;
803  if (algorithm.Securities.TryGetValue(dividend.Symbol, out security) && liveMode)
804  {
805  Log.Trace($"AlgorithmManager.Run(): {algorithm.Time}: Pre-Dividend: {dividend}. " +
806  $"Security Holdings: {security.Holdings.Quantity} Account Currency Holdings: " +
807  $"{algorithm.Portfolio.CashBook[algorithm.AccountCurrency].Amount}");
808  }
809 
811  .GetSubscriptionDataConfigs(dividend.Symbol)
812  .DataNormalizationMode();
813 
814  // apply the dividend event to the portfolio
815  algorithm.Portfolio.ApplyDividend(dividend, liveMode, mode);
816 
817  // apply the dividend event to the security volatility model
818  ApplySplitOrDividendToVolatilityModel(algorithm, security, liveMode, mode);
819 
820  if (liveMode && security != null)
821  {
822  Log.Trace($"AlgorithmManager.Run(): {algorithm.Time}: Post-Dividend: {dividend}. Security " +
823  $"Holdings: {security.Holdings.Quantity} Account Currency Holdings: " +
824  $"{algorithm.Portfolio.CashBook[algorithm.AccountCurrency].Amount}");
825  }
826  }
827  }
828 
829  /// <summary>
830  /// Keeps track of split warnings so we can later liquidate option contracts
831  /// </summary>
832  private void HandleSplitSymbols(Splits newSplits, List<Split> splitWarnings)
833  {
834  foreach (var split in newSplits.Values)
835  {
836  if (split.Type != SplitType.Warning)
837  {
838  if (Log.DebuggingEnabled)
839  {
840  Log.Debug($"AlgorithmManager.HandleSplitSymbols(): {_algorithm.Time} - Security split occurred: Split Factor: {split} Reference Price: {split.ReferencePrice}");
841  }
842  continue;
843  }
844 
845  if (Log.DebuggingEnabled)
846  {
847  Log.Debug($"AlgorithmManager.HandleSplitSymbols(): {_algorithm.Time} - Security split warning: {split}");
848  }
849 
850  if (!splitWarnings.Any(x => x.Symbol == split.Symbol && x.Type == SplitType.Warning))
851  {
852  splitWarnings.Add(split);
853  }
854  }
855  }
856 
857  /// <summary>
858  /// Liquidate option contact holdings who's underlying security has split
859  /// </summary>
860  private void ProcessSplitSymbols(IAlgorithm algorithm, List<Split> splitWarnings, List<Delisting> pendingDelistings)
861  {
862  // NOTE: This method assumes option contracts have the same core trading hours as their underlying contract
863  // This is a small performance optimization to prevent scanning every contract on every time step,
864  // instead we scan just the underlyings, thereby reducing the time footprint of this methods by a factor
865  // of N, the number of derivative subscriptions
866  for (int i = splitWarnings.Count - 1; i >= 0; i--)
867  {
868  var split = splitWarnings[i];
869  var security = algorithm.Securities[split.Symbol];
870 
871  if (!security.IsTradable
872  && !algorithm.UniverseManager.ActiveSecurities.Keys.Contains(split.Symbol))
873  {
874  Log.Debug($"AlgorithmManager.ProcessSplitSymbols(): {_algorithm.Time} - Removing split warning for {security.Symbol}");
875 
876  // remove the warning from out list
877  splitWarnings.RemoveAt(i);
878  // Since we are storing the split warnings for a loop
879  // we need to check if the security was removed.
880  // When removed, it will be marked as non tradable but just in case
881  // we expect it not to be an active security either
882  continue;
883  }
884 
885  var nextMarketClose = security.Exchange.Hours.GetNextMarketClose(security.LocalTime, false);
886 
887  // determine the latest possible time we can submit a MOC order
888  var configs = algorithm.SubscriptionManager.SubscriptionDataConfigService
889  .GetSubscriptionDataConfigs(security.Symbol);
890 
891  if (configs.Count == 0)
892  {
893  // should never happen at this point, if it does let's give some extra info
894  throw new Exception(
895  $"AlgorithmManager.ProcessSplitSymbols(): {_algorithm.Time} - No subscriptions found for {security.Symbol}" +
896  $", IsTradable: {security.IsTradable}" +
897  $", Active: {algorithm.UniverseManager.ActiveSecurities.Keys.Contains(split.Symbol)}");
898  }
899 
900  var latestMarketOnCloseTimeRoundedDownByResolution = nextMarketClose.Subtract(MarketOnCloseOrder.SubmissionTimeBuffer)
901  .RoundDownInTimeZone(configs.GetHighestResolution().ToTimeSpan(), security.Exchange.TimeZone, configs.First().DataTimeZone);
902 
903  // we don't need to do anyhing until the market closes
904  if (security.LocalTime < latestMarketOnCloseTimeRoundedDownByResolution) continue;
905 
906  // fetch all option derivatives of the underlying with holdings (excluding the canonical security)
907  var derivatives = algorithm.Securities.Values.Where(potentialDerivate =>
908  potentialDerivate.Symbol.SecurityType.IsOption() &&
909  potentialDerivate.Symbol.Underlying == security.Symbol &&
910  !potentialDerivate.Symbol.Underlying.IsCanonical() &&
911  potentialDerivate.HoldStock
912  );
913 
914  foreach (var derivative in derivatives)
915  {
916  var optionContractSymbol = derivative.Symbol;
917  var optionContractSecurity = (Option)derivative;
918 
919  if (pendingDelistings.Any(x => x.Symbol == optionContractSymbol
920  && x.Time.Date == optionContractSecurity.LocalTime.Date))
921  {
922  // if the option is going to be delisted today we skip sending the market on close order
923  continue;
924  }
925 
926  // close any open orders
927  algorithm.Transactions.CancelOpenOrders(optionContractSymbol, "Canceled due to impending split. Separate MarketOnClose order submitted to liquidate position.");
928 
929  var request = new SubmitOrderRequest(OrderType.MarketOnClose, optionContractSecurity.Type, optionContractSymbol,
930  -optionContractSecurity.Holdings.Quantity, 0, 0, algorithm.UtcTime,
931  "Liquidated due to impending split. Option splits are not currently supported."
932  );
933 
934  // send MOC order to liquidate option contract holdings
935  algorithm.Transactions.AddOrder(request);
936 
937  // mark option contract as not tradable
938  optionContractSecurity.IsTradable = false;
939 
940  algorithm.Debug($"MarketOnClose order submitted for option contract '{optionContractSymbol}' due to impending {split.Symbol.Value} split event. "
941  + "Option splits are not currently supported.");
942  }
943 
944  // remove the warning from out list
945  splitWarnings.RemoveAt(i);
946  }
947  }
948 
949  /// <summary>
950  /// Warms up the security's volatility model in the case of a split or dividend to avoid discontinuities when data is raw or in live mode
951  /// </summary>
952  private static void ApplySplitOrDividendToVolatilityModel(IAlgorithm algorithm, Security security, bool liveMode,
953  DataNormalizationMode dataNormalizationMode)
954  {
955  if (security.Type == SecurityType.Equity && (liveMode || dataNormalizationMode == DataNormalizationMode.Raw))
956  {
957  security?.VolatilityModel.WarmUp(algorithm.HistoryProvider, algorithm.SubscriptionManager, security, algorithm.UtcTime,
958  algorithm.TimeZone, liveMode, dataNormalizationMode);
959  }
960  }
961 
962  /// <summary>
963  /// Constructs the correct <see cref="ITokenBucket"/> instance per the provided controls.
964  /// The provided controls will be null when
965  /// </summary>
966  private static ITokenBucket CreateTokenBucket(LeakyBucketControlParameters controls)
967  {
968  if (controls == null)
969  {
970  // this will only be null when the AlgorithmManager is being initialized outside of LEAN
971  // for example, in unit tests that don't provide a job package as well as from Research
972  // in each of the above cases, it seems best to not enforce the leaky bucket restrictions
973  return TokenBucket.Null;
974  }
975 
976  Log.Trace("AlgorithmManager.CreateTokenBucket(): Initializing LeakyBucket: " +
977  $"Capacity: {controls.Capacity} " +
978  $"RefillAmount: {controls.RefillAmount} " +
979  $"TimeInterval: {controls.TimeIntervalMinutes}"
980  );
981 
982  // these parameters view 'minutes' as the resource being rate limited. the capacity is the total
983  // number of minutes available for burst operations and after controls.TimeIntervalMinutes time
984  // has passed, we'll add controls.RefillAmount to the 'minutes' available, maxing at controls.Capacity
985  return new LeakyBucket(
986  controls.Capacity,
987  controls.RefillAmount,
988  TimeSpan.FromMinutes(controls.TimeIntervalMinutes)
989  );
990  }
991  }
992 }