Lean  $LEAN_TAG$
All Classes Namespaces Functions Variables Enumerations Enumerator Properties Events Pages
FakeDataQueue.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using QuantConnect.Data;
19 using QuantConnect.Util;
20 using QuantConnect.Logging;
21 using QuantConnect.Packets;
25 using System.Collections.Generic;
26 using Timer = System.Timers.Timer;
28 
30 {
31  /// <summary>
32  /// This is an implementation of <see cref="IDataQueueHandler"/> used for testing. <see cref="FakeHistoryProvider"/>
33  /// </summary>
35  {
36  private int _count;
37  private readonly Random _random = new Random();
38  private int _dataPointsPerSecondPerSymbol;
39 
40  private readonly Timer _timer;
41  private readonly IDataCacheProvider _dataCacheProvider;
42  private readonly IOptionChainProvider _optionChainProvider;
43  private readonly EventBasedDataQueueHandlerSubscriptionManager _subscriptionManager;
44  private readonly IDataAggregator _aggregator;
45  private readonly MarketHoursDatabase _marketHoursDatabase;
46  private readonly Dictionary<Symbol, TimeZoneOffsetProvider> _symbolExchangeTimeZones;
47 
48  /// <summary>
49  /// Continuous UTC time provider
50  /// </summary>
51  protected virtual ITimeProvider TimeProvider { get; } = RealTimeProvider.Instance;
52 
53 
54  /// <summary>
55  /// Initializes a new instance of the <see cref="FakeDataQueue"/> class to randomly emit data for each symbol
56  /// </summary>
57  public FakeDataQueue()
58  : this(Composer.Instance.GetExportedValueByTypeName<IDataAggregator>(nameof(AggregationManager)))
59  {
60 
61  }
62 
63  /// <summary>
64  /// Initializes a new instance of the <see cref="FakeDataQueue"/> class to randomly emit data for each symbol
65  /// </summary>
66  public FakeDataQueue(IDataAggregator dataAggregator, int dataPointsPerSecondPerSymbol = 500000)
67  {
68  _aggregator = dataAggregator;
69  _dataPointsPerSecondPerSymbol = dataPointsPerSecondPerSymbol;
70  _dataCacheProvider = new ZipDataCacheProvider(new DefaultDataProvider(), true);
71  var mapFileProvider = Composer.Instance.GetPart<IMapFileProvider>();
72  _optionChainProvider = new LiveOptionChainProvider(_dataCacheProvider, mapFileProvider);
73  _marketHoursDatabase = MarketHoursDatabase.FromDataFolder();
74  _symbolExchangeTimeZones = new Dictionary<Symbol, TimeZoneOffsetProvider>();
75  _subscriptionManager = new EventBasedDataQueueHandlerSubscriptionManager();
76  _subscriptionManager.SubscribeImpl += (s, t) => true;
77  _subscriptionManager.UnsubscribeImpl += (s, t) => true;
78 
79  _timer = new Timer
80  {
81  AutoReset = false,
82  Enabled = true,
83  Interval = 1000,
84  };
85 
86  var lastCount = 0;
87  var lastTime = DateTime.UtcNow;
88  _timer.Elapsed += (sender, args) =>
89  {
90  var elapsed = (DateTime.UtcNow - lastTime);
91  var ticksPerSecond = (_count - lastCount)/elapsed.TotalSeconds;
92  Log.Trace("TICKS PER SECOND:: " + ticksPerSecond.ToStringInvariant("000000.0") + " ITEMS IN QUEUE:: " + 0);
93  lastCount = _count;
94  lastTime = DateTime.UtcNow;
95  PopulateQueue();
96  try
97  {
98  _timer.Reset();
99  }
100  catch (ObjectDisposedException)
101  {
102  // pass
103  }
104  };
105  }
106 
107  /// <summary>
108  /// Subscribe to the specified configuration
109  /// </summary>
110  /// <param name="dataConfig">defines the parameters to subscribe to a data feed</param>
111  /// <param name="newDataAvailableHandler">handler to be fired on new data available</param>
112  /// <returns>The new enumerator for this subscription request</returns>
113  public IEnumerator<BaseData> Subscribe(SubscriptionDataConfig dataConfig, EventHandler newDataAvailableHandler)
114  {
115  var enumerator = _aggregator.Add(dataConfig, newDataAvailableHandler);
116  _subscriptionManager.Subscribe(dataConfig);
117 
118  return enumerator;
119  }
120 
121  /// <summary>
122  /// Sets the job we're subscribing for
123  /// </summary>
124  /// <param name="job">Job we're subscribing for</param>
125  public void SetJob(LiveNodePacket job)
126  {
127  }
128 
129  /// <summary>
130  /// Removes the specified configuration
131  /// </summary>
132  /// <param name="dataConfig">Subscription config to be removed</param>
133  public void Unsubscribe(SubscriptionDataConfig dataConfig)
134  {
135  _subscriptionManager.Unsubscribe(dataConfig);
136  _aggregator.Remove(dataConfig);
137  }
138 
139  /// <summary>
140  /// Returns whether the data provider is connected
141  /// </summary>
142  /// <returns>true if the data provider is connected</returns>
143  public bool IsConnected => true;
144 
145  /// <summary>
146  /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
147  /// </summary>
148  public void Dispose()
149  {
150  _timer.Stop();
151  _timer.DisposeSafely();
152  _dataCacheProvider.DisposeSafely();
153  }
154 
155  /// <summary>
156  /// Pumps a bunch of ticks into the queue
157  /// </summary>
158  private void PopulateQueue()
159  {
160  var symbols = _subscriptionManager.GetSubscribedSymbols();
161 
162 
163  foreach (var symbol in symbols)
164  {
165  if (symbol.IsCanonical() || symbol.Contains("UNIVERSE"))
166  {
167  continue;
168  }
169  var offsetProvider = GetTimeZoneOffsetProvider(symbol);
170  var trades = SubscriptionManager.DefaultDataTypes()[symbol.SecurityType].Contains(TickType.Trade);
171  var quotes = SubscriptionManager.DefaultDataTypes()[symbol.SecurityType].Contains(TickType.Quote);
172 
173  // emits 500k per second
174  for (var i = 0; i < _dataPointsPerSecondPerSymbol; i++)
175  {
176  var now = TimeProvider.GetUtcNow();
177  var exchangeTime = offsetProvider.ConvertFromUtc(now);
178  var lastTrade = 100 + (decimal)Math.Abs(Math.Sin(now.TimeOfDay.TotalMilliseconds));
179  if (trades)
180  {
181  _count++;
182  _aggregator.Update(new Tick
183  {
184  Time = exchangeTime,
185  Symbol = symbol,
186  Value = lastTrade,
187  TickType = TickType.Trade,
188  Quantity = _random.Next(10, (int)_timer.Interval)
189  });
190  }
191 
192  if (quotes)
193  {
194  _count++;
195  var bidPrice = lastTrade * 0.95m;
196  var askPrice = lastTrade * 1.05m;
197  var bidSize = _random.Next(10, (int) _timer.Interval);
198  var askSize = _random.Next(10, (int)_timer.Interval);
199  _aggregator.Update(new Tick(exchangeTime, symbol, "", "", bidSize: bidSize, bidPrice: bidPrice, askPrice: askPrice, askSize: askSize));
200  }
201  }
202  }
203  }
204 
205  private TimeZoneOffsetProvider GetTimeZoneOffsetProvider(Symbol symbol)
206  {
207  TimeZoneOffsetProvider offsetProvider;
208  if (!_symbolExchangeTimeZones.TryGetValue(symbol, out offsetProvider))
209  {
210  // read the exchange time zone from market-hours-database
211  var exchangeTimeZone = _marketHoursDatabase.GetExchangeHours(symbol.ID.Market, symbol, symbol.SecurityType).TimeZone;
212  _symbolExchangeTimeZones[symbol] = offsetProvider = new TimeZoneOffsetProvider(exchangeTimeZone, TimeProvider.GetUtcNow(), Time.EndOfTime);
213  }
214  return offsetProvider;
215  }
216 
217  /// <summary>
218  /// Method returns a collection of Symbols that are available at the data source.
219  /// </summary>
220  /// <param name="symbol">Symbol to lookup</param>
221  /// <param name="includeExpired">Include expired contracts</param>
222  /// <param name="securityCurrency">Expected security currency(if any)</param>
223  /// <returns>Enumerable of Symbols, that are associated with the provided Symbol</returns>
224  public IEnumerable<Symbol> LookupSymbols(Symbol symbol, bool includeExpired, string securityCurrency = null)
225  {
226  switch (symbol.SecurityType)
227  {
228  case SecurityType.Option:
229  case SecurityType.IndexOption:
230  case SecurityType.FutureOption:
231  foreach (var result in _optionChainProvider.GetOptionContractList(symbol, DateTime.UtcNow.Date))
232  {
233  yield return result;
234  }
235  break;
236  default:
237  break;
238  }
239  }
240 
241  /// <summary>
242  /// Checks if the FakeDataQueue can perform selection
243  /// </summary>
244  public bool CanPerformSelection()
245  {
246  return true;
247  }
248  }
249 }