21 using System.Collections;
22 using System.Collections.Concurrent;
23 using System.Collections.Generic;
34 private bool _consolidated;
35 private bool _isPeriodBase;
36 private bool _validateInputType;
37 private Type _consolidatorInputType;
38 private readonly DateTimeZone _timeZone;
39 private readonly ConcurrentQueue<T> _queue;
41 private readonly EventHandler _newDataAvailableHandler;
59 object IEnumerator.Current =>
Current;
72 _timeProvider = timeProvider;
73 _consolidator = consolidator;
74 _isPeriodBase = isPeriodBased;
75 _queue =
new ConcurrentQueue<T>();
76 _consolidatorInputType = consolidator.
InputType;
77 _validateInputType = _consolidatorInputType != typeof(
BaseData);
78 _newDataAvailableHandler = newDataAvailableHandler ?? ((s, e) => { });
80 _consolidator.DataConsolidated += DataConsolidatedHandler;
90 if (_validateInputType && data.GetType() != _consolidatorInputType)
100 _consolidator.Update(data);
105 _consolidator.Update(data);
113 private void Enqueue(T data)
115 _queue.Enqueue(data);
127 if (!_queue.TryDequeue(out _current) && _isPeriodBase)
129 _consolidated =
false;
133 var localTime = _timeProvider.GetUtcNow().ConvertFromUtc(_timeZone);
134 _consolidator.Scan(localTime);
139 _queue.TryDequeue(out _current);
163 _consolidator.DataConsolidated -= DataConsolidatedHandler;
166 private void DataConsolidatedHandler(
object sender,
IBaseData data)
168 var dataPoint = data as T;
169 _consolidated =
true;