Lean  $LEAN_TAG$
UniverseSelection.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Collections.Generic;
18 using System.Linq;
20 using QuantConnect.Data;
23 using QuantConnect.Logging;
25 using QuantConnect.Util;
27 
29 {
30  /// <summary>
31  /// Provides methods for apply the results of universe selection to an algorithm
32  /// </summary>
33  public class UniverseSelection
34  {
35  private IDataFeedSubscriptionManager _dataManager;
36  private readonly IAlgorithm _algorithm;
37  private readonly ISecurityService _securityService;
38  private readonly Dictionary<DateTime, Dictionary<Symbol, Security>> _pendingSecurityAdditions = new Dictionary<DateTime, Dictionary<Symbol, Security>>();
39  private readonly PendingRemovalsManager _pendingRemovalsManager;
40  private readonly CurrencySubscriptionDataConfigManager _currencySubscriptionDataConfigManager;
41  private readonly InternalSubscriptionManager _internalSubscriptionManager;
42  private bool _initializedSecurityBenchmark;
43  private bool _anyDoesNotHaveFundamentalDataWarningLogged;
44  private readonly SecurityChangesConstructor _securityChangesConstructor;
45 
46  /// <summary>
47  /// Initializes a new instance of the <see cref="UniverseSelection"/> class
48  /// </summary>
49  /// <param name="algorithm">The algorithm to add securities to</param>
50  /// <param name="securityService">The security service</param>
51  /// <param name="dataPermissionManager">The data permissions manager</param>
52  /// <param name="dataProvider">The data provider to use</param>
53  /// <param name="internalConfigResolution">The resolution to use for internal configuration</param>
55  IAlgorithm algorithm,
56  ISecurityService securityService,
57  IDataPermissionManager dataPermissionManager,
58  IDataProvider dataProvider,
59  Resolution internalConfigResolution = Resolution.Minute)
60  {
61  _algorithm = algorithm;
62  _securityService = securityService;
63  _pendingRemovalsManager = new PendingRemovalsManager(algorithm.Transactions);
64  _currencySubscriptionDataConfigManager = new CurrencySubscriptionDataConfigManager(algorithm.Portfolio.CashBook,
65  algorithm.Securities,
66  algorithm.SubscriptionManager,
67  _securityService,
68  Resolution.Minute);
69  // TODO: next step is to merge currency internal subscriptions under the same 'internal manager' instance and we could move this directly into the DataManager class
70  _internalSubscriptionManager = new InternalSubscriptionManager(_algorithm, internalConfigResolution);
71  _securityChangesConstructor = new SecurityChangesConstructor();
72  }
73 
74  /// <summary>
75  /// Sets the data manager
76  /// </summary>
78  {
79  if (_dataManager != null)
80  {
81  throw new Exception("UniverseSelection.SetDataManager(): can only be set once");
82  }
83  _dataManager = dataManager;
84 
85  _internalSubscriptionManager.Added += (sender, request) =>
86  {
87  _dataManager.AddSubscription(request);
88  };
89  _internalSubscriptionManager.Removed += (sender, request) =>
90  {
91  _dataManager.RemoveSubscription(request.Configuration);
92  };
93  }
94 
95  /// <summary>
96  /// Applies universe selection the the data feed and algorithm
97  /// </summary>
98  /// <param name="universe">The universe to perform selection on</param>
99  /// <param name="dateTimeUtc">The current date time in utc</param>
100  /// <param name="universeData">The data provided to perform selection with</param>
101  public SecurityChanges ApplyUniverseSelection(Universe universe, DateTime dateTimeUtc, BaseDataCollection universeData)
102  {
103  var algorithmEndDateUtc = _algorithm.EndDate.ConvertToUtc(_algorithm.TimeZone);
104  if (dateTimeUtc > algorithmEndDateUtc)
105  {
106  return SecurityChanges.None;
107  }
108 
109  IEnumerable<Symbol> selectSymbolsResult;
110 
111  // check if this universe must be filtered with fine fundamental data
113  fineFiltered ??= (universe as FundamentalFilteredUniverse)?.FundamentalUniverse;
114 
115  if (fineFiltered != null
116  // if the universe has been disposed we don't perform selection. This us handled bellow by 'Universe.PerformSelection'
117  // but in this case we directly call 'SelectSymbols' because we want to perform fine selection even if coarse returns the same
118  // symbols, see 'Universe.PerformSelection', which detects this and returns 'Universe.Unchanged'
119  && !universe.DisposeRequested)
120  {
121  // perform initial filtering and limit the result
122  selectSymbolsResult = universe.SelectSymbols(dateTimeUtc, universeData);
123 
124  if (!ReferenceEquals(selectSymbolsResult, Universe.Unchanged))
125  {
126  // prepare a BaseDataCollection of FineFundamental instances
127  var fineCollection = new BaseDataCollection();
128 
129  // if the input is already fundamental data we just need to filter it and pass it through
130  var hasFundamentalData = universeData.Data.Count > 0 && universeData.Data[0] is Fundamental;
131  if(hasFundamentalData)
132  {
133  // Remove selected symbols that does not have fine fundamental data
134  var anyDoesNotHaveFundamentalData = false;
135 
136  // only pre filter selected symbols if there actually is any fundamental data. This way we can support custom universe filtered by fine fundamental data
137  // which do not use coarse data as underlying, in which case it could happen that we try to load fine fundamental data that is missing, but no problem,
138  // 'FineFundamentalSubscriptionEnumeratorFactory' won't emit it
139  var set = selectSymbolsResult.ToHashSet();
140  fineCollection.Data.AddRange(universeData.Data.OfType<Fundamental>().Where(fundamental => {
141  // we remove to we distict by symbol
142  if (set.Remove(fundamental.Symbol))
143  {
144  if (!fundamental.HasFundamentalData)
145  {
146  anyDoesNotHaveFundamentalData = true;
147  return false;
148  }
149  return true;
150  }
151  return false;
152  }));
153 
154  if (!_anyDoesNotHaveFundamentalDataWarningLogged && anyDoesNotHaveFundamentalData)
155  {
156  _algorithm.Debug("Note: Your coarse selection filter was updated to exclude symbols without fine fundamental data. Make sure your coarse filter excludes symbols where HasFundamental is false.");
157  _anyDoesNotHaveFundamentalDataWarningLogged = true;
158  }
159  }
160  else
161  {
162  // we need to load the fundamental data
163  var currentTime = dateTimeUtc.ConvertFromUtc(TimeZones.NewYork);
164  foreach (var symbol in selectSymbolsResult)
165  {
166  fineCollection.Data.Add(new Fundamental(currentTime, symbol));
167  }
168  }
169 
170  universeData.Data = fineCollection.Data;
171  // perform the fine fundamental universe selection
172  selectSymbolsResult = fineFiltered.PerformSelection(dateTimeUtc, fineCollection);
173  }
174  }
175  else
176  {
177  // perform initial filtering and limit the result
178  selectSymbolsResult = universe.PerformSelection(dateTimeUtc, universeData);
179  }
180 
181  if (!ReferenceEquals(selectSymbolsResult, Universe.Unchanged))
182  {
183  // materialize the enumerable into a set for processing
184  universe.Selected = selectSymbolsResult.ToHashSet();
185  }
186 
187  // first check for no pending removals, even if the universe selection
188  // didn't change we might need to remove a security because a position was closed
189  RemoveSecurityFromUniverse(
190  _pendingRemovalsManager.CheckPendingRemovals(universe.Selected, universe),
191  dateTimeUtc,
192  algorithmEndDateUtc);
193 
194  // check for no changes second
195  if (ReferenceEquals(selectSymbolsResult, Universe.Unchanged))
196  {
197  return SecurityChanges.None;
198  }
199 
200  // determine which data subscriptions need to be removed from this universe
201  foreach (var member in universe.Securities.Values.OrderBy(member => member.Security.Symbol.SecurityType).ThenBy(x => x.Security.Symbol.ID))
202  {
203  var security = member.Security;
204  // if we've selected this subscription again, keep it
205  if (universe.Selected.Contains(security.Symbol)) continue;
206 
207  // don't remove if the universe wants to keep him in
208  if (!universe.CanRemoveMember(dateTimeUtc, security)) continue;
209 
210  if (!member.Security.IsDelisted)
211  {
212  // TODO: here we are not checking if other universes have this security still selected
213  _securityChangesConstructor.Remove(member.Security, member.IsInternal);
214  }
215 
216  RemoveSecurityFromUniverse(_pendingRemovalsManager.TryRemoveMember(security, universe),
217  dateTimeUtc,
218  algorithmEndDateUtc);
219  }
220 
221  Dictionary<Symbol, Security> pendingAdditions;
222  if (!_pendingSecurityAdditions.TryGetValue(dateTimeUtc, out pendingAdditions))
223  {
224  // if the frontier moved forward then we've added these securities to the algorithm
225  _pendingSecurityAdditions.Clear();
226 
227  // keep track of created securities so we don't create the same security twice, leads to bad things :)
228  pendingAdditions = new Dictionary<Symbol, Security>();
229  _pendingSecurityAdditions[dateTimeUtc] = pendingAdditions;
230  }
231 
232  // find new selections and add them to the algorithm
233  foreach (var symbol in universe.Selected)
234  {
235  if (universe.Securities.ContainsKey(symbol))
236  {
237  // if its already part of the universe no need to re add it
238  continue;
239  }
240 
241  Security underlying = null;
242  if (symbol.HasUnderlying)
243  {
244  underlying = GetOrCreateSecurity(pendingAdditions, symbol.Underlying, universe.UniverseSettings);
245  }
246  // create the new security, the algorithm thread will add this at the appropriate time
247  var security = GetOrCreateSecurity(pendingAdditions, symbol, universe.UniverseSettings, underlying);
248 
249  var addedSubscription = false;
250  var dataFeedAdded = false;
251  var internalFeed = true;
252  foreach (var request in universe.GetSubscriptionRequests(security, dateTimeUtc, algorithmEndDateUtc,
253  _algorithm.SubscriptionManager.SubscriptionDataConfigService))
254  {
255  if (security.Symbol == request.Configuration.Symbol // Just in case check its the same symbol, else AddData will throw.
256  && !security.Subscriptions.Contains(request.Configuration))
257  {
258  // For now this is required for retro compatibility with usages of security.Subscriptions
259  security.AddData(request.Configuration);
260  }
261 
262  var toRemove = _currencySubscriptionDataConfigManager.GetSubscriptionDataConfigToRemove(request.Configuration.Symbol);
263  if (toRemove != null)
264  {
265  Log.Trace($"UniverseSelection.ApplyUniverseSelection(): Removing internal currency data feed {toRemove}");
266  _dataManager.RemoveSubscription(toRemove);
267  }
268 
269  // 'dataFeedAdded' will help us notify the user for security changes only once per non internal subscription
270  // for example two universes adding the sample configuration, we don't want two notifications
271  dataFeedAdded = _dataManager.AddSubscription(request);
272 
273  // only update our security changes if we actually added data
274  if (!request.IsUniverseSubscription)
275  {
276  addedSubscription = true;
277  // if any config isn't internal then it's not internal
278  internalFeed &= request.Configuration.IsInternalFeed;
279  _internalSubscriptionManager.AddedSubscriptionRequest(request);
280  }
281  }
282 
283  if (addedSubscription)
284  {
285  var addedMember = universe.AddMember(dateTimeUtc, security, internalFeed);
286 
287  if (addedMember && dataFeedAdded)
288  {
289  _securityChangesConstructor.Add(security, internalFeed);
290  }
291  }
292  }
293 
294  var securityChanges = _securityChangesConstructor.Flush();
295 
296  // Add currency data feeds that weren't explicitly added in Initialize
297  if (securityChanges.AddedSecurities.Count > 0)
298  {
299  EnsureCurrencyDataFeeds(securityChanges);
300  }
301 
302  if (securityChanges != SecurityChanges.None && Log.DebuggingEnabled)
303  {
304  // for performance lets not create the message string if debugging is not enabled
305  // this can be executed many times and its in the algorithm thread
306  Log.Debug("UniverseSelection.ApplyUniverseSelection(): " + dateTimeUtc + ": " + securityChanges);
307  }
308 
309  return securityChanges;
310  }
311 
312  /// <summary>
313  /// Will add any pending internal currency subscriptions
314  /// </summary>
315  /// <param name="utcStart">The current date time in utc</param>
316  /// <returns>Will return true if any subscription was added</returns>
317  public bool AddPendingInternalDataFeeds(DateTime utcStart)
318  {
319  var added = false;
320  if (!_initializedSecurityBenchmark)
321  {
322  _initializedSecurityBenchmark = true;
323 
324  var securityBenchmark = _algorithm.Benchmark as SecurityBenchmark;
325  if (securityBenchmark != null)
326  {
327  var resolution = _algorithm.LiveMode ? Resolution.Minute : Resolution.Hour;
328 
329  // Check that the tradebar subscription we are using can support this resolution GH #5893
330  var subscriptionType = _algorithm.SubscriptionManager.SubscriptionDataConfigService.LookupSubscriptionConfigDataTypes(securityBenchmark.Security.Type, resolution, securityBenchmark.Security.Symbol.IsCanonical()).First();
331  var symbol = securityBenchmark.Security.Symbol;
332  var isCustomData = false;
333 
334  // Check if the benchmark security is a custom data in order to make sure we get the correct
335  // type
336  if (symbol.SecurityType == SecurityType.Base)
337  {
338  var symbolDataConfigs = _algorithm.SubscriptionManager.SubscriptionDataConfigService.GetSubscriptionDataConfigs(symbol);
339  if (symbolDataConfigs.Any())
340  {
341  subscriptionType = new Tuple<Type, TickType>(symbolDataConfigs.First().Type, TickType.Trade);
342  isCustomData = true;
343  }
344  }
345 
346  var baseInstance = subscriptionType.Item1.GetBaseDataInstance();
347  baseInstance.Symbol = securityBenchmark.Security.Symbol;
348  var supportedResolutions = baseInstance.SupportedResolutions();
349  if (!supportedResolutions.Contains(resolution))
350  {
351  resolution = supportedResolutions.OrderByDescending(x => x).First();
352  }
353 
354  var subscriptionList = new List<Tuple<Type, TickType>>() {subscriptionType};
355  var dataConfig = _algorithm.SubscriptionManager.SubscriptionDataConfigService.Add(
356  securityBenchmark.Security.Symbol,
357  resolution,
358  isInternalFeed: true,
359  fillForward: false,
360  isCustomData: isCustomData,
361  subscriptionDataTypes: subscriptionList
362  ).First();
363 
364  // we want to start from the previous tradable bar so the benchmark security
365  // never has 0 price
366  var previousTradableBar = Time.GetStartTimeForTradeBars(
367  securityBenchmark.Security.Exchange.Hours,
368  utcStart.ConvertFromUtc(securityBenchmark.Security.Exchange.TimeZone),
369  _algorithm.LiveMode ? Time.OneMinute : Time.OneDay,
370  1,
371  false,
372  dataConfig.DataTimeZone,
373  LeanData.UseStrictEndTime(_algorithm.Settings.DailyPreciseEndTime, securityBenchmark.Security.Symbol, _algorithm.LiveMode ? Time.OneMinute : Time.OneDay, securityBenchmark.Security.Exchange.Hours)
374  ).ConvertToUtc(securityBenchmark.Security.Exchange.TimeZone);
375 
376  if (dataConfig != null)
377  {
378  added |= _dataManager.AddSubscription(new SubscriptionRequest(
379  false,
380  null,
381  securityBenchmark.Security,
382  dataConfig,
383  previousTradableBar,
384  _algorithm.EndDate.ConvertToUtc(_algorithm.TimeZone)));
385 
386  Log.Trace($"UniverseSelection.AddPendingInternalDataFeeds(): Adding internal benchmark data feed {dataConfig}");
387  }
388  }
389  }
390 
391  if (_currencySubscriptionDataConfigManager.UpdatePendingSubscriptionDataConfigs(_algorithm.BrokerageModel))
392  {
393  foreach (var subscriptionDataConfig in _currencySubscriptionDataConfigManager
394  .GetPendingSubscriptionDataConfigs())
395  {
396  var security = _algorithm.Securities[subscriptionDataConfig.Symbol];
397  added |= _dataManager.AddSubscription(new SubscriptionRequest(
398  false,
399  null,
400  security,
401  subscriptionDataConfig,
402  utcStart,
403  _algorithm.EndDate.ConvertToUtc(_algorithm.TimeZone)));
404  }
405  }
406  return added;
407  }
408 
409  /// <summary>
410  /// Checks the current subscriptions and adds necessary currency pair feeds to provide real time conversion data
411  /// </summary>
412  public void EnsureCurrencyDataFeeds(SecurityChanges securityChanges)
413  {
414  _currencySubscriptionDataConfigManager.EnsureCurrencySubscriptionDataConfigs(securityChanges, _algorithm.BrokerageModel);
415  }
416 
417  /// <summary>
418  /// Handles the delisting process of the given data symbol from the algorithm securities
419  /// </summary>
420  public SecurityChanges HandleDelisting(BaseData data, bool isInternalFeed)
421  {
422  if (_algorithm.Securities.TryGetValue(data.Symbol, out var security))
423  {
424  // don't allow users to open a new position once delisted
425  security.IsDelisted = true;
426  security.IsTradable = false;
427 
428  // Add the security removal to the security changes but only if not pending for removal.
429  // If pending, the removed change event was already emitted for this security
430  if (_algorithm.Securities.Remove(data.Symbol) && !_pendingRemovalsManager.PendingRemovals.Values.Any(x => x.Any(y => y.Symbol == data.Symbol)))
431  {
432  _securityChangesConstructor.Remove(security, isInternalFeed);
433 
434  return _securityChangesConstructor.Flush();
435  }
436  }
437 
438  return SecurityChanges.None;
439  }
440 
441  private void RemoveSecurityFromUniverse(
442  List<PendingRemovalsManager.RemovedMember> removedMembers,
443  DateTime dateTimeUtc,
444  DateTime algorithmEndDateUtc)
445  {
446  if (removedMembers == null)
447  {
448  return;
449  }
450  foreach (var removedMember in removedMembers)
451  {
452  var universe = removedMember.Universe;
453  var member = removedMember.Security;
454 
455  // safe to remove the member from the universe
456  universe.RemoveMember(dateTimeUtc, member);
457 
458  var isActive = _algorithm.UniverseManager.ActiveSecurities.ContainsKey(member.Symbol);
459  foreach (var subscription in universe.GetSubscriptionRequests(member, dateTimeUtc, algorithmEndDateUtc,
460  _algorithm.SubscriptionManager.SubscriptionDataConfigService))
461  {
462  if (_dataManager.RemoveSubscription(subscription.Configuration, universe))
463  {
464  _internalSubscriptionManager.RemovedSubscriptionRequest(subscription);
465 
466  // if not used by any universe
467  if (!isActive)
468  {
469  member.IsTradable = false;
470  // We need to mark this security as untradeable while it has no data subscription
471  // it is expected that this function is called while in sync with the algo thread,
472  // so we can make direct edits to the security here.
473  // We only clear the cache once the subscription is removed from the data stack
474  member.Cache.Reset();
475 
476  _algorithm.Securities.Remove(member.Symbol);
477  }
478  }
479  }
480  }
481  }
482 
483  private Security GetOrCreateSecurity(Dictionary<Symbol, Security> pendingAdditions, Symbol symbol, UniverseSettings universeSettings, Security underlying = null)
484  {
485  // create the new security, the algorithm thread will add this at the appropriate time
486  Security security;
487  if (!pendingAdditions.TryGetValue(symbol, out security) && !_algorithm.Securities.TryGetValue(symbol, out security))
488  {
489  security = _securityService.CreateSecurity(symbol, new List<SubscriptionDataConfig>(), universeSettings.Leverage, symbol.ID.SecurityType.IsOption(), underlying);
490 
491  pendingAdditions.Add(symbol, security);
492  }
493 
494  return security;
495  }
496  }
497 }