18 using System.Collections.Generic;
34 private readonly PriorityQueue<ConsolidatorWrapper, ConsolidatorScanPriority> _consolidatorsSortedByScanTime;
35 private readonly Dictionary<IDataConsolidator, ConsolidatorWrapper> _consolidators;
36 private List<Tuple<ConsolidatorWrapper, ConsolidatorScanPriority>> _consolidatorsToAdd;
37 private readonly
object _threadSafeCollectionLock;
67 _consolidators =
new();
68 _timeKeeper = timeKeeper;
69 _consolidatorsSortedByScanTime =
new(1000);
70 _threadSafeCollectionLock =
new object();
92 DateTimeZone timeZone,
93 DateTimeZone exchangeTimeZone,
94 bool isCustomData =
false,
95 bool fillForward =
true,
96 bool extendedMarketHours =
false
103 dataType = typeof(
Tick);
107 return Add(dataType, tickType, symbol, resolution, timeZone, exchangeTimeZone, isCustomData, fillForward,
108 extendedMarketHours);
143 DateTimeZone dataTimeZone,
144 DateTimeZone exchangeTimeZone,
146 bool fillForward =
true,
147 bool extendedMarketHours =
false,
148 bool isInternalFeed =
false,
149 bool isFilteredSubscription =
true,
154 extendedMarketHours, isFilteredSubscription, isInternalFeed, isCustomData,
155 new List<Tuple<Type, TickType>> {
new Tuple<Type, TickType>(dataType, tickType) },
156 dataNormalizationMode).First();
168 var subscriptions =
Subscriptions.Where(x => x.Symbol == symbol).ToList();
170 if (subscriptions.Count == 0)
173 throw new ArgumentException(
"Please subscribe to this symbol before adding a consolidator for it. Symbol: " +
177 foreach (var subscription
in subscriptions)
182 subscription.Consolidators.Add(consolidator);
184 var wrapper = _consolidators[consolidator] =
185 new ConsolidatorWrapper(consolidator, subscription.Increment, _timeKeeper, _timeKeeper.GetLocalTimeKeeper(subscription.ExchangeTimeZone));
187 lock (_threadSafeCollectionLock)
189 _consolidatorsToAdd ??=
new();
190 _consolidatorsToAdd.Add(
new(wrapper, wrapper.Priority));
196 string tickTypeException =
null;
197 if (tickType !=
null && !subscriptions.Where(x => x.TickType == tickType).Any())
199 tickTypeException = $
"No subscription with the requested Tick Type {tickType} was found. Available Tick Types: {string.Join(",
", subscriptions.Select(x => x.TickType))}";
202 throw new ArgumentException(tickTypeException ?? (
"Type mismatch found between consolidator and symbol. " +
203 $
"Symbol: {symbol.Value} does not support input type: {consolidator.InputType.Name}. " +
204 $
"Supported types: {string.Join(",
", subscriptions.Select(x => x.Type.Name))}."));
236 subscription.Consolidators.Remove(consolidator);
238 if (_consolidators.Remove(consolidator, out var consolidatorsToScan))
240 consolidatorsToScan.Dispose();
245 consolidator.DisposeSafely();
270 if (_consolidatorsToAdd !=
null)
272 lock (_threadSafeCollectionLock)
274 _consolidatorsToAdd.DoForEach(x => _consolidatorsSortedByScanTime.Enqueue(x.Item1, x.Item2));
275 _consolidatorsToAdd =
null;
279 while (_consolidatorsSortedByScanTime.TryPeek(out _, out var priority) && priority.UtcScanTime < newUtcTime)
281 var consolidatorToScan = _consolidatorsSortedByScanTime.Dequeue();
282 if (consolidatorToScan.Disposed)
288 if (priority.UtcScanTime != algorithm.
UtcTime)
294 if (consolidatorToScan.UtcScanTime <= priority.UtcScanTime)
297 consolidatorToScan.Scan();
300 _consolidatorsSortedByScanTime.Enqueue(consolidatorToScan, consolidatorToScan.Priority);
309 return new Dictionary<SecurityType, List<TickType>>
355 _subscriptionManager = subscriptionManager;
367 if (subscription.
Type == typeof(
Tick) &&
370 if (desiredTickType ==
null)
376 return subscription.
TickType == tickType;
378 else if (subscription.
TickType != desiredTickType)
384 return consolidator.
InputType.IsAssignableFrom(subscription.
Type);
394 internal static bool IsDefaultDataType(
BaseData data)