18 using System.Collections.Generic;
43 IEnumerator<BaseData> enumerator,
44 bool dailyStrictEndTimeEnabled)
46 if (enumerator ==
null)
48 return GetEndedSubscription(request);
50 var exchangeHours = request.
Security.Exchange.Hours;
55 timeZoneOffsetProvider,
58 dailyStrictEndTimeEnabled
60 return new Subscription(request, dataEnumerator, timeZoneOffsetProvider);
74 IEnumerator<BaseData> enumerator,
76 bool enablePriceScale,
77 bool dailyStrictEndTimeEnabled)
79 if(enumerator ==
null)
81 return GetEndedSubscription(request);
83 var exchangeHours = request.
Security.Exchange.Hours;
86 var subscription =
new Subscription(request, enqueueable, timeZoneOffsetProvider);
87 var config = subscription.Configuration;
88 enablePriceScale = enablePriceScale && config.PricesShouldBeScaled();
89 var lastTradableDate = DateTime.MinValue;
91 Func<int, bool> produce = (workBatchSize) =>
96 while (enumerator.MoveNext())
99 if (enqueueable.HasFinished)
101 enumerator.DisposeSafely();
105 var data = enumerator.Current;
118 var requestMode = config.DataNormalizationMode;
124 var priceScaleFrontierDate = data.GetUpdatePriceScaleFrontier().Date;
128 if (enablePriceScale && priceScaleFrontierDate > lastTradableDate && data.DataType !=
MarketDataType.Auxiliary && (!data.IsFillForward || lastTradableDate == DateTime.MinValue))
131 lastTradableDate = priceScaleFrontierDate;
132 request.
Configuration.
PriceScaleFactor = factorFile.GetPriceScale(lastTradableDate, requestMode, config.ContractDepthOffset, config.DataMappingMode);
138 subscription.OffsetProvider,
144 enqueueable.Enqueue(subscriptionData);
148 if (count > workBatchSize)
154 catch (Exception exception)
156 Log.
Error(exception, $
"Subscription worker task exception {request.Configuration}.");
162 enumerator.DisposeSafely();
170 if (enqueueable.HasFinished)
174 return enqueueable.Count;