Lean  $LEAN_TAG$
AlgoSeekFuturesReader.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Collections;
18 using System.Collections.Generic;
19 using System.Globalization;
20 using System.IO;
21 using System.Linq;
23 using QuantConnect.Logging;
26 using QuantConnect.Util;
27 
29 {
30  /// <summary>
31  /// Enumerator for converting AlgoSeek futures files into Ticks.
32  /// </summary>
33  public class AlgoSeekFuturesReader : IEnumerator<Tick>
34  {
35  private readonly Stream _stream;
36  private readonly StreamReader _streamReader;
37  private readonly HashSet<string> _symbolFilter;
38  private readonly Dictionary<string, decimal> _symbolMultipliers;
39  private readonly SymbolPropertiesDatabase _symbolProperties;
40 
41  private readonly int _columnTimestamp = -1;
42  private readonly int _columnSecID = -1;
43  private readonly int _columnTicker = -1;
44  private readonly int _columnType = -1;
45  private readonly int _columnSide = -1;
46  private readonly int _columnQuantity = -1;
47  private readonly int _columnPrice = -1;
48  private readonly int _columnsCount = -1;
49 
50  /// <summary>
51  /// Enumerate through the lines of the algoseek files.
52  /// </summary>
53  /// <param name="file">BZ File for AlgoSeek</param>
54  /// <param name="symbolMultipliers">Symbol price multiplier</param>
55  /// <param name="symbolFilter">Symbol filter to apply, if any</param>
56  public AlgoSeekFuturesReader(string file, Dictionary<string, decimal> symbolMultipliers, HashSet<string> symbolFilter = null)
57  {
58  var streamProvider = StreamProvider.ForExtension(Path.GetExtension(file));
59  _stream = streamProvider.Open(file).First();
60  _streamReader = new StreamReader(_stream);
61  _symbolFilter = symbolFilter;
62  _symbolMultipliers = symbolMultipliers.ToDictionary();
63  _symbolProperties = SymbolPropertiesDatabase.FromDataFolder();
64 
65  // detecting column order in the file
66  var headerLine = _streamReader.ReadLine();
67  if (!string.IsNullOrEmpty(headerLine))
68  {
69  var header = headerLine.ToCsv();
70  _columnTimestamp = header.FindIndex(x => x == "Timestamp");
71  _columnTicker = header.FindIndex(x => x == "Ticker");
72  _columnType = header.FindIndex(x => x == "Type");
73  _columnSide = header.FindIndex(x => x == "Side");
74  _columnSecID = header.FindIndex(x => x == "SecurityID");
75  _columnQuantity = header.FindIndex(x => x == "Quantity");
76  _columnPrice = header.FindIndex(x => x == "Price");
77 
78  _columnsCount = new[] { _columnTimestamp, _columnTicker, _columnType, _columnSide, _columnSecID, _columnQuantity, _columnPrice }.Max();
79  }
80  //Prime the data pump, set the current.
81  Current = null;
82  MoveNext();
83  }
84 
85  /// <summary>
86  /// Parse the next line of the algoseek future file.
87  /// </summary>
88  /// <returns></returns>
89  public bool MoveNext()
90  {
91  string line;
92  Tick tick = null;
93  while (tick == null && (line = _streamReader.ReadLine()) != null)
94  {
95  // If line is invalid continue looping to find next valid line.
96  tick = Parse(line);
97  }
98 
99  Current = tick;
100  return Current != null;
101  }
102 
103  /// <summary>
104  /// Current top of the tick file.
105  /// </summary>
106  public Tick Current { get; private set; }
107 
108  /// <summary>
109  /// Gets the current element in the collection.
110  /// </summary>
111  /// <returns>
112  /// The current element in the collection.
113  /// </returns>
114  object IEnumerator.Current => Current;
115 
116  /// <summary>
117  /// Reset the enumerator for the AlgoSeekFuturesReader
118  /// </summary>
119  public void Reset()
120  {
121  throw new NotImplementedException("Reset not implemented for AlgoSeekFuturesReader.");
122  }
123 
124  /// <summary>
125  /// Dispose of the underlying AlgoSeekFuturesReader
126  /// </summary>
127  public void Dispose()
128  {
129  _stream.Close();
130  _stream.Dispose();
131  _streamReader.Close();
132  _streamReader.Dispose();
133  }
134 
135  /// <summary>
136  /// Parse a string line into a future tick.
137  /// </summary>
138  /// <param name="line"></param>
139  /// <returns></returns>
140  private Tick Parse(string line)
141  {
142  try
143  {
144  const int TradeMask = 2;
145  const int QuoteMask = 1;
146  const int OpenInterestMask = 11;
147  const int MessageTypeMask = 15;
148 
149  // parse csv check column count
150  var csv = line.ToCsv();
151  if (csv.Count - 1 < _columnsCount)
152  {
153  return null;
154  }
155 
156  var ticker = csv[_columnTicker];
157 
158  // we filter out options and spreads
159  if (ticker.IndexOfAny(new [] { ' ', '-' }) != -1)
160  {
161  return null;
162  }
163 
164  ticker = ticker.Trim('"');
165 
166  if (string.IsNullOrEmpty(ticker))
167  {
168  return null;
169  }
170 
171  // ignoring time zones completely -- this is all in the 'data-time-zone'
172  var timeString = csv[_columnTimestamp];
173  var time = DateTime.ParseExact(timeString, "yyyyMMddHHmmssFFF", CultureInfo.InvariantCulture);
174 
175  var symbol = SymbolRepresentation.ParseFutureSymbol(ticker, time.Year);
176 
177  if (symbol == null || !_symbolMultipliers.ContainsKey(symbol.ID.Symbol) ||
178  _symbolFilter != null && !_symbolFilter.Contains(symbol.ID.Symbol, StringComparer.InvariantCultureIgnoreCase))
179  {
180  return null;
181  }
182 
183  // detecting tick type (trade or quote)
184  TickType tickType;
185  bool isAsk = false;
186 
187  var type = csv[_columnType].ConvertInvariant<int>();
188  if ((type & MessageTypeMask) == TradeMask)
189  {
190  tickType = TickType.Trade;
191  }
192  else if ((type & MessageTypeMask) == OpenInterestMask)
193  {
194  tickType = TickType.OpenInterest;
195  }
196  else if ((type & MessageTypeMask) == QuoteMask)
197  {
198  tickType = TickType.Quote;
199 
200  switch (csv[_columnSide])
201  {
202  case "B":
203  isAsk = false;
204  break;
205  case "S":
206  isAsk = true;
207  break;
208  default:
209  {
210  return null;
211  }
212  }
213  }
214  else
215  {
216  return null;
217  }
218 
219  // All futures but VIX are delivered with a scale factor of 10000000000.
220  var scaleFactor = symbol.ID.Symbol == "VX" ? decimal.One : 10000000000m;
221 
222  var price = csv[_columnPrice].ToDecimal() / scaleFactor;
223  var quantity = csv[_columnQuantity].ToInt32();
224 
225  price *= _symbolMultipliers[symbol.ID.Symbol];
226 
227  switch (tickType)
228  {
229  case TickType.Quote:
230 
231  var tick = new Tick
232  {
233  Symbol = symbol,
234  Time = time,
235  TickType = tickType,
236  Value = price
237  };
238 
239  if (isAsk)
240  {
241  tick.AskPrice = price;
242  tick.AskSize = quantity;
243  }
244  else
245  {
246  tick.BidPrice = price;
247  tick.BidSize = quantity;
248  }
249 
250  return tick;
251 
252  case TickType.Trade:
253 
254  tick = new Tick
255  {
256  Symbol = symbol,
257  Time = time,
258  TickType = tickType,
259  Value = price,
260  Quantity = quantity
261  };
262  return tick;
263 
264  case TickType.OpenInterest:
265 
266  tick = new Tick
267  {
268  Symbol = symbol,
269  Time = time,
270  TickType = tickType,
271  Exchange = symbol.ID.Market,
272  Value = quantity
273  };
274  return tick;
275  }
276 
277  return null;
278  }
279  catch (Exception err)
280  {
281  Log.Error(err);
282  Log.Trace("Line: {0}", line);
283  return null;
284  }
285  }
286  }
287 }