Lean  $LEAN_TAG$
InternalSubscriptionManager.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Collections.Generic;
18 using System.Linq;
19 using QuantConnect.Data;
22 
24 {
25  /// <summary>
26  /// Class in charge of handling Leans internal subscriptions
27  /// </summary>
29  {
30  private readonly Dictionary<Symbol, List<SubscriptionRequest>> _subscriptionRequests;
31  private readonly Resolution _resolution;
32  private readonly IAlgorithm _algorithm;
33 
34  /// <summary>
35  /// Event fired when a new internal subscription request is to be added
36  /// </summary>
37  public EventHandler<SubscriptionRequest> Added { get; set; }
38 
39  /// <summary>
40  /// Event fired when an existing internal subscription should be removed
41  /// </summary>
42  public EventHandler<SubscriptionRequest> Removed { get; set; }
43 
44  /// <summary>
45  /// Creates a new instances
46  /// </summary>
47  /// <param name="algorithm">The associated algorithm</param>
48  /// <param name="resolution">The resolution to use for the internal subscriptions</param>
49  public InternalSubscriptionManager(IAlgorithm algorithm, Resolution resolution)
50  {
51  _algorithm = algorithm;
52  _resolution = resolution;
53  _subscriptionRequests = new Dictionary<Symbol, List<SubscriptionRequest>>();
54  }
55 
56  /// <summary>
57  /// Notifies about a removed subscription request
58  /// </summary>
59  /// <param name="request">The removed subscription request</param>
61  {
62  if (PreFilter(request))
63  {
64  var lowResolution = request.Configuration.Resolution > Resolution.Minute;
65  List<SubscriptionRequest> internalRequests;
66  var existing = _subscriptionRequests.TryGetValue(request.Configuration.Symbol, out internalRequests);
67  var alreadyInternal = existing && internalRequests.Any(internalRequest => internalRequest.Configuration.Type == request.Configuration.Type
68  && request.Configuration.TickType == internalRequest.Configuration.TickType);
69 
70  if (lowResolution && !alreadyInternal)
71  {
72  // low resolution subscriptions we will add internal Resolution.Minute subscriptions
73  // if we don't already have this symbol added
74  var config = new SubscriptionDataConfig(request.Configuration, resolution: _resolution, isInternalFeed: true, extendedHours: true, isFilteredSubscription: false);
75  var startTimeUtc = request.StartTimeUtc;
76  if (_algorithm.IsWarmingUp)
77  {
78  // during warmup in live trading do not add these internal subscription until the algorithm starts
79  // these subscription are only added for realtime price in low resolution subscriptions which isn't required for warmup
80  startTimeUtc = DateTime.UtcNow;
81  }
82  var internalRequest = new SubscriptionRequest(false, null, request.Security, config, startTimeUtc, request.EndTimeUtc);
83  if (existing)
84  {
85  _subscriptionRequests[request.Configuration.Symbol].Add(internalRequest);
86  }
87  else
88  {
89  _subscriptionRequests[request.Configuration.Symbol] = new List<SubscriptionRequest>{ internalRequest };
90  }
91  Added?.Invoke(this, internalRequest);
92  }
93  else if (!lowResolution && alreadyInternal)
94  {
95  _subscriptionRequests.Remove(request.Configuration.Symbol);
96  // the user added a higher resolution configuration, we can remove the internal we added
97  foreach (var subscriptionRequest in internalRequests)
98  {
99  Removed?.Invoke(this, subscriptionRequest);
100  }
101  }
102  }
103  }
104 
105  /// <summary>
106  /// Notifies about an added subscription request
107  /// </summary>
108  /// <param name="request">The added subscription request</param>
110  {
111  if (PreFilter(request) && _subscriptionRequests.ContainsKey(request.Configuration.Symbol))
112  {
113  var userConfigs = _algorithm.SubscriptionManager.SubscriptionDataConfigService
114  .GetSubscriptionDataConfigs(request.Configuration.Symbol).ToList();
115 
116  if (userConfigs.Count == 0 || userConfigs.Any(config => config.Resolution <= Resolution.Minute))
117  {
118  var requests = _subscriptionRequests[request.Configuration.Symbol];
119  _subscriptionRequests.Remove(request.Configuration.Symbol);
120  // if we had a config and the user no longer has a config for this symbol we remove the internal subscription
121  foreach (var subscriptionRequest in requests)
122  {
123  Removed?.Invoke(this, subscriptionRequest);
124  }
125  }
126  }
127  }
128 
129  /// <summary>
130  /// True for for live trading, non internal, non universe subscriptions, non custom data subscriptions
131  /// </summary>
132  private bool PreFilter(SubscriptionRequest request)
133  {
134  return _algorithm.LiveMode && !request.Configuration.IsInternalFeed && !request.IsUniverseSubscription && !request.Configuration.IsCustomData;
135  }
136  }
137 }