18 using System.Collections.Concurrent;
19 using System.Collections.Generic;
20 using System.Threading;
21 using System.Threading.Tasks;
35 private static readonly Lazy<WeightedWorkScheduler> _instance =
new Lazy<WeightedWorkScheduler>(() =>
new WeightedWorkScheduler());
49 private readonly ConcurrentQueue<WorkItem> _newWork;
50 private readonly AutoResetEvent _newWorkEvent;
51 private Task _initializationTask;
52 private readonly List<WeightedWorkQueue> _workerQueues;
61 _newWork =
new ConcurrentQueue<WorkItem>();
62 _newWorkEvent =
new AutoResetEvent(
false);
63 _workerQueues =
new List<WeightedWorkQueue>(
WorkersCount);
65 _initializationTask = Task.Run(() =>
67 MaxWorkWeight = Configuration.Config.GetInt(
"data-feed-max-work-weight", 400);
68 Logging.Log.Trace($
"WeightedWorkScheduler(): will use {WorkersCount} workers and MaxWorkWeight is {MaxWorkWeight}");
72 var workQueue =
new WeightedWorkQueue();
73 _workerQueues.Add(workQueue);
74 var thread =
new Thread(() => workQueue.WorkerThread(_newWork, _newWorkEvent))
77 Priority = workQueue.ThreadPriority,
78 Name = $
"WeightedWorkThread{i}"
92 public override void QueueWork(
Symbol symbol, Func<int, bool> workFunc, Func<int> weightFunc)
94 _newWork.Enqueue(
new WorkItem(workFunc, weightFunc));
103 if (!_initializationTask.Wait(TimeSpan.FromSeconds(10)))
105 throw new TimeoutException(
"Timeout waiting for worker threads to start");
108 for (var i = 0; i < _workerQueues.Count; i++)
110 _workerQueues[i].AddSingleCall(action);