20 using System.Runtime.CompilerServices;
21 using System.Threading;
22 using System.Threading.Tasks;
49 private bool _historyStartDateLimitedWarningEmitted;
50 private bool _historyNumericalPrecisionLimitedWarningEmitted;
51 private readonly
bool _liveMode;
85 var marketHoursDatabaseTask = Task.Run(() => StaticInitializations());
88 var algorithmManager = manager;
92 Log.
Trace($
"Engine.Run(): Resource limits '{job.Controls.CpuAllocation}' CPUs. {job.Controls.RamAllocation} MB RAM.");
96 var initializeComplete =
false;
111 var marketHoursDatabase = marketHoursDatabaseTask.Result;
145 var securityService =
new SecurityService(algorithm.Portfolio.CashBook,
147 symbolPropertiesDatabase,
149 registeredTypesProvider,
151 mapFilePrimaryExchangeProvider,
154 algorithm.Securities.SetSecurityService(securityService);
163 algorithm.TimeKeeper,
166 registeredTypesProvider,
169 algorithm.SubscriptionManager.SetDataManager(dataManager);
171 synchronizer.Initialize(algorithm, dataManager);
189 var historyProvider = GetHistoryProvider();
190 historyProvider.SetBrokerage(brokerage);
191 historyProvider.Initialize(
202 if (!algorithm.GetLocked() || algorithm.IsWarmingUp)
204 AlgorithmHandlers.Results.SendStatusUpdate(AlgorithmStatus.History,
205 Invariant($
"Processing history {progress}%..."));
209 parallelHistoryRequestsEnabled: !_liveMode,
211 objectStore: algorithm.ObjectStore,
212 algorithmSettings: algorithm.Settings
216 historyProvider.InvalidConfigurationDetected += (sender, args) => {
AlgorithmHandlers.
Results.ErrorMessage(args.Message); };
217 historyProvider.DownloadFailed += (sender, args) => {
AlgorithmHandlers.
Results.ErrorMessage(args.Message, args.StackTrace); };
218 historyProvider.ReaderErrorDetected += (sender, args) => {
AlgorithmHandlers.
Results.RuntimeError(args.Message, args.StackTrace); };
220 algorithm.HistoryProvider = historyProvider;
223 algorithm.BrokerageMessageHandler = factory.CreateBrokerageMessageHandler(algorithm, job,
SystemHandlers.
Api);
236 initializeComplete =
false;
238 var errorMessage =
string.Join(
",", algorithm.ErrorMessages);
239 string stackTrace =
"";
242 var message = e.Message;
243 if (e.InnerException !=
null)
246 var err = interpreter.Interpret(e.InnerException);
247 var stackMessage = interpreter.GetExceptionMessageHeader(err);
248 message += stackMessage;
249 stackTrace += stackMessage;
253 Log.
Error(
"Engine.Run(): " + errorMessage);
258 catch (Exception err)
263 var stackTrace = job.Language ==
Language.Python ? err.Message : err.ToString();
265 var runtimeMessage =
"Algorithm.Initialize() Error: " + err.Message +
" Stack Trace: " + stackTrace;
271 var historyProviderName = algorithm?.HistoryProvider !=
null ? algorithm.HistoryProvider.GetType().FullName :
string.Empty;
273 Log.
Trace($
"JOB HANDLERS:{Environment.NewLine}" +
274 $
" DataFeed: {AlgorithmHandlers.DataFeed.GetType().FullName}{Environment.NewLine}" +
275 $
" Setup: {AlgorithmHandlers.Setup.GetType().FullName}{Environment.NewLine}" +
276 $
" RealTime: {AlgorithmHandlers.RealTime.GetType().FullName}{Environment.NewLine}" +
277 $
" Results: {AlgorithmHandlers.Results.GetType().FullName}{Environment.NewLine}" +
278 $
" Transactions: {AlgorithmHandlers.Transactions.GetType().FullName}{Environment.NewLine}" +
279 $
" Object Store: {AlgorithmHandlers.ObjectStore.GetType().FullName}{Environment.NewLine}" +
280 $
" History Provider: {historyProviderName}{Environment.NewLine}" +
281 $
" Brokerage: {brokerage?.GetType().FullName}{Environment.NewLine}" +
282 $
" Data Provider: {AlgorithmHandlers.DataProvider.GetType().FullName}{Environment.NewLine}");
285 if (initializeComplete)
291 var startTime = DateTime.UtcNow;
294 algorithm.SetAlgorithmId(job.AlgorithmId);
295 algorithm.SetLocked();
302 brokerage.Message += (sender, message) =>
304 algorithm.BrokerageMessageHandler.HandleMessage(message);
307 algorithm.OnBrokerageMessage(message);
308 switch (message.Type)
311 algorithm.OnBrokerageDisconnect();
314 algorithm.OnBrokerageReconnect();
321 $
"Launching analysis for {job.AlgorithmId} with LEAN Engine v{Globals.Version}");
326 var isolator =
new Isolator();
337 algorithmManager.Run(job, algorithm, synchronizer, AlgorithmHandlers.Transactions, AlgorithmHandlers.Results, AlgorithmHandlers.RealTime, SystemHandlers.LeanManager, isolator.CancellationTokenSource);
339 catch (Exception err)
341 algorithm.SetRuntimeError(err,
"AlgorithmManager.Run");
345 Log.
Trace(
"Engine.Run(): Exiting Algorithm Manager");
346 }, job.Controls.RamAllocation, workerThread:workerThread, sleepIntervalMillis: algorithm.LiveMode ? 10000 : 1000);
352 +
" seconds. Please make it run faster.");
355 catch (Exception err)
358 algorithm.SetRuntimeError(err,
"Engine Isolator");
362 if (algorithm.RunTimeError !=
null)
364 HandleAlgorithmError(job, algorithm.RunTimeError);
372 var csvTransactionsFileName =
Config.
Get(
"transaction-log");
373 if (!
string.IsNullOrEmpty(csvTransactionsFileName))
381 var totalSeconds = (DateTime.UtcNow - startTime).TotalSeconds;
382 var dataPoints = algorithmManager.DataPoints + algorithm.HistoryProvider.DataPointCount;
383 var kps = dataPoints / (double) 1000 / totalSeconds;
384 AlgorithmHandlers.
Results.DebugMessage($
"Algorithm Id:({job.AlgorithmId}) completed in {totalSeconds:F2} seconds at {kps:F0}k data points per second. Processing total of {dataPoints:N0} data points.");
387 catch (Exception err)
389 Log.
Error(err,
"Error sending analysis results");
395 dataManager?.RemoveAllSubscriptions();
396 workerThread?.Dispose();
399 synchronizer.DisposeSafely();
401 AlgorithmHandlers.DataFeed.Exit();
404 AlgorithmHandlers.Results.Exit();
407 var millisecondInterval = 10;
408 var millisecondTotalWait = 0;
409 while ((AlgorithmHandlers.Results.IsActive
410 || (AlgorithmHandlers.Transactions !=
null && AlgorithmHandlers.Transactions.IsActive)
411 || (AlgorithmHandlers.DataFeed !=
null && AlgorithmHandlers.DataFeed.IsActive)
412 || (AlgorithmHandlers.RealTime !=
null && AlgorithmHandlers.RealTime.IsActive))
413 && millisecondTotalWait < 30*1000)
415 Thread.Sleep(millisecondInterval);
416 if (millisecondTotalWait % (millisecondInterval * 10) == 0)
418 Log.
Trace(
"Waiting for threads to exit...");
420 millisecondTotalWait += millisecondInterval;
423 if (brokerage !=
null)
425 Log.
Trace(
"Engine.Run(): Disconnecting from brokerage...");
426 brokerage.Disconnect();
429 if (AlgorithmHandlers.Setup !=
null)
431 Log.
Trace(
"Engine.Run(): Disposing of setup handler...");
432 AlgorithmHandlers.Setup.Dispose();
435 Log.
Trace(
"Engine.Main(): Analysis Completed and Results Posted.");
437 catch (Exception err)
439 Log.
Error(err,
"Error running algorithm");
445 SystemHandlers.Api.SetAlgorithmStatus(job.AlgorithmId, algorithmManager.State);
447 AlgorithmHandlers.Results.Exit();
448 AlgorithmHandlers.DataFeed.Exit();
449 AlgorithmHandlers.Transactions.Exit();
450 AlgorithmHandlers.RealTime.Exit();
451 AlgorithmHandlers.DataMonitor.Exit();
462 AlgorithmHandlers.DataFeed?.Exit();
463 if (AlgorithmHandlers.Results !=
null)
465 var message = $
"Runtime Error: {err.Message}";
466 Log.
Trace(
"Engine.Run(): Sending runtime error to user...");
467 AlgorithmHandlers.Results.LogMessage(message);
470 var stackTrace = job.
Language ==
Language.Python ? err.Message : err.ToString();
472 AlgorithmHandlers.Results.RuntimeError(message, stackTrace);
484 provider.InvalidConfigurationDetected += (sender, args) => { AlgorithmHandlers.Results.ErrorMessage(args.Message); };
485 provider.NumericalPrecisionLimited += (sender, args) =>
487 if (!_historyNumericalPrecisionLimitedWarningEmitted)
489 _historyNumericalPrecisionLimitedWarningEmitted =
true;
490 AlgorithmHandlers.Results.DebugMessage(
"Warning: when performing history requests, the start date will be adjusted if there are numerical precision errors in the factor files.");
493 provider.StartDateLimited += (sender, args) =>
495 if (!_historyStartDateLimitedWarningEmitted)
497 _historyStartDateLimitedWarningEmitted =
true;
498 AlgorithmHandlers.Results.DebugMessage(
"Warning: when performing history requests, the start date will be adjusted if it is before the first known date for the symbol.");
501 provider.DownloadFailed += (sender, args) => { AlgorithmHandlers.Results.ErrorMessage(args.Message, args.StackTrace); };
502 provider.ReaderErrorDetected += (sender, args) => { AlgorithmHandlers.Results.RuntimeError(args.Message, args.StackTrace); };
512 private static void SaveListOfTrades(
IOrderProvider transactions,
string csvFileName)
514 var orders = transactions.
GetOrders(x => x.Status.IsFill());
516 var path = Path.GetDirectoryName(csvFileName);
517 if (path !=
null && !Directory.Exists(path))
518 Directory.CreateDirectory(path);
520 using (var writer =
new StreamWriter(csvFileName))
522 foreach (var order
in orders)
524 var line = Invariant($
"{order.Time.ToStringInvariant("yyyy-MM-dd HH:mm:ss
")},") +
525 Invariant($
"{order.Symbol.Value},{order.Direction},{order.Quantity},{order.Price}");
526 writer.WriteLine(line);
534 [MethodImpl(MethodImplOptions.NoOptimization | MethodImplOptions.NoInlining)]
538 var nyTime = TimeZones.NewYork;