Lean  $LEAN_TAG$
Isolator.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Threading;
18 using System.Threading.Tasks;
19 using QuantConnect.Logging;
20 using QuantConnect.Util;
21 
22 namespace QuantConnect
23 {
24  /// <summary>
25  /// Isolator class - create a new instance of the algorithm and ensure it doesn't
26  /// exceed memory or time execution limits.
27  /// </summary>
28  public class Isolator
29  {
30  /// <summary>
31  /// Algo cancellation controls - cancel source.
32  /// </summary>
34  {
35  get; private set;
36  }
37 
38  /// <summary>
39  /// Initializes a new instance of the <see cref="Isolator"/> class
40  /// </summary>
41  public Isolator()
42  {
44  }
45 
46  /// <summary>
47  /// Execute a code block with a maximum limit on time and memory.
48  /// </summary>
49  /// <param name="timeSpan">Timeout in timespan</param>
50  /// <param name="withinCustomLimits">Function used to determine if the codeBlock is within custom limits, such as with algorithm manager
51  /// timing individual time loops, return a non-null and non-empty string with a message indicating the error/reason for stoppage</param>
52  /// <param name="codeBlock">Action codeblock to execute</param>
53  /// <param name="memoryCap">Maximum memory allocation, default 1024Mb</param>
54  /// <param name="sleepIntervalMillis">Sleep interval between each check in ms</param>
55  /// <param name="workerThread">The worker thread instance that will execute the provided action, if null
56  /// will use a <see cref="Task"/></param>
57  /// <returns>True if algorithm exited successfully, false if cancelled because it exceeded limits.</returns>
58  public bool ExecuteWithTimeLimit(TimeSpan timeSpan, Func<IsolatorLimitResult> withinCustomLimits, Action codeBlock, long memoryCap = 1024, int sleepIntervalMillis = 1000, WorkerThread workerThread = null)
59  {
60  workerThread?.Add(codeBlock);
61 
62  var task = workerThread == null
63  //Launch task
64  ? Task.Factory.StartNew(codeBlock, CancellationTokenSource.Token)
65  // wrapper task so we can reuse MonitorTask
66  : Task.Factory.StartNew(() => workerThread.FinishedWorkItem.WaitOne(), CancellationTokenSource.Token);
67  try
68  {
69  return MonitorTask(task, timeSpan, withinCustomLimits, memoryCap, sleepIntervalMillis);
70  }
71  catch (Exception)
72  {
73  if (!task.IsCompleted)
74  {
75  // lets free the wrapper task even if the worker thread didn't finish
76  workerThread?.FinishedWorkItem.Set();
77  }
78  throw;
79  }
80  }
81 
82  private bool MonitorTask(Task task,
83  TimeSpan timeSpan,
84  Func<IsolatorLimitResult> withinCustomLimits,
85  long memoryCap = 1024,
86  int sleepIntervalMillis = 1000)
87  {
88  // default to always within custom limits
89  withinCustomLimits = withinCustomLimits ?? (() => new IsolatorLimitResult(TimeSpan.Zero, string.Empty));
90 
91  var message = string.Empty;
92  var emaPeriod = 60d;
93  var memoryUsed = 0L;
94  var utcNow = DateTime.UtcNow;
95  var end = utcNow + timeSpan;
96  var memoryLogger = utcNow + Time.OneMinute;
97  var isolatorLimitResult = new IsolatorLimitResult(TimeSpan.Zero, string.Empty);
98 
99  //Convert to bytes
100  memoryCap *= 1024 * 1024;
101  var spikeLimit = memoryCap*2;
102 
103  if (memoryCap <= 0)
104  {
105  memoryCap = int.MaxValue;
106  spikeLimit = int.MaxValue;
107  }
108 
109  while (!task.IsCompleted && !CancellationTokenSource.IsCancellationRequested && utcNow < end)
110  {
111  // if over 80% allocation force GC then sample
112  var sample = Convert.ToDouble(GC.GetTotalMemory(memoryUsed > memoryCap * 0.8));
113 
114  // find the EMA of the memory used to prevent spikes killing stategy
115  memoryUsed = Convert.ToInt64((emaPeriod-1)/emaPeriod * memoryUsed + (1/emaPeriod)*sample);
116 
117  // if the rolling EMA > cap; or the spike is more than 2x the allocation.
118  if (memoryUsed > memoryCap || sample > spikeLimit)
119  {
120  message = Messages.Isolator.MemoryUsageMaxedOut(PrettyFormatRam(memoryCap), PrettyFormatRam((long)sample));
121  break;
122  }
123 
124  if (utcNow > memoryLogger)
125  {
126  if (memoryUsed > memoryCap * 0.8)
127  {
128  Log.Error(Messages.Isolator.MemoryUsageOver80Percent(sample));
129  }
130 
131  Log.Trace("Isolator.ExecuteWithTimeLimit(): " +
132  Messages.Isolator.MemoryUsageInfo(
133  PrettyFormatRam(memoryUsed),
134  PrettyFormatRam((long)sample),
135  PrettyFormatRam(OS.ApplicationMemoryUsed * 1024 * 1024),
136  isolatorLimitResult.CurrentTimeStepElapsed,
137  (int)Math.Ceiling(OS.CpuUsage)));
138 
139  memoryLogger = utcNow.AddMinutes(1);
140  }
141 
142  // check to see if we're within other custom limits defined by the caller
143  isolatorLimitResult = withinCustomLimits();
144  if (!isolatorLimitResult.IsWithinCustomLimits)
145  {
146  message = isolatorLimitResult.ErrorMessage;
147  break;
148  }
149 
150  if (task.Wait(utcNow.GetSecondUnevenWait(sleepIntervalMillis)))
151  {
152  break;
153  }
154 
155  utcNow = DateTime.UtcNow;
156  }
157 
158  if (task.IsCompleted == false)
159  {
160  if (CancellationTokenSource.IsCancellationRequested)
161  {
162  Log.Trace($"Isolator.ExecuteWithTimeLimit(): Operation was canceled");
163  throw new OperationCanceledException("Operation was canceled");
164  }
165  else if (string.IsNullOrEmpty(message))
166  {
167  message = Messages.Isolator.MemoryUsageMonitorTaskTimedOut(timeSpan);
168  Log.Trace($"Isolator.ExecuteWithTimeLimit(): {message}");
169  }
170  }
171 
172  if (!string.IsNullOrEmpty(message))
173  {
174  if (!CancellationTokenSource.IsCancellationRequested)
175  {
176  CancellationTokenSource.Cancel();
177  }
178  Log.Error($"Security.ExecuteWithTimeLimit(): {message}");
179  throw new TimeoutException(message);
180  }
181  return task.IsCompleted;
182  }
183 
184  /// <summary>
185  /// Execute a code block with a maximum limit on time and memory.
186  /// </summary>
187  /// <param name="timeSpan">Timeout in timespan</param>
188  /// <param name="codeBlock">Action codeblock to execute</param>
189  /// <param name="memoryCap">Maximum memory allocation, default 1024Mb</param>
190  /// <param name="sleepIntervalMillis">Sleep interval between each check in ms</param>
191  /// <param name="workerThread">The worker thread instance that will execute the provided action, if null
192  /// will use a <see cref="Task"/></param>
193  /// <returns>True if algorithm exited successfully, false if cancelled because it exceeded limits.</returns>
194  public bool ExecuteWithTimeLimit(TimeSpan timeSpan, Action codeBlock, long memoryCap, int sleepIntervalMillis = 1000, WorkerThread workerThread = null)
195  {
196  return ExecuteWithTimeLimit(timeSpan, null, codeBlock, memoryCap, sleepIntervalMillis, workerThread);
197  }
198 
199  /// <summary>
200  /// Convert the bytes to a MB in double format for string display
201  /// </summary>
202  /// <param name="ramInBytes"></param>
203  /// <returns></returns>
204  private static string PrettyFormatRam(long ramInBytes)
205  {
206  return Math.Round(Convert.ToDouble(ramInBytes/(1024*1024))).ToStringInvariant();
207  }
208  }
209 }