18 using System.Threading;
23 using System.Collections.Generic;
24 using System.Collections.Concurrent;
33 private Thread _thread;
34 private uint _sleepInterval = 1;
35 private Func<Exception, bool> _isFatalError;
36 private readonly CancellationTokenSource _cancellationTokenSource;
38 private readonly
string _name;
39 private ManualResetEventSlim _manualResetEventSlim;
40 private ConcurrentDictionary<Symbol, EnumeratorHandler> _enumerators;
47 get => _sleepInterval;
52 throw new ArgumentException(
"Sleep interval should be bigger than 0");
54 _sleepInterval = value;
73 _isFatalError = x =>
false;
74 _cancellationTokenSource =
new CancellationTokenSource();
75 _manualResetEventSlim =
new ManualResetEventSlim(
false);
76 _enumerators =
new ConcurrentDictionary<Symbol, EnumeratorHandler>();
86 _enumerators[handler.
Symbol] = handler;
87 _manualResetEventSlim.Set();
100 public void AddEnumerator(
Symbol symbol, IEnumerator<BaseData> enumerator, Func<bool> shouldMoveNext =
null, Action<EnumeratorHandler> enumeratorFinished =
null, Action<BaseData> handleData =
null)
102 var enumeratorHandler =
new EnumeratorHandler(symbol, enumerator, shouldMoveNext, handleData);
103 if (enumeratorFinished !=
null)
105 enumeratorHandler.EnumeratorFinished += (sender, args) => enumeratorFinished(args);
122 _isFatalError = isFatalError ?? (x =>
false);
132 if (_enumerators.TryRemove(symbol, out handler))
135 handler.Enumerator.Dispose();
146 var manualEvent =
new ManualResetEventSlim(
false);
147 _thread =
new Thread(() =>
150 Log.
Trace($
"BaseDataExchange({Name}) Starting...");
151 ConsumeEnumerators();
152 }) { IsBackground =
true,
Name =
Name };
156 manualEvent.DisposeSafely();
164 _thread.StopSafely(TimeSpan.FromSeconds(5), _cancellationTokenSource);
169 private void ConsumeEnumerators()
171 while (!_cancellationTokenSource.Token.IsCancellationRequested)
176 _manualResetEventSlim.Reset();
178 foreach (var kvp
in _enumerators)
180 if (_cancellationTokenSource.Token.IsCancellationRequested)
182 Log.
Trace($
"BaseDataExchange({Name}).ConsumeQueue(): Exiting...");
185 var enumeratorHandler = kvp.Value;
186 var enumerator = enumeratorHandler.Enumerator;
189 if (!enumeratorHandler.ShouldMoveNext())
continue;
191 if (!enumerator.MoveNext())
193 enumeratorHandler.OnEnumeratorFinished();
194 enumeratorHandler.Enumerator.Dispose();
195 _enumerators.TryRemove(enumeratorHandler.Symbol, out enumeratorHandler);
199 if (enumerator.Current ==
null)
continue;
202 enumeratorHandler.HandleData(enumerator.Current);
212 catch (OperationCanceledException)
216 catch (Exception err)
219 if (_isFatalError(err))
221 Log.
Trace($
"BaseDataExchange({Name}).ConsumeQueue(): Fatal error encountered. Exiting...");
227 Log.
Trace($
"BaseDataExchange({Name}).ConsumeQueue(): Exiting...");
235 private readonly Func<bool> _shouldMoveNext;
236 private readonly Action<BaseData> _handleData;
261 public EnumeratorHandler(
Symbol symbol, IEnumerator<BaseData> enumerator, Func<bool> shouldMoveNext =
null, Action<BaseData> handleData =
null)
266 _handleData = handleData;
267 _shouldMoveNext = shouldMoveNext ?? (() =>
true);
283 return _shouldMoveNext();
292 _handleData?.Invoke(data);