Lean  $LEAN_TAG$
BaseDataCollectionAggregatorReader.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using QuantConnect.Data;
19 using QuantConnect.Util;
21 using System.Collections.Generic;
23 
25 {
26  /// <summary>
27  /// Data source reader that will aggregate data points into a base data collection
28  /// </summary>
30  {
31  private readonly Type _collectionType;
32  private BaseDataCollection _collection;
33 
34  /// <summary>
35  /// Initializes a new instance of the <see cref="TextSubscriptionDataSourceReader"/> class
36  /// </summary>
37  /// <param name="dataCacheProvider">This provider caches files if needed</param>
38  /// <param name="config">The subscription's configuration</param>
39  /// <param name="date">The date this factory was produced to read data for</param>
40  /// <param name="isLiveMode">True if we're in live mode, false for backtesting</param>
41  /// <param name="objectStore">The object storage for data persistence</param>
42  public BaseDataCollectionAggregatorReader(IDataCacheProvider dataCacheProvider, SubscriptionDataConfig config, DateTime date,
43  bool isLiveMode, IObjectStore objectStore)
44  : base(dataCacheProvider, config, date, isLiveMode, objectStore)
45  {
46  // if the type is not a BaseDataCollection, we'll default to BaseDataCollection.
47  // e.g. custom Python dynamic folding collections need to be aggregated into a BaseDataCollection,
48  // but they implement PythonData, so casting an instance of PythonData to BaseDataCollection will fail.
49  _collectionType = config.Type.IsAssignableTo(typeof(BaseDataCollection)) ? config.Type : typeof(BaseDataCollection);
50  }
51 
52  /// <summary>
53  /// Reads the specified <paramref name="source"/>
54  /// </summary>
55  /// <param name="source">The source to be read</param>
56  /// <returns>An <see cref="IEnumerable{BaseData}"/> that contains the data in the source</returns>
57  public override IEnumerable<BaseData> Read(SubscriptionDataSource source)
58  {
59  foreach (var point in base.Read(source))
60  {
61  if (point is BaseDataCollection collection && !collection.Data.IsNullOrEmpty())
62  {
63  // if underlying already is returning an aggregated collection let it through as is
64  yield return point;
65  }
66  else
67  {
68  if (_collection != null && _collection.EndTime != point.EndTime)
69  {
70  // when we get a new time we flush current collection instance, if any
71  yield return _collection;
72  _collection = null;
73  }
74 
75  if (_collection == null)
76  {
77  _collection = (BaseDataCollection)Activator.CreateInstance(_collectionType);
78  _collection.Time = point.Time;
79  _collection.Symbol = Config.Symbol;
80  _collection.EndTime = point.EndTime;
81  }
82  // aggregate the data points
83  _collection.Add(point);
84  }
85  }
86 
87  // underlying reader ended, flush current collection instance if any
88  if (_collection != null)
89  {
90  yield return _collection;
91  _collection = null;
92  }
93  }
94  }
95 }