Lean  $LEAN_TAG$
BaseDataCollectionAggregatorReader.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using QuantConnect.Data;
19 using QuantConnect.Util;
21 using System.Collections.Generic;
23 
25 {
26  /// <summary>
27  /// Data source reader that will aggregate data points into a base data collection
28  /// </summary>
30  {
31  private readonly Type _collectionType;
32  private BaseDataCollection _collection;
33 
34  /// <summary>
35  /// Initializes a new instance of the <see cref="TextSubscriptionDataSourceReader"/> class
36  /// </summary>
37  /// <param name="dataCacheProvider">This provider caches files if needed</param>
38  /// <param name="config">The subscription's configuration</param>
39  /// <param name="date">The date this factory was produced to read data for</param>
40  /// <param name="isLiveMode">True if we're in live mode, false for backtesting</param>
41  public BaseDataCollectionAggregatorReader(IDataCacheProvider dataCacheProvider, SubscriptionDataConfig config, DateTime date,
42  bool isLiveMode, IObjectStore objectStore)
43  : base(dataCacheProvider, config, date, isLiveMode, objectStore)
44  {
45  _collectionType = config.Type;
46  }
47 
48  /// <summary>
49  /// Reads the specified <paramref name="source"/>
50  /// </summary>
51  /// <param name="source">The source to be read</param>
52  /// <returns>An <see cref="IEnumerable{BaseData}"/> that contains the data in the source</returns>
53  public override IEnumerable<BaseData> Read(SubscriptionDataSource source)
54  {
55  foreach (var point in base.Read(source))
56  {
57  if (point is BaseDataCollection collection && !collection.Data.IsNullOrEmpty())
58  {
59  // if underlying already is returning an aggregated collection let it through as is
60  yield return point;
61  }
62  else
63  {
64  if (_collection != null && _collection.EndTime != point.EndTime)
65  {
66  // when we get a new time we flush current collection instance, if any
67  yield return _collection;
68  _collection = null;
69  }
70 
71  if (_collection == null)
72  {
73  _collection = (BaseDataCollection)Activator.CreateInstance(_collectionType);
74  _collection.Time = point.Time;
75  _collection.Symbol = Config.Symbol;
76  _collection.EndTime = point.EndTime;
77  }
78  // aggregate the data points
79  _collection.Add(point);
80  }
81  }
82 
83  // underlying reader ended, flush current collection instance if any
84  if (_collection != null)
85  {
86  yield return _collection;
87  _collection = null;
88  }
89  }
90  }
91 }