Lean  $LEAN_TAG$
IDataProcessor.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Collections.Generic;
18 using System.Linq;
19 using QuantConnect.Data;
22 
23 namespace QuantConnect.ToolBox
24 {
25  /// <summary>
26  /// Specifies a piece of processing that should be performed against a source file
27  /// </summary>
28  public interface IDataProcessor : IDisposable
29  {
30  /// <summary>
31  /// Invoked for each piece of data from the source file
32  /// </summary>
33  /// <param name="data">The data to be processed</param>
34  void Process(IBaseData data);
35  }
36 
37  /// <summary>
38  /// Provides methods for creating data processor stacks
39  /// </summary>
40  public static class DataProcessor
41  {
42  /// <summary>
43  /// Creates a new data processor that will filter in input data before piping it into the specified processor
44  /// </summary>
45  public static IDataProcessor FilteredBy(this IDataProcessor processor, Func<IBaseData, bool> predicate)
46  {
47  return new FilteredDataProcessor(processor, predicate);
48  }
49 
50  /// <summary>
51  /// Creates a data processor that will aggregate and zip the requested resolutions of data
52  /// </summary>
53  public static IDataProcessor Zip(string dataDirectory, IEnumerable<Resolution> resolutions, TickType tickType, bool sourceIsTick)
54  {
55  var set = resolutions.ToHashSet();
56 
57  var root = new PipeDataProcessor();
58 
59  // only filter tick sources
60  var stack = !sourceIsTick ? root
61  : (IDataProcessor) new FilteredDataProcessor(root, x => ((Tick) x).TickType == tickType);
62 
63  if (set.Contains(Resolution.Tick))
64  {
65  // tick is filtered via trade/quote
66  var tick = new CsvDataProcessor(dataDirectory, Resolution.Tick, tickType);
67  root.PipeTo(tick);
68  }
69  if (set.Contains(Resolution.Second))
70  {
71  root = AddResolution(dataDirectory, tickType, root, Resolution.Second, sourceIsTick);
72  sourceIsTick = false;
73  }
74  if (set.Contains(Resolution.Minute))
75  {
76  root = AddResolution(dataDirectory, tickType, root, Resolution.Minute, sourceIsTick);
77  sourceIsTick = false;
78  }
79  if (set.Contains(Resolution.Hour))
80  {
81  root = AddResolution(dataDirectory, tickType, root, Resolution.Hour, sourceIsTick);
82  sourceIsTick = false;
83  }
84  if (set.Contains(Resolution.Daily))
85  {
86  AddResolution(dataDirectory, tickType, root, Resolution.Daily, sourceIsTick);
87  }
88  return stack;
89  }
90 
91  private static PipeDataProcessor AddResolution(string dataDirectory, TickType tickType, PipeDataProcessor root, Resolution resolution, bool sourceIsTick)
92  {
93  var second = new CsvDataProcessor(dataDirectory, resolution, tickType);
94  var secondRoot = new PipeDataProcessor(second);
95  var aggregator = new ConsolidatorDataProcessor(secondRoot, data => CreateConsolidator(resolution, tickType, data, sourceIsTick));
96  root.PipeTo(aggregator);
97  return secondRoot;
98  }
99 
100  private static IDataConsolidator CreateConsolidator(Resolution resolution, TickType tickType, IBaseData data, bool sourceIsTick)
101  {
102  var securityType = data.Symbol.ID.SecurityType;
103  switch (securityType)
104  {
105  case SecurityType.Base:
106  case SecurityType.Equity:
107  case SecurityType.Cfd:
108  case SecurityType.Forex:
109  return new TickConsolidator(resolution.ToTimeSpan());
110 
111  case SecurityType.Option:
112  if (tickType == TickType.Trade)
113  {
114  return sourceIsTick
115  ? new TickConsolidator(resolution.ToTimeSpan())
116  : (IDataConsolidator) new TradeBarConsolidator(resolution.ToTimeSpan());
117  }
118  if (tickType == TickType.Quote)
119  {
120  return sourceIsTick
121  ? new TickQuoteBarConsolidator(resolution.ToTimeSpan())
122  : (IDataConsolidator) new QuoteBarConsolidator(resolution.ToTimeSpan());
123  }
124  break;
125  }
126  throw new NotImplementedException("Consolidator creation is not defined for " + securityType + " " + tickType);
127  }
128  }
129 }