18 using System.Collections.Generic;
19 using System.Threading;
32 private DateTimeZone _dateTimeZone;
80 public virtual IEnumerable<TimeSlice>
StreamData(CancellationToken cancellationToken)
88 var previousEmitTime = DateTime.MaxValue;
93 var previousWasTimePulse =
false;
96 while (!cancellationToken.IsCancellationRequested)
101 if (!enumerator.MoveNext())
106 timeSlice = enumerator.Current;
108 catch (Exception err)
111 Algorithm.SetRuntimeError(err,
"Synchronizer");
116 if (timeSlice ==
null || cancellationToken.IsCancellationRequested)
break;
126 if (timeSlice.
Time != previousEmitTime || previousWasTimePulse || timeSlice.
UniverseData.Count != 0)
128 previousEmitTime = timeSlice.
Time;
132 yield
return timeSlice;
139 if (!timeSlice.
Slice.HasData || retried)
148 enumerator.DisposeSafely();
149 Log.
Trace(
"Synchronizer.GetEnumerator(): Exited thread.");
162 Log.
Debug(
"Synchronizer.SubscriptionFinished(): Finished subscription:" +
163 $
"{subscription.Configuration} at {FrontierTimeProvider.GetUtcNow()} UTC");
183 private DateTime GetInitialFrontierTime()
185 var frontier = DateTime.MaxValue;
188 var current = subscription.Current;
201 if (current.EmitTimeUtc < frontier)
203 frontier = current.EmitTimeUtc;
207 if (frontier == DateTime.MaxValue)