Lean  $LEAN_TAG$
AlgoSeekFuturesProcessor.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.IO;
18 using System.Linq;
19 using System.Threading;
22 using QuantConnect.Logging;
23 using QuantConnect.Util;
24 
26 {
27  /// <summary>
28  /// Processor for caching and consolidating ticks;
29  /// then flushing the ticks in memory to disk when triggered.
30  /// </summary>
32  {
33  static private int _curFileCount = 0;
34  private string _zipPath;
35  private string _entryPath;
36  private Symbol _symbol;
37  private TickType _tickType;
38  private Resolution _resolution;
39  private LazyStreamWriter _streamWriter;
40  private string _dataDirectory;
41  private IDataConsolidator _consolidator;
42  private DateTime _referenceDate;
43  private static string[] _windowsRestrictedNames =
44  {
45  "con", "prn", "aux", "nul"
46  };
47 
48  /// <summary>
49  /// Zip entry name for the futures contract
50  /// </summary>
51  public string EntryPath
52  {
53  get
54  {
55  if (_entryPath == null)
56  {
57  _entryPath = SafeName(LeanData.GenerateZipEntryName(_symbol, _referenceDate, _resolution, _tickType));
58  }
59  return _entryPath;
60  }
61  set { _entryPath = value; }
62  }
63 
64  /// <summary>
65  /// Zip file path for the futures contract collection
66  /// </summary>
67  public string ZipPath
68  {
69  get
70  {
71  if (_zipPath == null)
72  {
73  _zipPath = Path.Combine(_dataDirectory, SafeName(LeanData.GenerateRelativeZipFilePath(Safe(_symbol), _referenceDate, _resolution, _tickType).Replace(".zip", string.Empty))) + ".zip";
74  }
75  return _zipPath;
76  }
77  set { _zipPath = value; }
78  }
79 
80  /// <summary>
81  /// Public access to the processor symbol
82  /// </summary>
83  public Symbol Symbol
84  {
85  get { return _symbol; }
86  }
87 
88  /// <summary>
89  /// Accessor for the final enumerator
90  /// </summary>
91  public Resolution Resolution
92  {
93  get { return _resolution; }
94  }
95 
96  /// <summary>
97  /// Type of this futures processor.
98  /// ASOP's are grouped trade type for file writing.
99  /// </summary>
100  public TickType TickType
101  {
102  get { return _tickType; }
103  set { _tickType = value; }
104  }
105 
106  /// <summary>
107  /// If no data has been consolidated, do not write to disk
108  /// </summary>
109  public bool ShouldWriteToDisk()
110  {
111  return _consolidator.Consolidated != null;
112  }
113 
114  /// <summary>
115  /// Create a new AlgoSeekFuturesProcessor for enquing consolidated bars and flushing them to disk
116  /// </summary>
117  /// <param name="symbol">Symbol for the processor</param>
118  /// <param name="date">Reference date for the processor</param>
119  /// <param name="tickType">TradeBar or QuoteBar to generate</param>
120  /// <param name="resolution">Resolution to consolidate</param>
121  /// <param name="dataDirectory">Data directory for LEAN</param>
122  public AlgoSeekFuturesProcessor(Symbol symbol, DateTime date, TickType tickType, Resolution resolution, string dataDirectory)
123  {
124  _symbol = Safe(symbol);
125  _tickType = tickType;
126  _referenceDate = date;
127  _resolution = resolution;
128  _dataDirectory = dataDirectory;
129 
130  // Setup the consolidator for the requested resolution
131  if (resolution == Resolution.Tick)
132  {
133  _consolidator = new IdentityDataConsolidator<Tick>();
134  }
135  else
136  {
137  switch (tickType)
138  {
139  case TickType.Trade:
140  _consolidator = new TickConsolidator(resolution.ToTimeSpan());
141  break;
142  case TickType.Quote:
143  _consolidator = new TickQuoteBarConsolidator(resolution.ToTimeSpan());
144  break;
145  case TickType.OpenInterest:
146  _consolidator = new OpenInterestConsolidator(resolution.ToTimeSpan());
147  break;
148  }
149  }
150 
151  var path = ZipPath.Replace(".zip", string.Empty);
152  Directory.CreateDirectory(path);
153 
154  var file = Path.Combine(path, EntryPath);
155 
156  try
157  {
158  _streamWriter = new LazyStreamWriter(file);
159  }
160  catch (Exception err)
161  {
162  // we are unable to open new file - it is already opened due to bug in algoseek data
163  Log.Error("File: {0} Err: {1} Source: {2} Stack: {3}", file, err.Message, err.Source, err.StackTrace);
164  var newRandomizedName = (file + "-" + Math.Abs(file.GetHashCode()).ToStringInvariant()).Replace(".csv", string.Empty) + ".csv";
165 
166  // we store the information under different (randomized) name
167  Log.Trace("Changing name from {0} to {1}", file, newRandomizedName);
168  _streamWriter = new LazyStreamWriter(newRandomizedName);
169  }
170 
171  // On consolidating the bars put the bar into a queue in memory to be written to disk later.
172  _consolidator.DataConsolidated += (sender, consolidated) =>
173  {
174  _streamWriter.WriteLine(LeanData.GenerateLine(consolidated, SecurityType.Future, Resolution));
175  };
176 
177  Interlocked.Add(ref _curFileCount, 1);
178  if (_curFileCount % 1000 == 0)
179  {
180  Log.Trace("Opened more files: {0}", _curFileCount);
181  }
182  }
183 
184  /// <summary>
185  /// Process the tick; add to the con
186  /// </summary>
187  /// <param name="data"></param>
188  public void Process(Tick data)
189  {
190  if (data.TickType != _tickType)
191  {
192  return;
193  }
194 
195  _consolidator.Update(data);
196  }
197 
198  /// <summary>
199  /// Write the in memory queues to the disk.
200  /// </summary>
201  /// <param name="frontierTime">Current foremost tick time</param>
202  /// <param name="finalFlush">Indicates is this is the final push to disk at the end of the data</param>
203  public void FlushBuffer(DateTime frontierTime, bool finalFlush)
204  {
205  //Force the consolidation if time has past the bar
206  _consolidator.Scan(frontierTime);
207 
208  // If this is the final packet dump it to the queue
209  if (finalFlush)
210  {
211  if (_consolidator.WorkingData != null)
212  {
213  _streamWriter.WriteLine(LeanData.GenerateLine(_consolidator.WorkingData, SecurityType.Future, Resolution));
214  }
215 
216  _streamWriter.Flush();
217  _streamWriter.Close();
218  _streamWriter = null;
219 
220  Interlocked.Add(ref _curFileCount, -1);
221  if (_curFileCount % 1000 == 0)
222  {
223  Log.Trace("Closed some files: {0}", _curFileCount);
224  }
225  }
226  }
227 
228  /// <summary>
229  /// Add filtering to safe check the symbol for windows environments
230  /// </summary>
231  /// <param name="symbol">Symbol to rename if required</param>
232  /// <returns>Renamed symbol for reserved names</returns>
233  private static Symbol Safe(Symbol symbol)
234  {
235  if (OS.IsWindows)
236  {
237  if (_windowsRestrictedNames.Contains(symbol.Value.ToLowerInvariant()))
238  {
239  symbol = Symbol.CreateFuture(SafeName(symbol.Underlying.Value), symbol.ID.Market, symbol.ID.Date);
240  }
241  }
242  return symbol;
243  }
244  private static string SafeName(string fileName)
245  {
246  if (OS.IsWindows)
247  {
248  foreach (var name in _windowsRestrictedNames)
249  {
250  // The 'con' restricted filename will corrupt the 'seCONed' filepath
251  var restrictedFilePath = Path.DirectorySeparatorChar + name;
252  var safeFilePath = Path.DirectorySeparatorChar + "_" + name;
253  fileName = fileName.Replace(restrictedFilePath, safeFilePath);
254  }
255  }
256  return fileName;
257  }
258  }
259 }