Lean  $LEAN_TAG$
PeriodCountConsolidatorBase.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Runtime.CompilerServices;
20 using Python.Runtime;
22 
24 {
25  /// <summary>
26  /// Provides a base class for consolidators that emit data based on the passing of a period of time
27  /// or after seeing a max count of data points.
28  /// </summary>
29  /// <typeparam name="T">The input type of the consolidator</typeparam>
30  /// <typeparam name="TConsolidated">The output type of the consolidator</typeparam>
31  public abstract class PeriodCountConsolidatorBase<T, TConsolidated> : DataConsolidator<T>
32  where T : IBaseData
33  where TConsolidated : BaseData
34  {
35  // The SecurityIdentifier that we are consolidating for.
36  private SecurityIdentifier _securityIdentifier;
37  private bool _securityIdentifierIsSet;
38  //The number of data updates between creating new bars.
39  private int? _maxCount;
40  //
41  private IPeriodSpecification _periodSpecification;
42  //The minimum timespan between creating new bars.
43  private TimeSpan? _period;
44  //The number of pieces of data we've accumulated since our last emit
45  private int _currentCount;
46  //The working bar used for aggregating the data
47  private TConsolidated _workingBar;
48  //The last time we emitted a consolidated bar
49  private DateTime? _lastEmit;
50  private bool _validateTimeSpan;
51 
52  private PeriodCountConsolidatorBase(IPeriodSpecification periodSpecification)
53  {
54  _periodSpecification = periodSpecification;
55  _period = _periodSpecification.Period;
56  }
57 
58  /// <summary>
59  /// Creates a consolidator to produce a new <typeparamref name="TConsolidated"/> instance representing the period
60  /// </summary>
61  /// <param name="period">The minimum span of time before emitting a consolidated bar</param>
62  protected PeriodCountConsolidatorBase(TimeSpan period)
63  : this(new TimeSpanPeriodSpecification(period))
64  {
65  _period = _periodSpecification.Period;
66  }
67 
68  /// <summary>
69  /// Creates a consolidator to produce a new <typeparamref name="TConsolidated"/> instance representing the last count pieces of data
70  /// </summary>
71  /// <param name="maxCount">The number of pieces to accept before emiting a consolidated bar</param>
72  protected PeriodCountConsolidatorBase(int maxCount)
73  : this(new BarCountPeriodSpecification())
74  {
75  _maxCount = maxCount;
76  }
77 
78  /// <summary>
79  /// Creates a consolidator to produce a new <typeparamref name="TConsolidated"/> instance representing the last count pieces of data or the period, whichever comes first
80  /// </summary>
81  /// <param name="maxCount">The number of pieces to accept before emiting a consolidated bar</param>
82  /// <param name="period">The minimum span of time before emitting a consolidated bar</param>
83  protected PeriodCountConsolidatorBase(int maxCount, TimeSpan period)
84  : this(new MixedModePeriodSpecification(period))
85  {
86  _maxCount = maxCount;
87  _period = _periodSpecification.Period;
88  }
89 
90  /// <summary>
91  /// Creates a consolidator to produce a new <typeparamref name="TConsolidated"/> instance representing the last count pieces of data or the period, whichever comes first
92  /// </summary>
93  /// <param name="func">Func that defines the start time of a consolidated data</param>
94  protected PeriodCountConsolidatorBase(Func<DateTime, CalendarInfo> func)
95  : this(new FuncPeriodSpecification(func))
96  {
97  _period = Time.OneSecond;
98  }
99 
100  /// <summary>
101  /// Creates a consolidator to produce a new <typeparamref name="TConsolidated"/> instance representing the last count pieces of data or the period, whichever comes first
102  /// </summary>
103  /// <param name="pyObject">Python object that defines either a function object that defines the start time of a consolidated data or a timespan</param>
104  protected PeriodCountConsolidatorBase(PyObject pyObject)
105  : this(GetPeriodSpecificationFromPyObject(pyObject))
106  {
107  }
108 
109  /// <summary>
110  /// Gets the type produced by this consolidator
111  /// </summary>
112  public override Type OutputType => typeof(TConsolidated);
113 
114  /// <summary>
115  /// Gets a clone of the data being currently consolidated
116  /// </summary>
117  public override IBaseData WorkingData => _workingBar?.Clone();
118 
119  /// <summary>
120  /// Event handler that fires when a new piece of data is produced. We define this as a 'new'
121  /// event so we can expose it as a <typeparamref name="TConsolidated"/> instead of a <see cref="BaseData"/> instance
122  /// </summary>
123  public new event EventHandler<TConsolidated> DataConsolidated;
124 
125  /// <summary>
126  /// Updates this consolidator with the specified data. This method is
127  /// responsible for raising the DataConsolidated event
128  /// In time span mode, the bar range is closed on the left and open on the right: [T, T+TimeSpan).
129  /// For example, if time span is 1 minute, we have [10:00, 10:01): so data at 10:01 is not
130  /// included in the bar starting at 10:00.
131  /// </summary>
132  /// <exception cref="InvalidOperationException">Thrown when multiple symbols are being consolidated.</exception>
133  /// <param name="data">The new data for the consolidator</param>
134  public override void Update(T data)
135  {
136  if (!_securityIdentifierIsSet)
137  {
138  _securityIdentifierIsSet = true;
139  _securityIdentifier = data.Symbol.ID;
140  }
141  else if (!data.Symbol.ID.Equals(_securityIdentifier))
142  {
143  throw new InvalidOperationException($"Consolidators can only be used with a single symbol. The previous consolidated SecurityIdentifier ({_securityIdentifier}) is not the same as in the current data ({data.Symbol.ID}).");
144  }
145 
146  if (!ShouldProcess(data))
147  {
148  // first allow the base class a chance to filter out data it doesn't want
149  // before we start incrementing counts and what not
150  return;
151  }
152 
153  if (!_validateTimeSpan && _period.HasValue && _periodSpecification is TimeSpanPeriodSpecification)
154  {
155  // only do this check once
156  _validateTimeSpan = true;
157  var dataLength = data.EndTime - data.Time;
158  if (dataLength > _period)
159  {
160  throw new ArgumentException($"For Symbol {data.Symbol} can not consolidate bars of period: {_period}, using data of the same or higher period: {data.EndTime - data.Time}");
161  }
162  }
163 
164  //Decide to fire the event
165  var fireDataConsolidated = false;
166 
167  // decide to aggregate data before or after firing OnDataConsolidated event
168  // always aggregate before firing in counting mode
169  bool aggregateBeforeFire = _maxCount.HasValue;
170 
171  if (_maxCount.HasValue)
172  {
173  // we're in count mode
174  _currentCount++;
175  if (_currentCount >= _maxCount.Value)
176  {
177  _currentCount = 0;
178  fireDataConsolidated = true;
179  }
180  }
181 
182  if (!_lastEmit.HasValue)
183  {
184  // initialize this value for period computations
185  _lastEmit = IsTimeBased ? DateTime.MinValue : data.Time;
186  }
187 
188  if (_period.HasValue)
189  {
190  // we're in time span mode and initialized
191  if (_workingBar != null && data.Time - _workingBar.Time >= _period.Value && GetRoundedBarTime(data) > _lastEmit)
192  {
193  fireDataConsolidated = true;
194  }
195 
196  // special case: always aggregate before event trigger when TimeSpan is zero
197  if (_period.Value == TimeSpan.Zero)
198  {
199  fireDataConsolidated = true;
200  aggregateBeforeFire = true;
201  }
202  }
203 
204  if (aggregateBeforeFire)
205  {
206  if (data.Time >= _lastEmit)
207  {
208  AggregateBar(ref _workingBar, data);
209  }
210  }
211 
212  //Fire the event
213  if (fireDataConsolidated)
214  {
215  var workingTradeBar = _workingBar as TradeBar;
216  if (workingTradeBar != null)
217  {
218  // we kind of are cheating here...
219  if (_period.HasValue)
220  {
221  workingTradeBar.Period = _period.Value;
222  }
223  // since trade bar has period it aggregates this properly
224  else if (!(data is TradeBar))
225  {
226  workingTradeBar.Period = data.Time - _lastEmit.Value;
227  }
228  }
229 
230  // Set _lastEmit first because OnDataConsolidated will set _workingBar to null
231  _lastEmit = IsTimeBased && _workingBar != null ? _workingBar.EndTime : data.Time;
232  OnDataConsolidated(_workingBar);
233  }
234 
235  if (!aggregateBeforeFire)
236  {
237  if (data.Time >= _lastEmit)
238  {
239  AggregateBar(ref _workingBar, data);
240  }
241  }
242  }
243 
244  /// <summary>
245  /// Scans this consolidator to see if it should emit a bar due to time passing
246  /// </summary>
247  /// <param name="currentLocalTime">The current time in the local time zone (same as <see cref="BaseData.Time"/>)</param>
248  public override void Scan(DateTime currentLocalTime)
249  {
250  if (_workingBar != null && _period.HasValue && _period.Value != TimeSpan.Zero
251  && currentLocalTime - _workingBar.Time >= _period.Value && GetRoundedBarTime(currentLocalTime) > _lastEmit)
252  {
253  _lastEmit = _workingBar.EndTime;
254  OnDataConsolidated(_workingBar);
255  }
256  }
257 
258  /// <summary>
259  /// Resets the consolidator
260  /// </summary>
261  public override void Reset()
262  {
263  base.Reset();
264  _securityIdentifier = null;
265  _securityIdentifierIsSet = false;
266  _currentCount = 0;
267  _workingBar = null;
268  _lastEmit = null;
269  _validateTimeSpan = false;
270  }
271 
272  /// <summary>
273  /// Returns true if this consolidator is time-based, false otherwise
274  /// </summary>
275  protected bool IsTimeBased => !_maxCount.HasValue;
276 
277  /// <summary>
278  /// Gets the time period for this consolidator
279  /// </summary>
280  protected TimeSpan? Period => _period;
281 
282  /// <summary>
283  /// Determines whether or not the specified data should be processed
284  /// </summary>
285  /// <param name="data">The data to check</param>
286  /// <returns>True if the consolidator should process this data, false otherwise</returns>
287  protected virtual bool ShouldProcess(T data) => true;
288 
289  /// <summary>
290  /// Aggregates the new 'data' into the 'workingBar'. The 'workingBar' will be
291  /// null following the event firing
292  /// </summary>
293  /// <param name="workingBar">The bar we're building, null if the event was just fired and we're starting a new consolidated bar</param>
294  /// <param name="data">The new data</param>
295  protected abstract void AggregateBar(ref TConsolidated workingBar, T data);
296 
297  /// <summary>
298  /// Gets a rounded-down bar time. Called by AggregateBar in derived classes.
299  /// </summary>
300  /// <param name="time">The bar time to be rounded down</param>
301  /// <returns>The rounded bar time</returns>
302  [MethodImpl(MethodImplOptions.AggressiveInlining)]
303  protected DateTime GetRoundedBarTime(DateTime time)
304  {
305  var startTime = _periodSpecification.GetRoundedBarTime(time);
306 
307  // In the case of a new bar, define the period defined at opening time
308  if (_workingBar == null)
309  {
310  _period = _periodSpecification.Period;
311  }
312 
313  return startTime;
314  }
315 
316  /// <summary>
317  /// Gets a rounded-down bar start time. Called by AggregateBar in derived classes.
318  /// </summary>
319  /// <param name="inputData">The input data point</param>
320  /// <returns>The rounded bar start time</returns>
321  [MethodImpl(MethodImplOptions.AggressiveInlining)]
322  protected DateTime GetRoundedBarTime(IBaseData inputData)
323  {
324  var potentialStartTime = GetRoundedBarTime(inputData.Time);
325  if (_period.HasValue && potentialStartTime + _period < inputData.EndTime)
326  {
327  // US equity hour bars from the database starts at 9am but the exchange opens at 9:30am. Thus, the method
328  // GetRoundedBarTime(inputData.Time) returns the market open of the previous day, which is not consistent
329  // with the given end time. For that reason we need to handle this case specifically, by calling
330  // GetRoundedBarTime(inputData.EndTime) as it will return our expected start time: 9:30am
331  if (inputData.EndTime - inputData.Time == Time.OneHour && potentialStartTime.Date < inputData.Time.Date)
332  {
333  potentialStartTime = GetRoundedBarTime(inputData.EndTime);
334  }
335  else
336  {
337  // whops! the end time we were giving is beyond our potential end time, so let's use the giving bars star time instead
338  potentialStartTime = inputData.Time;
339  }
340  }
341 
342  return potentialStartTime;
343  }
344 
345  /// <summary>
346  /// Event invocator for the <see cref="DataConsolidated"/> event
347  /// </summary>
348  /// <param name="e">The consolidated data</param>
349  protected virtual void OnDataConsolidated(TConsolidated e)
350  {
351  base.OnDataConsolidated(e);
352  DataConsolidated?.Invoke(this, e);
353 
354  _workingBar = null;
355  }
356 
357  /// <summary>
358  /// Gets the period specification from the PyObject that can either represent a function object that defines the start time of a consolidated data or a timespan.
359  /// </summary>
360  /// <param name="pyObject">Python object that defines either a function object that defines the start time of a consolidated data or a timespan</param>
361  /// <returns>IPeriodSpecification that represents the PyObject</returns>
362  private static IPeriodSpecification GetPeriodSpecificationFromPyObject(PyObject pyObject)
363  {
364  Func<DateTime, CalendarInfo> expiryFunc;
365  if (pyObject.TryConvertToDelegate(out expiryFunc))
366  {
367  return new FuncPeriodSpecification(expiryFunc);
368  }
369 
370  using (Py.GIL())
371  {
372  return new TimeSpanPeriodSpecification(pyObject.As<TimeSpan>());
373  }
374  }
375 
376  /// <summary>
377  /// Distinguishes between the different ways a consolidated data start time can be specified
378  /// </summary>
379  private interface IPeriodSpecification
380  {
381  TimeSpan? Period { get; }
382  DateTime GetRoundedBarTime(DateTime time);
383  }
384 
385  /// <summary>
386  /// User defined the bars period using a counter
387  /// </summary>
388  private class BarCountPeriodSpecification : IPeriodSpecification
389  {
390  public TimeSpan? Period { get; } = null;
391 
392  public DateTime GetRoundedBarTime(DateTime time) => time;
393  }
394 
395  /// <summary>
396  /// User defined the bars period using a counter and a period (mixed mode)
397  /// </summary>
398  private class MixedModePeriodSpecification : IPeriodSpecification
399  {
400  public TimeSpan? Period { get; }
401 
402  public MixedModePeriodSpecification(TimeSpan period)
403  {
404  Period = period;
405  }
406 
407  public DateTime GetRoundedBarTime(DateTime time) => time;
408  }
409 
410  /// <summary>
411  /// User defined the bars period using a time span
412  /// </summary>
413  private class TimeSpanPeriodSpecification : IPeriodSpecification
414  {
415  public TimeSpan? Period { get; }
416 
417  public TimeSpanPeriodSpecification(TimeSpan period)
418  {
419  Period = period;
420  }
421 
422  public DateTime GetRoundedBarTime(DateTime time) =>
423  Period.Value > Time.OneDay
424  ? time // #4915 For periods larger than a day, don't use a rounding schedule.
425  : time.RoundDown(Period.Value);
426  }
427 
428  /// <summary>
429  /// Special case for bars where the open time is defined by a function.
430  /// We assert on construction that the function returns a date time in the past or equal to the given time instant.
431  /// </summary>
432  private class FuncPeriodSpecification : IPeriodSpecification
433  {
434  private static readonly DateTime _verificationDate = new DateTime(2022, 01, 03, 10, 10, 10);
435  public TimeSpan? Period { get; private set; }
436 
437  public readonly Func<DateTime, CalendarInfo> _calendarInfoFunc;
438 
439  public FuncPeriodSpecification(Func<DateTime, CalendarInfo> expiryFunc)
440  {
441  if (expiryFunc(_verificationDate).Start > _verificationDate)
442  {
443  throw new ArgumentException($"{nameof(FuncPeriodSpecification)}: Please use a function that computes the start of the bar associated with the given date time. Should never return a time later than the one passed in.");
444  }
445  _calendarInfoFunc = expiryFunc;
446  }
447 
448  public DateTime GetRoundedBarTime(DateTime time)
449  {
450  var calendarInfo = _calendarInfoFunc(time);
451  Period = calendarInfo.Period;
452  return calendarInfo.Start;
453  }
454  }
455  }
456 }