Lean  $LEAN_TAG$
CsvDataProcessor.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Collections.Generic;
18 using System.IO;
19 using System.Threading.Tasks;
20 using QuantConnect.Data;
21 using QuantConnect.Util;
22 
23 namespace QuantConnect.ToolBox
24 {
25  /// <summary>
26  /// Provides an implementation of <see cref="IDataProcessor"/> that writes the incoming
27  /// stream of data to a csv file.
28  /// </summary>
30  {
31  private const int TicksPerFlush = 50;
32  private static readonly object DirectoryCreateSync = new object();
33 
34  private readonly string _dataDirectory;
35  private readonly Resolution _resolution;
36  private readonly TickType _tickType;
37  private readonly Dictionary<Symbol, Writer> _writers;
38 
39  /// <summary>
40  /// Initializes a new instance of the <see cref="CsvDataProcessor"/> class
41  /// </summary>
42  /// <param name="dataDirectory">The root data directory, /Data</param>
43  /// <param name="resolution">The resolution being sent into the Process method</param>
44  /// <param name="tickType">The tick type, trade or quote</param>
45  public CsvDataProcessor(string dataDirectory, Resolution resolution, TickType tickType)
46  {
47  _dataDirectory = dataDirectory;
48  _resolution = resolution;
49  _tickType = tickType;
50  _writers = new Dictionary<Symbol, Writer>();
51  }
52 
53  /// <summary>
54  /// Invoked for each piece of data from the source file
55  /// </summary>
56  /// <param name="data">The data to be processed</param>
57  public void Process(IBaseData data)
58  {
59  Writer writer;
60  if (!_writers.TryGetValue(data.Symbol, out writer))
61  {
62  writer = CreateTextWriter(data);
63  _writers[data.Symbol] = writer;
64  }
65 
66  // flush every so often
67  if (++writer.ProcessCount%TicksPerFlush == 0)
68  {
69  writer.TextWriter.Flush();
70  }
71 
72  var line = LeanData.GenerateLine(data, data.Symbol.ID.SecurityType, _resolution);
73  writer.TextWriter.WriteLine(line);
74  }
75 
76  /// <summary>
77  /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
78  /// </summary>
79  public void Dispose()
80  {
81  foreach (var kvp in _writers)
82  {
83  kvp.Value.TextWriter.Dispose();
84  }
85  }
86 
87  /// <summary>
88  /// Creates the <see cref="TextWriter"/> that writes data to csv files
89  /// </summary>
90  private Writer CreateTextWriter(IBaseData data)
91  {
92  var entry = LeanData.GenerateZipEntryName(data.Symbol, data.Time.Date, _resolution, _tickType);
93  var relativePath = LeanData.GenerateRelativeZipFilePath(data.Symbol, data.Time.Date, _resolution, _tickType)
94  .Replace(".zip", string.Empty);
95  var path = Path.Combine(Path.Combine(_dataDirectory, relativePath), entry);
96  var directory = new FileInfo(path).Directory.FullName;
97  if (!Directory.Exists(directory))
98  {
99  // lock before checking again
100  lock (DirectoryCreateSync) if (!Directory.Exists(directory)) Directory.CreateDirectory(directory);
101  }
102 
103  return new Writer(path, new StreamWriter(path));
104  }
105 
106 
107  private sealed class Writer
108  {
109  public readonly string Path;
110  public readonly TextWriter TextWriter;
111  public int ProcessCount;
112  public Writer(string path, TextWriter textWriter)
113  {
114  Path = path;
115  TextWriter = textWriter;
116  }
117  }
118  }
119 }