Lean  $LEAN_TAG$
JobQueue.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using Fasterflect;
17 using Newtonsoft.Json;
20 using QuantConnect.Data;
22 using QuantConnect.Logging;
23 using QuantConnect.Packets;
24 using QuantConnect.Python;
25 using QuantConnect.Util;
26 using System;
27 using System.Collections.Generic;
28 using System.IO;
29 using System.Linq;
30 using System.Reflection;
31 
33 {
34  /// <summary>
35  /// Implementation of local/desktop job request:
36  /// </summary>
37  public class JobQueue : IJobQueueHandler
38  {
39  // The type name of the QuantConnect.Brokerages.Paper.PaperBrokerage
40  private static readonly TextWriter Console = System.Console.Out;
41 
42  private const string PaperBrokerageTypeName = "PaperBrokerage";
43  private const string DefaultHistoryProvider = "SubscriptionDataReaderHistoryProvider";
44  private const string DefaultDataQueueHandler = "LiveDataQueue";
45  private const string DefaultDataChannelProvider = "DataChannelProvider";
46  private static readonly string Channel = Config.Get("data-channel");
47  private readonly string AlgorithmTypeName = Config.Get("algorithm-type-name");
48  private Language? _language;
49 
50  /// <summary>
51  /// This property is protected for testing purposes
52  /// </summary>
53  protected Language Language
54  {
55  get
56  {
57  if (_language == null)
58  {
59  string algorithmLanguage = Config.Get("algorithm-language");
60  if (string.IsNullOrEmpty(algorithmLanguage))
61  {
62  var extension = Path.GetExtension(AlgorithmLocation).ToLower();
63  switch (extension)
64  {
65  case ".dll":
66  _language = Language.CSharp;
67  break;
68  case ".py":
69  _language = Language.Python;
70  break;
71  default:
72  throw new ArgumentException($"Unknown extension, algorithm extension was {extension}");
73  }
74  }
75  else
76  {
77  _language = (Language)Enum.Parse(typeof(Language), algorithmLanguage, ignoreCase: true);
78  }
79  }
80 
81  return (Language)_language;
82  }
83  }
84 
85  /// <summary>
86  /// Physical location of Algorithm DLL.
87  /// </summary>
88  /// <remarks>We expect this dll to be copied into the output directory</remarks>
89  private string AlgorithmLocation { get; } = Config.Get("algorithm-location", "QuantConnect.Algorithm.CSharp.dll");
90 
91  /// <summary>
92  /// Initialize the job queue:
93  /// </summary>
94  public void Initialize(IApi api)
95  {
97  }
98 
99  /// <summary>
100  /// Gets Brokerage Factory for provided IDQH
101  /// </summary>
102  /// <param name="dataQueueHandler"></param>
103  /// <returns>An Instance of Brokerage Factory if possible, otherwise null</returns>
104  public static IBrokerageFactory GetFactoryFromDataQueueHandler(string dataQueueHandler)
105  {
106  IBrokerageFactory brokerageFactory = null;
107  var dataQueueHandlerType = Composer.Instance.GetExportedTypes<IBrokerage>()
108  .FirstOrDefault(x =>
109  x.FullName != null &&
110  x.FullName.EndsWith(dataQueueHandler, StringComparison.InvariantCultureIgnoreCase) &&
111  x.HasAttribute(typeof(BrokerageFactoryAttribute)));
112 
113  if (dataQueueHandlerType != null)
114  {
115  var attribute = dataQueueHandlerType.GetCustomAttribute<BrokerageFactoryAttribute>();
116  brokerageFactory = (BrokerageFactory)Activator.CreateInstance(attribute.Type);
117  }
118  return brokerageFactory;
119  }
120 
121  /// <summary>
122  /// Desktop/Local Get Next Task - Get task from the Algorithm folder of VS Solution.
123  /// </summary>
124  /// <returns></returns>
125  public AlgorithmNodePacket NextJob(out string algorithmPath)
126  {
127  algorithmPath = GetAlgorithmLocation();
128 
129  Log.Trace($"JobQueue.NextJob(): Selected {algorithmPath}");
130 
131  // check for parameters in the config
132  var parameters = new Dictionary<string, string>();
133 
134  var parametersConfigString = Config.Get("parameters");
135  if (!string.IsNullOrEmpty(parametersConfigString))
136  {
137  parameters = JsonConvert.DeserializeObject<Dictionary<string, string>>(parametersConfigString);
138  }
139 
140  var controls = new Controls()
141  {
142  MinuteLimit = Config.GetInt("symbol-minute-limit", 10000),
143  SecondLimit = Config.GetInt("symbol-second-limit", 10000),
144  TickLimit = Config.GetInt("symbol-tick-limit", 10000),
145  RamAllocation = int.MaxValue,
146  MaximumDataPointsPerChartSeries = Config.GetInt("maximum-data-points-per-chart-series", 1000000),
147  MaximumChartSeries = Config.GetInt("maximum-chart-series", 30),
148  StorageLimit = Config.GetValue("storage-limit", 10737418240L),
149  StorageFileCount = Config.GetInt("storage-file-count", 10000),
150  StoragePermissions = (FileAccess)Config.GetInt("storage-permissions", (int)FileAccess.ReadWrite)
151  };
152 
153  var algorithmId = Config.Get("algorithm-id", AlgorithmTypeName);
154 
155  //If this isn't a backtesting mode/request, attempt a live job.
156  if (Globals.LiveMode)
157  {
158  var dataHandlers = Config.Get("data-queue-handler", DefaultDataQueueHandler);
159  var liveJob = new LiveNodePacket
160  {
161  Type = PacketType.LiveNode,
162  Algorithm = File.ReadAllBytes(AlgorithmLocation),
163  Brokerage = Config.Get("live-mode-brokerage", PaperBrokerageTypeName),
164  HistoryProvider = Config.Get("history-provider", DefaultHistoryProvider),
165  DataQueueHandler = dataHandlers,
166  DataChannelProvider = Config.Get("data-channel-provider", DefaultDataChannelProvider),
167  Channel = Channel,
168  UserToken = Globals.UserToken,
169  UserId = Globals.UserId,
170  ProjectId = Globals.ProjectId,
171  OrganizationId = Globals.OrganizationID,
172  Version = Globals.Version,
173  DeployId = algorithmId,
174  Parameters = parameters,
175  Language = Language,
176  Controls = controls,
177  PythonVirtualEnvironment = Config.Get("python-venv"),
178  DeploymentTarget = DeploymentTarget.LocalPlatform,
179  };
180 
181  Type brokerageName = null;
182  try
183  {
184  // import the brokerage data for the configured brokerage
185  var brokerageFactory = Composer.Instance.Single<IBrokerageFactory>(factory => factory.BrokerageType.MatchesTypeName(liveJob.Brokerage));
186  brokerageName = brokerageFactory.BrokerageType;
187  liveJob.BrokerageData = brokerageFactory.BrokerageData;
188  }
189  catch (Exception err)
190  {
191  Log.Error(err, $"Error resolving BrokerageData for live job for brokerage {liveJob.Brokerage}");
192  }
193 
194  var brokerageBasedHistoryProvider = liveJob.HistoryProvider.DeserializeList().Select(x =>
195  {
196  HistoryExtensions.TryGetBrokerageName(x, out var brokerageName);
197  return brokerageName;
198  }).Where(x => x != null);
199 
200  foreach (var dataHandlerName in dataHandlers.DeserializeList().Concat(brokerageBasedHistoryProvider).Distinct())
201  {
202  var brokerageFactoryForDataHandler = GetFactoryFromDataQueueHandler(dataHandlerName);
203  if (brokerageFactoryForDataHandler == null)
204  {
205  Log.Trace($"JobQueue.NextJob(): Not able to fetch brokerage factory with name: {dataHandlerName}");
206  continue;
207  }
208  if (brokerageFactoryForDataHandler.BrokerageType == brokerageName)
209  {
210  //Don't need to add brokerageData again if added by brokerage
211  continue;
212  }
213  foreach (var data in brokerageFactoryForDataHandler.BrokerageData)
214  {
215  if (data.Key == "live-holdings" || data.Key == "live-cash-balance")
216  {
217  //live holdings & cash balance not required for data handler
218  continue;
219  }
220 
221  liveJob.BrokerageData.TryAdd(data.Key, data.Value);
222  }
223  }
224  return liveJob;
225  }
226 
227  var optimizationId = Config.Get("optimization-id");
228  //Default run a backtesting job.
229  var backtestJob = new BacktestNodePacket(0, 0, "", new byte[] { }, Config.Get("backtest-name", "local"))
230  {
231  Type = PacketType.BacktestNode,
232  Algorithm = File.ReadAllBytes(AlgorithmLocation),
233  HistoryProvider = Config.Get("history-provider", DefaultHistoryProvider),
234  Channel = Channel,
235  UserToken = Globals.UserToken,
236  UserId = Globals.UserId,
237  ProjectId = Globals.ProjectId,
238  OrganizationId = Globals.OrganizationID,
239  Version = Globals.Version,
240  BacktestId = algorithmId,
241  Language = Language,
242  Parameters = parameters,
243  Controls = controls,
244  PythonVirtualEnvironment = Config.Get("python-venv"),
245  DeploymentTarget = DeploymentTarget.LocalPlatform,
246  };
247 
248  var outOfSampleMaxEndDate = Config.Get("out-of-sample-max-end-date");
249  if (!string.IsNullOrEmpty(outOfSampleMaxEndDate))
250  {
251  backtestJob.OutOfSampleMaxEndDate = Time.ParseDate(outOfSampleMaxEndDate);
252  }
253  backtestJob.OutOfSampleDays = Config.GetInt("out-of-sample-days");
254 
255  // Only set optimization id when backtest is for optimization
256  if (!optimizationId.IsNullOrEmpty())
257  {
258  backtestJob.OptimizationId = optimizationId;
259  }
260 
261  return backtestJob;
262  }
263 
264  /// <summary>
265  /// Get the algorithm location for client side backtests.
266  /// </summary>
267  /// <returns></returns>
268  private string GetAlgorithmLocation()
269  {
270  if (Language == Language.Python)
271  {
272  if (!File.Exists(AlgorithmLocation))
273  {
274  throw new FileNotFoundException($"JobQueue.TryCreatePythonAlgorithm(): Unable to find py file: {AlgorithmLocation}");
275  }
276 
277  // Add this directory to our Python Path so it may be imported properly
278  var pythonFile = new FileInfo(AlgorithmLocation);
279  PythonInitializer.AddAlgorithmLocationPath(pythonFile.Directory.FullName);
280  }
281 
282  return AlgorithmLocation;
283  }
284 
285  /// <summary>
286  /// Desktop/Local acknowledge the task processed. Nothing to do.
287  /// </summary>
288  /// <param name="job"></param>
290  {
291  // Make the console window pause so we can read log output before exiting and killing the application completely
292  Console.WriteLine("Engine.Main(): Analysis Complete.");
293  // closing automatically is useful for optimization, we don't want to leave open all the ended lean instances
294  if (!Config.GetBool("close-automatically"))
295  {
296  Console.WriteLine("Engine.Main(): Press any key to continue.");
297  System.Console.Read();
298  }
299  }
300  }
301 }