Lean  $LEAN_TAG$
SubscriptionCollection.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Collections;
19 using System.Collections.Concurrent;
20 using System.Collections.Generic;
21 using System.Linq;
22 using QuantConnect.Data;
23 using QuantConnect.Util;
24 
26 {
27  /// <summary>
28  /// Provides a collection for holding subscriptions.
29  /// </summary>
30  public class SubscriptionCollection : IEnumerable<Subscription>
31  {
32  private readonly ConcurrentDictionary<SubscriptionDataConfig, Subscription> _subscriptions;
33  private bool _sortingSubscriptionRequired;
34  private bool _frozen;
35  private readonly Ref<TimeSpan> _fillForwardResolution;
36 
37  // some asset types (options, futures, crypto) have multiple subscriptions for different tick types,
38  // we keep a sorted list of subscriptions so we can return them in a deterministic order
39  private List<Subscription> _subscriptionsByTickType;
40 
41  /// <summary>
42  /// Event fired when the fill forward resolution changes
43  /// </summary>
44  public event EventHandler<FillForwardResolutionChangedEvent> FillForwardResolutionChanged;
45 
46  /// <summary>
47  /// Initializes a new instance of the <see cref="SubscriptionCollection"/> class
48  /// </summary>
50  {
51  _subscriptions = new ConcurrentDictionary<SubscriptionDataConfig, Subscription>();
52  _subscriptionsByTickType = new List<Subscription>();
53  var ffres = Time.OneMinute;
54  _fillForwardResolution = Ref.Create(() => ffres, res => ffres = res);
55  }
56 
57  /// <summary>
58  /// Checks the collection for the specified subscription configuration
59  /// </summary>
60  /// <param name="configuration">The subscription configuration to check for</param>
61  /// <returns>True if a subscription with the specified configuration is found in this collection, false otherwise</returns>
62  public bool Contains(SubscriptionDataConfig configuration)
63  {
64  return _subscriptions.ContainsKey(configuration);
65  }
66 
67  /// <summary>
68  /// Attempts to add the specified subscription to the collection. If another subscription
69  /// exists with the same configuration then it won't be added.
70  /// </summary>
71  /// <param name="subscription">The subscription to add</param>
72  /// <returns>True if the subscription is successfully added, false otherwise</returns>
73  public bool TryAdd(Subscription subscription)
74  {
75  if (_subscriptions.TryAdd(subscription.Configuration, subscription))
76  {
77  UpdateFillForwardResolution(FillForwardResolutionOperation.AfterAdd, subscription.Configuration);
78  _sortingSubscriptionRequired = true;
79  return true;
80  }
81  return false;
82  }
83 
84  /// <summary>
85  /// Attempts to retrieve the subscription with the specified configuration
86  /// </summary>
87  /// <param name="configuration">The subscription's configuration</param>
88  /// <param name="subscription">The subscription matching the configuration, null if not found</param>
89  /// <returns>True if the subscription is successfully retrieved, false otherwise</returns>
90  public bool TryGetValue(SubscriptionDataConfig configuration, out Subscription subscription)
91  {
92  return _subscriptions.TryGetValue(configuration, out subscription);
93  }
94 
95  /// <summary>
96  /// Attempts to remove the subscription with the specified configuraton from the collection.
97  /// </summary>
98  /// <param name="configuration">The configuration of the subscription to remove</param>
99  /// <param name="subscription">The removed subscription, null if not found.</param>
100  /// <returns>True if the subscription is successfully removed, false otherwise</returns>
101  public bool TryRemove(SubscriptionDataConfig configuration, out Subscription subscription)
102  {
103  if (_subscriptions.TryRemove(configuration, out subscription))
104  {
105  // for user friendlyness only look at removals triggerd by the user not those that are due to a data feed ending because of no more data,
106  // let's try to respect the users original FF enumerator request
107  if (!subscription.EndOfStream)
108  {
109  UpdateFillForwardResolution(FillForwardResolutionOperation.AfterRemove, configuration);
110  }
111  _sortingSubscriptionRequired = true;
112  return true;
113  }
114  return false;
115  }
116 
117  /// <summary>
118  /// Returns an enumerator that iterates through the collection.
119  /// </summary>
120  /// <returns>
121  /// An enumerator that can be used to iterate through the collection.
122  /// </returns>
123  public IEnumerator<Subscription> GetEnumerator()
124  {
125  SortSubscriptions();
126  return _subscriptionsByTickType.GetEnumerator();
127  }
128 
129  /// <summary>
130  /// Returns an enumerator that iterates through a collection.
131  /// </summary>
132  /// <returns>
133  /// An <see cref="T:System.Collections.IEnumerator"/> object that can be used to iterate through the collection.
134  /// </returns>
135  IEnumerator IEnumerable.GetEnumerator()
136  {
137  return GetEnumerator();
138  }
139 
140  /// <summary>
141  /// Gets and updates the fill forward resolution by checking specified subscription configurations and
142  /// selecting the smallest resoluton not equal to tick
143  /// </summary>
145  {
146  if (configuration != null)
147  {
148  UpdateFillForwardResolution(FillForwardResolutionOperation.BeforeAdd, configuration);
149  }
150  return _fillForwardResolution;
151  }
152 
153  /// <summary>
154  /// Will disable or enable fill forward resolution updates
155  /// </summary>
156  public void FreezeFillForwardResolution(bool freeze)
157  {
158  _frozen = freeze;
159  }
160 
161  /// <summary>
162  /// Helper method to validate a configuration to be included in the fill forward calculation
163  /// </summary>
164  private static bool ValidateFillForwardResolution(SubscriptionDataConfig configuration)
165  {
166  return !configuration.IsInternalFeed && configuration.Resolution != Resolution.Tick;
167  }
168  /// <summary>
169  /// Gets and updates the fill forward resolution by checking specified subscription configurations and
170  /// selecting the smallest resoluton not equal to tick
171  /// </summary>
172  private void UpdateFillForwardResolution(FillForwardResolutionOperation operation, SubscriptionDataConfig configuration)
173  {
174  if(_frozen)
175  {
176  return;
177  }
178 
179  // Due to performance implications let's be jealous in updating the _fillForwardResolution
180  if (ValidateFillForwardResolution(configuration) &&
181  (
182  ((FillForwardResolutionOperation.BeforeAdd == operation || FillForwardResolutionOperation.AfterAdd == operation)
183  && configuration.Increment != _fillForwardResolution.Value) // check if the new Increment is different
184  ||
185  (operation == FillForwardResolutionOperation.AfterRemove // We are removing
186  && configuration.Increment == _fillForwardResolution.Value // True: We are removing the resolution we were using
187  // False: there is at least another one equal, no need to update, but we only look at those valid configuration which are the ones which set the FF resolution
188  && _subscriptions.Keys.All(x => !ValidateFillForwardResolution(x) || x.Resolution != configuration.Resolution)))
189  )
190  {
191  var configurations = (operation == FillForwardResolutionOperation.BeforeAdd)
192  ? _subscriptions.Keys.Concat(new[] { configuration }) : _subscriptions.Keys;
193 
194  var eventArgs = new FillForwardResolutionChangedEvent { Old = _fillForwardResolution.Value };
195  _fillForwardResolution.Value = configurations.Where(ValidateFillForwardResolution)
196  .Select(x => x.Resolution)
197  .Distinct()
198  .DefaultIfEmpty(Resolution.Minute)
199  .Min().ToTimeSpan();
200  if (_fillForwardResolution.Value != eventArgs.Old)
201  {
202  eventArgs.New = _fillForwardResolution.Value;
203  // notify consumers if any
204  FillForwardResolutionChanged?.Invoke(this, eventArgs);
205  }
206  }
207  }
208 
209  /// <summary>
210  /// Sorts subscriptions so that equity subscriptions are enumerated before option
211  /// securities to ensure the underlying data is available when we process the options data
212  /// </summary>
213  private void SortSubscriptions()
214  {
215  if (_sortingSubscriptionRequired)
216  {
217  _sortingSubscriptionRequired = false;
218  // it's important that we enumerate underlying securities before derivatives to this end,
219  // we order by security type so that equity subscriptions are enumerated before option
220  // securities to ensure the underlying data is available when we process the options data
221  _subscriptionsByTickType = _subscriptions
222  .Select(x => x.Value)
223  .OrderBy(x => x.Configuration.SecurityType)
224  .ThenBy(x => x.Configuration.TickType)
225  .ThenBy(x => x.Configuration.Symbol)
226  .ToList();
227  }
228  }
229 
230  private enum FillForwardResolutionOperation
231  {
232  AfterRemove,
233  BeforeAdd,
234  AfterAdd
235  }
236  }
237 }