Lean  $LEAN_TAG$
TimeTriggeredUniverseSubscriptionEnumeratorFactory.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Collections;
19 using System.Collections.Generic;
20 using System.Collections.Specialized;
21 using System.Linq;
22 using QuantConnect.Data;
27 
29 {
30  /// <summary>
31  /// Provides an implementation of <see cref="ISubscriptionEnumeratorFactory"/> to emit
32  /// ticks based on <see cref="UserDefinedUniverse.GetTriggerTimes"/>, allowing universe
33  /// selection to fire at planned times.
34  /// </summary>
36  {
37  private readonly ITimeProvider _timeProvider;
38  private readonly ITimeTriggeredUniverse _universe;
39  private readonly MarketHoursDatabase _marketHoursDatabase;
40 
41  /// <summary>
42  /// Initializes a new instance of the <see cref="TimeTriggeredUniverseSubscriptionEnumeratorFactory"/> class
43  /// </summary>
44  /// <param name="universe">The user defined universe</param>
45  /// <param name="marketHoursDatabase">The market hours database</param>
46  /// <param name="timeProvider">The time provider</param>
48  {
49  _universe = universe;
50  _timeProvider = timeProvider;
51  _marketHoursDatabase = marketHoursDatabase;
52  }
53 
54  /// <summary>
55  /// Creates an enumerator to read the specified request
56  /// </summary>
57  /// <param name="request">The subscription request to be read</param>
58  /// <param name="dataProvider">Provider used to get data when it is not present on disk</param>
59  /// <returns>An enumerator reading the subscription request</returns>
60  public IEnumerator<BaseData> CreateEnumerator(SubscriptionRequest request, IDataProvider dataProvider)
61  {
62  var enumerator = (IEnumerator<BaseData>) _universe.GetTriggerTimes(request.StartTimeUtc, request.EndTimeUtc, _marketHoursDatabase)
63  .Select(x => new Tick { Time = x, Symbol = request.Configuration.Symbol })
64  .GetEnumerator();
65 
66  var universe = request.Universe as UserDefinedUniverse;
67  if (universe != null)
68  {
69  enumerator = new InjectionEnumerator(enumerator);
70 
71  // Trigger universe selection when security added/removed after Initialize
72  universe.CollectionChanged += (sender, args) =>
73  {
74  // If it is an add we will set time 1 tick ahead to properly sync data
75  // with next timeslice, avoid emitting now twice, if it is a remove then we will set time to now
76  // we do the same in the 'DataManager' when handling FF resolution changes
77  IList items;
78  DateTime time;
79  if (args.Action == NotifyCollectionChangedAction.Add)
80  {
81  items = args.NewItems;
82  time = _timeProvider.GetUtcNow().AddTicks(1);
83  }
84  else if (args.Action == NotifyCollectionChangedAction.Remove)
85  {
86  items = args.OldItems;
87  time = _timeProvider.GetUtcNow();
88  }
89  else
90  {
91  items = null;
92  time = DateTime.MinValue;
93  }
94 
95  // Check that we have our items and time
96  if (items == null || time == DateTime.MinValue) return;
97 
98  var symbol = items.OfType<Symbol>().FirstOrDefault();
99 
100  if(symbol == null) return;
101 
102  // the data point time should always be in exchange timezone
103  time = time.ConvertFromUtc(request.Configuration.ExchangeTimeZone);
104 
105  var collection = new BaseDataCollection(time, symbol);
106  ((InjectionEnumerator) enumerator).InjectDataPoint(collection);
107  };
108  }
109 
110  return enumerator;
111  }
112 
113  private class InjectionEnumerator : IEnumerator<BaseData>
114  {
115  private volatile bool _wasInjected;
116  private readonly IEnumerator<BaseData> _underlyingEnumerator;
117 
118  public BaseData Current { get; private set; }
119 
120  object IEnumerator.Current => Current;
121 
122  public InjectionEnumerator(IEnumerator<BaseData> underlyingEnumerator)
123  {
124  _underlyingEnumerator = underlyingEnumerator;
125  }
126 
127  public void InjectDataPoint(BaseData baseData)
128  {
129  // we use a lock because the main algorithm thread is the one injecting and the base exchange is the thread pulling MoveNext()
130  lock (_underlyingEnumerator)
131  {
132  _wasInjected = true;
133  Current = baseData;
134  }
135  }
136 
137  public void Dispose()
138  {
139  _underlyingEnumerator.Dispose();
140  }
141 
142  public bool MoveNext()
143  {
144  lock (_underlyingEnumerator)
145  {
146  if (_wasInjected)
147  {
148  _wasInjected = false;
149  return true;
150  }
151  _underlyingEnumerator.MoveNext();
152  Current = _underlyingEnumerator.Current;
153  return true;
154  }
155  }
156 
157  public void Reset()
158  {
159  _underlyingEnumerator.Reset();
160  }
161  }
162  }
163 }