Lean
$LEAN_TAG$
BaseDataCollectionAggregatorReader.cs
1
/*
2
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4
*
5
* Licensed under the Apache License, Version 2.0 (the "License");
6
* you may not use this file except in compliance with the License.
7
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8
*
9
* Unless required by applicable law or agreed to in writing, software
10
* distributed under the License is distributed on an "AS IS" BASIS,
11
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
* See the License for the specific language governing permissions and
13
* limitations under the License.
14
*
15
*/
16
17
using
System;
18
using
QuantConnect
.
Data
;
19
using
QuantConnect
.
Util
;
20
using
QuantConnect
.
Interfaces
;
21
using
System.Collections.Generic;
22
using
QuantConnect
.
Data
.
UniverseSelection
;
23
24
namespace
QuantConnect.Lean.Engine.DataFeeds
25
{
26
/// <summary>
27
/// Data source reader that will aggregate data points into a base data collection
28
/// </summary>
29
public
class
BaseDataCollectionAggregatorReader
:
TextSubscriptionDataSourceReader
30
{
31
private
readonly Type _collectionType;
32
private
BaseDataCollection
_collection;
33
34
/// <summary>
35
/// Initializes a new instance of the <see cref="TextSubscriptionDataSourceReader"/> class
36
/// </summary>
37
/// <param name="dataCacheProvider">This provider caches files if needed</param>
38
/// <param name="config">The subscription's configuration</param>
39
/// <param name="date">The date this factory was produced to read data for</param>
40
/// <param name="isLiveMode">True if we're in live mode, false for backtesting</param>
41
/// <param name="objectStore">The object storage for data persistence</param>
42
public
BaseDataCollectionAggregatorReader
(
IDataCacheProvider
dataCacheProvider,
SubscriptionDataConfig
config, DateTime date,
43
bool
isLiveMode,
IObjectStore
objectStore)
44
: base(dataCacheProvider, config, date, isLiveMode, objectStore)
45
{
46
// if the type is not a BaseDataCollection, we'll default to BaseDataCollection.
47
// e.g. custom Python dynamic folding collections need to be aggregated into a BaseDataCollection,
48
// but they implement PythonData, so casting an instance of PythonData to BaseDataCollection will fail.
49
_collectionType = config.
Type
.IsAssignableTo(typeof(
BaseDataCollection
)) ? config.
Type
: typeof(
BaseDataCollection
);
50
}
51
52
/// <summary>
53
/// Reads the specified <paramref name="source"/>
54
/// </summary>
55
/// <param name="source">The source to be read</param>
56
/// <returns>An <see cref="IEnumerable{BaseData}"/> that contains the data in the source</returns>
57
public
override
IEnumerable<BaseData>
Read
(
SubscriptionDataSource
source)
58
{
59
foreach
(var point
in
base.Read(source))
60
{
61
if
(point is
BaseDataCollection
collection && !collection.
Data
.IsNullOrEmpty())
62
{
63
// if underlying already is returning an aggregated collection let it through as is
64
yield
return
point;
65
}
66
else
67
{
68
if
(_collection !=
null
&& _collection.
EndTime
!= point.EndTime)
69
{
70
// when we get a new time we flush current collection instance, if any
71
yield
return
_collection;
72
_collection =
null
;
73
}
74
75
if
(_collection ==
null
)
76
{
77
_collection = (
BaseDataCollection
)Activator.CreateInstance(_collectionType);
78
_collection.
Time
= point.Time;
79
_collection.
Symbol
=
Config
.
Symbol
;
80
_collection.
EndTime
= point.EndTime;
81
}
82
// aggregate the data points
83
_collection.
Add
(point);
84
}
85
}
86
87
// underlying reader ended, flush current collection instance if any
88
if
(_collection !=
null
)
89
{
90
yield
return
_collection;
91
_collection =
null
;
92
}
93
}
94
}
95
}
Engine
DataFeeds
BaseDataCollectionAggregatorReader.cs
Generated by
1.8.17