Lean  $LEAN_TAG$
KaikoDataConverterProgram.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  *
8  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15 */
16 
17 using System;
18 using System.IO;
19 using System.Linq;
20 using QuantConnect.Data;
21 using System.Diagnostics;
22 using QuantConnect.Logging;
23 using System.Collections.Generic;
24 using ZipFile = Ionic.Zip.ZipFile;
25 
27 {
28  /// <summary>
29  /// Console application for converting a single day of Kaiko data into Lean data format for high resolutions (tick, second and minute)
30  /// </summary>
31  public static class KaikoDataConverterProgram
32  {
33  /// <summary>
34  /// Kaiko data converter entry point.
35  /// </summary>
36  /// <param name="sourceDirectory">The source directory where all Kaiko zipped files are stored..</param>
37  /// <param name="date">The date to process.</param>
38  /// <param name="exchange">The exchange to process, if not defined, all exchanges will be processed.</param>
39  /// <exception cref="ArgumentException">Source folder does not exists.</exception>
40  /// <remarks>This converter will process automatically data for every exchange and for both tick types if the raw data files are available in the sourceDirectory</remarks>
41  public static void KaikoDataConverter(string sourceDirectory, string date, string exchange = "")
42  {
43  var timer = new Stopwatch();
44  timer.Start();
45  var folderPath = new DirectoryInfo(sourceDirectory);
46  if (!folderPath.Exists)
47  {
48  throw new ArgumentException($"Source folder {folderPath.FullName} not found");
49  }
50 
51  exchange = !string.IsNullOrEmpty(exchange) && exchange.ToLowerInvariant() == "gdax" ? "coinbase" : exchange;
52 
53  var processingDate = Parse.DateTimeExact(date, DateFormat.EightCharacter);
54  foreach (var filePath in folderPath.EnumerateFiles("*.zip"))
55  {
56  // Do not process exchanges other than the one defined.
57  if (!string.IsNullOrEmpty(exchange) && !filePath.Name.ToLowerInvariant().Contains(exchange.ToLowerInvariant())) continue;
58 
59  Log.Trace($"KaikoDataConverter(): Starting data conversion from source {filePath.Name} for date {processingDate:yyyy_MM_dd}... ");
60  using (var zip = new ZipFile(filePath.FullName))
61  {
62  var targetDayEntries = zip.Entries.Where(e => e.FileName.Contains($"{processingDate.ToStringInvariant("yyyy_MM_dd")}")).ToList();
63 
64  if (!targetDayEntries.Any())
65  {
66  Log.Error($"KaikoDataConverter(): Date {processingDate:yyyy_MM_dd} not found in source file {filePath.FullName}.");
67  }
68 
69  foreach (var zipEntry in targetDayEntries)
70  {
71  var nameParts = zipEntry.FileName.Split(new char[] { '/' }).Last().Split(new char[] { '_' });
72  var market = nameParts[0] == "Coinbase" ? "GDAX" : nameParts[0];
73  var ticker = nameParts[1];
74  var tickType = nameParts[2] == "trades" ? TickType.Trade : TickType.Quote;
75  var symbol = Symbol.Create(ticker, SecurityType.Crypto, market);
76 
77  Log.Trace($"KaikoDataConverter(): Processing {symbol.Value} {tickType}");
78 
79  // Generate ticks from raw data and write them to disk
80 
81  var reader = new KaikoDataReader(symbol, tickType);
82  var ticks = reader.GetTicksFromZipEntry(zipEntry);
83 
84  var writer = new LeanDataWriter(Resolution.Tick, symbol, Globals.DataFolder, tickType);
85  writer.Write(ticks);
86 
87  try
88  {
89  Log.Trace($"KaikoDataConverter(): Starting consolidation for {symbol.Value} {tickType}");
90  List<TickAggregator> consolidators = new List<TickAggregator>();
91 
92  if (tickType == TickType.Trade)
93  {
94  consolidators.AddRange(new[]
95  {
96  new TradeTickAggregator(Resolution.Second),
97  new TradeTickAggregator(Resolution.Minute),
98  });
99  }
100  else
101  {
102  consolidators.AddRange(new[]
103  {
104  new QuoteTickAggregator(Resolution.Second),
105  new QuoteTickAggregator(Resolution.Minute),
106  });
107  }
108 
109  foreach (var tick in ticks)
110  {
111  foreach (var consolidator in consolidators)
112  {
113  consolidator.Update(tick);
114  }
115  }
116 
117  foreach (var consolidator in consolidators)
118  {
119  writer = new LeanDataWriter(consolidator.Resolution, symbol, Globals.DataFolder, tickType);
120  writer.Write(consolidator.Flush());
121  }
122  }
123  catch (Exception e)
124  {
125  Log.Error($"KaikoDataConverter(): Error processing entry {zipEntry.FileName}. Exception {e}");
126  }
127  }
128  }
129  }
130  Log.Trace($"KaikoDataConverter(): Finished in {timer.Elapsed}");
131  }
132  }
133 }