Lean  $LEAN_TAG$
SynchronizingHistoryProvider.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Collections.Generic;
18 using System.Linq;
19 using System.Threading;
20 using NodaTime;
21 using QuantConnect.Data;
28 using QuantConnect.Util;
29 
31 {
32  /// <summary>
33  /// Provides an abstract implementation of <see cref="IHistoryProvider"/>
34  /// which provides synchronization of multiple history results
35  /// </summary>
37  {
38  private int _dataPointCount;
39 
40  /// <summary>
41  /// The algorithm settings instance to use
42  /// </summary>
44 
45  /// <summary>
46  /// Gets the total number of data points emitted by this history provider
47  /// </summary>
48  public override int DataPointCount => _dataPointCount;
49 
50  /// <summary>
51  /// Enumerates the subscriptions into slices
52  /// </summary>
53  protected IEnumerable<Slice> CreateSliceEnumerableFromSubscriptions(List<Subscription> subscriptions, DateTimeZone sliceTimeZone)
54  {
55  // required by TimeSlice.Create, but we don't need it's behavior
56  var frontier = DateTime.MinValue;
57  // never changes, there's no selection during a history request
58  var universeSelectionData = new Dictionary<Universe, BaseDataCollection>();
59  var timeSliceFactory = new TimeSliceFactory(sliceTimeZone);
60  while (true)
61  {
62  var earlyBirdTicks = long.MaxValue;
63  var data = new List<DataFeedPacket>();
64  foreach (var subscription in subscriptions.Where(subscription => !subscription.EndOfStream))
65  {
66  if (subscription.Current == null && !subscription.MoveNext())
67  {
68  // initial pump. We do it here and not when creating the subscriptions so
69  // that parallel workers can all start as fast as possible
70  continue;
71  }
72 
73  DataFeedPacket packet = null;
74  while (subscription.Current.EmitTimeUtc <= frontier)
75  {
76  if (packet == null)
77  {
78  // for performance, lets be selfish about creating a new instance
79  packet = new DataFeedPacket(subscription.Security, subscription.Configuration);
80 
81  // only add if we have data
82  data.Add(packet);
83  }
84 
85  packet.Add(subscription.Current.Data);
86  Interlocked.Increment(ref _dataPointCount);
87  if (!subscription.MoveNext())
88  {
89  break;
90  }
91  }
92  // update our early bird ticks (next frontier time)
93  if (subscription.Current != null)
94  {
95  // take the earliest between the next piece of data or the next tz discontinuity
96  earlyBirdTicks = Math.Min(earlyBirdTicks, subscription.Current.EmitTimeUtc.Ticks);
97  }
98  }
99 
100  if (data.Count != 0)
101  {
102  // reuse the slice construction code from TimeSlice.Create
103  yield return timeSliceFactory.Create(frontier, data, SecurityChanges.None, universeSelectionData).Slice;
104  }
105 
106  // end of subscriptions, after we emit, else we might drop a data point
107  if (earlyBirdTicks == long.MaxValue) break;
108 
109  frontier = new DateTime(Math.Max(earlyBirdTicks, frontier.Ticks), DateTimeKind.Utc);
110  }
111 
112  // make sure we clean up after ourselves
113  foreach (var subscription in subscriptions)
114  {
115  subscription.Dispose();
116  }
117  }
118 
119  /// <summary>
120  /// Creates a subscription to process the history request
121  /// </summary>
122  protected Subscription CreateSubscription(HistoryRequest request, IEnumerable<BaseData> history)
123  {
124  var config = request.ToSubscriptionDataConfig();
125  var security = new Security(
126  request.ExchangeHours,
127  config,
128  new Cash(Currencies.NullCurrency, 0, 1m),
132  new SecurityCache()
133  );
134 
135  var reader = history.GetEnumerator();
136 
137  var useDailyStrictEndTimes = LeanData.UseDailyStrictEndTimes(AlgorithmSettings, request, config.Symbol, config.Increment);
138  if (useDailyStrictEndTimes)
139  {
140  reader = new StrictDailyEndTimesEnumerator(reader, request.ExchangeHours, request.StartTimeLocal);
141  }
142 
143  // optionally apply fill forward behavior
144  if (request.FillForwardResolution.HasValue)
145  {
146  // FillForwardEnumerator expects these values in local times
147  var start = request.StartTimeUtc.ConvertFromUtc(request.ExchangeHours.TimeZone);
148  var end = request.EndTimeUtc.ConvertFromUtc(request.ExchangeHours.TimeZone);
149 
150  // copy forward Bid/Ask bars for QuoteBars
151  if (request.DataType == typeof(QuoteBar))
152  {
153  reader = new QuoteBarFillForwardEnumerator(reader);
154  }
155 
156  var readOnlyRef = Ref.CreateReadOnly(() => request.FillForwardResolution.Value.ToTimeSpan());
157  reader = new FillForwardEnumerator(reader, security.Exchange, readOnlyRef, request.IncludeExtendedMarketHours, end, config.Increment, config.DataTimeZone, useDailyStrictEndTimes, request.DataType);
158  }
159 
160  var subscriptionRequest = new SubscriptionRequest(false, null, security, config, request.StartTimeUtc, request.EndTimeUtc);
161 
162  return SubscriptionUtils.Create(subscriptionRequest, reader, AlgorithmSettings.DailyPreciseEndTime);
163  }
164  }
165 }