Lean  $LEAN_TAG$
LeakyBucket.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Threading;
18 
20 {
21  /// <summary>
22  /// Provides an implementation of <see cref="ITokenBucket"/> that implements the leaky bucket algorithm
23  /// See: https://en.wikipedia.org/wiki/Leaky_bucket
24  /// </summary>
25  public class LeakyBucket : ITokenBucket
26  {
27  private readonly object _sync = new object();
28 
29  private long _available;
30  private readonly ISleepStrategy _sleep;
31  private readonly IRefillStrategy _refill;
32  private readonly ITimeProvider _timeProvider;
33 
34  /// <summary>
35  /// Gets the maximum capacity of tokens this bucket can hold.
36  /// </summary>
37  public long Capacity { get; }
38 
39  /// <summary>
40  /// Gets the total number of currently available tokens for consumption
41  /// </summary>
42  public long AvailableTokens
43  {
44  // synchronized read w/ the modification of available tokens in TryConsume
45  get { lock (_sync) return _available; }
46  }
47 
48  /// <summary>
49  /// Initializes a new instance of the <see cref="LeakyBucket"/> class.
50  /// This constructor initializes the bucket using the <see cref="ThreadSleepStrategy.Sleep"/> with a 1 millisecond
51  /// sleep to prevent being CPU intensive and uses the <see cref="FixedIntervalRefillStrategy"/> to refill bucket
52  /// tokens according to the <paramref name="refillAmount"/> and <paramref name="refillInterval"/> parameters.
53  /// </summary>
54  /// <param name="capacity">The maximum number of tokens this bucket can hold</param>
55  /// <param name="refillAmount">The number of tokens to add to the bucket each <paramref name="refillInterval"/></param>
56  /// <param name="refillInterval">The interval which after passing more tokens are added to the bucket</param>
57  public LeakyBucket(long capacity, long refillAmount, TimeSpan refillInterval)
58  : this(capacity, ThreadSleepStrategy.Sleeping(1),
59  new FixedIntervalRefillStrategy(RealTimeProvider.Instance, refillAmount, refillInterval)
60  )
61  {
62  }
63 
64  /// <summary>
65  /// Initializes a new instance of the <see cref="LeakyBucket"/> class
66  /// </summary>
67  /// <param name="capacity">The maximum number of tokens this bucket can hold</param>
68  /// <param name="sleep">Defines the <see cref="ISleepStrategy"/> used when <see cref="Consume"/> is invoked
69  /// but the bucket does not have enough tokens yet</param>
70  /// <param name="refill">Defines the <see cref="IRefillStrategy"/> that computes how many tokens to add
71  /// back to the bucket each time consumption is attempted</param>
72  /// <param name="timeProvider">Defines the <see cref="ITimeProvider"/> used to enforce timeouts when
73  /// invoking <see cref="Consume"/></param>
74  public LeakyBucket(long capacity, ISleepStrategy sleep, IRefillStrategy refill, ITimeProvider timeProvider = null)
75  {
76  _sleep = sleep;
77  _refill = refill;
78  Capacity = capacity;
79  _available = capacity;
80  _timeProvider = timeProvider ?? RealTimeProvider.Instance;
81  }
82 
83  /// <summary>
84  /// Blocks until the specified number of tokens are available for consumption
85  /// and then consumes that number of tokens.
86  /// </summary>
87  /// <param name="tokens">The number of tokens to consume</param>
88  /// <param name="timeout">The maximum amount of time, in milliseconds, to block. An exception is
89  /// throw in the event it takes longer than the stated timeout to consume the requested number
90  /// of tokens</param>
91  public void Consume(long tokens, long timeout = Timeout.Infinite)
92  {
93  if (timeout < Timeout.Infinite)
94  {
95  throw new ArgumentOutOfRangeException(nameof(timeout),
96  "Invalid timeout. Use -1 for no timeout, 0 for immediate timeout and a positive number " +
97  "of milliseconds to indicate a timeout. All other values are out of range."
98  );
99  }
100 
101  var startTime = _timeProvider.GetUtcNow();
102 
103  while (true)
104  {
105  if (TryConsume(tokens))
106  {
107  break;
108  }
109 
110  if (timeout != Timeout.Infinite)
111  {
112  // determine if the requested timeout has elapsed
113  var currentTime = _timeProvider.GetUtcNow();
114  var elapsedMilliseconds = (currentTime - startTime).TotalMilliseconds;
115  if (elapsedMilliseconds > timeout)
116  {
117  throw new TimeoutException("The operation timed out while waiting for the rate limit to be lifted.");
118  }
119  }
120 
121  _sleep.Sleep();
122  }
123  }
124 
125  /// <summary>
126  /// Attempts to consume the specified number of tokens from the bucket. If the
127  /// requested number of tokens are not immediately available, then this method
128  /// will return false to indicate that zero tokens have been consumed.
129  /// </summary>
130  public bool TryConsume(long tokens)
131  {
132  if (tokens <= 0)
133  {
134  throw new ArgumentOutOfRangeException(nameof(tokens),
135  "Number of tokens to consume must be positive"
136  );
137  }
138 
139  if (tokens > Capacity)
140  {
141  throw new ArgumentOutOfRangeException(nameof(tokens),
142  "Number of tokens to consume must be less than or equal to the capacity"
143  );
144  }
145 
146  lock (_sync)
147  {
148  // determine how many units have become available since last invocation
149  var refilled = Math.Max(0, _refill.Refill());
150 
151  // the number of tokens to add, the max of which is the difference between capacity and currently available
152  var deltaTokens = Math.Min(Capacity - _available, refilled);
153 
154  // update the available number of units with the new tokens
155  _available += deltaTokens;
156 
157  if (tokens > _available)
158  {
159  // we don't have enough tokens yet
160  Logging.Log.Trace($"LeakyBucket.TryConsume({tokens}): Failed to consumed tokens. Available: {_available}");
161  return false;
162  }
163 
164  // subtract the number of tokens consumed
165  _available = _available - tokens;
166  Logging.Log.Trace($"LeakyBucket.TryConsume({tokens}): Successfully consumed tokens. Available: {_available}");
167  return true;
168  }
169  }
170 
171  }
172 }