Lean  $LEAN_TAG$
IndexSubscriptionDataSourceReader.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using QuantConnect.Data;
19 using QuantConnect.Util;
21 using System.Collections.Generic;
22 
24 {
25  /// <summary>
26  /// This <see cref="ISubscriptionDataSourceReader"/> implementation supports
27  /// the <see cref="FileFormat.Index"/> and <see cref="IndexedBaseData"/> types.
28  /// Handles the layer of indirection for the index data source and forwards
29  /// the target source to the corresponding <see cref="ISubscriptionDataSourceReader"/>
30  /// </summary>
32  {
33  private readonly SubscriptionDataConfig _config;
34  private readonly DateTime _date;
35  private IDataProvider _dataProvider;
36  private readonly IndexedBaseData _factory;
37 
38  /// <summary>
39  /// Creates a new instance of this <see cref="ISubscriptionDataSourceReader"/>
40  /// </summary>
43  DateTime date,
44  bool isLiveMode,
45  IDataProvider dataProvider,
46  IObjectStore objectStore)
47  : base(dataCacheProvider, isLiveMode, objectStore)
48  {
49  _config = config;
50  _date = date;
51  _dataProvider = dataProvider;
52  _factory = config.Type.GetBaseDataInstance() as IndexedBaseData;
53  if (_factory == null)
54  {
55  throw new ArgumentException($"{nameof(IndexSubscriptionDataSourceReader)} should be used" +
56  $"with a data type which implements {nameof(IndexedBaseData)}");
57  }
58  }
59 
60  /// <summary>
61  /// Reads the specified <paramref name="source"/>
62  /// </summary>
63  /// <param name="source">The source to be read</param>
64  /// <returns>An <see cref="IEnumerable{BaseData}"/> that contains the data in the source</returns>
65  public override IEnumerable<BaseData> Read(SubscriptionDataSource source)
66  {
67  // handles zip or text files
68  using (var reader = CreateStreamReader(source))
69  {
70  // if the reader doesn't have data then we're done with this subscription
71  if (reader == null || reader.EndOfStream)
72  {
73  OnInvalidSource(source, new Exception($"The reader was empty for source: ${source.Source}"));
74  yield break;
75  }
76 
77  // while the reader has data
78  while (!reader.EndOfStream)
79  {
80  // read a line and pass it to the base data factory
81  var line = reader.ReadLine();
82  if (line.IsNullOrEmpty())
83  {
84  continue;
85  }
86 
87  SubscriptionDataSource dataSource;
88  try
89  {
90  dataSource = _factory.GetSourceForAnIndex(_config, _date, line, IsLiveMode);
91  }
92  catch
93  {
94  OnInvalidSource(source, new Exception("Factory.GetSourceForAnIndex() failed to return a valid source"));
95  yield break;
96  }
97 
98  if (dataSource != null)
99  {
100  var dataReader = SubscriptionDataSourceReader.ForSource(
101  dataSource,
103  _config,
104  _date,
105  IsLiveMode,
106  _factory,
107  _dataProvider,
108  ObjectStore);
109 
110  var enumerator = dataReader.Read(dataSource).GetEnumerator();
111  while (enumerator.MoveNext())
112  {
113  yield return enumerator.Current;
114  }
115  enumerator.DisposeSafely();
116  }
117  }
118  }
119  }
120  }
121 }