Lean  $LEAN_TAG$
CollectionSubscriptionDataSourceReader.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 using System;
16 using QuantConnect.Data;
17 using QuantConnect.Util;
19 using System.Collections.Generic;
21 
23 {
24  /// <summary>
25  /// Collection Subscription Factory takes a BaseDataCollection from BaseData factories
26  /// and yields it one point at a time to the algorithm
27  /// </summary>
29  {
30  private readonly DateTime _date;
31  private readonly BaseData _factory;
32  private readonly SubscriptionDataConfig _config;
33 
34  /// <summary>
35  /// Initializes a new instance of the <see cref="CollectionSubscriptionDataSourceReader"/> class
36  /// </summary>
37  /// <param name="dataCacheProvider">Used to cache data for requested from the IDataProvider</param>
38  /// <param name="config">The subscription's configuration</param>
39  /// <param name="date">The date this factory was produced to read data for</param>
40  /// <param name="isLiveMode">True if we're in live mode, false for backtesting</param>
41  public CollectionSubscriptionDataSourceReader(IDataCacheProvider dataCacheProvider, SubscriptionDataConfig config, DateTime date, bool isLiveMode, IObjectStore objectStore)
42  :base(dataCacheProvider, isLiveMode, objectStore)
43  {
44  _date = date;
45  _config = config;
46  _factory = _config.GetBaseDataInstance();
47  }
48 
49  /// <summary>
50  /// Event fired when an exception is thrown during a call to
51  /// <see cref="BaseData.Reader(SubscriptionDataConfig, string, DateTime, bool)"/>
52  /// </summary>
53  public event EventHandler<ReaderErrorEventArgs> ReaderError;
54 
55  /// <summary>
56  /// Reads the specified <paramref name="source"/>
57  /// </summary>
58  /// <param name="source">The source to be read</param>
59  /// <returns>An <see cref="IEnumerable{BaseData}"/> that contains the data in the source</returns>
60  public override IEnumerable<BaseData> Read(SubscriptionDataSource source)
61  {
63 
64  IStreamReader reader = null;
65  try
66  {
67  reader = CreateStreamReader(source);
68  if (reader == null)
69  {
70  yield break;
71  }
72 
73  var raw = "";
74  while (!reader.EndOfStream)
75  {
76  BaseDataCollection instances = null;
77  try
78  {
79  raw = reader.ReadLine();
80  var result = _factory.Reader(_config, raw, _date, IsLiveMode);
81  instances = result as BaseDataCollection;
82  if (instances == null && !reader.ShouldBeRateLimited)
83  {
84  OnInvalidSource(source, new Exception("Reader must generate a BaseDataCollection with the FileFormat.Collection"));
85  continue;
86  }
87  }
88  catch (Exception err)
89  {
90  OnReaderError(raw, err);
91  if (!reader.ShouldBeRateLimited)
92  {
93  continue;
94  }
95  }
96 
97  if (IsLiveMode
98  // this shouldn't happen, rest reader is the only one to be rate limited
99  // and in live mode, but just in case...
100  || instances == null && reader.ShouldBeRateLimited)
101  {
102  // in live trading these data points will be unrolled at the
103  // 'LiveCustomDataSubscriptionEnumeratorFactory' level
104  yield return instances;
105  }
106  else
107  {
108  foreach (var instance in instances.Data)
109  {
110  if (instance != null && instance.EndTime != default(DateTime))
111  {
112  yield return instance;
113  }
114  }
115  }
116  }
117  }
118  finally
119  {
120  reader.DisposeSafely();
121  }
122  }
123 
124  /// <summary>
125  /// Event invocator for the <see cref="ReaderError"/> event
126  /// </summary>
127  /// <param name="line">The line that caused the exception</param>
128  /// <param name="exception">The exception that was caught</param>
129  private void OnReaderError(string line, Exception exception)
130  {
131  var handler = ReaderError;
132  if (handler != null) handler(this, new ReaderErrorEventArgs(line, exception));
133  }
134  }
135 }