Lean
$LEAN_TAG$
BaseDataCollectionAggregatorReader.cs
1
/*
2
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4
*
5
* Licensed under the Apache License, Version 2.0 (the "License");
6
* you may not use this file except in compliance with the License.
7
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8
*
9
* Unless required by applicable law or agreed to in writing, software
10
* distributed under the License is distributed on an "AS IS" BASIS,
11
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
* See the License for the specific language governing permissions and
13
* limitations under the License.
14
*
15
*/
16
17
using
System;
18
using
QuantConnect
.
Data
;
19
using
QuantConnect
.
Util
;
20
using
QuantConnect
.
Interfaces
;
21
using
System.Collections.Generic;
22
using
QuantConnect
.
Data
.
UniverseSelection
;
23
24
namespace
QuantConnect.Lean.Engine.DataFeeds
25
{
26
/// <summary>
27
/// Data source reader that will aggregate data points into a base data collection
28
/// </summary>
29
public
class
BaseDataCollectionAggregatorReader
:
TextSubscriptionDataSourceReader
30
{
31
private
readonly Type _collectionType;
32
private
BaseDataCollection
_collection;
33
34
/// <summary>
35
/// Initializes a new instance of the <see cref="TextSubscriptionDataSourceReader"/> class
36
/// </summary>
37
/// <param name="dataCacheProvider">This provider caches files if needed</param>
38
/// <param name="config">The subscription's configuration</param>
39
/// <param name="date">The date this factory was produced to read data for</param>
40
/// <param name="isLiveMode">True if we're in live mode, false for backtesting</param>
41
public
BaseDataCollectionAggregatorReader
(
IDataCacheProvider
dataCacheProvider,
SubscriptionDataConfig
config, DateTime date,
42
bool
isLiveMode,
IObjectStore
objectStore)
43
: base(dataCacheProvider, config, date, isLiveMode, objectStore)
44
{
45
_collectionType = config.
Type
;
46
}
47
48
/// <summary>
49
/// Reads the specified <paramref name="source"/>
50
/// </summary>
51
/// <param name="source">The source to be read</param>
52
/// <returns>An <see cref="IEnumerable{BaseData}"/> that contains the data in the source</returns>
53
public
override
IEnumerable<BaseData>
Read
(
SubscriptionDataSource
source)
54
{
55
foreach
(var point
in
base.Read(source))
56
{
57
if
(point is
BaseDataCollection
collection && !collection.
Data
.IsNullOrEmpty())
58
{
59
// if underlying already is returning an aggregated collection let it through as is
60
yield
return
point;
61
}
62
else
63
{
64
if
(_collection !=
null
&& _collection.
EndTime
!= point.EndTime)
65
{
66
// when we get a new time we flush current collection instance, if any
67
yield
return
_collection;
68
_collection =
null
;
69
}
70
71
if
(_collection ==
null
)
72
{
73
_collection = (
BaseDataCollection
)Activator.CreateInstance(_collectionType);
74
_collection.
Time
= point.Time;
75
_collection.
Symbol
=
Config
.
Symbol
;
76
_collection.
EndTime
= point.EndTime;
77
}
78
// aggregate the data points
79
_collection.
Add
(point);
80
}
81
}
82
83
// underlying reader ended, flush current collection instance if any
84
if
(_collection !=
null
)
85
{
86
yield
return
_collection;
87
_collection =
null
;
88
}
89
}
90
}
91
}
Engine
DataFeeds
BaseDataCollectionAggregatorReader.cs
Generated by
1.8.17