Lean  $LEAN_TAG$
AlgoSeekFuturesConverter.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Collections.Generic;
18 using System.Diagnostics;
19 using System.IO;
20 using System.IO.Compression;
21 using System.Linq;
22 using System.Threading;
23 using System.Threading.Tasks;
25 using QuantConnect.Logging;
26 
28 {
29  using Processors = Dictionary<Symbol, List<List<AlgoSeekFuturesProcessor>>>;
30 
31  /// <summary>
32  /// Process a directory of algoseek futures files into separate resolutions.
33  /// </summary>
35  {
36  private readonly DirectoryInfo _source;
37  private readonly DirectoryInfo _remote;
38  private readonly string _destination;
39  private readonly List<Resolution> _resolutions;
40  private readonly DateTime _referenceDate;
41  private readonly HashSet<string> _symbolFilter;
42 
43  /// <summary>
44  /// Create a new instance of the AlgoSeekFutures Converter. Parse a single input directory into an output.
45  /// </summary>
46  /// <param name="resolutions">Convert this resolution</param>
47  /// <param name="referenceDate">Datetime to be added to the milliseconds since midnight. Algoseek data is stored in channel files (XX.bz2) and in a source directory</param>
48  /// <param name="remote">Remote directory of the .bz algoseek files</param>
49  /// <param name="source">Source directory of the .csv algoseek files</param>
50  /// <param name="destination">Destination directory of the processed future files</param>
51  /// <param name="symbolFilter">Collection of underlying ticker to process.</param>
52  public AlgoSeekFuturesConverter(List<Resolution> resolutions, DateTime referenceDate, string remote, string source, string destination, HashSet<string> symbolFilter = null)
53  {
54  _source = new DirectoryInfo(source);
55  _remote = new DirectoryInfo(remote);
56  _referenceDate = referenceDate;
57  _destination = destination;
58  _resolutions = resolutions;
59  _symbolFilter = symbolFilter;
60  }
61 
62  /// <summary>
63  /// Give the reference date and source directory, convert the algoseek data into n-resolutions LEAN format.
64  /// </summary>
65  public void Convert()
66  {
67  Log.Trace("AlgoSeekFuturesConverter.Convert(): Copying remote raw data files locally.");
68  //Get the list of available raw files, copy from its remote location to a local folder and then for each file open a separate streamer.
69 
70  var files = GetFilesInRawFolder()
71  .Where(f => (f.Extension == ".gz" || f.Extension == ".bz2") && !f.Name.Contains("option"))
72  .Select(remote => remote.CopyTo(Path.Combine(Path.GetTempPath(), remote.Name), true))
73  .ToList();
74 
75  Log.Trace("AlgoSeekFuturesConverter.Convert(): Loading {0} AlgoSeekFuturesReader for {1} ", files.Count, _referenceDate);
76 
77  //Initialize parameters
78  var totalLinesProcessed = 0L;
79  var totalFiles = files.Count;
80  var totalFilesProcessed = 0;
81  var start = DateTime.MinValue;
82 
83  var symbolMultipliers = LoadSymbolMultipliers();
84 
85  //Extract each file massively in parallel.
86  Parallel.ForEach(files, file =>
87  {
88  try
89  {
90  Log.Trace("Remote File :" + file);
91 
92  var csvFile = Path.Combine(_source.FullName, Path.GetFileNameWithoutExtension(file.Name));
93 
94  Log.Trace("Source File :" + csvFile);
95 
96  if (!File.Exists(csvFile))
97  {
98  // create the directory first or else 7z will fail
99  var csvFileInfo = new FileInfo(csvFile);
100  Directory.CreateDirectory(csvFileInfo.DirectoryName);
101 
102  Log.Trace("AlgoSeekFuturesConverter.Convert(): Extracting " + file);
103 
104  // Never time out extracting an archive; they can be pretty big
105  // and take a while to extract depending on the computer running this application
106  Compression.Extract7ZipArchive(file.FullName, _source.FullName, -1);
107  }
108 
109  // setting up local processors
110  var processors = new Processors();
111 
112  var reader = new AlgoSeekFuturesReader(csvFile, symbolMultipliers, _symbolFilter);
113  if (start == DateTime.MinValue)
114  {
115  start = DateTime.Now;
116  }
117 
118  if (reader.Current != null) // reader contains the data
119  {
120  do
121  {
122  var tick = reader.Current as Tick;
123 
124  if (tick.Symbol.ID.Symbol == "VX" && (
125  tick.BidPrice >= 998m || tick.AskPrice >= 998m))
126  {
127  // Invalid value for VX futures. Invalid prices in raw data are 998/999
128  continue;
129  }
130  //Add or create the consolidator-flush mechanism for symbol:
131  List<List<AlgoSeekFuturesProcessor>> symbolProcessors;
132  if (!processors.TryGetValue(tick.Symbol, out symbolProcessors))
133  {
134  symbolProcessors = new List<List<AlgoSeekFuturesProcessor>>(3)
135  {
136  { _resolutions.Select(x => new AlgoSeekFuturesProcessor(tick.Symbol, _referenceDate, TickType.Trade, x, _destination)).ToList() },
137  { _resolutions.Select(x => new AlgoSeekFuturesProcessor(tick.Symbol, _referenceDate, TickType.Quote, x, _destination)).ToList() },
138  { _resolutions.Select(x => new AlgoSeekFuturesProcessor(tick.Symbol, _referenceDate, TickType.OpenInterest, x, _destination)).ToList() }
139  };
140 
141  processors[tick.Symbol] = symbolProcessors;
142  }
143 
144  // Pass current tick into processor: enum 0 = trade; 1 = quote, 2 = oi
145  foreach (var processor in symbolProcessors[(int)tick.TickType])
146  {
147  processor.Process(tick);
148  }
149 
150  if (Interlocked.Increment(ref totalLinesProcessed) % 1000000m == 0)
151  {
152  var pro = (double)processors.Values.SelectMany(p => p.SelectMany(x => x)).Count();
153  var symbols = (double)processors.Keys.Count;
154  Log.Trace("AlgoSeekFuturesConverter.Convert(): Processed {0,3}M ticks( {1}k / sec); Memory in use: {2} MB; Total progress: {3}%, Processor per symbol {4}", Math.Round(totalLinesProcessed / 1000000m, 2), Math.Round(totalLinesProcessed / 1000L / (DateTime.Now - start).TotalSeconds), Process.GetCurrentProcess().WorkingSet64 / (1024 * 1024), 100 * totalFilesProcessed / totalFiles, pro / symbols);
155  }
156 
157  }
158  while (reader.MoveNext());
159 
160  Log.Trace("AlgoSeekFuturesConverter.Convert(): Performing final flush to disk... ");
161  Flush(processors, DateTime.MaxValue, true);
162  }
163 
164  processors = null;
165  GC.Collect();
166  GC.WaitForPendingFinalizers();
167 
168  Log.Trace("AlgoSeekFuturesConverter.Convert(): Finished processing file: " + file);
169  Interlocked.Increment(ref totalFilesProcessed);
170  }
171  catch(Exception err)
172  {
173  Log.Error("Exception caught! File: {0} Err: {1} Source {2} Stack {3}", file, err.Message, err.Source, err.StackTrace);
174  }
175  });
176 
177 
178  }
179 
180  /// <summary>
181  /// Gets the files in raw folder.
182  /// </summary>
183  /// <returns>List of files in source folder</returns>
184  private IEnumerable<FileInfo> GetFilesInRawFolder()
185  {
186  var files = new List<FileInfo>();
187 
188  var command = OS.IsLinux ? "ls" : "cmd.exe";
189  var arguments = OS.IsWindows ? "/c dir /b /a-d" : string.Empty;
190 
191  var processStartInfo = new ProcessStartInfo(command, arguments)
192  {
193  CreateNoWindow = true,
194  WindowStyle = ProcessWindowStyle.Hidden,
195  UseShellExecute = false,
196  RedirectStandardOutput = true,
197  WorkingDirectory = _remote.FullName
198  };
199 
200  using (var process = new Process())
201  {
202 
203  process.StartInfo = processStartInfo;
204  process.Start();
205 
206  while (!process.StandardOutput.EndOfStream)
207  {
208  var line = process.StandardOutput.ReadLine();
209  if (line != null)
210  {
211  files.Add(new FileInfo(Path.Combine(_remote.FullName, line)));
212  }
213  }
214  process.WaitForExit();
215  }
216 
217  return files;
218 
219  }
220 
221  /// <summary>
222  /// Private method loads symbol multipliers from algoseek csv file
223  /// </summary>
224  /// <returns></returns>
225  private Dictionary<string, decimal> LoadSymbolMultipliers()
226  {
227  const int columnUnderlying = 0;
228  const int columnMultipleFactor = 2;
229 
230  return File.ReadAllLines("AlgoSeekFuturesConverter/AlgoSeek.US.Futures.PriceMultipliers.1.1.csv")
231  .Select(line => line.ToCsvData())
232  // skipping empty fields
233  .Where(line => !string.IsNullOrEmpty(line[columnUnderlying]) &&
234  !string.IsNullOrEmpty(line[columnMultipleFactor]))
235  // skipping header
236  .Skip(1)
237  .ToDictionary(line => line[columnUnderlying],
238  line => line[columnMultipleFactor].ConvertInvariant<decimal>());
239  }
240 
241  private void Flush(Processors processors, DateTime time, bool final)
242  {
243  foreach (var symbol in processors.Keys)
244  {
245  processors[symbol].ForEach(p => p.ForEach(x => x.FlushBuffer(time, final)));
246  }
247  }
248 
249  /// <summary>
250  /// Compress the queue buffers directly to a zip file. Lightening fast as streaming ram-> compressed zip.
251  /// </summary>
252  public void Package(DateTime date)
253  {
254  var zipper = OS.IsWindows ? "C:/Program Files/7-Zip/7z.exe" : "7z";
255 
256  Log.Trace("AlgoSeekFuturesConverter.Package(): Zipping all files ...");
257 
258  var destination = Path.Combine(_destination, "future");
259  Directory.CreateDirectory(destination);
260  var dateMask = date.ToStringInvariant(DateFormat.EightCharacter);
261 
262  var files =
263  Directory.EnumerateFiles(destination, dateMask + "*.csv", SearchOption.AllDirectories)
264  .GroupBy(x => Directory.GetParent(x).FullName)
265  .ToList();
266 
267  // Zip each file massively in parallel
268  Parallel.ForEach(files, file =>
269  //foreach (var file in files)
270  {
271  try
272  {
273  var outputFileName = file.Key + ".zip";
274 
275  // Create and open a new ZIP file
276  var filesToCompress = Directory.GetFiles(file.Key, "*.csv", SearchOption.AllDirectories);
277  var zip = ZipFile.Open(outputFileName, ZipArchiveMode.Create);
278 
279  foreach (var fileToCompress in filesToCompress)
280  {
281  // Add the entry for each file
282  zip.CreateEntryFromFile(fileToCompress, Path.GetFileName(fileToCompress), CompressionLevel.Optimal);
283  }
284 
285  // Dispose of the object when we are done
286  zip.Dispose();
287 
288  try
289  {
290  Directory.Delete(file.Key, true);
291  }
292  catch (Exception err)
293  {
294  Log.Error("Directory.Delete returned error: " + err.Message);
295  }
296  }
297  catch (Exception err)
298  {
299  Log.Error("File: {0} Err: {1} Source {2} Stack {3}", file, err.Message, err.Source, err.StackTrace);
300  }
301  });
302  }
303 
304  }
305 }