Lean  $LEAN_TAG$
UniverseSelection.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Collections.Generic;
18 using System.Linq;
20 using QuantConnect.Data;
23 using QuantConnect.Logging;
25 using QuantConnect.Util;
27 
29 {
30  /// <summary>
31  /// Provides methods for apply the results of universe selection to an algorithm
32  /// </summary>
33  public class UniverseSelection
34  {
35  private IDataFeedSubscriptionManager _dataManager;
36  private readonly IAlgorithm _algorithm;
37  private readonly ISecurityService _securityService;
38  private readonly Dictionary<DateTime, Dictionary<Symbol, Security>> _pendingSecurityAdditions = new Dictionary<DateTime, Dictionary<Symbol, Security>>();
39  private readonly PendingRemovalsManager _pendingRemovalsManager;
40  private readonly CurrencySubscriptionDataConfigManager _currencySubscriptionDataConfigManager;
41  private readonly InternalSubscriptionManager _internalSubscriptionManager;
42  private bool _initializedSecurityBenchmark;
43  private readonly IDataProvider _dataProvider;
44  private bool _anyDoesNotHaveFundamentalDataWarningLogged;
45  private readonly SecurityChangesConstructor _securityChangesConstructor;
46 
47  /// <summary>
48  /// Initializes a new instance of the <see cref="UniverseSelection"/> class
49  /// </summary>
50  /// <param name="algorithm">The algorithm to add securities to</param>
51  /// <param name="securityService">The security service</param>
52  /// <param name="dataPermissionManager">The data permissions manager</param>
53  /// <param name="dataProvider">The data provider to use</param>
54  /// <param name="internalConfigResolution">The resolution to use for internal configuration</param>
56  IAlgorithm algorithm,
57  ISecurityService securityService,
58  IDataPermissionManager dataPermissionManager,
59  IDataProvider dataProvider,
60  Resolution internalConfigResolution = Resolution.Minute)
61  {
62  _dataProvider = dataProvider;
63  _algorithm = algorithm;
64  _securityService = securityService;
65  _pendingRemovalsManager = new PendingRemovalsManager(algorithm.Transactions);
66  _currencySubscriptionDataConfigManager = new CurrencySubscriptionDataConfigManager(algorithm.Portfolio.CashBook,
67  algorithm.Securities,
68  algorithm.SubscriptionManager,
69  _securityService,
70  Resolution.Minute);
71  // TODO: next step is to merge currency internal subscriptions under the same 'internal manager' instance and we could move this directly into the DataManager class
72  _internalSubscriptionManager = new InternalSubscriptionManager(_algorithm, internalConfigResolution);
73  _securityChangesConstructor = new SecurityChangesConstructor();
74  }
75 
76  /// <summary>
77  /// Sets the data manager
78  /// </summary>
80  {
81  if (_dataManager != null)
82  {
83  throw new Exception("UniverseSelection.SetDataManager(): can only be set once");
84  }
85  _dataManager = dataManager;
86 
87  _internalSubscriptionManager.Added += (sender, request) =>
88  {
89  _dataManager.AddSubscription(request);
90  };
91  _internalSubscriptionManager.Removed += (sender, request) =>
92  {
93  _dataManager.RemoveSubscription(request.Configuration);
94  };
95  }
96 
97  /// <summary>
98  /// Applies universe selection the the data feed and algorithm
99  /// </summary>
100  /// <param name="universe">The universe to perform selection on</param>
101  /// <param name="dateTimeUtc">The current date time in utc</param>
102  /// <param name="universeData">The data provided to perform selection with</param>
103  public SecurityChanges ApplyUniverseSelection(Universe universe, DateTime dateTimeUtc, BaseDataCollection universeData)
104  {
105  var algorithmEndDateUtc = _algorithm.EndDate.ConvertToUtc(_algorithm.TimeZone);
106  if (dateTimeUtc > algorithmEndDateUtc)
107  {
108  return SecurityChanges.None;
109  }
110 
111  IEnumerable<Symbol> selectSymbolsResult;
112 
113  // check if this universe must be filtered with fine fundamental data
115  fineFiltered ??= (universe as FundamentalFilteredUniverse)?.FundamentalUniverse;
116 
117  if (fineFiltered != null
118  // if the universe has been disposed we don't perform selection. This us handled bellow by 'Universe.PerformSelection'
119  // but in this case we directly call 'SelectSymbols' because we want to perform fine selection even if coarse returns the same
120  // symbols, see 'Universe.PerformSelection', which detects this and returns 'Universe.Unchanged'
121  && !universe.DisposeRequested)
122  {
123  // perform initial filtering and limit the result
124  selectSymbolsResult = universe.SelectSymbols(dateTimeUtc, universeData);
125 
126  if (!ReferenceEquals(selectSymbolsResult, Universe.Unchanged))
127  {
128  // prepare a BaseDataCollection of FineFundamental instances
129  var fineCollection = new BaseDataCollection();
130 
131  // if the input is already fundamental data we just need to filter it and pass it through
132  var hasFundamentalData = universeData.Data.Count > 0 && universeData.Data[0] is Fundamental;
133  if(hasFundamentalData)
134  {
135  // Remove selected symbols that does not have fine fundamental data
136  var anyDoesNotHaveFundamentalData = false;
137 
138  // only pre filter selected symbols if there actually is any fundamental data. This way we can support custom universe filtered by fine fundamental data
139  // which do not use coarse data as underlying, in which case it could happen that we try to load fine fundamental data that is missing, but no problem,
140  // 'FineFundamentalSubscriptionEnumeratorFactory' won't emit it
141  var set = selectSymbolsResult.ToHashSet();
142  fineCollection.Data.AddRange(universeData.Data.OfType<Fundamental>().Where(fundamental => {
143  // we remove to we distict by symbol
144  if (set.Remove(fundamental.Symbol))
145  {
146  if (!fundamental.HasFundamentalData)
147  {
148  anyDoesNotHaveFundamentalData = true;
149  return false;
150  }
151  return true;
152  }
153  return false;
154  }));
155 
156  if (!_anyDoesNotHaveFundamentalDataWarningLogged && anyDoesNotHaveFundamentalData)
157  {
158  _algorithm.Debug("Note: Your coarse selection filter was updated to exclude symbols without fine fundamental data. Make sure your coarse filter excludes symbols where HasFundamental is false.");
159  _anyDoesNotHaveFundamentalDataWarningLogged = true;
160  }
161  }
162  else
163  {
164  // we need to load the fundamental data
165  var currentTime = dateTimeUtc.ConvertFromUtc(TimeZones.NewYork);
166  foreach (var symbol in selectSymbolsResult)
167  {
168  fineCollection.Data.Add(new Fundamental(currentTime, symbol));
169  }
170  }
171 
172  universeData.Data = fineCollection.Data;
173  // perform the fine fundamental universe selection
174  selectSymbolsResult = fineFiltered.PerformSelection(dateTimeUtc, fineCollection);
175  }
176  }
177  else
178  {
179  // perform initial filtering and limit the result
180  selectSymbolsResult = universe.PerformSelection(dateTimeUtc, universeData);
181  }
182 
183  if (!ReferenceEquals(selectSymbolsResult, Universe.Unchanged))
184  {
185  // materialize the enumerable into a set for processing
186  universe.Selected = selectSymbolsResult.ToHashSet();
187  }
188 
189  // first check for no pending removals, even if the universe selection
190  // didn't change we might need to remove a security because a position was closed
191  RemoveSecurityFromUniverse(
192  _pendingRemovalsManager.CheckPendingRemovals(universe.Selected, universe),
193  dateTimeUtc,
194  algorithmEndDateUtc);
195 
196  // check for no changes second
197  if (ReferenceEquals(selectSymbolsResult, Universe.Unchanged))
198  {
199  return SecurityChanges.None;
200  }
201 
202  // determine which data subscriptions need to be removed from this universe
203  foreach (var member in universe.Securities.Values.OrderBy(member => member.Security.Symbol.SecurityType).ThenBy(x => x.Security.Symbol.ID))
204  {
205  var security = member.Security;
206  // if we've selected this subscription again, keep it
207  if (universe.Selected.Contains(security.Symbol)) continue;
208 
209  // don't remove if the universe wants to keep him in
210  if (!universe.CanRemoveMember(dateTimeUtc, security)) continue;
211 
212  if (!member.Security.IsDelisted)
213  {
214  // TODO: here we are not checking if other universes have this security still selected
215  _securityChangesConstructor.Remove(member.Security, member.IsInternal);
216  }
217 
218  RemoveSecurityFromUniverse(_pendingRemovalsManager.TryRemoveMember(security, universe),
219  dateTimeUtc,
220  algorithmEndDateUtc);
221  }
222 
223  Dictionary<Symbol, Security> pendingAdditions;
224  if (!_pendingSecurityAdditions.TryGetValue(dateTimeUtc, out pendingAdditions))
225  {
226  // if the frontier moved forward then we've added these securities to the algorithm
227  _pendingSecurityAdditions.Clear();
228 
229  // keep track of created securities so we don't create the same security twice, leads to bad things :)
230  pendingAdditions = new Dictionary<Symbol, Security>();
231  _pendingSecurityAdditions[dateTimeUtc] = pendingAdditions;
232  }
233 
234  // find new selections and add them to the algorithm
235  foreach (var symbol in universe.Selected)
236  {
237  if (universe.Securities.ContainsKey(symbol))
238  {
239  // if its already part of the universe no need to re add it
240  continue;
241  }
242 
243  Security underlying = null;
244  if (symbol.HasUnderlying)
245  {
246  underlying = GetOrCreateSecurity(pendingAdditions, symbol.Underlying, universe.UniverseSettings);
247  }
248  // create the new security, the algorithm thread will add this at the appropriate time
249  var security = GetOrCreateSecurity(pendingAdditions, symbol, universe.UniverseSettings, underlying);
250 
251  var addedSubscription = false;
252  var dataFeedAdded = false;
253  var internalFeed = true;
254  foreach (var request in universe.GetSubscriptionRequests(security, dateTimeUtc, algorithmEndDateUtc,
255  _algorithm.SubscriptionManager.SubscriptionDataConfigService))
256  {
257  if (security.Symbol == request.Configuration.Symbol // Just in case check its the same symbol, else AddData will throw.
258  && !security.Subscriptions.Contains(request.Configuration))
259  {
260  // For now this is required for retro compatibility with usages of security.Subscriptions
261  security.AddData(request.Configuration);
262  }
263 
264  var toRemove = _currencySubscriptionDataConfigManager.GetSubscriptionDataConfigToRemove(request.Configuration.Symbol);
265  if (toRemove != null)
266  {
267  Log.Trace($"UniverseSelection.ApplyUniverseSelection(): Removing internal currency data feed {toRemove}");
268  _dataManager.RemoveSubscription(toRemove);
269  }
270 
271  // 'dataFeedAdded' will help us notify the user for security changes only once per non internal subscription
272  // for example two universes adding the sample configuration, we don't want two notifications
273  dataFeedAdded = _dataManager.AddSubscription(request);
274 
275  // only update our security changes if we actually added data
276  if (!request.IsUniverseSubscription)
277  {
278  addedSubscription = true;
279  // if any config isn't internal then it's not internal
280  internalFeed &= request.Configuration.IsInternalFeed;
281  _internalSubscriptionManager.AddedSubscriptionRequest(request);
282  }
283  }
284 
285  if (addedSubscription)
286  {
287  var addedMember = universe.AddMember(dateTimeUtc, security, internalFeed);
288 
289  if (addedMember && dataFeedAdded)
290  {
291  _securityChangesConstructor.Add(security, internalFeed);
292  }
293  }
294  }
295 
296  var securityChanges = _securityChangesConstructor.Flush();
297 
298  // Add currency data feeds that weren't explicitly added in Initialize
299  if (securityChanges.AddedSecurities.Count > 0)
300  {
301  EnsureCurrencyDataFeeds(securityChanges);
302  }
303 
304  if (securityChanges != SecurityChanges.None && Log.DebuggingEnabled)
305  {
306  // for performance lets not create the message string if debugging is not enabled
307  // this can be executed many times and its in the algorithm thread
308  Log.Debug("UniverseSelection.ApplyUniverseSelection(): " + dateTimeUtc + ": " + securityChanges);
309  }
310 
311  return securityChanges;
312  }
313 
314  /// <summary>
315  /// Will add any pending internal currency subscriptions
316  /// </summary>
317  /// <param name="utcStart">The current date time in utc</param>
318  /// <returns>Will return true if any subscription was added</returns>
319  public bool AddPendingInternalDataFeeds(DateTime utcStart)
320  {
321  var added = false;
322  if (!_initializedSecurityBenchmark)
323  {
324  _initializedSecurityBenchmark = true;
325 
326  var securityBenchmark = _algorithm.Benchmark as SecurityBenchmark;
327  if (securityBenchmark != null)
328  {
329  var resolution = _algorithm.LiveMode ? Resolution.Minute : Resolution.Hour;
330 
331  // Check that the tradebar subscription we are using can support this resolution GH #5893
332  var subscriptionType = _algorithm.SubscriptionManager.SubscriptionDataConfigService.LookupSubscriptionConfigDataTypes(securityBenchmark.Security.Type, resolution, securityBenchmark.Security.Symbol.IsCanonical()).First();
333  var symbol = securityBenchmark.Security.Symbol;
334  var isCustomData = false;
335 
336  // Check if the benchmark security is a custom data in order to make sure we get the correct
337  // type
338  if (symbol.SecurityType == SecurityType.Base)
339  {
340  var symbolDataConfigs = _algorithm.SubscriptionManager.SubscriptionDataConfigService.GetSubscriptionDataConfigs(symbol);
341  if (symbolDataConfigs.Any())
342  {
343  subscriptionType = new Tuple<Type, TickType>(symbolDataConfigs.First().Type, TickType.Trade);
344  isCustomData = true;
345  }
346  }
347 
348  var baseInstance = subscriptionType.Item1.GetBaseDataInstance();
349  baseInstance.Symbol = securityBenchmark.Security.Symbol;
350  var supportedResolutions = baseInstance.SupportedResolutions();
351  if (!supportedResolutions.Contains(resolution))
352  {
353  resolution = supportedResolutions.OrderByDescending(x => x).First();
354  }
355 
356  var subscriptionList = new List<Tuple<Type, TickType>>() {subscriptionType};
357  var dataConfig = _algorithm.SubscriptionManager.SubscriptionDataConfigService.Add(
358  securityBenchmark.Security.Symbol,
359  resolution,
360  isInternalFeed: true,
361  fillForward: false,
362  isCustomData: isCustomData,
363  subscriptionDataTypes: subscriptionList
364  ).First();
365 
366  // we want to start from the previous tradable bar so the benchmark security
367  // never has 0 price
368  var previousTradableBar = Time.GetStartTimeForTradeBars(
369  securityBenchmark.Security.Exchange.Hours,
370  utcStart.ConvertFromUtc(securityBenchmark.Security.Exchange.TimeZone),
371  _algorithm.LiveMode ? Time.OneMinute : Time.OneDay,
372  1,
373  false,
374  dataConfig.DataTimeZone,
375  LeanData.UseStrictEndTime(_algorithm.Settings.DailyPreciseEndTime, securityBenchmark.Security.Symbol, _algorithm.LiveMode ? Time.OneMinute : Time.OneDay, securityBenchmark.Security.Exchange.Hours)
376  ).ConvertToUtc(securityBenchmark.Security.Exchange.TimeZone);
377 
378  if (dataConfig != null)
379  {
380  added |= _dataManager.AddSubscription(new SubscriptionRequest(
381  false,
382  null,
383  securityBenchmark.Security,
384  dataConfig,
385  previousTradableBar,
386  _algorithm.EndDate.ConvertToUtc(_algorithm.TimeZone)));
387 
388  Log.Trace($"UniverseSelection.AddPendingInternalDataFeeds(): Adding internal benchmark data feed {dataConfig}");
389  }
390  }
391  }
392 
393  if (_currencySubscriptionDataConfigManager.UpdatePendingSubscriptionDataConfigs(_algorithm.BrokerageModel))
394  {
395  foreach (var subscriptionDataConfig in _currencySubscriptionDataConfigManager
396  .GetPendingSubscriptionDataConfigs())
397  {
398  var security = _algorithm.Securities[subscriptionDataConfig.Symbol];
399  added |= _dataManager.AddSubscription(new SubscriptionRequest(
400  false,
401  null,
402  security,
403  subscriptionDataConfig,
404  utcStart,
405  _algorithm.EndDate.ConvertToUtc(_algorithm.TimeZone)));
406  }
407  }
408  return added;
409  }
410 
411  /// <summary>
412  /// Checks the current subscriptions and adds necessary currency pair feeds to provide real time conversion data
413  /// </summary>
414  public void EnsureCurrencyDataFeeds(SecurityChanges securityChanges)
415  {
416  _currencySubscriptionDataConfigManager.EnsureCurrencySubscriptionDataConfigs(securityChanges, _algorithm.BrokerageModel);
417  }
418 
419  /// <summary>
420  /// Handles the delisting process of the given data symbol from the algorithm securities
421  /// </summary>
422  public SecurityChanges HandleDelisting(BaseData data, bool isInternalFeed)
423  {
424  if (_algorithm.Securities.TryGetValue(data.Symbol, out var security))
425  {
426  // don't allow users to open a new position once delisted
427  security.IsDelisted = true;
428  security.IsTradable = false;
429 
430  if (_algorithm.Securities.Remove(data.Symbol))
431  {
432  _securityChangesConstructor.Remove(security, isInternalFeed);
433 
434  return _securityChangesConstructor.Flush();
435  }
436  }
437 
438  return SecurityChanges.None;
439  }
440 
441  private void RemoveSecurityFromUniverse(
442  List<PendingRemovalsManager.RemovedMember> removedMembers,
443  DateTime dateTimeUtc,
444  DateTime algorithmEndDateUtc)
445  {
446  if (removedMembers == null)
447  {
448  return;
449  }
450  foreach (var removedMember in removedMembers)
451  {
452  var universe = removedMember.Universe;
453  var member = removedMember.Security;
454 
455  // safe to remove the member from the universe
456  universe.RemoveMember(dateTimeUtc, member);
457 
458  var isActive = _algorithm.UniverseManager.ActiveSecurities.ContainsKey(member.Symbol);
459  foreach (var subscription in universe.GetSubscriptionRequests(member, dateTimeUtc, algorithmEndDateUtc,
460  _algorithm.SubscriptionManager.SubscriptionDataConfigService))
461  {
462  if (_dataManager.RemoveSubscription(subscription.Configuration, universe))
463  {
464  _internalSubscriptionManager.RemovedSubscriptionRequest(subscription);
465 
466  // if not used by any universe
467  if (!isActive)
468  {
469  member.IsTradable = false;
470  // We need to mark this security as untradeable while it has no data subscription
471  // it is expected that this function is called while in sync with the algo thread,
472  // so we can make direct edits to the security here.
473  // We only clear the cache once the subscription is removed from the data stack
474  member.Cache.Reset();
475 
476  _algorithm.Securities.Remove(member.Symbol);
477  }
478  }
479  }
480  }
481  }
482 
483  private Security GetOrCreateSecurity(Dictionary<Symbol, Security> pendingAdditions, Symbol symbol, UniverseSettings universeSettings, Security underlying = null)
484  {
485  // create the new security, the algorithm thread will add this at the appropriate time
486  Security security;
487  if (!pendingAdditions.TryGetValue(symbol, out security) && !_algorithm.Securities.TryGetValue(symbol, out security))
488  {
489  security = _securityService.CreateSecurity(symbol, new List<SubscriptionDataConfig>(), universeSettings.Leverage, symbol.ID.SecurityType.IsOption(), underlying);
490 
491  pendingAdditions.Add(symbol, security);
492  }
493 
494  return security;
495  }
496  }
497 }