Lean  $LEAN_TAG$
Engine.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.IO;
19 using System.Linq;
20 using System.Runtime.CompilerServices;
21 using System.Threading;
22 using System.Threading.Tasks;
25 using QuantConnect.Data;
32 using QuantConnect.Logging;
33 using QuantConnect.Orders;
34 using QuantConnect.Packets;
36 using QuantConnect.Util;
37 using static QuantConnect.StringExtensions;
38 
40 {
41  /// <summary>
42  /// LEAN ALGORITHMIC TRADING ENGINE: ENTRY POINT.
43  ///
44  /// The engine loads new tasks, create the algorithms and threads, and sends them
45  /// to Algorithm Manager to be executed. It is the primary operating loop.
46  /// </summary>
47  public class Engine
48  {
49  private bool _historyStartDateLimitedWarningEmitted;
50  private bool _historyNumericalPrecisionLimitedWarningEmitted;
51  private readonly bool _liveMode;
52 
53  /// <summary>
54  /// Gets the configured system handlers for this engine instance
55  /// </summary>
57 
58  /// <summary>
59  /// Gets the configured algorithm handlers for this engine instance
60  /// </summary>
62 
63  /// <summary>
64  /// Initializes a new instance of the <see cref="Engine"/> class using the specified handlers
65  /// </summary>
66  /// <param name="systemHandlers">The system handlers for controlling acquisition of jobs, messaging, and api calls</param>
67  /// <param name="algorithmHandlers">The algorithm handlers for managing algorithm initialization, data, results, transaction, and real time events</param>
68  /// <param name="liveMode">True when running in live mode, false otherwise</param>
69  public Engine(LeanEngineSystemHandlers systemHandlers, LeanEngineAlgorithmHandlers algorithmHandlers, bool liveMode)
70  {
71  _liveMode = liveMode;
72  SystemHandlers = systemHandlers;
73  AlgorithmHandlers = algorithmHandlers;
74  }
75 
76  /// <summary>
77  /// Runs a single backtest/live job from the job queue
78  /// </summary>
79  /// <param name="job">The algorithm job to be processed</param>
80  /// <param name="manager">The algorithm manager instance</param>
81  /// <param name="assemblyPath">The path to the algorithm's assembly</param>
82  /// <param name="workerThread">The worker thread instance</param>
83  public void Run(AlgorithmNodePacket job, AlgorithmManager manager, string assemblyPath, WorkerThread workerThread)
84  {
85  var marketHoursDatabaseTask = Task.Run(() => StaticInitializations());
86 
87  var algorithm = default(IAlgorithm);
88  var algorithmManager = manager;
89 
90  try
91  {
92  Log.Trace($"Engine.Run(): Resource limits '{job.Controls.CpuAllocation}' CPUs. {job.Controls.RamAllocation} MB RAM.");
94 
95  //Reset thread holders.
96  var initializeComplete = false;
97 
98  //-> Initialize messaging system
100 
101  //-> Set the result handler type for this algorithm job, and launch the associated result thread.
103 
104  IBrokerage brokerage = null;
105  DataManager dataManager = null;
106  var synchronizer = _liveMode ? new LiveSynchronizer() : new Synchronizer();
107  try
108  {
109  // we get the mhdb before creating the algorithm instance,
110  // since the algorithm constructor will use it
111  var marketHoursDatabase = marketHoursDatabaseTask.Result;
112 
113  AlgorithmHandlers.Setup.WorkerThread = workerThread;
114 
115  // Save algorithm to cache, load algorithm instance:
116  algorithm = AlgorithmHandlers.Setup.CreateAlgorithmInstance(job, assemblyPath);
117 
118  algorithm.ProjectId = job.ProjectId;
119 
120  // Set algorithm in ILeanManager
121  SystemHandlers.LeanManager.SetAlgorithm(algorithm);
122 
123  // initialize the object store
125 
126  // initialize the data permission manager
128 
129  // notify the user of any errors w/ object store persistence
130  AlgorithmHandlers.ObjectStore.ErrorRaised += (sender, args) => algorithm.Debug($"ObjectStore Persistence Error: {args.Error.Message}");
131 
132  // set the order processor on the transaction manager,needs to be done before initializing the brokerage which might start using it
133  algorithm.Transactions.SetOrderProcessor(AlgorithmHandlers.Transactions);
134 
135  // Initialize the brokerage
136  IBrokerageFactory factory;
137  brokerage = AlgorithmHandlers.Setup.CreateBrokerage(job, algorithm, out factory);
138 
139  // forward brokerage message events to the result handler
140  brokerage.Message += (_, e) => AlgorithmHandlers.Results.BrokerageMessage(e);
141 
142  var symbolPropertiesDatabase = SymbolPropertiesDatabase.FromDataFolder();
143  var mapFilePrimaryExchangeProvider = new MapFilePrimaryExchangeProvider(AlgorithmHandlers.MapFileProvider);
144  var registeredTypesProvider = new RegisteredSecurityDataTypesProvider();
145  var securityService = new SecurityService(algorithm.Portfolio.CashBook,
146  marketHoursDatabase,
147  symbolPropertiesDatabase,
148  algorithm,
149  registeredTypesProvider,
150  new SecurityCacheProvider(algorithm.Portfolio),
151  mapFilePrimaryExchangeProvider,
152  algorithm);
153 
154  algorithm.Securities.SetSecurityService(securityService);
155 
156  dataManager = new DataManager(AlgorithmHandlers.DataFeed,
157  new UniverseSelection(
158  algorithm,
159  securityService,
162  algorithm,
163  algorithm.TimeKeeper,
164  marketHoursDatabase,
165  _liveMode,
166  registeredTypesProvider,
168 
169  algorithm.SubscriptionManager.SetDataManager(dataManager);
170 
171  synchronizer.Initialize(algorithm, dataManager);
172 
173  // Set the algorithm's object store before initializing the data feed, which might use it
174  algorithm.SetObjectStore(AlgorithmHandlers.ObjectStore);
175 
176  // Initialize the data feed before we initialize so he can intercept added securities/universes via events
178  algorithm,
179  job,
184  dataManager,
185  (IDataFeedTimeProvider) synchronizer,
187 
188  // set the history provider before setting up the algorithm
189  var historyProvider = GetHistoryProvider();
190  historyProvider.SetBrokerage(brokerage);
191  historyProvider.Initialize(
193  job,
199  progress =>
200  {
201  // send progress updates to the result handler only during initialization
202  if (!algorithm.GetLocked() || algorithm.IsWarmingUp)
203  {
204  AlgorithmHandlers.Results.SendStatusUpdate(AlgorithmStatus.History,
205  Invariant($"Processing history {progress}%..."));
206  }
207  },
208  // disable parallel history requests for live trading
209  parallelHistoryRequestsEnabled: !_liveMode,
210  dataPermissionManager: AlgorithmHandlers.DataPermissionsManager,
211  objectStore: algorithm.ObjectStore,
212  algorithmSettings: algorithm.Settings
213  )
214  );
215 
216  historyProvider.InvalidConfigurationDetected += (sender, args) => { AlgorithmHandlers.Results.ErrorMessage(args.Message); };
217  historyProvider.DownloadFailed += (sender, args) => { AlgorithmHandlers.Results.ErrorMessage(args.Message, args.StackTrace); };
218  historyProvider.ReaderErrorDetected += (sender, args) => { AlgorithmHandlers.Results.RuntimeError(args.Message, args.StackTrace); };
219 
220  algorithm.HistoryProvider = historyProvider;
221 
222  // initialize the default brokerage message handler
223  algorithm.BrokerageMessageHandler = factory.CreateBrokerageMessageHandler(algorithm, job, SystemHandlers.Api);
224 
225  //Initialize the internal state of algorithm and job: executes the algorithm.Initialize() method.
226  initializeComplete = AlgorithmHandlers.Setup.Setup(new SetupHandlerParameters(dataManager.UniverseSelection, algorithm,
229 
230  // set this again now that we've actually added securities
232 
233  //If there are any reasons it failed, pass these back to the IDE.
234  if (!initializeComplete || AlgorithmHandlers.Setup.Errors.Count > 0)
235  {
236  initializeComplete = false;
237  //Get all the error messages: internal in algorithm and external in setup handler.
238  var errorMessage = string.Join(",", algorithm.ErrorMessages);
239  string stackTrace = "";
240  errorMessage += string.Join(",", AlgorithmHandlers.Setup.Errors.Select(e =>
241  {
242  var message = e.Message;
243  if (e.InnerException != null)
244  {
245  var interpreter = StackExceptionInterpreter.Instance.Value;
246  var err = interpreter.Interpret(e.InnerException);
247  var stackMessage = interpreter.GetExceptionMessageHeader(err);
248  message += stackMessage;
249  stackTrace += stackMessage;
250  }
251  return message;
252  }));
253  Log.Error("Engine.Run(): " + errorMessage);
254  AlgorithmHandlers.Results.RuntimeError(errorMessage, stackTrace);
255  SystemHandlers.Api.SetAlgorithmStatus(job.AlgorithmId, AlgorithmStatus.RuntimeError, errorMessage);
256  }
257  }
258  catch (Exception err)
259  {
260  Log.Error(err);
261 
262  // for python we don't add the ugly pythonNet stack trace
263  var stackTrace = job.Language == Language.Python ? err.Message : err.ToString();
264 
265  var runtimeMessage = "Algorithm.Initialize() Error: " + err.Message + " Stack Trace: " + stackTrace;
266  AlgorithmHandlers.Results.RuntimeError(runtimeMessage, stackTrace);
267  SystemHandlers.Api.SetAlgorithmStatus(job.AlgorithmId, AlgorithmStatus.RuntimeError, runtimeMessage);
268  }
269 
270 
271  var historyProviderName = algorithm?.HistoryProvider != null ? algorithm.HistoryProvider.GetType().FullName : string.Empty;
272  // log the job endpoints
273  Log.Trace($"JOB HANDLERS:{Environment.NewLine}" +
274  $" DataFeed: {AlgorithmHandlers.DataFeed.GetType().FullName}{Environment.NewLine}" +
275  $" Setup: {AlgorithmHandlers.Setup.GetType().FullName}{Environment.NewLine}" +
276  $" RealTime: {AlgorithmHandlers.RealTime.GetType().FullName}{Environment.NewLine}" +
277  $" Results: {AlgorithmHandlers.Results.GetType().FullName}{Environment.NewLine}" +
278  $" Transactions: {AlgorithmHandlers.Transactions.GetType().FullName}{Environment.NewLine}" +
279  $" Object Store: {AlgorithmHandlers.ObjectStore.GetType().FullName}{Environment.NewLine}" +
280  $" History Provider: {historyProviderName}{Environment.NewLine}" +
281  $" Brokerage: {brokerage?.GetType().FullName}{Environment.NewLine}" +
282  $" Data Provider: {AlgorithmHandlers.DataProvider.GetType().FullName}{Environment.NewLine}");
283 
284  //-> Using the job + initialization: load the designated handlers:
285  if (initializeComplete)
286  {
287  // notify the LEAN manager that the algorithm is initialized and starting
288  SystemHandlers.LeanManager.OnAlgorithmStart();
289 
290  //-> Reset the backtest stopwatch; we're now running the algorithm.
291  var startTime = DateTime.UtcNow;
292 
293  //Set algorithm as locked; set it to live mode if we're trading live, and set it to locked for no further updates.
294  algorithm.SetAlgorithmId(job.AlgorithmId);
295  algorithm.SetLocked();
296 
297  //Load the associated handlers for transaction and realtime events:
298  AlgorithmHandlers.Transactions.Initialize(algorithm, brokerage, AlgorithmHandlers.Results);
299  AlgorithmHandlers.RealTime.Setup(algorithm, job, AlgorithmHandlers.Results, SystemHandlers.Api, algorithmManager.TimeLimit);
300 
301  // wire up the brokerage message handler
302  brokerage.Message += (sender, message) =>
303  {
304  algorithm.BrokerageMessageHandler.HandleMessage(message);
305 
306  // fire brokerage message events
307  algorithm.OnBrokerageMessage(message);
308  switch (message.Type)
309  {
310  case BrokerageMessageType.Disconnect:
311  algorithm.OnBrokerageDisconnect();
312  break;
313  case BrokerageMessageType.Reconnect:
314  algorithm.OnBrokerageReconnect();
315  break;
316  }
317  };
318 
319  // Result manager scanning message queue: (started earlier)
320  AlgorithmHandlers.Results.DebugMessage(
321  $"Launching analysis for {job.AlgorithmId} with LEAN Engine v{Globals.Version}");
322 
323  try
324  {
325  //Create a new engine isolator class
326  var isolator = new Isolator();
327 
328  // Execute the Algorithm Code:
329  var complete = isolator.ExecuteWithTimeLimit(AlgorithmHandlers.Setup.MaximumRuntime, algorithmManager.TimeLimit.IsWithinLimit, () =>
330  {
331  try
332  {
333  //Run Algorithm Job:
334  // -> Using this Data Feed,
335  // -> Send Orders to this TransactionHandler,
336  // -> Send Results to ResultHandler.
337  algorithmManager.Run(job, algorithm, synchronizer, AlgorithmHandlers.Transactions, AlgorithmHandlers.Results, AlgorithmHandlers.RealTime, SystemHandlers.LeanManager, isolator.CancellationTokenSource);
338  }
339  catch (Exception err)
340  {
341  algorithm.SetRuntimeError(err, "AlgorithmManager.Run");
342  return;
343  }
344 
345  Log.Trace("Engine.Run(): Exiting Algorithm Manager");
346  }, job.Controls.RamAllocation, workerThread:workerThread, sleepIntervalMillis: algorithm.LiveMode ? 10000 : 1000);
347 
348  if (!complete)
349  {
350  Log.Error("Engine.Main(): Failed to complete in time: " + AlgorithmHandlers.Setup.MaximumRuntime.ToStringInvariant("F"));
351  throw new Exception("Failed to complete algorithm within " + AlgorithmHandlers.Setup.MaximumRuntime.ToStringInvariant("F")
352  + " seconds. Please make it run faster.");
353  }
354  }
355  catch (Exception err)
356  {
357  //Error running the user algorithm: purge datafeed, send error messages, set algorithm status to failed.
358  algorithm.SetRuntimeError(err, "Engine Isolator");
359  }
360 
361  // Algorithm runtime error:
362  if (algorithm.RunTimeError != null)
363  {
364  HandleAlgorithmError(job, algorithm.RunTimeError);
365  }
366 
367  // notify the LEAN manager that the algorithm has finished
368  SystemHandlers.LeanManager.OnAlgorithmEnd();
369 
370  try
371  {
372  var csvTransactionsFileName = Config.Get("transaction-log");
373  if (!string.IsNullOrEmpty(csvTransactionsFileName))
374  {
375  SaveListOfTrades(AlgorithmHandlers.Transactions, csvTransactionsFileName);
376  }
377 
378  if (!_liveMode)
379  {
380  //Diagnostics Completed, Send Result Packet:
381  var totalSeconds = (DateTime.UtcNow - startTime).TotalSeconds;
382  var dataPoints = algorithmManager.DataPoints + algorithm.HistoryProvider.DataPointCount;
383  var kps = dataPoints / (double) 1000 / totalSeconds;
384  AlgorithmHandlers.Results.DebugMessage($"Algorithm Id:({job.AlgorithmId}) completed in {totalSeconds:F2} seconds at {kps:F0}k data points per second. Processing total of {dataPoints:N0} data points.");
385  }
386  }
387  catch (Exception err)
388  {
389  Log.Error(err, "Error sending analysis results");
390  }
391 
392  //Before we return, send terminate commands to close up the threads
395  dataManager?.RemoveAllSubscriptions();
396  workerThread?.Dispose();
397  }
398 
399  synchronizer.DisposeSafely();
400  // Close data feed, alphas. Could be running even if algorithm initialization failed
401  AlgorithmHandlers.DataFeed.Exit();
402 
403  //Close result handler:
404  AlgorithmHandlers.Results.Exit();
405 
406  //Wait for the threads to complete:
407  var millisecondInterval = 10;
408  var millisecondTotalWait = 0;
409  while ((AlgorithmHandlers.Results.IsActive
410  || (AlgorithmHandlers.Transactions != null && AlgorithmHandlers.Transactions.IsActive)
411  || (AlgorithmHandlers.DataFeed != null && AlgorithmHandlers.DataFeed.IsActive)
412  || (AlgorithmHandlers.RealTime != null && AlgorithmHandlers.RealTime.IsActive))
413  && millisecondTotalWait < 30*1000)
414  {
415  Thread.Sleep(millisecondInterval);
416  if (millisecondTotalWait % (millisecondInterval * 10) == 0)
417  {
418  Log.Trace("Waiting for threads to exit...");
419  }
420  millisecondTotalWait += millisecondInterval;
421  }
422 
423  if (brokerage != null)
424  {
425  Log.Trace("Engine.Run(): Disconnecting from brokerage...");
426  brokerage.Disconnect();
427  brokerage.Dispose();
428  }
429  if (AlgorithmHandlers.Setup != null)
430  {
431  Log.Trace("Engine.Run(): Disposing of setup handler...");
432  AlgorithmHandlers.Setup.Dispose();
433  }
434 
435  Log.Trace("Engine.Main(): Analysis Completed and Results Posted.");
436  }
437  catch (Exception err)
438  {
439  Log.Error(err, "Error running algorithm");
440  }
441  finally
442  {
443  //No matter what for live mode; make sure we've set algorithm status in the API for "not running" conditions:
444  if (_liveMode && algorithmManager.State != AlgorithmStatus.Running && algorithmManager.State != AlgorithmStatus.RuntimeError)
445  SystemHandlers.Api.SetAlgorithmStatus(job.AlgorithmId, algorithmManager.State);
446 
447  AlgorithmHandlers.Results.Exit();
448  AlgorithmHandlers.DataFeed.Exit();
449  AlgorithmHandlers.Transactions.Exit();
450  AlgorithmHandlers.RealTime.Exit();
451  AlgorithmHandlers.DataMonitor.Exit();
452  }
453  }
454 
455  /// <summary>
456  /// Handle an error in the algorithm.Run method.
457  /// </summary>
458  /// <param name="job">Job we're processing</param>
459  /// <param name="err">Error from algorithm stack</param>
460  private void HandleAlgorithmError(AlgorithmNodePacket job, Exception err)
461  {
462  AlgorithmHandlers.DataFeed?.Exit();
463  if (AlgorithmHandlers.Results != null)
464  {
465  var message = $"Runtime Error: {err.Message}";
466  Log.Trace("Engine.Run(): Sending runtime error to user...");
467  AlgorithmHandlers.Results.LogMessage(message);
468 
469  // for python we don't add the ugly pythonNet stack trace
470  var stackTrace = job.Language == Language.Python ? err.Message : err.ToString();
471 
472  AlgorithmHandlers.Results.RuntimeError(message, stackTrace);
473  SystemHandlers.Api.SetAlgorithmStatus(job.AlgorithmId, AlgorithmStatus.RuntimeError, $"{message} Stack Trace: {stackTrace}");
474  }
475  }
476 
477  /// <summary>
478  /// Load the history provider from the Composer
479  /// </summary>
480  private HistoryProviderManager GetHistoryProvider()
481  {
482  var provider = new HistoryProviderManager();
483 
484  provider.InvalidConfigurationDetected += (sender, args) => { AlgorithmHandlers.Results.ErrorMessage(args.Message); };
485  provider.NumericalPrecisionLimited += (sender, args) =>
486  {
487  if (!_historyNumericalPrecisionLimitedWarningEmitted)
488  {
489  _historyNumericalPrecisionLimitedWarningEmitted = true;
490  AlgorithmHandlers.Results.DebugMessage("Warning: when performing history requests, the start date will be adjusted if there are numerical precision errors in the factor files.");
491  }
492  };
493  provider.StartDateLimited += (sender, args) =>
494  {
495  if (!_historyStartDateLimitedWarningEmitted)
496  {
497  _historyStartDateLimitedWarningEmitted = true;
498  AlgorithmHandlers.Results.DebugMessage("Warning: when performing history requests, the start date will be adjusted if it is before the first known date for the symbol.");
499  }
500  };
501  provider.DownloadFailed += (sender, args) => { AlgorithmHandlers.Results.ErrorMessage(args.Message, args.StackTrace); };
502  provider.ReaderErrorDetected += (sender, args) => { AlgorithmHandlers.Results.RuntimeError(args.Message, args.StackTrace); };
503 
504  return provider;
505  }
506 
507  /// <summary>
508  /// Save a list of trades to disk for a given path
509  /// </summary>
510  /// <param name="transactions">Transactions list via an OrderProvider</param>
511  /// <param name="csvFileName">File path to create</param>
512  private static void SaveListOfTrades(IOrderProvider transactions, string csvFileName)
513  {
514  var orders = transactions.GetOrders(x => x.Status.IsFill());
515 
516  var path = Path.GetDirectoryName(csvFileName);
517  if (path != null && !Directory.Exists(path))
518  Directory.CreateDirectory(path);
519 
520  using (var writer = new StreamWriter(csvFileName))
521  {
522  foreach (var order in orders)
523  {
524  var line = Invariant($"{order.Time.ToStringInvariant("yyyy-MM-dd HH:mm:ss")},") +
525  Invariant($"{order.Symbol.Value},{order.Direction},{order.Quantity},{order.Price}");
526  writer.WriteLine(line);
527  }
528  }
529  }
530 
531  /// <summary>
532  /// Initialize slow static variables
533  /// </summary>
534  [MethodImpl(MethodImplOptions.NoOptimization | MethodImplOptions.NoInlining)]
535  private static MarketHoursDatabase StaticInitializations()
536  {
537  // This is slow because it create all static timezones
538  var nyTime = TimeZones.NewYork;
539  // slow because if goes to disk and parses json
541  }
542 
543  } // End Algorithm Node Core Thread
544 } // End Namespace