Lean  $LEAN_TAG$
ConsolidatorWrapper.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Collections.Generic;
18 using System.Threading;
21 
22 namespace QuantConnect.Data
23 {
24  /// <summary>
25  /// Helper class to wrap a consolidator and keep track of the next scan time we should trigger
26  /// </summary>
27  internal class ConsolidatorWrapper : IDisposable
28  {
29  // helps us guarantee a deterministic ordering by addition/creation
30  private static long _counter;
31 
32  private readonly IDataConsolidator _consolidator;
33  private readonly LocalTimeKeeper _localTimeKeeper;
34  private readonly TimeSpan _minimumIncrement;
35  private readonly ITimeKeeper _timeKeeper;
36  private readonly long _id;
37  private TimeSpan? _barSpan;
38 
39  /// <summary>
40  /// True if this consolidator has been removed
41  /// </summary>
42  public bool Disposed { get; private set; }
43 
44  /// <summary>
45  /// The next utc scan time
46  /// </summary>
47  public DateTime UtcScanTime { get; private set; }
48 
49  /// <summary>
50  /// Get enqueue time
51  /// </summary>
52  public ConsolidatorScanPriority Priority => new (UtcScanTime, _id);
53 
54  /// <summary>
55  /// Creates a new instance
56  /// </summary>
57  public ConsolidatorWrapper(IDataConsolidator consolidator, TimeSpan configIncrement, ITimeKeeper timeKeeper, LocalTimeKeeper localTimeKeeper)
58  {
59  _id = Interlocked.Increment(ref _counter);
60 
61  _timeKeeper = timeKeeper;
62  _consolidator = consolidator;
63  _localTimeKeeper = localTimeKeeper;
64 
65  _minimumIncrement = configIncrement < Time.OneSecond ? Time.OneSecond : configIncrement;
66 
67  _consolidator.DataConsolidated += AdvanceScanTime;
68  AdvanceScanTime();
69  }
70 
71  /// <summary>
72  /// Scans the current consolidator
73  /// </summary>
74  public void Scan()
75  {
76  _consolidator.Scan(_localTimeKeeper.LocalTime);
77 
78  // it might not of emitted at all, could happen if we got no data or it's not expected to emit like in a weekend
79  // but we still need to advance the next scan time
80  AdvanceScanTime();
81  }
82 
83  public void Dispose()
84  {
85  Disposed = true;
86  _consolidator.DataConsolidated -= AdvanceScanTime;
87  }
88 
89  /// <summary>
90  /// Helper method to set the next scan time
91  /// </summary>
92  private void AdvanceScanTime(object _ = null, IBaseData consolidated = null)
93  {
94  if (consolidated == null && UtcScanTime > _timeKeeper.UtcTime)
95  {
96  // already set
97  return;
98  }
99 
100  if (_barSpan.HasValue)
101  {
102  var reference = _timeKeeper.UtcTime;
103  if (consolidated != null)
104  {
105  reference = consolidated.EndTime.ConvertToUtc(_localTimeKeeper.TimeZone);
106  }
107  UtcScanTime = reference + _barSpan.Value;
108  }
109  else
110  {
111  if (consolidated != null)
112  {
113  _barSpan = consolidated.EndTime - consolidated.Time;
114  if (_barSpan < _minimumIncrement)
115  {
116  _barSpan = _minimumIncrement;
117  }
118 
119  UtcScanTime = consolidated.EndTime.ConvertToUtc(_localTimeKeeper.TimeZone) + _barSpan.Value;
120  }
121  else if (_consolidator.WorkingData == null)
122  {
123  // we have no reference
124  UtcScanTime = _timeKeeper.UtcTime + _minimumIncrement;
125  }
126  else
127  {
128  var pontetialEndTime = _consolidator.WorkingData.EndTime.ConvertToUtc(_localTimeKeeper.TimeZone);
129  if (pontetialEndTime > _timeKeeper.UtcTime)
130  {
131  UtcScanTime = pontetialEndTime;
132  }
133  else
134  {
135  UtcScanTime = _timeKeeper.UtcTime + _minimumIncrement;
136  }
137  }
138  }
139  }
140  }
141 
142  internal class ConsolidatorScanPriority
143  {
144  private sealed class UtcScanTimeIdRelationalComparer : IComparer<ConsolidatorScanPriority>
145  {
146  public int Compare(ConsolidatorScanPriority? x, ConsolidatorScanPriority? y)
147  {
148  if (ReferenceEquals(x, y)) return 0;
149  if (y is null) return 1;
150  if (x is null) return -1;
151  var utcScanTimeComparison = x.UtcScanTime.CompareTo(y.UtcScanTime);
152  if (utcScanTimeComparison != 0) return utcScanTimeComparison;
153  return x.Id.CompareTo(y.Id);
154  }
155  }
156 
157  public static IComparer<ConsolidatorScanPriority> Comparer { get; } =
158  new UtcScanTimeIdRelationalComparer();
159 
160  /// <summary>
161  /// The next utc scan time
162  /// </summary>
163  public DateTime UtcScanTime { get; }
164 
165  /// <summary>
166  /// Unique Id of the associated consolidator
167  /// </summary>
168  public long Id { get; }
169 
170  public ConsolidatorScanPriority(DateTime utcScanTime, long id)
171  {
172  Id = id;
173  UtcScanTime = utcScanTime;
174  }
175  }
176 }