25 using System.Collections.Generic;
39 private bool _initialized;
45 private List<IHistoryProvider> _historyProviders =
new();
58 _brokerage = brokerage;
70 throw new InvalidOperationException(
"BrokerageHistoryProvider can only be initialized once");
74 var dataProvidersList = parameters.
Job?.HistoryProvider.DeserializeList() ??
new List<string>();
75 if (dataProvidersList.IsNullOrEmpty())
77 dataProvidersList.AddRange(
Config.
Get(
"history-provider",
"SubscriptionDataReaderHistoryProvider").DeserializeList());
81 foreach (var historyProviderName
in dataProvidersList)
88 if (dataQueueHandler ==
null)
93 dataQueueHandler.
SetJob((Packets.LiveNodePacket)parameters.Job);
94 Log.
Trace($
"HistoryProviderManager.Initialize(): Created and wrapped '{brokerageName}' as '{typeof(BrokerageHistoryProvider).Name}'");
98 Log.
Trace($
"HistoryProviderManager.Initialize(): Wrapping '{brokerageName}' instance as '{typeof(BrokerageHistoryProvider).Name}'");
103 brokerageHistoryProvider.SetBrokerage((
IBrokerage)dataQueueHandler);
104 historyProvider = brokerageHistoryProvider;
120 _historyProviders.Add(historyProvider);
123 Log.
Trace($
"HistoryProviderManager.Initialize(): history providers [{string.Join(",
", _historyProviders.Select(x => x.GetType().Name))}]");
132 public override IEnumerable<Slice>
GetHistory(IEnumerable<HistoryRequest> requests, DateTimeZone sliceTimeZone)
134 List<IEnumerator<Slice>> historyEnumerators =
new(_historyProviders.Count);
136 var historyRequets =
new List<HistoryRequest>();
137 foreach (var request
in requests)
139 var config = request.ToSubscriptionDataConfig();
140 _dataPermissionManager?.
AssertConfiguration(config, request.StartTimeLocal, request.EndTimeLocal);
141 historyRequets.Add(request);
144 foreach (var historyProvider
in _historyProviders)
148 var history = historyProvider.GetHistory(historyRequets, sliceTimeZone);
154 historyEnumerators.Add(history.GetEnumerator());
162 Slice latestMergeSlice =
null;
163 while (synchronizer.MoveNext())
165 if (synchronizer.Current ==
null)
169 if (latestMergeSlice ==
null)
171 latestMergeSlice = synchronizer.Current;
174 if (synchronizer.Current.UtcTime > latestMergeSlice.
UtcTime)
178 yield
return latestMergeSlice;
179 latestMergeSlice = synchronizer.Current;
184 latestMergeSlice.
MergeSlice(synchronizer.Current);
187 if (latestMergeSlice !=
null)
189 yield
return latestMergeSlice;
193 private int GetDataPointCount()
195 var dataPointCount = 0;
196 foreach (var historyProvider
in _historyProviders)
198 dataPointCount += historyProvider.DataPointCount;
200 return dataPointCount;