Lean  $LEAN_TAG$
RemoteFileSubscriptionStreamReader.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Collections.Generic;
19 using System.IO;
21 using QuantConnect.Util;
22 
24 {
25  /// <summary>
26  /// Represents a stream reader capabable of downloading a remote file and then
27  /// reading it from disk
28  /// </summary>
30  {
31  private readonly IStreamReader _streamReader;
32  private static IDownloadProvider _downloader;
33  // lock for multi thread scenarios where we are sharing the same cached file
34  private static readonly object _fileSystemLock = new object();
35 
36  /// <summary>
37  /// Gets whether or not this stream reader should be rate limited
38  /// </summary>
39  public bool ShouldBeRateLimited => false;
40 
41  /// <summary>
42  /// Direct access to the StreamReader instance
43  /// </summary>
44  public StreamReader StreamReader => _streamReader.StreamReader;
45 
46  /// <summary>
47  /// The local file name of the downloaded file
48  /// </summary>
49  public string LocalFileName { get; }
50 
51  /// <summary>
52  /// Initializes a new instance of the <see cref="RemoteFileSubscriptionStreamReader"/> class.
53  /// </summary>
54  /// <param name="dataCacheProvider">The <see cref="IDataCacheProvider"/> used to retrieve a stream of data</param>
55  /// <param name="source">The remote url to be downloaded via web client</param>
56  /// <param name="downloadDirectory">The local directory and destination of the download</param>
57  /// <param name="headers">Defines header values to add to the request</param>
58  public RemoteFileSubscriptionStreamReader(IDataCacheProvider dataCacheProvider, string source, string downloadDirectory, IEnumerable<KeyValuePair<string, string>> headers)
59  {
60  // don't use cache if data is ephemeral
61  // will be false for live history requests and live subscriptions
62  var useCache = !dataCacheProvider.IsDataEphemeral;
63 
64  // create a hash for a new filename
65  string baseFileName = string.Empty;
66  string extension = string.Empty;
67  string entryName = string.Empty;
68  try
69  {
70  var uri = new Uri(source);
71  baseFileName = uri.OriginalString;
72  if (!string.IsNullOrEmpty(uri.Fragment))
73  {
74  baseFileName = baseFileName.Replace(uri.Fragment, "", StringComparison.InvariantCulture);
75  }
76  extension = uri.AbsolutePath.GetExtension();
77  entryName = uri.Fragment;
78  }
79  catch
80  {
81  LeanData.ParseKey(source, out baseFileName, out entryName);
82  extension = Path.GetExtension(baseFileName);
83  }
84 
85  var cacheFileName = (useCache ? baseFileName.ToMD5() : Guid.NewGuid().ToString()) + extension;
86  LocalFileName = Path.Combine(downloadDirectory, cacheFileName);
87 
88  byte[] bytes = null;
89  if (useCache)
90  {
91  lock (_fileSystemLock)
92  {
93  if (!File.Exists(LocalFileName))
94  {
95  bytes = _downloader.DownloadBytes(source, headers, null, null);
96  }
97  }
98  }
99  else
100  {
101  bytes = _downloader.DownloadBytes(source, headers, null, null);
102  }
103 
104  if (bytes != null)
105  {
106  File.WriteAllBytes(LocalFileName, bytes);
107 
108  // Send the file to the dataCacheProvider so it is available when the streamReader asks for it
109  dataCacheProvider.Store(LocalFileName, bytes);
110  }
111 
112  // now we can just use the local file reader.
113  // add the entry name to the local file name so the correct entry is read
114  var fileNameWithEntry = LocalFileName;
115  if (!string.IsNullOrEmpty(entryName))
116  {
117  fileNameWithEntry += entryName;
118  }
119  _streamReader = new LocalFileSubscriptionStreamReader(dataCacheProvider, fileNameWithEntry);
120  }
121 
122  /// <summary>
123  /// Gets <see cref="SubscriptionTransportMedium.RemoteFile"/>
124  /// </summary>
126  {
127  get { return SubscriptionTransportMedium.RemoteFile; }
128  }
129 
130  /// <summary>
131  /// Gets whether or not there's more data to be read in the stream
132  /// </summary>
133  public bool EndOfStream
134  {
135  get { return _streamReader.EndOfStream; }
136  }
137 
138  /// <summary>
139  /// Gets the next line/batch of content from the stream
140  /// </summary>
141  public string ReadLine()
142  {
143  return _streamReader.ReadLine();
144  }
145 
146  /// <summary>
147  /// Disposes of the stream
148  /// </summary>
149  public void Dispose()
150  {
151  _streamReader.Dispose();
152  }
153 
154  /// <summary>
155  /// Save reference to the download system.
156  /// </summary>
157  /// <param name="downloader">Downloader provider for the remote file fetching.</param>
158  public static void SetDownloadProvider(IDownloadProvider downloader)
159  {
160  _downloader = downloader;
161  }
162  }
163 }