18 using System.Collections.Generic;
33 private readonly TimeSpan _minimumIntervalCheck;
35 private readonly Func<DateTime, DateTime> _dateAdjustment;
46 Func<DateTime, DateTime> dateAdjustment =
null, TimeSpan? minimumIntervalCheck =
null)
48 _timeProvider = timeProvider;
49 _dateAdjustment = dateAdjustment;
50 _minimumIntervalCheck = minimumIntervalCheck ?? TimeSpan.FromMinutes(30);
51 _objectStore = objectStore;
67 var lastSourceRefreshTime = DateTime.MinValue;
68 var sourceFactory = config.GetBaseDataInstance();
74 var utcNow = _timeProvider.GetUtcNow();
75 var minimumTimeBetweenCalls = GetMinimumTimeBetweenCalls(config.Increment, _minimumIntervalCheck);
76 if (utcNow - lastSourceRefreshTime < minimumTimeBetweenCalls)
78 return Enumerable.Empty<
BaseData>().GetEnumerator();
81 lastSourceRefreshTime = utcNow;
82 var localDate = _dateAdjustment?.Invoke(utcNow.ConvertFromUtc(config.ExchangeTimeZone).Date) ?? utcNow.ConvertFromUtc(config.ExchangeTimeZone).Date;
83 var source = sourceFactory.GetSource(config, localDate,
true);
86 var enumerator = EnumerateDataSourceReader(config, dataProvider, frontier, source, localDate, sourceFactory);
88 if (SourceRequiresFastForward(source))
98 enumerator =
new FastForwardEnumerator(enumerator, _timeProvider, config.ExchangeTimeZone, maximumDataAge);
106 if (source.Format ==
FileFormat.UnfoldingCollection)
109 enumerator = enumerator.SelectMany(data =>
112 IEnumerator<BaseData> collectionEnumerator;
113 if (collection !=
null)
118 collectionEnumerator = collection.Data
119 .Where(baseData => baseData.EndTime > frontier.Value)
124 collectionEnumerator = collection.Data.GetEnumerator();
129 collectionEnumerator =
new List<BaseData> { data }.
GetEnumerator();
131 return collectionEnumerator;
145 var newLocalFrontier = localFrontier.
Value;
148 foreach (var datum
in subscriptionEnumerator)
153 if (datum !=
null && datum.EndTime > localFrontier.
Value)
157 else if (!SourceRequiresFastForward(source))
170 newLocalFrontier =
Time.
Max(datum.EndTime, newLocalFrontier);
172 if (!SourceRequiresFastForward(source))
177 localFrontier.
Value = newLocalFrontier;
182 localFrontier.
Value = newLocalFrontier;
206 private static TimeSpan GetMinimumTimeBetweenCalls(TimeSpan increment, TimeSpan minimumInterval)
208 return TimeSpan.FromTicks(Math.Min(increment.Ticks, minimumInterval.Ticks));
211 private static TimeSpan GetMaximumDataAge(TimeSpan increment)
213 return TimeSpan.FromTicks(Math.Max(increment.Ticks, TimeSpan.FromSeconds(5).Ticks));