Lean  $LEAN_TAG$
AlgorithmManager.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Collections.Generic;
19 using System.Linq;
20 using System.Threading;
21 using Fasterflect;
24 using QuantConnect.Data;
33 using QuantConnect.Logging;
34 using QuantConnect.Orders;
35 using QuantConnect.Packets;
40 
42 {
43  /// <summary>
44  /// Algorithm manager class executes the algorithm and generates and passes through the algorithm events.
45  /// </summary>
46  public class AlgorithmManager
47  {
48  private IAlgorithm _algorithm;
49  private readonly object _lock;
50  private readonly bool _liveMode;
51  private bool _cancelRequested;
52  private CancellationTokenSource _cancellationTokenSource;
53 
54  /// <summary>
55  /// Publicly accessible algorithm status
56  /// </summary>
57  public AlgorithmStatus State => _algorithm?.Status ?? AlgorithmStatus.Running;
58 
59  /// <summary>
60  /// Public access to the currently running algorithm id.
61  /// </summary>
62  public string AlgorithmId { get; private set; }
63 
64  /// <summary>
65  /// Provides the isolator with a function for verifying that we're not spending too much time in each
66  /// algorithm manager time loop
67  /// </summary>
69 
70  /// <summary>
71  /// Quit state flag for the running algorithm. When true the user has requested the backtest stops through a Quit() method.
72  /// </summary>
73  /// <seealso cref="QCAlgorithm.Quit(String)"/>
74  public bool QuitState => State == AlgorithmStatus.Deleted;
75 
76  /// <summary>
77  /// Gets the number of data points processed per second
78  /// </summary>
79  public long DataPoints { get; private set; }
80 
81  /// <summary>
82  /// Gets the number of data points of algorithm history provider
83  /// </summary>
85 
86  /// <summary>
87  /// Initializes a new instance of the <see cref="AlgorithmManager"/> class
88  /// </summary>
89  /// <param name="liveMode">True if we're running in live mode, false for backtest mode</param>
90  /// <param name="job">Provided by LEAN when creating a new algo manager. This is the job
91  /// that the algo manager is about to execute. Research and other consumers can provide the
92  /// default value of null</param>
93  public AlgorithmManager(bool liveMode, AlgorithmNodePacket job = null)
94  {
95  AlgorithmId = "";
96  _liveMode = liveMode;
97  _lock = new object();
98 
99  // initialize the time limit manager
101  CreateTokenBucket(job?.Controls?.TrainingLimits),
102  TimeSpan.FromMinutes(Config.GetDouble("algorithm-manager-time-loop-maximum", 20))
103  );
104  }
105 
106  /// <summary>
107  /// Launch the algorithm manager to run this strategy
108  /// </summary>
109  /// <param name="job">Algorithm job</param>
110  /// <param name="algorithm">Algorithm instance</param>
111  /// <param name="synchronizer">Instance which implements <see cref="ISynchronizer"/>. Used to stream the data</param>
112  /// <param name="transactions">Transaction manager object</param>
113  /// <param name="results">Result handler object</param>
114  /// <param name="realtime">Realtime processing object</param>
115  /// <param name="leanManager">ILeanManager implementation that is updated periodically with the IAlgorithm instance</param>
116  /// <param name="cancellationTokenSource">Cancellation token source to monitor</param>
117  /// <remarks>Modify with caution</remarks>
118  public void Run(AlgorithmNodePacket job, IAlgorithm algorithm, ISynchronizer synchronizer, ITransactionHandler transactions, IResultHandler results, IRealTimeHandler realtime, ILeanManager leanManager, CancellationTokenSource cancellationTokenSource)
119  {
120  //Initialize:
121  DataPoints = 0;
122  _algorithm = algorithm;
123 
124  var token = cancellationTokenSource.Token;
125  _cancellationTokenSource = cancellationTokenSource;
126 
127  var backtestMode = (job.Type == PacketType.BacktestNode);
128  var methodInvokers = new Dictionary<Type, MethodInvoker>();
129  var marginCallFrequency = TimeSpan.FromMinutes(5);
130  var nextMarginCallTime = DateTime.MinValue;
131  var nextSecurityModelScan = algorithm.UtcTime.RoundDown(Time.OneHour) + Time.OneHour;
132  var time = algorithm.StartDate.Date;
133 
134  var pendingDelistings = new List<Delisting>();
135  var splitWarnings = new List<Split>();
136 
137  //Initialize Properties:
138  AlgorithmId = job.AlgorithmId;
139 
140  //Go through the subscription types and create invokers to trigger the event handlers for each custom type:
141  foreach (var config in algorithm.SubscriptionManager.Subscriptions)
142  {
143  //If type is a custom feed, check for a dedicated event handler
144  if (config.IsCustomData)
145  {
146  //Get the matching method for this event handler - e.g. public void OnData(Quandl data) { .. }
147  var genericMethod = (algorithm.GetType()).GetMethod("OnData", new[] { config.Type });
148 
149  //If we already have this Type-handler then don't add it to invokers again.
150  if (methodInvokers.ContainsKey(config.Type)) continue;
151 
152  if (genericMethod != null)
153  {
154  methodInvokers.Add(config.Type, genericMethod.DelegateForCallMethod());
155  }
156  }
157  }
158 
159  // Schedule a daily event for sampling at midnight every night
160  algorithm.Schedule.On("Daily Sampling", algorithm.Schedule.DateRules.EveryDay(),
161  algorithm.Schedule.TimeRules.Midnight, () =>
162  {
163  results.Sample(algorithm.UtcTime);
164  });
165 
166  //Loop over the queues: get a data collection, then pass them all into relevent methods in the algorithm.
167  Log.Trace($"AlgorithmManager.Run(): Begin DataStream - Start: {algorithm.StartDate} Stop: {algorithm.EndDate} Time: {algorithm.Time} Warmup: {algorithm.IsWarmingUp}");
168  foreach (var timeSlice in Stream(algorithm, synchronizer, results, token))
169  {
170  // reset our timer on each loop
172 
173  //Check this backtest is still running:
174  if (_algorithm.Status != AlgorithmStatus.Running && _algorithm.RunTimeError == null)
175  {
176  Log.Error($"AlgorithmManager.Run(): Algorithm state changed to {_algorithm.Status} at {timeSlice.Time.ToStringInvariant()}");
177  break;
178  }
179 
180  //Execute with TimeLimit Monitor:
181  if (token.IsCancellationRequested)
182  {
183  Log.Error($"AlgorithmManager.Run(): CancellationRequestion at {timeSlice.Time.ToStringInvariant()}");
184  return;
185  }
186 
187  // Update the ILeanManager
188  leanManager.Update();
189 
190  time = timeSlice.Time;
191  DataPoints += timeSlice.DataPointCount;
192 
193  if (backtestMode && algorithm.Portfolio.TotalPortfolioValue <= 0)
194  {
195  var logMessage = "AlgorithmManager.Run(): Portfolio value is less than or equal to zero, stopping algorithm.";
196  Log.Error(logMessage);
197  results.SystemDebugMessage(logMessage);
198  break;
199  }
200 
201  // If backtesting/warmup, we need to check if there are realtime events in the past
202  // which didn't fire because at the scheduled times there was no data (i.e. markets closed)
203  // and fire them with the correct date/time.
204  realtime.ScanPastEvents(time);
205 
206  // will scan registered consolidators for which we've past the expected scan call.
207  // In live mode we want to round down to the second, so we don't scan too far into the future:
208  // The time slice might carry the data needed to complete a current consolidated bar but the
209  // time slice time might be slightly ahead (a few milliseconds or even ticks) because in live we
210  // use DateTime.UtcNow. So we don't want to scan past the data time so that the consolidators can
211  // complete the current bar.
212  var pastConsolidatorsScanTime = _liveMode ? time.RoundDown(Time.OneSecond) : time;
213  algorithm.SubscriptionManager.ScanPastConsolidators(pastConsolidatorsScanTime, algorithm);
214 
215  //Set the algorithm and real time handler's time
216  algorithm.SetDateTime(time);
217 
218  // the time pulse are just to advance algorithm time, lets shortcut the loop here
219  if (timeSlice.IsTimePulse)
220  {
221  continue;
222  }
223 
224  // Update the current slice before firing scheduled events or any other task
225  algorithm.SetCurrentSlice(timeSlice.Slice);
226 
227  if (timeSlice.Slice.SymbolChangedEvents.Count != 0)
228  {
229  try
230  {
231  algorithm.OnSymbolChangedEvents(timeSlice.Slice.SymbolChangedEvents);
232  }
233  catch (Exception err)
234  {
235  algorithm.SetRuntimeError(err, "OnSymbolChangedEvents");
236  return;
237  }
238 
239  foreach (var symbol in timeSlice.Slice.SymbolChangedEvents.Keys)
240  {
241  // cancel all orders for the old symbol
242  foreach (var ticket in transactions.GetOpenOrderTickets(x => x.Symbol == symbol))
243  {
244  ticket.Cancel("Open order cancelled on symbol changed event");
245  }
246  }
247  }
248 
249  if (timeSlice.SecurityChanges != SecurityChanges.None)
250  {
251  algorithm.ProcessSecurityChanges(timeSlice.SecurityChanges);
252 
253  leanManager.OnSecuritiesChanged(timeSlice.SecurityChanges);
254  realtime.OnSecuritiesChanged(timeSlice.SecurityChanges);
255  results.OnSecuritiesChanged(timeSlice.SecurityChanges);
256  }
257 
258  //Update the securities properties: first before calling user code to avoid issues with data
259  foreach (var update in timeSlice.SecuritiesUpdateData)
260  {
261  var security = update.Target;
262 
263  security.Update(update.Data, update.DataType, update.ContainsFillForwardData);
264 
265  // Send market price updates to the TradeBuilder
266  algorithm.TradeBuilder.SetMarketPrice(security.Symbol, security.Price);
267  }
268 
269  // TODO: potentially push into a scheduled event
270  if (time >= nextSecurityModelScan)
271  {
272  foreach (var security in algorithm.Securities.Values)
273  {
274  security.MarginInterestRateModel.ApplyMarginInterestRate(new MarginInterestRateParameters(security, time));
275 
276  // perform check for settlement of unsettled funds
277  security.SettlementModel.Scan(new ScanSettlementModelParameters(algorithm.Portfolio, security, time));
278  }
279  nextSecurityModelScan = time.RoundDown(Time.OneHour) + Time.OneHour;
280  }
281 
282  //Update the securities properties with any universe data
283  if (timeSlice.UniverseData.Count > 0)
284  {
285  foreach (var dataCollection in timeSlice.UniverseData.Values)
286  {
287  if (!dataCollection.ShouldCacheToSecurity()) continue;
288 
289  foreach (var data in dataCollection.Data)
290  {
291  if (algorithm.Securities.TryGetValue(data.Symbol, out var security))
292  {
293  security.Cache.StoreData(new[] { data }, data.GetType());
294  }
295  }
296  }
297  }
298 
299  // poke each cash object to update from the recent security data
300  foreach (var cash in algorithm.Portfolio.CashBook.Values.Where(x => x.CurrencyConversion != null))
301  {
302  cash.Update();
303  }
304 
305  // security prices got updated
306  algorithm.Portfolio.InvalidateTotalPortfolioValue();
307 
308  // process fill models on the updated data before entering algorithm, applies to all non-market orders
309  transactions.ProcessSynchronousEvents();
310 
311  // fire real time events after we've updated based on the new data
312  realtime.SetTime(timeSlice.Time);
313 
314  // process split warnings for options
315  ProcessSplitSymbols(algorithm, splitWarnings, pendingDelistings);
316 
317  //Check if the user's signalled Quit: loop over data until day changes.
318  if (_algorithm.Status != AlgorithmStatus.Running && _algorithm.RunTimeError == null)
319  {
320  Log.Error($"AlgorithmManager.Run(): Algorithm state changed to {_algorithm.Status} at {timeSlice.Time.ToStringInvariant()}");
321  break;
322  }
323  if (algorithm.RunTimeError != null)
324  {
325  Log.Error($"AlgorithmManager.Run(): Stopping, encountered a runtime error at {algorithm.UtcTime} UTC.");
326  return;
327  }
328 
329  // perform margin calls, in live mode we can also use realtime to emit these
330  if (time >= nextMarginCallTime || (_liveMode && nextMarginCallTime > DateTime.UtcNow))
331  {
332  // determine if there are possible margin call orders to be executed
333  bool issueMarginCallWarning;
334  var marginCallOrders = algorithm.Portfolio.MarginCallModel.GetMarginCallOrders(out issueMarginCallWarning);
335  if (marginCallOrders.Count != 0)
336  {
337  var executingMarginCall = false;
338  try
339  {
340  // tell the algorithm we're about to issue the margin call
341  algorithm.OnMarginCall(marginCallOrders);
342 
343  executingMarginCall = true;
344 
345  // execute the margin call orders
346  var executedTickets = algorithm.Portfolio.MarginCallModel.ExecuteMarginCall(marginCallOrders);
347  foreach (var ticket in executedTickets)
348  {
349  algorithm.Error($"{algorithm.Time.ToStringInvariant()} - Executed MarginCallOrder: {ticket.Symbol} - " +
350  $"Quantity: {ticket.Quantity.ToStringInvariant()} @ {ticket.AverageFillPrice.ToStringInvariant()}"
351  );
352  }
353  }
354  catch (Exception err)
355  {
356  algorithm.SetRuntimeError(err, executingMarginCall ? "Portfolio.MarginCallModel.ExecuteMarginCall" : "OnMarginCall");
357  return;
358  }
359  }
360  // we didn't perform a margin call, but got the warning flag back, so issue the warning to the algorithm
361  else if (issueMarginCallWarning)
362  {
363  try
364  {
365  algorithm.OnMarginCallWarning();
366  }
367  catch (Exception err)
368  {
369  algorithm.SetRuntimeError(err, "OnMarginCallWarning");
370  return;
371  }
372  }
373 
374  nextMarginCallTime = time + marginCallFrequency;
375  }
376 
377  // before we call any events, let the algorithm know about universe changes
378  if (timeSlice.SecurityChanges != SecurityChanges.None)
379  {
380  try
381  {
382  var algorithmSecurityChanges = new SecurityChanges(timeSlice.SecurityChanges)
383  {
384  // by default for user code we want to filter out custom securities
385  FilterCustomSecurities = true,
386  // by default for user code we want to filter out internal securities
387  FilterInternalSecurities = true
388  };
389 
390  algorithm.OnSecuritiesChanged(algorithmSecurityChanges);
391  algorithm.OnFrameworkSecuritiesChanged(algorithmSecurityChanges);
392  }
393  catch (Exception err)
394  {
395  algorithm.SetRuntimeError(err, "OnSecuritiesChanged");
396  return;
397  }
398  }
399 
400  // apply dividends
401  HandleDividends(timeSlice, algorithm, _liveMode);
402 
403  // apply splits
404  HandleSplits(timeSlice, algorithm, _liveMode);
405 
406  //Update registered consolidators for this symbol index
407  try
408  {
409  if (timeSlice.ConsolidatorUpdateData.Count > 0)
410  {
411  var timeKeeper = algorithm.TimeKeeper;
412  foreach (var update in timeSlice.ConsolidatorUpdateData)
413  {
414  var localTime = timeKeeper.GetLocalTimeKeeper(update.Target.ExchangeTimeZone).LocalTime;
415  var consolidators = update.Target.Consolidators;
416  foreach (var consolidator in consolidators)
417  {
418  foreach (var dataPoint in update.Data)
419  {
420  consolidator.Update(dataPoint);
421  }
422 
423  // scan for time after we've pumped all the data through for this consolidator
424  consolidator.Scan(localTime);
425  }
426  }
427  }
428  }
429  catch (Exception err)
430  {
431  algorithm.SetRuntimeError(err, "Consolidators update");
432  return;
433  }
434 
435  // fire custom event handlers
436  foreach (var update in timeSlice.CustomData)
437  {
438  MethodInvoker methodInvoker;
439  if (!methodInvokers.TryGetValue(update.DataType, out methodInvoker))
440  {
441  continue;
442  }
443 
444  try
445  {
446  foreach (var dataPoint in update.Data)
447  {
448  if (update.DataType.IsInstanceOfType(dataPoint))
449  {
450  methodInvoker(algorithm, dataPoint);
451  }
452  }
453  }
454  catch (Exception err)
455  {
456  algorithm.SetRuntimeError(err, "Custom Data");
457  return;
458  }
459  }
460 
461  try
462  {
463  if (timeSlice.Slice.Splits.Count != 0)
464  {
465  algorithm.OnSplits(timeSlice.Slice.Splits);
466  }
467  }
468  catch (Exception err)
469  {
470  algorithm.SetRuntimeError(err, "OnSplits");
471  return;
472  }
473 
474  try
475  {
476  if (timeSlice.Slice.Dividends.Count != 0)
477  {
478  algorithm.OnDividends(timeSlice.Slice.Dividends);
479  }
480  }
481  catch (Exception err)
482  {
483  algorithm.SetRuntimeError(err, "OnDividends");
484  return;
485  }
486 
487  try
488  {
489  if (timeSlice.Slice.Delistings.Count != 0)
490  {
491  algorithm.OnDelistings(timeSlice.Slice.Delistings);
492  }
493  }
494  catch (Exception err)
495  {
496  algorithm.SetRuntimeError(err, "OnDelistings");
497  return;
498  }
499 
500  // Only track pending delistings in non-live mode.
501  if (!algorithm.LiveMode)
502  {
503  // Keep this up to date even though we don't process delistings here anymore
504  foreach (var delisting in timeSlice.Slice.Delistings.Values)
505  {
506  if (delisting.Type == DelistingType.Warning)
507  {
508  // Store our delistings warnings because they are still used by ProcessSplitSymbols above
509  pendingDelistings.Add(delisting);
510  }
511  else
512  {
513  // If we have an actual delisting event, remove it from pending delistings
514  var index = pendingDelistings.FindIndex(x => x.Symbol == delisting.Symbol);
515  if (index != -1)
516  {
517  pendingDelistings.RemoveAt(index);
518  }
519  }
520  }
521  }
522 
523  // run split logic after firing split events
524  HandleSplitSymbols(timeSlice.Slice.Splits, splitWarnings);
525 
526  try
527  {
528  if (timeSlice.Slice.HasData)
529  {
530  // EVENT HANDLER v3.0 -- all data in a single event
531  algorithm.OnData(algorithm.CurrentSlice);
532  }
533 
534  // always turn the crank on this method to ensure universe selection models function properly on day changes w/out data
535  algorithm.OnFrameworkData(timeSlice.Slice);
536  }
537  catch (Exception err)
538  {
539  algorithm.SetRuntimeError(err, "OnData");
540  return;
541  }
542 
543  //If its the historical/paper trading models, wait until market orders have been "filled"
544  // Manually trigger the event handler to prevent thread switch.
545  transactions.ProcessSynchronousEvents();
546 
547  // Process any required events of the results handler such as sampling assets, equity, or stock prices.
548  results.ProcessSynchronousEvents();
549 
550  // poke the algorithm at the end of each time step
551  algorithm.OnEndOfTimeStep();
552 
553  } // End of ForEach feed.Bridge.GetConsumingEnumerable
554 
555  // stop timing the loops
556  TimeLimit.StopEnforcingTimeLimit();
557 
558  //Stream over:: Send the final packet and fire final events:
559  Log.Trace("AlgorithmManager.Run(): Firing On End Of Algorithm...");
560  try
561  {
562  algorithm.OnEndOfAlgorithm();
563  }
564  catch (Exception err)
565  {
566  algorithm.SetRuntimeError(err, "OnEndOfAlgorithm");
567  return;
568  }
569 
570  // Process any required events of the results handler such as sampling assets, equity, or stock prices.
571  results.ProcessSynchronousEvents(forceProcess: true);
572 
573  //Liquidate Holdings for Calculations:
574  if (_algorithm.Status == AlgorithmStatus.Liquidated && _liveMode)
575  {
576  Log.Trace("AlgorithmManager.Run(): Liquidating algorithm holdings...");
577  algorithm.Liquidate();
578  results.LogMessage("Algorithm Liquidated");
579  results.SendStatusUpdate(AlgorithmStatus.Liquidated);
580  }
581 
582  //Manually stopped the algorithm
583  if (_algorithm.Status == AlgorithmStatus.Stopped)
584  {
585  Log.Trace("AlgorithmManager.Run(): Stopping algorithm...");
586  results.LogMessage("Algorithm Stopped");
587  results.SendStatusUpdate(AlgorithmStatus.Stopped);
588  }
589 
590  //Backtest deleted.
591  if (_algorithm.Status == AlgorithmStatus.Deleted)
592  {
593  Log.Trace("AlgorithmManager.Run(): Deleting algorithm...");
594  results.DebugMessage("Algorithm Id:(" + job.AlgorithmId + ") Deleted by request.");
595  results.SendStatusUpdate(AlgorithmStatus.Deleted);
596  }
597 
598  //Algorithm finished, send regardless of commands:
599  results.SendStatusUpdate(AlgorithmStatus.Completed);
600  SetStatus(AlgorithmStatus.Completed);
601 
602  //Take final samples:
603  results.Sample(time);
604 
605  } // End of Run();
606 
607  /// <summary>
608  /// Set the quit state.
609  /// </summary>
610  public void SetStatus(AlgorithmStatus state)
611  {
612  lock (_lock)
613  {
614  //We don't want anyone else to set our internal state to "Running".
615  //This is controlled by the algorithm private variable only.
616  //Algorithm could be null after it's initialized and they call Run on us
617  if (state != AlgorithmStatus.Running && _algorithm != null)
618  {
619  _algorithm.SetStatus(state);
620  }
621 
622  if (_cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested && !_cancelRequested)
623  {
624  if (state == AlgorithmStatus.Deleted)
625  {
626  _cancelRequested = true;
627  // if the algorithm was deleted, let's give the algorithm a few seconds to shutdown and cancel it out
628  _cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(5));
629  }
630  else if (state == AlgorithmStatus.Stopped)
631  {
632  _cancelRequested = true;
633  // if the algorithm was stopped, let's give the algorithm a few seconds to shutdown and cancel it out
634  _cancellationTokenSource.CancelAfter(TimeSpan.FromMinutes(1));
635  }
636  }
637  }
638  }
639 
640  private IEnumerable<TimeSlice> Stream(IAlgorithm algorithm, ISynchronizer synchronizer, IResultHandler results, CancellationToken cancellationToken)
641  {
642  var nextWarmupStatusTime = DateTime.MinValue;
643  var warmingUp = algorithm.IsWarmingUp;
644  var warmingUpPercent = 0;
645  if (warmingUp)
646  {
647  nextWarmupStatusTime = DateTime.UtcNow.AddSeconds(1);
648  algorithm.Debug("Algorithm starting warm up...");
649  results.SendStatusUpdate(AlgorithmStatus.History, $"{warmingUpPercent}");
650  }
651  else
652  {
653  results.SendStatusUpdate(AlgorithmStatus.Running);
654  // let's be polite, and call warmup finished even though there was no warmup period and avoid algorithms having to handle it instead.
655  // we trigger this callback here and not internally in the algorithm so that we can go through python if required
656  algorithm.OnWarmupFinished();
657  }
658 
659  // bellow we compare with slice.Time which is in UTC
660  var startTimeTicks = algorithm.UtcTime.Ticks;
661  var warmupEndTicks = algorithm.StartDate.ConvertToUtc(algorithm.TimeZone).Ticks;
662 
663  // fulfilling history requirements of volatility models in live mode
664  if (algorithm.LiveMode)
665  {
666  warmupEndTicks = DateTime.UtcNow.Ticks;
667  ProcessVolatilityHistoryRequirements(algorithm, _liveMode);
668  }
669 
670  foreach (var timeSlice in synchronizer.StreamData(cancellationToken))
671  {
672  if (algorithm.IsWarmingUp)
673  {
674  var now = DateTime.UtcNow;
675  if (now > nextWarmupStatusTime)
676  {
677  // send some status to the user letting them know we're done history, but still warming up,
678  // catching up to real time data
679  nextWarmupStatusTime = now.AddSeconds(2);
680  var newPercent = (int)(100 * (timeSlice.Time.Ticks - startTimeTicks) / (double)(warmupEndTicks - startTimeTicks));
681  // if there isn't any progress don't send the same update many times
682  if (newPercent != warmingUpPercent)
683  {
684  warmingUpPercent = newPercent;
685  algorithm.Debug($"Processing algorithm warm-up request {warmingUpPercent}%...");
686  results.SendStatusUpdate(AlgorithmStatus.History, $"{warmingUpPercent}");
687  }
688  }
689  }
690  else if (warmingUp)
691  {
692  // warmup finished, send an update
693  warmingUp = false;
694  // we trigger this callback here and not internally in the algorithm so that we can go through python if required
695  algorithm.OnWarmupFinished();
696  algorithm.Debug("Algorithm finished warming up.");
697  results.SendStatusUpdate(AlgorithmStatus.Running, "100");
698  }
699  yield return timeSlice;
700  }
701  }
702 
703  /// <summary>
704  /// Helper method used to process securities volatility history requirements
705  /// </summary>
706  /// <remarks>Implemented as static to facilitate testing</remarks>
707  /// <param name="algorithm">The algorithm instance</param>
708  /// <param name="liveMode">Whether the algorithm is in live mode</param>
709  public static void ProcessVolatilityHistoryRequirements(IAlgorithm algorithm, bool liveMode)
710  {
711  Log.Trace("ProcessVolatilityHistoryRequirements(): Updating volatility models with historical data...");
712 
713  foreach (var security in algorithm.Securities.Values)
714  {
715  security.VolatilityModel.WarmUp(algorithm.HistoryProvider, algorithm.SubscriptionManager, security, algorithm.UtcTime,
716  algorithm.TimeZone, liveMode);
717  }
718 
719  Log.Trace("ProcessVolatilityHistoryRequirements(): finished.");
720  }
721 
722  /// <summary>
723  /// Helper method to apply a split to an algorithm instance
724  /// </summary>
725  public static void HandleSplits(TimeSlice timeSlice, IAlgorithm algorithm, bool liveMode)
726  {
727  foreach (var split in timeSlice.Slice.Splits.Values)
728  {
729  try
730  {
731  // only process split occurred events (ignore warnings)
732  if (split.Type != SplitType.SplitOccurred)
733  {
734  continue;
735  }
736 
737  if (liveMode && algorithm.IsWarmingUp)
738  {
739  // skip past split during live warmup, the algorithms position already reflects them
740  Log.Trace($"AlgorithmManager.Run(): {algorithm.Time}: Skip Split during live warmup: {split}");
741  continue;
742  }
743 
744  if (Log.DebuggingEnabled)
745  {
746  Log.Debug($"AlgorithmManager.Run(): {algorithm.Time}: Applying Split for {split.Symbol}");
747  }
748 
749  Security security = null;
750  if (algorithm.Securities.TryGetValue(split.Symbol, out security) && liveMode)
751  {
752  Log.Trace($"AlgorithmManager.Run(): {algorithm.Time}: Pre-Split for {split}. Security Price: {security.Price} Holdings: {security.Holdings.Quantity}");
753  }
754 
756  .GetSubscriptionDataConfigs(split.Symbol)
757  .DataNormalizationMode();
758 
759  // apply the split event to the portfolio
760  algorithm.Portfolio.ApplySplit(split, security, liveMode, mode);
761 
762  // apply the split event to the trade builder
763  algorithm.TradeBuilder.ApplySplit(split, liveMode, mode);
764 
765  // apply the split event to the security volatility model
766  ApplySplitOrDividendToVolatilityModel(algorithm, security, liveMode, mode);
767 
768  if (liveMode && security != null)
769  {
770  Log.Trace($"AlgorithmManager.Run(): {algorithm.Time}: Post-Split for {split}. Security Price: {security.Price} Holdings: {security.Holdings.Quantity}");
771  }
772 
773  // apply the split to open orders as well in raw mode, all other modes are split adjusted
774  if (liveMode || mode == DataNormalizationMode.Raw)
775  {
776  // in live mode we always want to have our order match the order at the brokerage, so apply the split to the orders
777  var openOrders = algorithm.Transactions.GetOpenOrderTickets(ticket => ticket.Symbol == split.Symbol);
778  algorithm.BrokerageModel.ApplySplit(openOrders.ToList(), split);
779  }
780  }
781  catch (Exception err)
782  {
783  algorithm.SetRuntimeError(err, "Split event");
784  return;
785  }
786  }
787  }
788 
789  /// <summary>
790  /// Helper method to apply a dividend to an algorithm instance
791  /// </summary>
792  public static void HandleDividends(TimeSlice timeSlice, IAlgorithm algorithm, bool liveMode)
793  {
794  foreach (var dividend in timeSlice.Slice.Dividends.Values)
795  {
796  if (liveMode && algorithm.IsWarmingUp)
797  {
798  // skip past dividends during live warmup, the algorithms position already reflects them
799  Log.Trace($"AlgorithmManager.Run(): {algorithm.Time}: Skip Dividend during live warmup: {dividend}");
800  continue;
801  }
802 
803  if (Log.DebuggingEnabled)
804  {
805  Log.Debug($"AlgorithmManager.Run(): {algorithm.Time}: Applying Dividend: {dividend}");
806  }
807 
808  Security security = null;
809  if (algorithm.Securities.TryGetValue(dividend.Symbol, out security) && liveMode)
810  {
811  Log.Trace($"AlgorithmManager.Run(): {algorithm.Time}: Pre-Dividend: {dividend}. " +
812  $"Security Holdings: {security.Holdings.Quantity} Account Currency Holdings: " +
813  $"{algorithm.Portfolio.CashBook[algorithm.AccountCurrency].Amount}");
814  }
815 
817  .GetSubscriptionDataConfigs(dividend.Symbol)
818  .DataNormalizationMode();
819 
820  // apply the dividend event to the portfolio
821  algorithm.Portfolio.ApplyDividend(dividend, liveMode, mode);
822 
823  // apply the dividend event to the security volatility model
824  ApplySplitOrDividendToVolatilityModel(algorithm, security, liveMode, mode);
825 
826  if (liveMode && security != null)
827  {
828  Log.Trace($"AlgorithmManager.Run(): {algorithm.Time}: Post-Dividend: {dividend}. Security " +
829  $"Holdings: {security.Holdings.Quantity} Account Currency Holdings: " +
830  $"{algorithm.Portfolio.CashBook[algorithm.AccountCurrency].Amount}");
831  }
832  }
833  }
834 
835  /// <summary>
836  /// Keeps track of split warnings so we can later liquidate option contracts
837  /// </summary>
838  private void HandleSplitSymbols(Splits newSplits, List<Split> splitWarnings)
839  {
840  foreach (var split in newSplits.Values)
841  {
842  if (split.Type != SplitType.Warning)
843  {
844  if (Log.DebuggingEnabled)
845  {
846  Log.Debug($"AlgorithmManager.HandleSplitSymbols(): {_algorithm.Time} - Security split occurred: Split Factor: {split} Reference Price: {split.ReferencePrice}");
847  }
848  continue;
849  }
850 
851  if (Log.DebuggingEnabled)
852  {
853  Log.Debug($"AlgorithmManager.HandleSplitSymbols(): {_algorithm.Time} - Security split warning: {split}");
854  }
855 
856  if (!splitWarnings.Any(x => x.Symbol == split.Symbol && x.Type == SplitType.Warning))
857  {
858  splitWarnings.Add(split);
859  }
860  }
861  }
862 
863  /// <summary>
864  /// Liquidate option contact holdings who's underlying security has split
865  /// </summary>
866  private void ProcessSplitSymbols(IAlgorithm algorithm, List<Split> splitWarnings, List<Delisting> pendingDelistings)
867  {
868  // NOTE: This method assumes option contracts have the same core trading hours as their underlying contract
869  // This is a small performance optimization to prevent scanning every contract on every time step,
870  // instead we scan just the underlyings, thereby reducing the time footprint of this methods by a factor
871  // of N, the number of derivative subscriptions
872  for (int i = splitWarnings.Count - 1; i >= 0; i--)
873  {
874  var split = splitWarnings[i];
875  var security = algorithm.Securities[split.Symbol];
876 
877  if (!security.IsTradable
878  && !algorithm.UniverseManager.ActiveSecurities.Keys.Contains(split.Symbol))
879  {
880  Log.Debug($"AlgorithmManager.ProcessSplitSymbols(): {_algorithm.Time} - Removing split warning for {security.Symbol}");
881 
882  // remove the warning from out list
883  splitWarnings.RemoveAt(i);
884  // Since we are storing the split warnings for a loop
885  // we need to check if the security was removed.
886  // When removed, it will be marked as non tradable but just in case
887  // we expect it not to be an active security either
888  continue;
889  }
890 
891  var nextMarketClose = security.Exchange.Hours.GetNextMarketClose(security.LocalTime, false);
892 
893  // determine the latest possible time we can submit a MOC order
894  var configs = algorithm.SubscriptionManager.SubscriptionDataConfigService
895  .GetSubscriptionDataConfigs(security.Symbol);
896 
897  if (configs.Count == 0)
898  {
899  // should never happen at this point, if it does let's give some extra info
900  throw new Exception(
901  $"AlgorithmManager.ProcessSplitSymbols(): {_algorithm.Time} - No subscriptions found for {security.Symbol}" +
902  $", IsTradable: {security.IsTradable}" +
903  $", Active: {algorithm.UniverseManager.ActiveSecurities.Keys.Contains(split.Symbol)}");
904  }
905 
906  var latestMarketOnCloseTimeRoundedDownByResolution = nextMarketClose.Subtract(MarketOnCloseOrder.SubmissionTimeBuffer)
907  .RoundDownInTimeZone(configs.GetHighestResolution().ToTimeSpan(), security.Exchange.TimeZone, configs.First().DataTimeZone);
908 
909  // we don't need to do anyhing until the market closes
910  if (security.LocalTime < latestMarketOnCloseTimeRoundedDownByResolution) continue;
911 
912  // fetch all option derivatives of the underlying with holdings (excluding the canonical security)
913  var derivatives = algorithm.Securities.Values.Where(potentialDerivate =>
914  potentialDerivate.Symbol.SecurityType.IsOption() &&
915  potentialDerivate.Symbol.Underlying == security.Symbol &&
916  !potentialDerivate.Symbol.Underlying.IsCanonical() &&
917  potentialDerivate.HoldStock
918  );
919 
920  foreach (var derivative in derivatives)
921  {
922  var optionContractSymbol = derivative.Symbol;
923  var optionContractSecurity = (Option)derivative;
924 
925  if (pendingDelistings.Any(x => x.Symbol == optionContractSymbol
926  && x.Time.Date == optionContractSecurity.LocalTime.Date))
927  {
928  // if the option is going to be delisted today we skip sending the market on close order
929  continue;
930  }
931 
932  // close any open orders
933  algorithm.Transactions.CancelOpenOrders(optionContractSymbol, "Canceled due to impending split. Separate MarketOnClose order submitted to liquidate position.");
934 
935  var request = new SubmitOrderRequest(OrderType.MarketOnClose, optionContractSecurity.Type, optionContractSymbol,
936  -optionContractSecurity.Holdings.Quantity, 0, 0, algorithm.UtcTime,
937  "Liquidated due to impending split. Option splits are not currently supported."
938  );
939 
940  // send MOC order to liquidate option contract holdings
941  algorithm.Transactions.AddOrder(request);
942 
943  // mark option contract as not tradable
944  optionContractSecurity.IsTradable = false;
945 
946  algorithm.Debug($"MarketOnClose order submitted for option contract '{optionContractSymbol}' due to impending {split.Symbol.Value} split event. "
947  + "Option splits are not currently supported.");
948  }
949 
950  // remove the warning from out list
951  splitWarnings.RemoveAt(i);
952  }
953  }
954 
955  /// <summary>
956  /// Warms up the security's volatility model in the case of a split or dividend to avoid discontinuities when data is raw or in live mode
957  /// </summary>
958  private static void ApplySplitOrDividendToVolatilityModel(IAlgorithm algorithm, Security security, bool liveMode,
959  DataNormalizationMode dataNormalizationMode)
960  {
961  if (security.Type == SecurityType.Equity && (liveMode || dataNormalizationMode == DataNormalizationMode.Raw))
962  {
963  security?.VolatilityModel.WarmUp(algorithm.HistoryProvider, algorithm.SubscriptionManager, security, algorithm.UtcTime,
964  algorithm.TimeZone, liveMode, dataNormalizationMode);
965  }
966  }
967 
968  /// <summary>
969  /// Constructs the correct <see cref="ITokenBucket"/> instance per the provided controls.
970  /// The provided controls will be null when
971  /// </summary>
972  private static ITokenBucket CreateTokenBucket(LeakyBucketControlParameters controls)
973  {
974  if (controls == null)
975  {
976  // this will only be null when the AlgorithmManager is being initialized outside of LEAN
977  // for example, in unit tests that don't provide a job package as well as from Research
978  // in each of the above cases, it seems best to not enforce the leaky bucket restrictions
979  return TokenBucket.Null;
980  }
981 
982  Log.Trace("AlgorithmManager.CreateTokenBucket(): Initializing LeakyBucket: " +
983  $"Capacity: {controls.Capacity} " +
984  $"RefillAmount: {controls.RefillAmount} " +
985  $"TimeInterval: {controls.TimeIntervalMinutes}"
986  );
987 
988  // these parameters view 'minutes' as the resource being rate limited. the capacity is the total
989  // number of minutes available for burst operations and after controls.TimeIntervalMinutes time
990  // has passed, we'll add controls.RefillAmount to the 'minutes' available, maxing at controls.Capacity
991  return new LeakyBucket(
992  controls.Capacity,
993  controls.RefillAmount,
994  TimeSpan.FromMinutes(controls.TimeIntervalMinutes)
995  );
996  }
997  }
998 }