Lean  $LEAN_TAG$
LiveSubscriptionEnumerator.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using QuantConnect.Data;
19 using QuantConnect.Util;
20 using System.Collections;
21 using QuantConnect.Logging;
23 using System.Collections.Generic;
24 
26 {
27  /// <summary>
28  /// Enumerator that will subscribe through the provided data queue handler and refresh the subscription if any mapping occurs
29  /// </summary>
30  public class LiveSubscriptionEnumerator : IEnumerator<BaseData>
31  {
32  private BaseData _current;
33  private readonly Symbol _requestedSymbol;
34  private SubscriptionDataConfig _currentConfig;
35  private IEnumerator<BaseData> _previousEnumerator;
36  private IEnumerator<BaseData> _underlyingEnumerator;
37 
38  /// <summary>
39  /// The current data object instance
40  /// </summary>
41  public BaseData Current => _current;
42 
43  /// <summary>
44  /// The current data object instance
45  /// </summary>
46  object IEnumerator.Current => Current;
47 
48  /// <summary>
49  /// Creates a new instance
50  /// </summary>
51  public LiveSubscriptionEnumerator(SubscriptionDataConfig dataConfig, IDataQueueHandler dataQueueHandler, EventHandler handler, Func<SubscriptionDataConfig, bool> isExpired)
52  {
53  _requestedSymbol = dataConfig.Symbol;
54  _underlyingEnumerator = dataQueueHandler.SubscribeWithMapping(dataConfig, handler, isExpired, out _currentConfig);
55 
56  // for any mapping event we will re subscribe
57  dataConfig.NewSymbol += (_, _) =>
58  {
59  dataQueueHandler.Unsubscribe(_currentConfig);
60  _previousEnumerator = _underlyingEnumerator;
61 
62  var oldSymbol = _currentConfig.Symbol;
63  _underlyingEnumerator = dataQueueHandler.SubscribeWithMapping(dataConfig, handler, isExpired, out _currentConfig);
64 
65  Log.Trace($"LiveSubscriptionEnumerator({_requestedSymbol}): " +
66  $"resubscribing old: '{oldSymbol.Value}' new '{_currentConfig.Symbol.Value}'");
67  };
68  }
69 
70  /// <summary>
71  /// Advances the enumerator to the next element.
72  /// </summary>
73  public bool MoveNext()
74  {
75  if (_previousEnumerator != null)
76  {
77  // if previous is set we dispose of it here since we are the consumers of it
78  _previousEnumerator.DisposeSafely();
79  _previousEnumerator = null;
80  }
81 
82  var result = _underlyingEnumerator.MoveNext();
83  if (result)
84  {
85  _current = _underlyingEnumerator.Current;
86  }
87  else
88  {
89  _current = null;
90  }
91 
92  if (_current != null && _current.Symbol != _requestedSymbol)
93  {
94  // if we've done some mapping at this layer let's clone the underlying and set the requested symbol,
95  // don't trust the IDQH implementations for data uniqueness, since the configuration could be shared
96  _current = _current.Clone();
97  _current.Symbol = _requestedSymbol;
98  }
99 
100  return result;
101  }
102 
103  /// <summary>
104  /// Reset the IEnumeration
105  /// </summary>
106  public void Reset()
107  {
108  _underlyingEnumerator.Reset();
109  }
110 
111  /// <summary>
112  /// Disposes of the used enumerators
113  /// </summary>
114  public void Dispose()
115  {
116  _previousEnumerator.DisposeSafely();
117  _underlyingEnumerator.DisposeSafely();
118  }
119  }
120 }