18 using System.Collections.Generic;
19 using System.Threading;
41 private readonly ManualResetEventSlim _newLiveDataEmitted =
new ManualResetEventSlim(
false);
55 base.Initialize(algorithm, dataFeedSubscriptionManager);
78 _realTimeScheduleEventService.
NewEvent += (sender, args) => _newLiveDataEmitted.Set();
84 public override IEnumerable<TimeSlice>
StreamData(CancellationToken cancellationToken)
88 var shouldSendExtraEmptyPacket =
false;
89 var nextEmit = DateTime.MinValue;
90 var lastLoopStart = DateTime.UtcNow;
96 var previousWasTimePulse =
false;
97 while (!cancellationToken.IsCancellationRequested)
99 var now = DateTime.UtcNow;
100 if (!previousWasTimePulse)
102 if (!_newLiveDataEmitted.IsSet
108 if (lastLoopStart.Second == now.Second)
111 _newLiveDataEmitted.Wait();
114 _newLiveDataEmitted.Reset();
122 if (!enumerator.MoveNext())
128 timeSlice = enumerator.Current;
130 catch (Exception err)
133 Algorithm.SetRuntimeError(err,
"LiveSynchronizer");
134 shouldSendExtraEmptyPacket =
true;
139 if (timeSlice ==
null || cancellationToken.IsCancellationRequested)
break;
145 || timeSlice.
Data.Count != 0
146 || frontierUtc >= nextEmit)
149 yield
return timeSlice;
152 if (!timeSlice.IsTimePulse)
161 if (shouldSendExtraEmptyPacket)
167 if (!cancellationToken.IsCancellationRequested)
171 new List<DataFeedPacket>(),
173 new Dictionary<Universe, BaseDataCollection>());
174 yield
return timeSlice;
178 enumerator.DisposeSafely();
179 Log.
Trace(
"LiveSynchronizer.GetEnumerator(): Exited thread.");
187 _newLiveDataEmitted.Set();
188 _newLiveDataEmitted?.DisposeSafely();
189 _realTimeScheduleEventService?.DisposeSafely();
207 base.PostInitialize();
208 _frontierTimeProvider.
Initialize(base.GetTimeProvider());
227 _newLiveDataEmitted.Set();