18 using System.Collections.Generic;
20 using System.Threading;
49 _universeSelection = universeSelection;
58 if (_timeProvider !=
null)
60 throw new Exception(
"SubscriptionSynchronizer.SetTimeProvider(): can only be called once");
62 _timeProvider = timeProvider;
72 if (_timeSliceFactory !=
null)
74 throw new Exception(
"SubscriptionSynchronizer.SetTimeSliceFactory(): can only be called once");
76 _timeSliceFactory = timeSliceFactory;
85 public IEnumerable<TimeSlice>
Sync(IEnumerable<Subscription> subscriptions,
86 CancellationToken cancellationToken)
88 var delayedSubscriptionFinished =
new Queue<Subscription>();
90 while (!cancellationToken.IsCancellationRequested)
93 var data =
new List<DataFeedPacket>(1);
95 Dictionary<Universe, BaseDataCollection> universeData =
null;
96 var universeDataForTimeSliceCreate =
new Dictionary<Universe, BaseDataCollection>();
98 var frontierUtc = _timeProvider.
GetUtcNow();
105 foreach (var subscription
in subscriptions)
107 if (subscription.EndOfStream)
114 if (subscription.Current ==
null)
116 if (!subscription.MoveNext())
125 while (subscription.Current !=
null && subscription.Current.EmitTimeUtc <= frontierUtc)
131 subscription.Security,
132 subscription.Configuration,
133 subscription.RemovedFromUniverse
145 if (subscription.Current.Data.DataType ==
MarketDataType.Auxiliary && subscription.Current.Data is
Delisting delisting)
147 if(subscription.IsUniverseSelectionSubscription)
149 subscription.Universes.Single().Dispose();
153 changes += _universeSelection.HandleDelisting(subscription.Current.Data, subscription.Configuration.IsInternalFeed);
157 packet.
Add(subscription.Current.Data);
159 if (!subscription.MoveNext())
161 delayedSubscriptionFinished.Enqueue(subscription);
166 if (packet?.Count > 0)
169 if (!subscription.IsUniverseSelectionSubscription)
178 var packetData = packetBaseDataCollection ==
null
180 : packetBaseDataCollection.Data;
183 if (universeData !=
null
184 && universeData.TryGetValue(subscription.Universes.Single(), out collection))
190 collection =
new BaseDataCollection(frontierUtc, frontierUtc, subscription.Configuration.Symbol, packetData, packetBaseDataCollection?.Underlying, packetBaseDataCollection?.FilteredContracts);
191 if (universeData ==
null)
193 universeData =
new Dictionary<Universe, BaseDataCollection>();
195 universeData[subscription.Universes.Single()] = collection;
200 if (subscription.IsUniverseSelectionSubscription
201 && subscription.Universes.Single().DisposeRequested)
203 var universe = subscription.Universes.Single();
205 if (universeData ==
null || !universeData.ContainsKey(universe))
207 if (universeData ==
null)
209 universeData =
new Dictionary<Universe, BaseDataCollection>();
212 universeData[universe] =
new BaseDataCollection(frontierUtc, subscription.Configuration.Symbol);
220 if (universeData !=
null && universeData.Count > 0)
228 foreach (var kvp
in universeData.OrderBy(x => x.Key.Configuration.Resolution).ThenBy(x => x.Key.Symbol.ID))
230 var universe = kvp.Key;
231 var baseDataCollection = kvp.Value;
232 universeDataForTimeSliceCreate[universe] = baseDataCollection;
233 newChanges += _universeSelection.ApplyUniverseSelection(universe, frontierUtc, baseDataCollection);
235 universeData.Clear();
238 changes += newChanges;
241 || _universeSelection.AddPendingInternalDataFeeds(frontierUtc));
243 var timeSlice = _timeSliceFactory.
Create(frontierUtc, data, changes, universeDataForTimeSliceCreate);
245 while (delayedSubscriptionFinished.Count > 0)
250 var subscription = delayedSubscriptionFinished.Dequeue();
254 yield
return timeSlice;
271 return _frontierTimeProvider.
GetUtcNow();