Lean  $LEAN_TAG$
KaikoCryptoReader.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  *
8  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15 */
16 
17 using QuantConnect.Data;
19 using QuantConnect.Logging;
20 using System;
21 using System.Collections.Generic;
22 using System.Globalization;
23 using System.IO;
24 using System.IO.Compression;
25 using System.Linq;
26 using ZipEntry = Ionic.Zip.ZipEntry;
27 
29 {
30  /// <summary>
31  /// Decompress single entry from Kaiko crypto raw data.
32  /// </summary>
33  public class KaikoDataReader
34  {
35  private Symbol _symbol;
36  private TickType _tickType;
37 
38  /// <summary>
39  /// Initializes a new instance of the <see cref="KaikoDataReader"/> class.
40  /// </summary>
41  /// <param name="symbol">The symbol.</param>
42  /// <param name="tickType">Type of the tick.</param>
43  public KaikoDataReader(Symbol symbol, TickType tickType)
44  {
45  _symbol = symbol;
46  _tickType = tickType;
47  }
48 
49  /// <summary>
50  /// Gets the ticks from Kaiko file zip entry.
51  /// </summary>
52  /// <param name="zipEntry">The zip entry.</param>
53  /// <returns></returns>
54  public IEnumerable<BaseData> GetTicksFromZipEntry(ZipEntry zipEntry)
55  {
56  var rawData = GetRawDataStreamFromEntry(zipEntry);
57  return _tickType == TickType.Trade ? ParseKaikoTradeFile(rawData) : ParseKaikoQuoteFile(rawData);
58  }
59 
60  /// <summary>
61  /// Gets the raw data from entry.
62  /// </summary>
63  /// <param name="zipEntry">The zip entry.</param>
64  /// <returns>IEnumerable with the zip entry content.</returns>
65  private IEnumerable<string> GetRawDataStreamFromEntry(ZipEntry zipEntry)
66  {
67  using (var outerStream = new StreamReader(zipEntry.OpenReader()))
68  using (var innerStream = new GZipStream(outerStream.BaseStream, CompressionMode.Decompress))
69  using (var outputStream = new StreamReader(innerStream))
70  {
71  string line;
72  while ((line = outputStream.ReadLine()) != null)
73  {
74  yield return line;
75  }
76  }
77  }
78 
79  /// <summary>
80  /// Parse order book information for Kaiko data files
81  /// </summary>
82  /// <param name="rawDataLines">The raw data lines.</param>
83  /// <returns>
84  /// IEnumerable of ticks representing the Kaiko data
85  /// </returns>
86  private IEnumerable<Tick> ParseKaikoQuoteFile(IEnumerable<string> rawDataLines)
87  {
88  var headerLine = rawDataLines.First();
89  var headerCsv = headerLine.ToCsv();
90  var typeColumn = headerCsv.FindIndex(x => x == "type");
91  var dateColumn = headerCsv.FindIndex(x => x == "date");
92  var priceColumn = headerCsv.FindIndex(x => x == "price");
93  var quantityColumn = headerCsv.FindIndex(x => x == "amount");
94 
95  long currentEpoch = 0;
96  var currentEpochTicks = new List<KaikoTick>();
97 
98  foreach (var line in rawDataLines.Skip(1))
99  {
100  if (line == null || string.IsNullOrEmpty(line)) continue;
101 
102  var lineParts = line.Split(',');
103 
104  var tickEpoch = Parse.Long(lineParts[dateColumn]);
105 
106  decimal quantity;
107  decimal price;
108 
109  try
110  {
111  quantity = ParseScientificNotationToDecimal(lineParts, quantityColumn);
112  price = ParseScientificNotationToDecimal(lineParts, priceColumn);
113  }
114  catch (Exception ex)
115  {
116  Log.Error($"KaikoDataConverter.ParseKaikoQuoteFile(): Raw data corrupted. Line {string.Join(" ", lineParts)}, Exception {ex}");
117  continue;
118  }
119 
120  var currentTick = new KaikoTick
121  {
122  TickType = TickType.Quote,
123  Time = Time.UnixMillisecondTimeStampToDateTime(tickEpoch),
124  Quantity = quantity,
125  Value = price,
126  OrderDirection = lineParts[typeColumn]
127  };
128 
129  if (currentEpoch != tickEpoch)
130  {
131  var quoteTick = CreateQuoteTick(Time.UnixMillisecondTimeStampToDateTime(currentEpoch), currentEpochTicks);
132 
133  if (quoteTick != null) yield return quoteTick;
134 
135  currentEpochTicks.Clear();
136  currentEpoch = tickEpoch;
137  }
138 
139  currentEpochTicks.Add(currentTick);
140  }
141  }
142 
143  /// <summary>
144  /// Take a minute snapshot of order book information and make a single Lean quote tick
145  /// </summary>
146  /// <param name="date">The data being processed</param>
147  /// <param name="currentEpcohTicks">The snapshot of bid/ask Kaiko data</param>
148  /// <returns>A single Lean quote tick</returns>
149  private Tick CreateQuoteTick(DateTime date, List<KaikoTick> currentEpcohTicks)
150  {
151  // lowest ask
152  var bestAsk = currentEpcohTicks.Where(x => x.OrderDirection == "a")
153  .OrderBy(x => x.Value)
154  .FirstOrDefault();
155 
156  // highest bid
157  var bestBid = currentEpcohTicks.Where(x => x.OrderDirection == "b")
158  .OrderByDescending(x => x.Value)
159  .FirstOrDefault();
160 
161  if (bestAsk == null && bestBid == null)
162  {
163  // Did not have enough data to create a tick
164  return null;
165  }
166 
167  var tick = new Tick()
168  {
169  Symbol = _symbol,
170  Time = date,
171  TickType = TickType.Quote
172  };
173 
174  if (bestBid != null)
175  {
176  tick.BidPrice = bestBid.Price;
177  tick.BidSize = bestBid.Quantity;
178  }
179 
180  if (bestAsk != null)
181  {
182  tick.AskPrice = bestAsk.Price;
183  tick.AskSize = bestAsk.Quantity;
184  }
185 
186  return tick;
187  }
188 
189  /// <summary>
190  /// Parse a kaiko trade file
191  /// </summary>
192  /// <param name="unzippedFile">The path to the unzipped file</param>
193  /// <returns>Lean Ticks in the Kaiko file</returns>
194  private IEnumerable<Tick> ParseKaikoTradeFile(IEnumerable<string> rawDataLines)
195  {
196  var headerLine = rawDataLines.First();
197  var headerCsv = headerLine.ToCsv();
198  var dateColumn = headerCsv.FindIndex(x => x == "date");
199  var priceColumn = headerCsv.FindIndex(x => x == "price");
200  var quantityColumn = headerCsv.FindIndex(x => x == "amount");
201 
202  foreach (var line in rawDataLines.Skip(1))
203  {
204  if (line == null || string.IsNullOrEmpty(line)) continue;
205 
206  var lineParts = line.Split(',');
207 
208  decimal quantity;
209  decimal price;
210 
211  try
212  {
213  quantity = ParseScientificNotationToDecimal(lineParts, quantityColumn);
214  price = ParseScientificNotationToDecimal(lineParts, priceColumn);
215  }
216  catch (Exception ex)
217  {
218  Log.Error($"KaikoDataConverter.ParseKaikoTradeFile(): Raw data corrupted. Line {string.Join(" ", lineParts)}, Exception {ex}");
219  continue;
220  }
221 
222  yield return new Tick
223  {
224  Symbol = _symbol,
225  TickType = TickType.Trade,
226  Time = Time.UnixMillisecondTimeStampToDateTime(Parse.Long(lineParts[dateColumn])),
227  Quantity = quantity,
228  Value = price
229  };
230  }
231  }
232 
233  /// <summary>
234  /// Parse the quantity field of the kaiko ticks - can sometimes be expressed in scientific notation
235  /// </summary>
236  /// <param name="lineParts">The line from the Kaiko file</param>
237  /// <param name="column">The index of the quantity column </param>
238  /// <returns>The quantity as a decimal</returns>
239  private static decimal ParseScientificNotationToDecimal(string[] lineParts, int column)
240  {
241  var value = lineParts[column];
242  if (value.Contains('e', StringComparison.InvariantCulture))
243  {
244  return Parse.Decimal(value, NumberStyles.Float);
245  }
246 
247  return lineParts[column].ConvertInvariant<decimal>();
248  }
249 
250  /// <summary>
251  /// Simple class to add order direction to Tick
252  /// used for aggregating Kaiko order book snapshots
253  /// </summary>
254  private class KaikoTick : Tick
255  {
256  public string OrderDirection { get; set; }
257  }
258  }
259 }