23 using System.Collections.Concurrent;
24 using System.Collections.Generic;
35 private readonly ConcurrentDictionary<SecurityIdentifier, List<KeyValuePair<SubscriptionDataConfig, ScannableEnumerator<BaseData>>>> _enumerators
36 =
new ConcurrentDictionary<SecurityIdentifier, List<KeyValuePair<SubscriptionDataConfig, ScannableEnumerator<BaseData>>>>();
37 private bool _dailyStrictEndTimeEnabled;
51 Log.
Trace($
"AggregationManager.Initialize(): daily strict end times: {_dailyStrictEndTimeEnabled}");
63 var isPeriodBased = (dataConfig.
Type.Name == nameof(
QuoteBar) ||
69 _enumerators.AddOrUpdate(
72 (k, v) => {
return v.Concat(
new[] {
new KeyValuePair<SubscriptionDataConfig, ScannableEnumerator<BaseData>>(dataConfig, enumerator) }).ToList(); });
83 List<KeyValuePair<SubscriptionDataConfig, ScannableEnumerator<BaseData>>> enumerators;
84 if (_enumerators.TryGetValue(dataConfig.
Symbol.
ID, out enumerators))
86 if (enumerators.Count == 1)
88 List<KeyValuePair<SubscriptionDataConfig, ScannableEnumerator<BaseData>>> output;
89 return _enumerators.TryRemove(dataConfig.
Symbol.
ID, out output);
93 _enumerators[dataConfig.
Symbol.
ID] = enumerators.Where(pair => pair.Key != dataConfig).ToList();
99 Log.
Debug($
"AggregationManager.Update(): IDataConsolidator for symbol ({dataConfig.Symbol.Value}) was not found.");
112 List<KeyValuePair<SubscriptionDataConfig, ScannableEnumerator<BaseData>>> enumerators;
113 if (_enumerators.TryGetValue(input.
Symbol.
ID, out enumerators))
115 for (var i = 0; i < enumerators.Count; i++)
117 var kvp = enumerators[i];
122 var tick = input as
Tick;
123 if (tick !=
null && tick.Suspicious)
129 kvp.
Value.Update(input);
133 catch (Exception exception)