Lean  $LEAN_TAG$
WeightedWorkQueue.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15 */
16 
17 using System;
18 using System.Threading;
19 using System.Collections.Generic;
20 using System.Collections.Concurrent;
21 using System.Runtime.CompilerServices;
22 
24 {
25  internal class WeightedWorkQueue
26  {
27  private int _pointer;
28  private bool _removed;
29  private Action _singleCallWork;
30  private readonly List<WorkItem> _workQueue;
31 
32  /// <summary>
33  /// Event used to notify there is work ready to execute in this queue
34  /// </summary>
35  private AutoResetEvent _workAvailableEvent;
36 
37  /// <summary>
38  /// Returns the thread priority to use for this work queue
39  /// </summary>
40  public ThreadPriority ThreadPriority => ThreadPriority.Lowest;
41 
42  /// <summary>
43  /// Creates a new instance
44  /// </summary>
45  public WeightedWorkQueue()
46  {
47  _workQueue = new List<WorkItem>();
48  _workAvailableEvent = new AutoResetEvent(false);
49  }
50 
51  /// <summary>
52  /// This is the worker thread loop.
53  /// It will first try to take a work item from the new work queue else will check his own queue.
54  /// </summary>
55  public void WorkerThread(ConcurrentQueue<WorkItem> newWork, AutoResetEvent newWorkEvent)
56  {
57  var waitHandles = new WaitHandle[] { _workAvailableEvent, newWorkEvent };
58  var waitedPreviousLoop = 0;
59  while (true)
60  {
61  WorkItem workItem;
62  if (!newWork.TryDequeue(out workItem))
63  {
64  workItem = Get();
65  if (workItem == null)
66  {
67  if(_singleCallWork != null)
68  {
69  try
70  {
71  _singleCallWork();
72  }
73  catch (Exception exception)
74  {
75  // this shouldn't happen but just in case
76  Logging.Log.Error(exception);
77  }
78  // we execute this once only and clear it's reference
79  _singleCallWork = null;
80  }
81 
82  // no work to do, lets sleep and try again
83  WaitHandle.WaitAny(waitHandles, Math.Min(1 + (waitedPreviousLoop * 10), 250));
84  waitedPreviousLoop++;
85  continue;
86  }
87  }
88  else
89  {
90  Add(workItem);
91  }
92 
93  try
94  {
95  waitedPreviousLoop = 0;
96  if (!workItem.Work(WeightedWorkScheduler.WorkBatchSize))
97  {
98  Remove(workItem);
99  }
100  }
101  catch (Exception exception)
102  {
103  Remove(workItem);
104  Logging.Log.Error(exception);
105  }
106  }
107  }
108 
109  /// <summary>
110  /// Adds a new item to this work queue
111  /// </summary>
112  /// <param name="work">The work to add</param>
113  private void Add(WorkItem work)
114  {
115  _workQueue.Add(work);
116  }
117 
118  /// <summary>
119  /// Adds a new item to this work queue
120  /// </summary>
121  /// <param name="work">The work to add</param>
122  public void AddSingleCall(Action work)
123  {
124  _singleCallWork = work;
125  _workAvailableEvent.Set();
126  }
127 
128  /// <summary>
129  /// Removes an item from the work queue
130  /// </summary>
131  /// <param name="workItem">The work item to remove</param>
132  private void Remove(WorkItem workItem)
133  {
134  _workQueue.Remove(workItem);
135  _removed = true;
136  }
137 
138  /// <summary>
139  /// Gets the next work item to process
140  /// </summary>
141  /// <returns>The work item to process, null if none available</returns>
142  [MethodImpl(MethodImplOptions.AggressiveInlining)]
143  protected WorkItem Get()
144  {
145  var count = _workQueue.Count;
146  if (count == 0)
147  {
148  return null;
149  }
150  var countFactor = (10 + 10 / count) / 10;
151 
152  if (_removed)
153  {
154  // if we removed an item don't really trust the pointer any more
155  _removed = false;
156  _pointer = Math.Min(_pointer, count - 1);
157  }
158 
159  var initial = _pointer;
160  do
161  {
162  var item = _workQueue[_pointer++];
163  if (_pointer >= count)
164  {
165  _pointer = 0;
166 
167  // this will only really make a difference if there are many work items
168  if (25 > count)
169  {
170  // if we looped around let's sort the queue leave the jobs with less points at the start
171  _workQueue.Sort(WorkItem.Compare);
172  }
173  }
174 
175  if (item.UpdateWeight() < WeightedWorkScheduler.MaxWorkWeight * countFactor)
176  {
177  return item;
178  }
179  } while (initial != _pointer);
180 
181  // no work item is ready, pointer still will keep it's same value
182  return null;
183  }
184  }
185 }