Lean  $LEAN_TAG$
LeanOptimizer.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Threading;
18 using QuantConnect.Util;
19 using QuantConnect.Logging;
21 using System.Collections.Concurrent;
22 using System.Collections.Generic;
23 using System.Globalization;
27 
28 namespace QuantConnect.Optimizer
29 {
30  /// <summary>
31  /// Base Lean optimizer class in charge of handling an optimization job packet
32  /// </summary>
33  public abstract class LeanOptimizer : IDisposable
34  {
35  private readonly int _optimizationUpdateInterval = Config.GetInt("optimization-update-interval", 10);
36 
37  private DateTime _startedAt = DateTime.UtcNow;
38 
39  private DateTime _lastUpdate;
40  private int _failedBacktest;
41  private int _completedBacktest;
42  private volatile bool _disposed;
43 
44  /// <summary>
45  /// The total completed backtests count
46  /// </summary>
47  protected int CompletedBacktests => _failedBacktest + _completedBacktest;
48 
49  /// <summary>
50  /// Lock to update optimization status
51  /// </summary>
52  private object _statusLock = new object();
53 
54  /// <summary>
55  /// The current optimization status
56  /// </summary>
57  protected OptimizationStatus Status { get; private set; } = OptimizationStatus.New;
58 
59  /// <summary>
60  /// The optimization target
61  /// </summary>
62  protected Target OptimizationTarget { get; }
63 
64  /// <summary>
65  /// Collection holding <see cref="ParameterSet"/> for each backtest id we are waiting to finish
66  /// </summary>
67  protected ConcurrentDictionary<string, ParameterSet> RunningParameterSetForBacktest { get; init; }
68 
69  /// <summary>
70  /// Collection holding <see cref="ParameterSet"/> for each backtest id we are waiting to launch
71  /// </summary>
72  /// <remarks>We can't launch 1 million backtests at the same time</remarks>
73  protected ConcurrentQueue<ParameterSet> PendingParameterSet { get; init; }
74 
75  /// <summary>
76  /// The optimization strategy being used
77  /// </summary>
78  protected IOptimizationStrategy Strategy { get; init; }
79 
80  /// <summary>
81  /// The optimization packet
82  /// </summary>
83  protected OptimizationNodePacket NodePacket { get; init; }
84 
85  /// <summary>
86  /// Indicates whether optimizer was disposed
87  /// </summary>
88  protected bool Disposed => _disposed;
89 
90  /// <summary>
91  /// Event triggered when the optimization work ended
92  /// </summary>
93  public event EventHandler<OptimizationResult> Ended;
94 
95  /// <summary>
96  /// Creates a new instance
97  /// </summary>
98  /// <param name="nodePacket">The optimization node packet to handle</param>
99  protected LeanOptimizer(OptimizationNodePacket nodePacket)
100  {
101  if (nodePacket.OptimizationParameters.IsNullOrEmpty())
102  {
103  throw new ArgumentException("Cannot start an optimization job with no parameter to optimize");
104  }
105 
106  if (string.IsNullOrEmpty(nodePacket.Criterion?.Target))
107  {
108  throw new ArgumentException("Cannot start an optimization job with no target to optimize");
109  }
110 
111  NodePacket = nodePacket;
113  OptimizationTarget.Reached += (s, e) =>
114  {
115  // we've reached the optimization target
117  };
118 
119  Strategy = (IOptimizationStrategy)Activator.CreateInstance(Type.GetType(NodePacket.OptimizationStrategy));
120 
121  RunningParameterSetForBacktest = new ConcurrentDictionary<string, ParameterSet>();
122  PendingParameterSet = new ConcurrentQueue<ParameterSet>();
123 
125 
126  Strategy.NewParameterSet += (s, parameterSet) =>
127  {
128  if (parameterSet == null)
129  {
130  // shouldn't happen
131  Log.Error($"Strategy.NewParameterSet({GetLogDetails()}): generated a null {nameof(ParameterSet)} instance");
132  return;
133  }
134  LaunchLeanForParameterSet(parameterSet);
135  };
136  }
137 
138  /// <summary>
139  /// Starts the optimization
140  /// </summary>
141  public virtual void Start()
142  {
144  {
146 
147  // if after we started there are no running parameter sets means we have failed to start
148  if (RunningParameterSetForBacktest.Count == 0)
149  {
151  throw new InvalidOperationException($"LeanOptimizer.Start({GetLogDetails()}): failed to start");
152  }
153  Log.Trace($"LeanOptimizer.Start({GetLogDetails()}): start ended. Waiting on {RunningParameterSetForBacktest.Count + PendingParameterSet.Count} backtests");
154  }
155 
157  ProcessUpdate(forceSend: true);
158  }
159 
160  /// <summary>
161  /// Triggers the optimization job end event
162  /// </summary>
163  protected virtual void TriggerOnEndEvent()
164  {
165  if (_disposed)
166  {
167  return;
168  }
170 
171  var result = Strategy.Solution;
172  if (result != null)
173  {
174  var constraint = NodePacket.Constraints != null ? $"Constraints: ({string.Join(",", NodePacket.Constraints)})" : string.Empty;
175  Log.Trace($"LeanOptimizer.TriggerOnEndEvent({GetLogDetails()}): Optimization has ended. " +
176  $"Result for {OptimizationTarget}: was reached using ParameterSet: ({result.ParameterSet}) backtestId '{result.BacktestId}'. " +
177  $"{constraint}");
178  }
179  else
180  {
181  Log.Trace($"LeanOptimizer.TriggerOnEndEvent({GetLogDetails()}): Optimization has ended. Result was not reached");
182  }
183 
184  // we clean up before we send an update so that the runtime stats are updated
185  CleanUpRunningInstance();
186  ProcessUpdate(forceSend: true);
187 
188  Ended?.Invoke(this, result);
189  }
190 
191  /// <summary>
192  /// Handles starting Lean for a given parameter set
193  /// </summary>
194  /// <param name="parameterSet">The parameter set for the backtest to run</param>
195  /// <param name="backtestName">The backtest name to use</param>
196  /// <returns>The new unique backtest id</returns>
197  protected abstract string RunLean(ParameterSet parameterSet, string backtestName);
198 
199  /// <summary>
200  /// Get's a new backtest name
201  /// </summary>
202  protected virtual string GetBacktestName(ParameterSet parameterSet)
203  {
204  return "OptimizationBacktest";
205  }
206 
207  /// <summary>
208  /// Handles a new backtest json result matching a requested backtest id
209  /// </summary>
210  /// <param name="jsonBacktestResult">The backtest json result</param>
211  /// <param name="backtestId">The associated backtest id</param>
212  protected virtual void NewResult(string jsonBacktestResult, string backtestId)
213  {
215  {
216  ParameterSet parameterSet;
217 
218  // we take a lock so that there is no race condition with launching Lean adding the new backtest id and receiving the backtest result for that id
219  // before it's even in the collection 'ParameterSetForBacktest'
220 
221  if (!RunningParameterSetForBacktest.TryRemove(backtestId, out parameterSet))
222  {
223  Interlocked.Increment(ref _failedBacktest);
224  Log.Error(
225  $"LeanOptimizer.NewResult({GetLogDetails()}): Optimization compute job with id '{backtestId}' was not found");
226  return;
227  }
228 
229  // we got a new result if there are any pending parameterSet to run we can now trigger 1
230  // we do this before 'Strategy.PushNewResults' so FIFO is respected
231  if (PendingParameterSet.TryDequeue(out var pendingParameterSet))
232  {
233  LaunchLeanForParameterSet(pendingParameterSet);
234  }
235 
236  var result = new OptimizationResult(null, parameterSet, backtestId);
237  if (string.IsNullOrEmpty(jsonBacktestResult))
238  {
239  Interlocked.Increment(ref _failedBacktest);
240  Log.Error(
241  $"LeanOptimizer.NewResult({GetLogDetails()}): Got null/empty backtest result for backtest id '{backtestId}'");
242  }
243  else
244  {
245  Interlocked.Increment(ref _completedBacktest);
246  result = new OptimizationResult(jsonBacktestResult, parameterSet, backtestId);
247  }
248 
249  // always notify the strategy
250  Strategy.PushNewResults(result);
251 
252  // strategy could of added more
253  if (RunningParameterSetForBacktest.Count == 0)
254  {
256  }
257  else
258  {
259  ProcessUpdate();
260  }
261  }
262  }
263 
264  /// <summary>
265  /// Disposes of any resources
266  /// </summary>
267  public virtual void Dispose()
268  {
269  if (_disposed)
270  {
271  return;
272  }
273  _disposed = true;
274  CleanUpRunningInstance();
275  }
276 
277  /// <summary>
278  /// Returns the current optimization status and strategy estimates
279  /// </summary>
280  public int GetCurrentEstimate()
281  {
283  }
284 
285  /// <summary>
286  /// Get the current runtime statistics
287  /// </summary>
288  public Dictionary<string, string> GetRuntimeStatistics()
289  {
290  var completedCount = _completedBacktest;
291  var totalEndedCount = completedCount + _failedBacktest;
292  var runtime = DateTime.UtcNow - _startedAt;
293  var result = new Dictionary<string, string>
294  {
295  { "Completed", $"{completedCount}"},
296  { "Failed", $"{_failedBacktest}"},
297  { "Running", $"{RunningParameterSetForBacktest.Count}"},
298  { "In Queue", $"{PendingParameterSet.Count}"},
299  { "Average Length", $"{(totalEndedCount > 0 ? new TimeSpan(runtime.Ticks / totalEndedCount) : TimeSpan.Zero).ToString(@"hh\:mm\:ss", CultureInfo.InvariantCulture)}"},
300  { "Total Runtime", $"{runtime.ToString(@"hh\:mm\:ss", CultureInfo.InvariantCulture)}" }
301  };
302 
303  return result;
304  }
305 
306  /// <summary>
307  /// Helper method to have pretty more informative logs
308  /// </summary>
309  protected string GetLogDetails()
310  {
311  if (NodePacket.UserId == 0)
312  {
313  return $"OID {NodePacket.OptimizationId}";
314  }
315  return $"UI {NodePacket.UserId} PID {NodePacket.ProjectId} OID {NodePacket.OptimizationId} S {Status}";
316  }
317 
318  /// <summary>
319  /// Handles stopping Lean process
320  /// </summary>
321  /// <param name="backtestId">Specified backtest id</param>
322  protected abstract void AbortLean(string backtestId);
323 
324  /// <summary>
325  /// Sends an update of the current optimization status to the user
326  /// </summary>
327  protected abstract void SendUpdate();
328 
329  /// <summary>
330  /// Sets the current optimization status
331  /// </summary>
332  /// <param name="optimizationStatus">The new optimization status</param>
333  protected virtual void SetOptimizationStatus(OptimizationStatus optimizationStatus)
334  {
335  lock (_statusLock)
336  {
337  // we never come back from an aborted/ended status
338  if (Status != OptimizationStatus.Aborted && Status != OptimizationStatus.Completed)
339  {
340  Status = optimizationStatus;
341  }
342  }
343  }
344 
345  /// <summary>
346  /// Clean up any pending or running lean instance
347  /// </summary>
348  private void CleanUpRunningInstance()
349  {
350  PendingParameterSet.Clear();
351 
353  {
354  foreach (var backtestId in RunningParameterSetForBacktest.Keys)
355  {
356  ParameterSet parameterSet;
357  if (RunningParameterSetForBacktest.TryRemove(backtestId, out parameterSet))
358  {
359  Interlocked.Increment(ref _failedBacktest);
360  try
361  {
362  AbortLean(backtestId);
363  }
364  catch
365  {
366  // pass
367  }
368  }
369  }
370  }
371  }
372 
373  /// <summary>
374  /// Will determine if it's right time to trigger an update call
375  /// </summary>
376  /// <param name="forceSend">True will force send, skipping interval, useful on start and end</param>
377  private void ProcessUpdate(bool forceSend = false)
378  {
379  if (!forceSend && Status == OptimizationStatus.New)
380  {
381  // don't send any update until we finish the Start(), will be creating a bunch of backtests don't want to send partial/multiple updates
382  return;
383  }
384 
385  try
386  {
387  var now = DateTime.UtcNow;
388  if (forceSend || (now - _lastUpdate > TimeSpan.FromSeconds(_optimizationUpdateInterval)))
389  {
390  _lastUpdate = now;
391  Log.Debug($"LeanOptimizer.ProcessUpdate({GetLogDetails()}): start sending update...");
392 
393  SendUpdate();
394 
395  Log.Debug($"LeanOptimizer.ProcessUpdate({GetLogDetails()}): finished sending update successfully.");
396  }
397  }
398  catch (Exception e)
399  {
400  Log.Error(e, "Failed to send status update");
401  }
402  }
403 
404  private void LaunchLeanForParameterSet(ParameterSet parameterSet)
405  {
406  if (_disposed || Status == OptimizationStatus.Completed || Status == OptimizationStatus.Aborted)
407  {
408  return;
409  }
410 
412  {
414  {
415  // we hit the limit on the concurrent backtests
416  PendingParameterSet.Enqueue(parameterSet);
417  return;
418  }
419 
420  try
421  {
422  var backtestName = GetBacktestName(parameterSet);
423  var backtestId = RunLean(parameterSet, backtestName);
424 
425  if (!string.IsNullOrEmpty(backtestId))
426  {
427  Log.Trace($"LeanOptimizer.LaunchLeanForParameterSet({GetLogDetails()}): launched backtest '{backtestId}' with parameters '{parameterSet}'");
428  RunningParameterSetForBacktest.TryAdd(backtestId, parameterSet);
429  }
430  else
431  {
432  Interlocked.Increment(ref _failedBacktest);
433  // always notify the strategy
434  Strategy.PushNewResults(new OptimizationResult(null, parameterSet, backtestId));
435  Log.Error($"LeanOptimizer.LaunchLeanForParameterSet({GetLogDetails()}): Initial/null optimization compute job could not be placed into the queue");
436  }
437 
438  ProcessUpdate();
439  }
440  catch (Exception ex)
441  {
442  Log.Error($"LeanOptimizer.LaunchLeanForParameterSet({GetLogDetails()}): Error encountered while placing optimization message into the queue: {ex.Message}");
443  }
444  }
445  }
446  }
447 }