Lean  $LEAN_TAG$
ObjectStoreSubscriptionStreamReader.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.IO;
19 using Ionic.Zip;
21 using QuantConnect.Util;
22 
24 {
25  /// <summary>
26  /// Represents a stream reader capable of reading lines from the object store
27  /// </summary>
29  {
30  private IObjectStore _objectStore;
31  private string _key;
32  private StreamReader _streamReader;
33 
34  /// <summary>
35  /// Gets whether or not this stream reader should be rate limited
36  /// </summary>
37  public bool ShouldBeRateLimited => false;
38 
39  /// <summary>
40  /// Direct access to the StreamReader instance
41  /// </summary>
43  {
44  get
45  {
46  if (_streamReader == null && !string.IsNullOrEmpty(_key) && _objectStore.ContainsKey(_key))
47  {
48  var data = _objectStore.ReadBytes(_key);
49  var stream = new MemoryStream(data);
50 
51  if (_key.EndsWith(".zip", StringComparison.InvariantCulture))
52  {
53  using var zipFile = ZipFile.Read(stream);
54  // we only support single file zip files for now
55  var zipEntry = zipFile[0];
56  var tempStream = new MemoryStream();
57  zipEntry.Extract(tempStream);
58  tempStream.Position = 0;
59  _streamReader = new StreamReader(tempStream);
60 
61  stream.DisposeSafely();
62  }
63  else
64  {
65  _streamReader = new StreamReader(stream);
66  }
67  }
68 
69  return _streamReader;
70  }
71  }
72 
73  /// <summary>
74  /// Initializes a new instance of the <see cref="ObjectStoreSubscriptionStreamReader"/> class.
75  /// </summary>
76  /// <param name="objectStore">The <see cref="IObjectStore"/> used to retrieve a stream of data</param>
77  /// <param name="key">The object store key the data should be fetched from</param>
78  public ObjectStoreSubscriptionStreamReader(IObjectStore objectStore, string key)
79  {
80  _objectStore = objectStore;
81  _key = key;
82  }
83 
84  /// <summary>
85  /// Gets <see cref="SubscriptionTransportMedium.LocalFile"/>
86  /// </summary>
88  {
89  get { return SubscriptionTransportMedium.ObjectStore; }
90  }
91 
92  /// <summary>
93  /// Gets whether or not there's more data to be read in the stream
94  /// </summary>
95  public bool EndOfStream
96  {
97  get { return StreamReader == null || StreamReader.EndOfStream; }
98  }
99 
100  /// <summary>
101  /// Gets the next line/batch of content from the stream
102  /// </summary>
103  public string ReadLine()
104  {
105  return StreamReader.ReadLine();
106  }
107 
108  /// <summary>
109  /// Disposes of the stream
110  /// </summary>
111  public void Dispose()
112  {
113  if (_streamReader != null)
114  {
115  _streamReader.Dispose();
116  _streamReader = null;
117  }
118  }
119  }
120 }