Lean  $LEAN_TAG$
WeightedWorkScheduler.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Collections.Concurrent;
19 using System.Collections.Generic;
20 using System.Threading;
21 using System.Threading.Tasks;
22 
24 {
25  /// <summary>
26  /// This singleton class will create a thread pool to processes work
27  /// that will be prioritized based on it's weight
28  /// </summary>
29  /// <remarks>The threads in the pool will take ownership of the
30  /// <see cref="WorkItem"/> and not share it with another thread.
31  /// This is required because the data enumerator stack yields, which state
32  /// depends on the thread id</remarks>
34  {
35  private static readonly Lazy<WeightedWorkScheduler> _instance = new Lazy<WeightedWorkScheduler>(() => new WeightedWorkScheduler());
36 
37  /// <summary>
38  /// This is the size of each work sprint
39  /// </summary>
40  public const int WorkBatchSize = 50;
41 
42  /// <summary>
43  /// This is the maximum size a work item can weigh,
44  /// if reached, it will be ignored and not executed until its less
45  /// </summary>
46  /// <remarks>This is useful to limit RAM and CPU usage</remarks>
47  public static int MaxWorkWeight;
48 
49  private readonly ConcurrentQueue<WorkItem> _newWork;
50  private readonly AutoResetEvent _newWorkEvent;
51  private Task _initializationTask;
52  private readonly List<WeightedWorkQueue> _workerQueues;
53 
54  /// <summary>
55  /// Singleton instance
56  /// </summary>
57  public static WeightedWorkScheduler Instance => _instance.Value;
58 
59  private WeightedWorkScheduler()
60  {
61  _newWork = new ConcurrentQueue<WorkItem>();
62  _newWorkEvent = new AutoResetEvent(false);
63  _workerQueues = new List<WeightedWorkQueue>(WorkersCount);
64 
65  _initializationTask = Task.Run(() =>
66  {
67  MaxWorkWeight = Configuration.Config.GetInt("data-feed-max-work-weight", 400);
68  Logging.Log.Trace($"WeightedWorkScheduler(): will use {WorkersCount} workers and MaxWorkWeight is {MaxWorkWeight}");
69 
70  for (var i = 0; i < WorkersCount; i++)
71  {
72  var workQueue = new WeightedWorkQueue();
73  _workerQueues.Add(workQueue);
74  var thread = new Thread(() => workQueue.WorkerThread(_newWork, _newWorkEvent))
75  {
76  IsBackground = true,
77  Priority = workQueue.ThreadPriority,
78  Name = $"WeightedWorkThread{i}"
79  };
80  thread.Start();
81  }
82  });
83  }
84 
85  /// <summary>
86  /// Add a new work item to the queue
87  /// </summary>
88  /// <param name="symbol">The symbol associated with this work</param>
89  /// <param name="workFunc">The work function to run</param>
90  /// <param name="weightFunc">The weight function.
91  /// Work will be sorted in ascending order based on this weight</param>
92  public override void QueueWork(Symbol symbol, Func<int, bool> workFunc, Func<int> weightFunc)
93  {
94  _newWork.Enqueue(new WorkItem(workFunc, weightFunc));
95  _newWorkEvent.Set();
96  }
97 
98  /// <summary>
99  /// Execute the given action in all workers once
100  /// </summary>
101  public void AddSingleCallForAll(Action action)
102  {
103  if (!_initializationTask.Wait(TimeSpan.FromSeconds(10)))
104  {
105  throw new TimeoutException("Timeout waiting for worker threads to start");
106  }
107 
108  for (var i = 0; i < _workerQueues.Count; i++)
109  {
110  _workerQueues[i].AddSingleCall(action);
111  }
112  }
113  }
114 }