Lean  $LEAN_TAG$
DataQueueHandlerSubscriptionManager.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
17 using QuantConnect.Logging;
18 using System;
19 using System.Collections.Concurrent;
20 using System.Collections.Generic;
21 using System.Linq;
22 
23 namespace QuantConnect.Data
24 {
25  /// <summary>
26  /// Count number of subscribers for each channel (Symbol, Socket) pair
27  /// </summary>
28  public abstract class DataQueueHandlerSubscriptionManager : IDisposable
29  {
30  /// <summary>
31  /// Counter
32  /// </summary>
33  protected ConcurrentDictionary<Channel, int> SubscribersByChannel { get; init; } = new ConcurrentDictionary<Channel, int>();
34 
35  /// <summary>
36  /// Increment number of subscribers for current <see cref="TickType"/>
37  /// </summary>
38  /// <param name="dataConfig">defines the subscription configuration data.</param>
39  public void Subscribe(SubscriptionDataConfig dataConfig)
40  {
41  try
42  {
43  var channel = GetChannel(dataConfig);
44  int count;
45  if (SubscribersByChannel.TryGetValue(channel, out count))
46  {
47  SubscribersByChannel.TryUpdate(channel, count + 1, count);
48  return;
49  }
50 
51  if (Subscribe(new[] { dataConfig.Symbol }, dataConfig.TickType))
52  {
53  SubscribersByChannel.AddOrUpdate(channel, 1);
54  }
55  }
56  catch (Exception exception)
57  {
58  Log.Error(exception);
59  throw;
60  }
61  }
62 
63  /// <summary>
64  /// Decrement number of subscribers for current <see cref="TickType"/>
65  /// </summary>
66  /// <param name="dataConfig">defines the subscription configuration data.</param>
67  public void Unsubscribe(SubscriptionDataConfig dataConfig)
68  {
69  try
70  {
71  var channel = GetChannel(dataConfig);
72  int count;
73  if (SubscribersByChannel.TryGetValue(channel, out count))
74  {
75  if (count > 1)
76  {
77  SubscribersByChannel.TryUpdate(channel, count - 1, count);
78  return;
79  }
80 
81  if (Unsubscribe(new[] { dataConfig.Symbol }, dataConfig.TickType))
82  {
83  SubscribersByChannel.TryRemove(channel, out count);
84  }
85  }
86  }
87  catch (Exception exception)
88  {
89  Log.Error(exception);
90  throw;
91  }
92  }
93 
94  /// <summary>
95  /// Returns subscribed symbols
96  /// </summary>
97  /// <returns>list of <see cref="Symbol"/> currently subscribed</returns>
98  public IEnumerable<Symbol> GetSubscribedSymbols()
99  {
100  return SubscribersByChannel.Keys
101  .Select(c => c.Symbol)
102  .Distinct();
103  }
104 
105  /// <summary>
106  /// Retrieves the list of unique <see cref="Symbol"/> instances that are currently subscribed for a specific <see cref="TickType"/>.
107  /// </summary>
108  /// <param name="tickType">The type of tick data to filter subscriptions by.</param>
109  /// <returns>A collection of unique <see cref="Symbol"/> objects that match the specified <paramref name="tickType"/>.</returns>
110  public IEnumerable<Symbol> GetSubscribedSymbols(TickType tickType)
111  {
112  var channelName = ChannelNameFromTickType(tickType);
113 #pragma warning disable CA1309
114  return SubscribersByChannel.Keys.Where(x => x.Name.Equals(channelName, StringComparison.InvariantCultureIgnoreCase))
115 #pragma warning restore CA1309
116  .Select(c => c.Symbol)
117  .Distinct();
118  }
119 
120  /// <summary>
121  /// Checks if there is existing subscriber for current channel
122  /// </summary>
123  /// <param name="symbol">Symbol</param>
124  /// <param name="tickType">Type of tick data</param>
125  /// <returns>return true if there is one subscriber at least; otherwise false</returns>
126  public bool IsSubscribed(Symbol symbol, TickType tickType)
127  {
128  return SubscribersByChannel.ContainsKey(GetChannel(
129  symbol,
130  tickType));
131  }
132 
133  /// <summary>
134  /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
135  /// </summary>
136  public virtual void Dispose()
137  {
138  }
139 
140  /// <summary>
141  /// Describes the way <see cref="IDataQueueHandler"/> implements subscription
142  /// </summary>
143  /// <param name="symbols">Symbols to subscribe</param>
144  /// <param name="tickType">Type of tick data</param>
145  /// <returns>Returns true if subsribed; otherwise false</returns>
146  protected abstract bool Subscribe(IEnumerable<Symbol> symbols, TickType tickType);
147 
148  /// <summary>
149  /// Describes the way <see cref="IDataQueueHandler"/> implements unsubscription
150  /// </summary>
151  /// <param name="symbols">Symbols to unsubscribe</param>
152  /// <param name="tickType">Type of tick data</param>
153  /// <returns>Returns true if unsubsribed; otherwise false</returns>
154  protected abstract bool Unsubscribe(IEnumerable<Symbol> symbols, TickType tickType);
155 
156  /// <summary>
157  /// Brokerage maps <see cref="TickType"/> to real socket/api channel
158  /// </summary>
159  /// <param name="tickType">Type of tick data</param>
160  /// <returns></returns>
161  protected abstract string ChannelNameFromTickType(TickType tickType);
162 
163  private Channel GetChannel(SubscriptionDataConfig dataConfig) => GetChannel(dataConfig.Symbol, dataConfig.TickType);
164 
165  private Channel GetChannel(Symbol symbol, TickType tickType)
166  {
167  return new Channel(
168  ChannelNameFromTickType(tickType),
169  symbol);
170  }
171  }
172 }