Lean  $LEAN_TAG$
ConsolidatorDataProcessor.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Collections.Generic;
18 using QuantConnect.Data;
20 
21 namespace QuantConnect.ToolBox
22 {
23  /// <summary>
24  /// Provides an implementation of <see cref="IDataProcessor"/> that consolidates the data
25  /// stream and forwards the consolidated data to other processors
26  /// </summary>
28  {
29  private DateTime _frontier;
30  private readonly IDataProcessor _destination;
31  private readonly Func<IBaseData, IDataConsolidator> _createConsolidator;
32  private readonly Dictionary<Symbol, IDataConsolidator> _consolidators;
33 
34  /// <summary>
35  /// Initializes a new instance of the <see cref="ConsolidatorDataProcessor"/> class
36  /// </summary>
37  /// <param name="destination">The receiver of the consolidated data</param>
38  /// <param name="createConsolidator">Function used to create consolidators</param>
39  public ConsolidatorDataProcessor(IDataProcessor destination, Func<IBaseData, IDataConsolidator> createConsolidator)
40  {
41  _destination = destination;
42  _createConsolidator = createConsolidator;
43  _consolidators = new Dictionary<Symbol, IDataConsolidator>();
44  }
45 
46  /// <summary>
47  /// Invoked for each piece of data from the source file
48  /// </summary>
49  /// <param name="data">The data to be processed</param>
50  public void Process(IBaseData data)
51  {
52  // grab the correct consolidator for this symbol
53  IDataConsolidator consolidator;
54  if (!_consolidators.TryGetValue(data.Symbol, out consolidator))
55  {
56  consolidator = _createConsolidator(data);
57  consolidator.DataConsolidated += OnDataConsolidated;
58  _consolidators[data.Symbol] = consolidator;
59  }
60 
61  consolidator.Update(data);
62  }
63 
64  /// <summary>
65  /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
66  /// </summary>
67  public void Dispose()
68  {
69  _frontier = DateTime.MaxValue;
70 
71  // check the other consolidators to see if they also need to emit their working bars
72  foreach (var consolidator in _consolidators.Values)
73  {
74  consolidator.Scan(_frontier);
75  }
76 
77  _destination.Dispose();
78  _consolidators.Clear();
79  }
80 
81  /// <summary>
82  /// Handles the <see cref="IDataConsolidator.DataConsolidated"/> event
83  /// </summary>
84  private void OnDataConsolidated(object sender, IBaseData args)
85  {
86  _destination.Process(args);
87 
88  // we've already checked this frontier time, so don't scan the consolidators
89  if (_frontier >= args.EndTime) return;
90  _frontier = args.EndTime;
91 
92  // check the other consolidators to see if they also need to emit
93  foreach (var consolidator in _consolidators.Values)
94  {
95  // back up the time a single instance, this allows data at exact same
96  // time to still come through
97  consolidator.Scan(args.EndTime.AddTicks(-1));
98  }
99  }
100  }
101 }