Lean  $LEAN_TAG$
BrokerageTransactionHandler.cs
1 /*
2  * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
3  * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14 */
15 
16 using System;
17 using System.Collections.Concurrent;
18 using System.Collections.Generic;
19 using System.Linq;
20 using System.Runtime.CompilerServices;
21 using System.Threading;
26 using QuantConnect.Logging;
27 using QuantConnect.Orders;
31 using QuantConnect.Util;
32 
34 {
35  /// <summary>
36  /// Transaction handler for all brokerages
37  /// </summary>
39  {
40  private IAlgorithm _algorithm;
41  private IBrokerage _brokerage;
42  private bool _brokerageIsBacktesting;
43  private bool _loggedFeeAdjustmentWarning;
44 
45  // Counter to keep track of total amount of processed orders
46  private int _totalOrderCount;
47 
48  // this bool is used to check if the warning message for the rounding of order quantity has been displayed for the first time
49  private bool _firstRoundOffMessage = false;
50 
51  // this value is used for determining how confident we are in our cash balance update
52  private long _lastFillTimeTicks;
53 
54  private const int MaxCashSyncAttempts = 5;
55  private int _failedCashSyncAttempts;
56 
57  /// <summary>
58  /// OrderQueue holds the newly updated orders from the user algorithm waiting to be processed. Once
59  /// orders are processed they are moved into the Orders queue awaiting the brokerage response.
60  /// </summary>
62 
63  private Thread _processingThread;
64  private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
65 
66  private readonly ConcurrentQueue<OrderEvent> _orderEvents = new ConcurrentQueue<OrderEvent>();
67 
68  /// <summary>
69  /// The _completeOrders dictionary holds all orders.
70  /// Once the transaction thread has worked on them they get put here while witing for fill updates.
71  /// </summary>
72  private readonly ConcurrentDictionary<int, Order> _completeOrders = new ConcurrentDictionary<int, Order>();
73 
74  /// <summary>
75  /// The orders dictionary holds orders which are open. Status: New, Submitted, PartiallyFilled, None, CancelPending
76  /// Once the transaction thread has worked on them they get put here while witing for fill updates.
77  /// </summary>
78  private readonly ConcurrentDictionary<int, Order> _openOrders = new ConcurrentDictionary<int, Order>();
79 
80  /// <summary>
81  /// The _openOrderTickets dictionary holds open order tickets that the algorithm can use to reference a specific order. This
82  /// includes invoking update and cancel commands. In the future, we can add more features to the ticket, such as events
83  /// and async events (such as run this code when this order fills)
84  /// </summary>
85  private readonly ConcurrentDictionary<int, OrderTicket> _openOrderTickets = new ConcurrentDictionary<int, OrderTicket>();
86 
87  /// <summary>
88  /// The _completeOrderTickets dictionary holds all order tickets that the algorithm can use to reference a specific order. This
89  /// includes invoking update and cancel commands. In the future, we can add more features to the ticket, such as events
90  /// and async events (such as run this code when this order fills)
91  /// </summary>
92  private readonly ConcurrentDictionary<int, OrderTicket> _completeOrderTickets = new ConcurrentDictionary<int, OrderTicket>();
93 
94  /// <summary>
95  /// Cache collection of price adjustment modes for each symbol
96  /// </summary>
97  private readonly Dictionary<Symbol, DataNormalizationMode> _priceAdjustmentModes = new Dictionary<Symbol, DataNormalizationMode>();
98 
99  /// <summary>
100  /// The _cancelPendingOrders instance will help to keep track of CancelPending orders and their Status
101  /// </summary>
103 
104  private IResultHandler _resultHandler;
105 
106  private readonly object _lockHandleOrderEvent = new object();
107 
108  /// <summary>
109  /// Event fired when there is a new <see cref="OrderEvent"/>
110  /// </summary>
111  public event EventHandler<OrderEvent> NewOrderEvent;
112 
113  /// <summary>
114  /// Gets the permanent storage for all orders
115  /// </summary>
116  public ConcurrentDictionary<int, Order> Orders
117  {
118  get
119  {
120  return _completeOrders;
121  }
122  }
123 
124  /// <summary>
125  /// Gets all order events
126  /// </summary>
127  public IEnumerable<OrderEvent> OrderEvents => _orderEvents;
128 
129  /// <summary>
130  /// Gets the permanent storage for all order tickets
131  /// </summary>
132  public ConcurrentDictionary<int, OrderTicket> OrderTickets
133  {
134  get
135  {
136  return _completeOrderTickets;
137  }
138  }
139 
140  /// <summary>
141  /// Gets the current number of orders that have been processed
142  /// </summary>
143  public int OrdersCount => _totalOrderCount;
144 
145  /// <summary>
146  /// Creates a new BrokerageTransactionHandler to process orders using the specified brokerage implementation
147  /// </summary>
148  /// <param name="algorithm">The algorithm instance</param>
149  /// <param name="brokerage">The brokerage implementation to process orders and fire fill events</param>
150  /// <param name="resultHandler"></param>
151  public virtual void Initialize(IAlgorithm algorithm, IBrokerage brokerage, IResultHandler resultHandler)
152  {
153  if (brokerage == null)
154  {
155  throw new ArgumentNullException(nameof(brokerage));
156  }
157  // multi threaded queue, used for live deployments
159  // we don't need to do this today because we just initialized/synced
160  _resultHandler = resultHandler;
161 
162  _brokerage = brokerage;
163  _brokerageIsBacktesting = brokerage is BacktestingBrokerage;
164 
165  _brokerage.OrdersStatusChanged += (sender, orderEvents) =>
166  {
167  HandleOrderEvents(orderEvents);
168  };
169 
170  _brokerage.AccountChanged += (sender, account) =>
171  {
172  HandleAccountChanged(account);
173  };
174 
175  _brokerage.OptionPositionAssigned += (sender, fill) =>
176  {
177  HandlePositionAssigned(fill);
178  };
179 
180  _brokerage.OptionNotification += (sender, e) =>
181  {
182  HandleOptionNotification(e);
183  };
184 
185  _brokerage.NewBrokerageOrderNotification += (sender, e) =>
186  {
187  HandleNewBrokerageSideOrder(e);
188  };
189 
190  _brokerage.DelistingNotification += (sender, e) =>
191  {
192  HandleDelistingNotification(e);
193  };
194 
195  _brokerage.OrderIdChanged += (sender, e) =>
196  {
197  HandlerBrokerageOrderIdChangedEvent(e);
198  };
199 
200  _brokerage.OrderUpdated += (sender, e) =>
201  {
202  HandleOrderUpdated(e);
203  };
204 
205  IsActive = true;
206 
207  _algorithm = algorithm;
209  }
210 
211  /// <summary>
212  /// Create and start the transaction thread, who will be in charge of processing
213  /// the order requests
214  /// </summary>
215  protected virtual void InitializeTransactionThread()
216  {
217  _processingThread = new Thread(Run) { IsBackground = true, Name = "Transaction Thread" };
218  _processingThread.Start();
219  }
220 
221  /// <summary>
222  /// Boolean flag indicating the Run thread method is busy.
223  /// False indicates it is completely finished processing and ready to be terminated.
224  /// </summary>
225  public bool IsActive { get; private set; }
226 
227  #region Order Request Processing
228 
229  /// <summary>
230  /// Adds the specified order to be processed
231  /// </summary>
232  /// <param name="request">The order to be processed</param>
234  {
235  if (_algorithm.LiveMode)
236  {
237  Log.Trace("BrokerageTransactionHandler.Process(): " + request);
238 
239  _algorithm.Portfolio.LogMarginInformation(request);
240  }
241 
242  switch (request.OrderRequestType)
243  {
244  case OrderRequestType.Submit:
245  return AddOrder((SubmitOrderRequest)request);
246 
247  case OrderRequestType.Update:
248  return UpdateOrder((UpdateOrderRequest)request);
249 
250  case OrderRequestType.Cancel:
251  return CancelOrder((CancelOrderRequest)request);
252 
253  default:
254  throw new ArgumentOutOfRangeException();
255  }
256  }
257 
258  /// <summary>
259  /// Add an order to collection and return the unique order id or negative if an error.
260  /// </summary>
261  /// <param name="request">A request detailing the order to be submitted</param>
262  /// <returns>New unique, increasing orderid</returns>
264  {
265  var response = !_algorithm.IsWarmingUp
266  ? OrderResponse.Success(request)
267  : OrderResponse.WarmingUp(request);
268 
269  var shortable = true;
270  if (request.Quantity < 0)
271  {
272  shortable = _algorithm.Shortable(request.Symbol, request.Quantity);
273  }
274 
275  if (!shortable)
276  {
277  var message = GetShortableErrorMessage(request.Symbol, request.Quantity);
278  if (_algorithm.LiveMode)
279  {
280  // in live mode we send a warning but we wont block the order being sent to the brokerage
281  _algorithm.Debug($"Warning: {message}");
282  }
283  else
284  {
285  response = OrderResponse.Error(request, OrderResponseErrorCode.ExceedsShortableQuantity, message);
286  }
287  }
288 
289  request.SetResponse(response);
290  var ticket = new OrderTicket(_algorithm.Transactions, request);
291 
292  Interlocked.Increment(ref _totalOrderCount);
293  // send the order to be processed after creating the ticket
294  if (response.IsSuccess)
295  {
296  _openOrderTickets.TryAdd(ticket.OrderId, ticket);
297  _completeOrderTickets.TryAdd(ticket.OrderId, ticket);
298  _orderRequestQueue.Add(request);
299 
300  // wait for the transaction handler to set the order reference into the new order ticket,
301  // so we can ensure the order has already been added to the open orders,
302  // before returning the ticket to the algorithm.
303  WaitForOrderSubmission(ticket);
304  }
305  else
306  {
307  // add it to the orders collection for recall later
308  var order = Order.CreateOrder(request);
309  var orderTag = response.ErrorCode == OrderResponseErrorCode.AlgorithmWarmingUp
310  ? "Algorithm warming up."
311  : response.ErrorMessage;
312 
313  // ensure the order is tagged with a currency
314  var security = _algorithm.Securities[order.Symbol];
315  order.PriceCurrency = security.SymbolProperties.QuoteCurrency;
316 
317  order.Status = OrderStatus.Invalid;
318  order.Tag = orderTag;
319  ticket.SetOrder(order);
320  _completeOrderTickets.TryAdd(ticket.OrderId, ticket);
321  _completeOrders.TryAdd(order.Id, order);
322 
323  HandleOrderEvent(new OrderEvent(order,
324  _algorithm.UtcTime,
325  OrderFee.Zero,
326  orderTag));
327  }
328  return ticket;
329  }
330 
331  /// <summary>
332  /// Wait for the order to be handled by the <see cref="_processingThread"/>
333  /// </summary>
334  /// <param name="ticket">The <see cref="OrderTicket"/> expecting to be submitted</param>
335  protected virtual void WaitForOrderSubmission(OrderTicket ticket)
336  {
337  var orderSetTimeout = Time.OneSecond;
338  if (!ticket.OrderSet.WaitOne(orderSetTimeout))
339  {
340  Log.Error("BrokerageTransactionHandler.WaitForOrderSubmission(): " +
341  $"The order request (Id={ticket.OrderId}) was not submitted within {orderSetTimeout.TotalSeconds} second(s).");
342  }
343  }
344 
345  /// <summary>
346  /// Update an order yet to be filled such as stop or limit orders.
347  /// </summary>
348  /// <param name="request">Request detailing how the order should be updated</param>
349  /// <remarks>Does not apply if the order is already fully filled</remarks>
351  {
352  OrderTicket ticket;
353  if (!_completeOrderTickets.TryGetValue(request.OrderId, out ticket))
354  {
355  return OrderTicket.InvalidUpdateOrderId(_algorithm.Transactions, request);
356  }
357 
358  ticket.AddUpdateRequest(request);
359 
360  try
361  {
362  //Update the order from the behaviour
363  var order = GetOrderByIdInternal(request.OrderId);
364  var orderQuantity = request.Quantity ?? ticket.Quantity;
365 
366  var shortable = true;
367  if (order?.Direction == OrderDirection.Sell || orderQuantity < 0)
368  {
369  shortable = _algorithm.Shortable(ticket.Symbol, orderQuantity, order.Id);
370 
371  if (_algorithm.LiveMode && !shortable)
372  {
373  // let's override and just send warning
374  shortable = true;
375 
376  _algorithm.Debug($"Warning: {GetShortableErrorMessage(ticket.Symbol, ticket.Quantity)}");
377  }
378  }
379 
380  if (order == null)
381  {
382  // can't update an order that doesn't exist!
383  Log.Error("BrokerageTransactionHandler.Update(): Cannot update a null order");
384  request.SetResponse(OrderResponse.UnableToFindOrder(request));
385  }
386  else if (order.Status == OrderStatus.New)
387  {
388  // can't update a pending submit order
389  Log.Error("BrokerageTransactionHandler.Update(): Cannot update a pending submit order with status " + order.Status);
390  request.SetResponse(OrderResponse.InvalidNewStatus(request, order));
391  }
392  else if (order.Status.IsClosed() && !request.IsAllowedForClosedOrder())
393  {
394  // can't update a completed order
395  Log.Error("BrokerageTransactionHandler.Update(): Cannot update closed order with status " + order.Status);
396  request.SetResponse(OrderResponse.InvalidStatus(request, order));
397  }
398  else if (request.Quantity.HasValue && request.Quantity.Value == 0)
399  {
400  request.SetResponse(OrderResponse.ZeroQuantity(request));
401  }
402  else if (_algorithm.IsWarmingUp)
403  {
404  request.SetResponse(OrderResponse.WarmingUp(request));
405  }
406  else if (!shortable)
407  {
408  var shortableResponse = OrderResponse.Error(request, OrderResponseErrorCode.ExceedsShortableQuantity,
409  GetShortableErrorMessage(ticket.Symbol, ticket.Quantity));
410 
411  request.SetResponse(shortableResponse);
412  }
413  else
414  {
415  request.SetResponse(OrderResponse.Success(request), OrderRequestStatus.Processing);
416  _orderRequestQueue.Add(request);
417  }
418  }
419  catch (Exception err)
420  {
421  Log.Error(err);
422  request.SetResponse(OrderResponse.Error(request, OrderResponseErrorCode.ProcessingError, err.Message));
423  }
424 
425  return ticket;
426  }
427 
428  /// <summary>
429  /// Remove this order from outstanding queue: user is requesting a cancel.
430  /// </summary>
431  /// <param name="request">Request containing the specific order id to remove</param>
433  {
434  OrderTicket ticket;
435  if (!_completeOrderTickets.TryGetValue(request.OrderId, out ticket))
436  {
437  Log.Error("BrokerageTransactionHandler.CancelOrder(): Unable to locate ticket for order.");
438  return OrderTicket.InvalidCancelOrderId(_algorithm.Transactions, request);
439  }
440 
441  try
442  {
443  // if we couldn't set this request as the cancellation then another thread/someone
444  // else is already doing it or it in fact has already been cancelled
445  if (!ticket.TrySetCancelRequest(request))
446  {
447  // the ticket has already been cancelled
448  request.SetResponse(OrderResponse.Error(request, OrderResponseErrorCode.InvalidRequest, "Cancellation is already in progress."));
449  return ticket;
450  }
451 
452  //Error check
453  var order = GetOrderByIdInternal(request.OrderId);
454  if (order != null && request.Tag != null)
455  {
456  order.Tag = request.Tag;
457  }
458  if (order == null)
459  {
460  Log.Error("BrokerageTransactionHandler.CancelOrder(): Cannot find this id.");
461  request.SetResponse(OrderResponse.UnableToFindOrder(request));
462  }
463  else if (order.Status == OrderStatus.New)
464  {
465  Log.Error("BrokerageTransactionHandler.CancelOrder(): Cannot cancel order with status: " + order.Status);
466  request.SetResponse(OrderResponse.InvalidNewStatus(request, order));
467  }
468  else if (order.Status.IsClosed())
469  {
470  Log.Error("BrokerageTransactionHandler.CancelOrder(): Cannot cancel order already " + order.Status);
471  request.SetResponse(OrderResponse.InvalidStatus(request, order));
472  }
473  else if (_algorithm.IsWarmingUp)
474  {
475  request.SetResponse(OrderResponse.WarmingUp(request));
476  }
477  else
478  {
479  _cancelPendingOrders.Set(order.Id, order.Status);
480  // update the order status
481  order.Status = OrderStatus.CancelPending;
482 
483  // notify the algorithm with an order event
484  HandleOrderEvent(new OrderEvent(order,
485  _algorithm.UtcTime,
486  OrderFee.Zero));
487 
488  // send the request to be processed
489  request.SetResponse(OrderResponse.Success(request), OrderRequestStatus.Processing);
490  _orderRequestQueue.Add(request);
491  }
492  }
493  catch (Exception err)
494  {
495  Log.Error(err);
496  request.SetResponse(OrderResponse.Error(request, OrderResponseErrorCode.ProcessingError, err.Message));
497  }
498 
499  return ticket;
500  }
501 
502  /// <summary>
503  /// Gets and enumerable of <see cref="OrderTicket"/> matching the specified <paramref name="filter"/>
504  /// </summary>
505  /// <param name="filter">The filter predicate used to find the required order tickets</param>
506  /// <returns>An enumerable of <see cref="OrderTicket"/> matching the specified <paramref name="filter"/></returns>
507  public IEnumerable<OrderTicket> GetOrderTickets(Func<OrderTicket, bool> filter = null)
508  {
509  return _completeOrderTickets.Select(x => x.Value).Where(filter ?? (x => true));
510  }
511 
512  /// <summary>
513  /// Gets and enumerable of opened <see cref="OrderTicket"/> matching the specified <paramref name="filter"/>
514  /// </summary>
515  /// <param name="filter">The filter predicate used to find the required order tickets</param>
516  /// <returns>An enumerable of opened <see cref="OrderTicket"/> matching the specified <paramref name="filter"/></returns>
517  public IEnumerable<OrderTicket> GetOpenOrderTickets(Func<OrderTicket, bool> filter = null)
518  {
519  return _openOrderTickets.Select(x => x.Value).Where(filter ?? (x => true));
520  }
521 
522  /// <summary>
523  /// Gets the order ticket for the specified order id. Returns null if not found
524  /// </summary>
525  /// <param name="orderId">The order's id</param>
526  /// <returns>The order ticket with the specified id, or null if not found</returns>
527  public OrderTicket GetOrderTicket(int orderId)
528  {
529  OrderTicket ticket;
530  _completeOrderTickets.TryGetValue(orderId, out ticket);
531  return ticket;
532  }
533 
534  #endregion
535 
536  /// <summary>
537  /// Get the order by its id
538  /// </summary>
539  /// <param name="orderId">Order id to fetch</param>
540  /// <returns>A clone of the order with the specified id, or null if no match is found</returns>
541  public Order GetOrderById(int orderId)
542  {
543  Order order = GetOrderByIdInternal(orderId);
544  return order?.Clone();
545  }
546 
547  private Order GetOrderByIdInternal(int orderId)
548  {
549  Order order;
550  return _completeOrders.TryGetValue(orderId, out order) ? order : null;
551  }
552 
553  /// <summary>
554  /// Gets the order by its brokerage id
555  /// </summary>
556  /// <param name="brokerageId">The brokerage id to fetch</param>
557  /// <returns>The first order matching the brokerage id, or null if no match is found</returns>
558  public List<Order> GetOrdersByBrokerageId(string brokerageId)
559  {
560  var openOrders = GetOrdersByBrokerageId(brokerageId, _openOrders);
561 
562  if (openOrders.Count > 0
563  // if it's part of a group, some leg could be filled already, not part of open orders
564  && (openOrders[0].GroupOrderManager == null || openOrders[0].GroupOrderManager.Count == openOrders.Count))
565  {
566  return openOrders;
567  }
568 
569  return GetOrdersByBrokerageId(brokerageId, _completeOrders);
570  }
571 
572  private static List<Order> GetOrdersByBrokerageId(string brokerageId, ConcurrentDictionary<int, Order> orders)
573  {
574  return orders
575  .Where(x => x.Value.BrokerId.Contains(brokerageId))
576  .Select(kvp => kvp.Value.Clone())
577  .ToList();
578  }
579 
580  /// <summary>
581  /// Gets all orders matching the specified filter. Specifying null will return an enumerable
582  /// of all orders.
583  /// </summary>
584  /// <param name="filter">Delegate used to filter the orders</param>
585  /// <returns>All orders this order provider currently holds by the specified filter</returns>
586  public IEnumerable<Order> GetOrders(Func<Order, bool> filter = null)
587  {
588  if (filter != null)
589  {
590  // return a clone to prevent object reference shenanigans, you must submit a request to change the order
591  return _completeOrders.Select(x => x.Value).Where(filter).Select(x => x.Clone());
592  }
593  return _completeOrders.Select(x => x.Value).Select(x => x.Clone());
594  }
595 
596  /// <summary>
597  /// Gets open orders matching the specified filter
598  /// </summary>
599  /// <param name="filter">Delegate used to filter the orders</param>
600  /// <returns>All open orders this order provider currently holds</returns>
601  public List<Order> GetOpenOrders(Func<Order, bool> filter = null)
602  {
603  if (filter != null)
604  {
605  // return a clone to prevent object reference shenanigans, you must submit a request to change the order
606  return _openOrders.Select(x => x.Value).Where(filter).Select(x => x.Clone()).ToList();
607  }
608  return _openOrders.Select(x => x.Value).Select(x => x.Clone()).ToList();
609  }
610 
611  /// <summary>
612  /// Primary thread entry point to launch the transaction thread.
613  /// </summary>
614  protected void Run()
615  {
616  try
617  {
618  foreach (var request in _orderRequestQueue.GetConsumingEnumerable(_cancellationTokenSource.Token))
619  {
620  HandleOrderRequest(request);
622  }
623  }
624  catch (Exception err)
625  {
626  // unexpected error, we need to close down shop
627  _algorithm.SetRuntimeError(err, "HandleOrderRequest");
628  }
629 
630  if (_processingThread != null)
631  {
632  Log.Trace("BrokerageTransactionHandler.Run(): Ending Thread...");
633  IsActive = false;
634  }
635  }
636 
637  /// <summary>
638  /// Processes asynchronous events on the transaction handler's thread
639  /// </summary>
640  public virtual void ProcessAsynchronousEvents()
641  {
642  // NOP
643  }
644 
645  /// <summary>
646  /// Processes all synchronous events that must take place before the next time loop for the algorithm
647  /// </summary>
648  public virtual void ProcessSynchronousEvents()
649  {
650  // how to do synchronous market orders for real brokerages?
651 
652  // in backtesting we need to wait for orders to be removed from the queue and finished processing
653  if (!_algorithm.LiveMode)
654  {
655  if (_orderRequestQueue.IsBusy && !_orderRequestQueue.WaitHandle.WaitOne(Time.OneSecond, _cancellationTokenSource.Token))
656  {
657  Log.Error("BrokerageTransactionHandler.ProcessSynchronousEvents(): Timed out waiting for request queue to finish processing.");
658  }
659  return;
660  }
661 
662  // check if the brokerage should perform cash sync now
663  if (!_algorithm.IsWarmingUp && _brokerage.ShouldPerformCashSync(CurrentTimeUtc))
664  {
665  // only perform cash syncs if we haven't had a fill for at least 10 seconds
666  if (TimeSinceLastFill > TimeSpan.FromSeconds(10))
667  {
668  if (!_brokerage.PerformCashSync(_algorithm, CurrentTimeUtc, () => TimeSinceLastFill))
669  {
670  if (++_failedCashSyncAttempts >= MaxCashSyncAttempts)
671  {
672  throw new Exception("The maximum number of attempts for brokerage cash sync has been reached.");
673  }
674  }
675  }
676  }
677 
678  // we want to remove orders older than 10k records, but only in live mode
679  const int maxOrdersToKeep = 10000;
680  if (_completeOrders.Count < maxOrdersToKeep + 1)
681  {
682  return;
683  }
684 
685  Log.Debug("BrokerageTransactionHandler.ProcessSynchronousEvents(): Start removing old orders...");
686  var max = _completeOrders.Max(x => x.Key);
687  var lowestOrderIdToKeep = max - maxOrdersToKeep;
688  foreach (var item in _completeOrders.Where(x => x.Key <= lowestOrderIdToKeep))
689  {
690  Order value;
691  OrderTicket ticket;
692  _completeOrders.TryRemove(item.Key, out value);
693  _completeOrderTickets.TryRemove(item.Key, out ticket);
694  }
695 
696  Log.Debug($"BrokerageTransactionHandler.ProcessSynchronousEvents(): New order count {_completeOrders.Count}. Exit");
697  }
698 
699  /// <summary>
700  /// Register an already open Order
701  /// </summary>
702  public void AddOpenOrder(Order order, IAlgorithm algorithm)
703  {
704  if (order.Status == OrderStatus.New || order.Status == OrderStatus.None)
705  {
706  // make sure we have a valid order status
707  order.Status = OrderStatus.Submitted;
708  }
709 
710  order.Id = algorithm.Transactions.GetIncrementOrderId();
711 
712  if (order.GroupOrderManager != null && order.GroupOrderManager.Id == 0)
713  {
715  }
716 
717  var orderTicket = order.ToOrderTicket(algorithm.Transactions);
718 
719  SetPriceAdjustmentMode(order, algorithm);
720 
721  _openOrders.AddOrUpdate(order.Id, order, (i, o) => order);
722  _completeOrders.AddOrUpdate(order.Id, order, (i, o) => order);
723  _openOrderTickets.AddOrUpdate(order.Id, orderTicket);
724  _completeOrderTickets.AddOrUpdate(order.Id, orderTicket);
725 
726  Interlocked.Increment(ref _totalOrderCount);
727  }
728 
729 
730  /// <summary>
731  /// Signal a end of thread request to stop monitoring the transactions.
732  /// </summary>
733  public void Exit()
734  {
735  var timeout = TimeSpan.FromSeconds(60);
736  if (_processingThread != null)
737  {
738  // only wait if the processing thread is running
739  if (_orderRequestQueue.IsBusy && !_orderRequestQueue.WaitHandle.WaitOne(timeout))
740  {
741  Log.Error("BrokerageTransactionHandler.Exit(): Exceed timeout: " + (int)(timeout.TotalSeconds) + " seconds.");
742  }
743  }
744 
745  _processingThread?.StopSafely(timeout, _cancellationTokenSource);
746  IsActive = false;
747  _cancellationTokenSource.DisposeSafely();
748  }
749 
750  /// <summary>
751  /// Handles a generic order request
752  /// </summary>
753  /// <param name="request"><see cref="OrderRequest"/> to be handled</param>
754  /// <returns><see cref="OrderResponse"/> for request</returns>
755  public void HandleOrderRequest(OrderRequest request)
756  {
757  OrderResponse response;
758  switch (request.OrderRequestType)
759  {
760  case OrderRequestType.Submit:
761  response = HandleSubmitOrderRequest((SubmitOrderRequest)request);
762  break;
763  case OrderRequestType.Update:
764  response = HandleUpdateOrderRequest((UpdateOrderRequest)request);
765  break;
766  case OrderRequestType.Cancel:
767  response = HandleCancelOrderRequest((CancelOrderRequest)request);
768  break;
769  default:
770  throw new ArgumentOutOfRangeException();
771  }
772 
773  // mark request as processed
774  request.SetResponse(response, OrderRequestStatus.Processed);
775  }
776 
777  /// <summary>
778  /// Handles a request to submit a new order
779  /// </summary>
780  private OrderResponse HandleSubmitOrderRequest(SubmitOrderRequest request)
781  {
782  OrderTicket ticket;
783  var order = Order.CreateOrder(request);
784 
785  // ensure the order is tagged with a currency
786  var security = _algorithm.Securities[order.Symbol];
787  order.PriceCurrency = security.SymbolProperties.QuoteCurrency;
788  if (string.IsNullOrEmpty(order.Tag))
789  {
790  order.Tag = order.GetDefaultTag();
791  }
792 
793  // rounds off the order towards 0 to the nearest multiple of lot size
794  order.Quantity = RoundOffOrder(order, security);
795 
796  if (!_openOrders.TryAdd(order.Id, order) || !_completeOrders.TryAdd(order.Id, order))
797  {
798  Log.Error("BrokerageTransactionHandler.HandleSubmitOrderRequest(): Unable to add new order, order not processed.");
799  return OrderResponse.Error(request, OrderResponseErrorCode.OrderAlreadyExists, "Cannot process submit request because order with id {0} already exists");
800  }
801  if (!_completeOrderTickets.TryGetValue(order.Id, out ticket))
802  {
803  Log.Error("BrokerageTransactionHandler.HandleSubmitOrderRequest(): Unable to retrieve order ticket, order not processed.");
804  return OrderResponse.UnableToFindOrder(request);
805  }
806 
807  var comboIsReady = order.TryGetGroupOrders(TryGetOrder, out var orders);
808  var comboSecuritiesFound = orders.TryGetGroupOrdersSecurities(_algorithm.Portfolio, out var securities);
809 
810  // rounds the order prices
811  RoundOrderPrices(order, security, comboIsReady, securities);
812 
813  // save current security prices
814  order.OrderSubmissionData = new OrderSubmissionData(security.BidPrice, security.AskPrice, security.Close);
815 
816  // Set order price adjustment mode
817  SetPriceAdjustmentMode(order, _algorithm);
818 
819  // update the ticket's internal storage with this new order reference
820  ticket.SetOrder(order);
821 
822  if (!comboIsReady)
823  {
824  // an Order of the group is missing
825  return OrderResponse.Success(request);
826  }
827 
828  if (orders.Any(o => o.Quantity == 0))
829  {
830  var response = OrderResponse.ZeroQuantity(request);
831  _algorithm.Error(response.ErrorMessage);
832 
833  InvalidateOrders(orders, response.ErrorMessage);
834  return response;
835  }
836 
837  if (!comboSecuritiesFound)
838  {
839  var response = OrderResponse.MissingSecurity(request);
840  _algorithm.Error(response.ErrorMessage);
841 
842  InvalidateOrders(orders, response.ErrorMessage);
843  return response;
844  }
845 
846  // check to see if we have enough money to place the order
847  HasSufficientBuyingPowerForOrderResult hasSufficientBuyingPowerResult;
848  try
849  {
850  hasSufficientBuyingPowerResult = _algorithm.Portfolio.HasSufficientBuyingPowerForOrder(orders);
851  }
852  catch (Exception err)
853  {
854  Log.Error(err);
855  _algorithm.Error($"Order Error: id: {order.Id.ToStringInvariant()}, Error executing margin models: {err.Message}");
856  HandleOrderEvent(new OrderEvent(order,
857  _algorithm.UtcTime,
858  OrderFee.Zero,
859  "Error executing margin models"));
860  return OrderResponse.Error(request, OrderResponseErrorCode.ProcessingError, "Error in GetSufficientCapitalForOrder");
861  }
862 
863  if (!hasSufficientBuyingPowerResult.IsSufficient)
864  {
865  var errorMessage = securities.GetErrorMessage(hasSufficientBuyingPowerResult);
866  _algorithm.Error(errorMessage);
867 
868  InvalidateOrders(orders, errorMessage);
869  return OrderResponse.Error(request, OrderResponseErrorCode.InsufficientBuyingPower, errorMessage);
870  }
871 
872  // verify that our current brokerage can actually take the order
873  foreach (var kvp in securities)
874  {
875  if (!_algorithm.BrokerageModel.CanSubmitOrder(kvp.Value, kvp.Key, out var message))
876  {
877  var errorMessage = $"BrokerageModel declared unable to submit order: [{string.Join(",", orders.Select(o => o.Id))}]";
878 
879  // if we couldn't actually process the order, mark it as invalid and bail
880  message ??= new BrokerageMessageEvent(BrokerageMessageType.Warning, "InvalidOrder", string.Empty);
881  var response = OrderResponse.Error(request, OrderResponseErrorCode.BrokerageModelRefusedToSubmitOrder, $"{errorMessage} {message}");
882 
883  InvalidateOrders(orders, response.ErrorMessage);
884  _algorithm.Error(response.ErrorMessage);
885  return response;
886  }
887  }
888 
889  // set the order status based on whether or not we successfully submitted the order to the market
890  bool orderPlaced;
891  try
892  {
893  orderPlaced = orders.All(o => _brokerage.PlaceOrder(o));
894  }
895  catch (Exception err)
896  {
897  Log.Error(err);
898  orderPlaced = false;
899  }
900 
901  if (!orderPlaced)
902  {
903  // we failed to submit the order, invalidate it
904  var errorMessage = $"Brokerage failed to place orders: [{string.Join(",", orders.Select(o => o.Id))}]";
905 
906  InvalidateOrders(orders, errorMessage);
907  _algorithm.Error(errorMessage);
908  return OrderResponse.Error(request, OrderResponseErrorCode.BrokerageFailedToSubmitOrder, errorMessage);
909  }
910 
911  return OrderResponse.Success(request);
912  }
913 
914  /// <summary>
915  /// Handles a request to update order properties
916  /// </summary>
917  private OrderResponse HandleUpdateOrderRequest(UpdateOrderRequest request)
918  {
919  Order order;
920  OrderTicket ticket;
921  if (!_completeOrders.TryGetValue(request.OrderId, out order) || !_completeOrderTickets.TryGetValue(request.OrderId, out ticket))
922  {
923  Log.Error("BrokerageTransactionHandler.HandleUpdateOrderRequest(): Unable to update order with ID " + request.OrderId);
924  return OrderResponse.UnableToFindOrder(request);
925  }
926 
927  if (order.Status == OrderStatus.New)
928  {
929  return OrderResponse.InvalidNewStatus(request, order);
930  }
931 
932  var isClosedOrderUpdate = false;
933 
934  if (order.Status.IsClosed())
935  {
936  if (!request.IsAllowedForClosedOrder())
937  {
938  return OrderResponse.InvalidStatus(request, order);
939  }
940 
941  isClosedOrderUpdate = true;
942  }
943 
944  // rounds off the order towards 0 to the nearest multiple of lot size
945  var security = _algorithm.Securities[order.Symbol];
946  order.Quantity = RoundOffOrder(order, security);
947 
948  // verify that our current brokerage can actually update the order
949  BrokerageMessageEvent message;
950  if (!_algorithm.LiveMode && !_algorithm.BrokerageModel.CanUpdateOrder(_algorithm.Securities[order.Symbol], order, request, out message))
951  {
952  if (message == null) message = new BrokerageMessageEvent(BrokerageMessageType.Warning, "InvalidRequest", "BrokerageModel declared unable to update order: " + order.Id);
953  var response = OrderResponse.Error(request, OrderResponseErrorCode.BrokerageModelRefusedToUpdateOrder, "OrderID: " + order.Id + " " + message);
954  _algorithm.Error(response.ErrorMessage);
955  HandleOrderEvent(new OrderEvent(order,
956  _algorithm.UtcTime,
957  OrderFee.Zero,
958  "BrokerageModel declared unable to update order"));
959  return response;
960  }
961 
962  // modify the values of the order object
963  order.ApplyUpdateOrderRequest(request);
964 
965  // rounds the order prices
966  RoundOrderPrices(order, security);
967 
968  ticket.SetOrder(order);
969 
970  bool orderUpdated;
971  if (isClosedOrderUpdate)
972  {
973  orderUpdated = true;
974  }
975  else
976  {
977  try
978  {
979  orderUpdated = _brokerage.UpdateOrder(order);
980  }
981  catch (Exception err)
982  {
983  Log.Error(err);
984  orderUpdated = false;
985  }
986  }
987 
988  if (!orderUpdated)
989  {
990  // we failed to update the order for some reason
991  var errorMessage = "Brokerage failed to update order with id " + request.OrderId;
992  _algorithm.Error(errorMessage);
993  HandleOrderEvent(new OrderEvent(order,
994  _algorithm.UtcTime,
995  OrderFee.Zero,
996  "Brokerage failed to update order"));
997  return OrderResponse.Error(request, OrderResponseErrorCode.BrokerageFailedToUpdateOrder, errorMessage);
998  }
999 
1000  return OrderResponse.Success(request);
1001  }
1002 
1003  /// <summary>
1004  /// Handles a request to cancel an order
1005  /// </summary>
1006  private OrderResponse HandleCancelOrderRequest(CancelOrderRequest request)
1007  {
1008  Order order;
1009  OrderTicket ticket;
1010  if (!_completeOrders.TryGetValue(request.OrderId, out order) || !_completeOrderTickets.TryGetValue(request.OrderId, out ticket))
1011  {
1012  Log.Error("BrokerageTransactionHandler.HandleCancelOrderRequest(): Unable to cancel order with ID " + request.OrderId + ".");
1014  return OrderResponse.UnableToFindOrder(request);
1015  }
1016 
1017  if (order.Status == OrderStatus.New)
1018  {
1020  return OrderResponse.InvalidNewStatus(request, order);
1021  }
1022 
1023  if (order.Status.IsClosed())
1024  {
1026  return OrderResponse.InvalidStatus(request, order);
1027  }
1028 
1029  ticket.SetOrder(order);
1030 
1031  bool orderCanceled;
1032  try
1033  {
1034  orderCanceled = _brokerage.CancelOrder(order);
1035  }
1036  catch (Exception err)
1037  {
1038  Log.Error(err);
1039  orderCanceled = false;
1040  }
1041 
1042  if (!orderCanceled)
1043  {
1044  // failed to cancel the order
1045  var message = "Brokerage failed to cancel order with id " + order.Id;
1046  _algorithm.Error(message);
1048  return OrderResponse.Error(request, OrderResponseErrorCode.BrokerageFailedToCancelOrder, message);
1049  }
1050 
1051  if (request.Tag != null)
1052  {
1053  // update the tag, useful for 'why' we canceled the order
1054  order.Tag = request.Tag;
1055  }
1056 
1057  return OrderResponse.Success(request);
1058  }
1059 
1060  private void HandleOrderEvents(List<OrderEvent> orderEvents)
1061  {
1062  lock (_lockHandleOrderEvent)
1063  {
1064  // Get orders and tickets
1065  var orders = new List<Order>(orderEvents.Count);
1066 
1067  for (var i = 0; i < orderEvents.Count; i++)
1068  {
1069  var orderEvent = orderEvents[i];
1070 
1071  if (orderEvent.Status.IsClosed() && _openOrders.TryRemove(orderEvent.OrderId, out var order))
1072  {
1073  _completeOrders[orderEvent.OrderId] = order;
1074  }
1075  else if (!_completeOrders.TryGetValue(orderEvent.OrderId, out order))
1076  {
1077  Log.Error("BrokerageTransactionHandler.HandleOrderEvents(): Unable to locate open Combo Order with id " + orderEvent.OrderId);
1078  LogOrderEvent(orderEvent);
1079  return;
1080  }
1081  orders.Add(order);
1082 
1083  if (orderEvent.Status.IsClosed() && _openOrderTickets.TryRemove(orderEvent.OrderId, out var ticket))
1084  {
1085  _completeOrderTickets[orderEvent.OrderId] = ticket;
1086  }
1087  else if (!_completeOrderTickets.TryGetValue(orderEvent.OrderId, out ticket))
1088  {
1089  Log.Error("BrokerageTransactionHandler.HandleOrderEvents(): Unable to resolve open ticket: " + orderEvent.OrderId);
1090  LogOrderEvent(orderEvent);
1091  return;
1092  }
1093  orderEvent.Ticket = ticket;
1094  }
1095 
1096  var fillsToProcess = new List<OrderEvent>(orderEvents.Count);
1097 
1098  // now lets update the orders
1099  for (var i = 0; i < orderEvents.Count; i++)
1100  {
1101  var orderEvent = orderEvents[i];
1102  var order = orders[i];
1103  var ticket = orderEvent.Ticket;
1104 
1105  _cancelPendingOrders.UpdateOrRemove(order.Id, orderEvent.Status);
1106  // set the status of our order object based on the fill event except if the order status is filled/cancelled and the event is invalid
1107  // in live trading it can happen that we submit an update which get's rejected by the brokerage because the order is already filled
1108  // so we don't want the invalid update event to set the order status to invalid if it's already filled
1109  if (order.Status != OrderStatus.Filled && order.Status != OrderStatus.Canceled || orderEvent.Status != OrderStatus.Invalid)
1110  {
1111  order.Status = orderEvent.Status;
1112  }
1113 
1114  orderEvent.Id = order.GetNewId();
1115 
1116  // set the modified time of the order to the fill's timestamp
1117  switch (orderEvent.Status)
1118  {
1119  case OrderStatus.Canceled:
1120  order.CanceledTime = orderEvent.UtcTime;
1121  break;
1122 
1123  case OrderStatus.PartiallyFilled:
1124  case OrderStatus.Filled:
1125  order.LastFillTime = orderEvent.UtcTime;
1126 
1127  // append fill message to order tag, for additional information
1128  if (orderEvent.Status == OrderStatus.Filled && !string.IsNullOrWhiteSpace(orderEvent.Message))
1129  {
1130  if (string.IsNullOrWhiteSpace(order.Tag))
1131  {
1132  order.Tag = orderEvent.Message;
1133  }
1134  else
1135  {
1136  order.Tag += " - " + orderEvent.Message;
1137  }
1138  }
1139  break;
1140 
1141  case OrderStatus.UpdateSubmitted:
1142  case OrderStatus.Submitted:
1143  // submit events after the initial submission are all order updates
1144  if (ticket.UpdateRequests.Count > 0)
1145  {
1146  order.LastUpdateTime = orderEvent.UtcTime;
1147  }
1148  break;
1149  }
1150 
1151  // lets always set current Quantity, Limit and Stop prices in the order event so that it's easier for consumers
1152  // to know the current state and detect any update
1153  orderEvent.Quantity = order.Quantity;
1154  switch (order.Type)
1155  {
1156  case OrderType.Limit:
1157  var limit = order as LimitOrder;
1158  orderEvent.LimitPrice = limit.LimitPrice;
1159  break;
1160  case OrderType.ComboLegLimit:
1161  var legLimitOrder = order as ComboLegLimitOrder;
1162  orderEvent.LimitPrice = legLimitOrder.LimitPrice;
1163  break;
1164  case OrderType.StopMarket:
1165  var marketOrder = order as StopMarketOrder;
1166  orderEvent.StopPrice = marketOrder.StopPrice;
1167  break;
1168  case OrderType.StopLimit:
1169  var stopLimitOrder = order as StopLimitOrder;
1170  orderEvent.LimitPrice = stopLimitOrder.LimitPrice;
1171  orderEvent.StopPrice = stopLimitOrder.StopPrice;
1172  break;
1173  case OrderType.TrailingStop:
1174  var trailingStopOrder = order as TrailingStopOrder;
1175  orderEvent.StopPrice = trailingStopOrder.StopPrice;
1176  orderEvent.TrailingAmount = trailingStopOrder.TrailingAmount;
1177  break;
1178  case OrderType.LimitIfTouched:
1179  var limitIfTouchedOrder = order as LimitIfTouchedOrder;
1180  orderEvent.LimitPrice = limitIfTouchedOrder.LimitPrice;
1181  orderEvent.TriggerPrice = limitIfTouchedOrder.TriggerPrice;
1182  break;
1183  }
1184 
1185  // check if the fill currency and the order currency match the symbol currency
1186  if (orderEvent.Status == OrderStatus.Filled || orderEvent.Status == OrderStatus.PartiallyFilled)
1187  {
1188  fillsToProcess.Add(orderEvent);
1189  Interlocked.Exchange(ref _lastFillTimeTicks, CurrentTimeUtc.Ticks);
1190 
1191  var security = _algorithm.Securities[orderEvent.Symbol];
1192 
1193  if (orderEvent.Symbol.SecurityType == SecurityType.Crypto
1194  && order.Direction == OrderDirection.Buy
1195  && CurrencyPairUtil.TryDecomposeCurrencyPair(orderEvent.Symbol, out var baseCurrency, out var quoteCurrency)
1196  && orderEvent.OrderFee.Value.Currency == baseCurrency)
1197  {
1198  // fees are in the base currency, so we have to subtract them from the filled quantity
1199  // else the virtual position will bigger than the real size and we might no be able to liquidate
1200  orderEvent.FillQuantity -= orderEvent.OrderFee.Value.Amount;
1201  orderEvent.OrderFee = new ModifiedFillQuantityOrderFee(orderEvent.OrderFee.Value, quoteCurrency, security.SymbolProperties.ContractMultiplier);
1202 
1203  if (!_loggedFeeAdjustmentWarning)
1204  {
1205  _loggedFeeAdjustmentWarning = true;
1206  const string message = "When buying currency pairs, using Cash account types, fees in base currency" +
1207  " will be deducted from the filled quantity so virtual positions reflect actual holdings.";
1208  Log.Trace($"BrokerageTransactionHandler.HandleOrderEvent(): {message}");
1209  _algorithm.Debug(message);
1210  }
1211  }
1212  }
1213  }
1214 
1215  //Apply the filled orders to our portfolio:
1216  try
1217  {
1218  _algorithm.Portfolio.ProcessFills(fillsToProcess);
1219  }
1220  catch (Exception err)
1221  {
1222  Log.Error(err);
1223  _algorithm.Error($"Fill error: error in TradeBuilder.ProcessFill: {err.Message}");
1224  }
1225 
1226  // Apply the filled orders to the trade builder
1227  for (var i = 0; i < orderEvents.Count; i++)
1228  {
1229  var orderEvent = orderEvents[i];
1230 
1231  if (orderEvent.Status == OrderStatus.Filled || orderEvent.Status == OrderStatus.PartiallyFilled)
1232  {
1233  var security = _algorithm.Securities[orderEvent.Symbol];
1234 
1235  var multiplier = security.SymbolProperties.ContractMultiplier;
1236  var securityConversionRate = security.QuoteCurrency.ConversionRate;
1237  var feeInAccountCurrency = _algorithm.Portfolio.CashBook
1238  .ConvertToAccountCurrency(orderEvent.OrderFee.Value).Amount;
1239 
1240  try
1241  {
1242  _algorithm.TradeBuilder.ProcessFill(
1243  orderEvent,
1244  securityConversionRate,
1245  feeInAccountCurrency,
1246  multiplier);
1247  }
1248  catch (Exception err)
1249  {
1250  Log.Error(err);
1251  }
1252  }
1253 
1254  // update the ticket after we've processed the fill, but before the event, this way everything is ready for user code
1255  orderEvent.Ticket.AddOrderEvent(orderEvent);
1256  }
1257  }
1258 
1259  //We have the events! :) Orders filled, send them in to be handled by algorithm portfolio.
1260  for (var i = 0; i < orderEvents.Count; i++)
1261  {
1262  var orderEvent = orderEvents[i];
1263 
1264  if (orderEvent.Status != OrderStatus.None) //order.Status != OrderStatus.Submitted
1265  {
1266  _orderEvents.Enqueue(orderEvent);
1267 
1268  //Create new order event:
1269  _resultHandler.OrderEvent(orderEvent);
1270 
1271  NewOrderEvent?.Invoke(this, orderEvent);
1272 
1273  try
1274  {
1275  //Trigger our order event handler
1276  _algorithm.OnOrderEvent(orderEvent);
1277  }
1278  catch (Exception err)
1279  {
1280  // unexpected error, we need to close down shop
1281  _algorithm.SetRuntimeError(err, "Order Event Handler");
1282  }
1283  }
1284 
1285  LogOrderEvent(orderEvent);
1286  }
1287  }
1288 
1289  private void HandleOrderEvent(OrderEvent orderEvent)
1290  {
1291  HandleOrderEvents(new List<OrderEvent> { orderEvent });
1292  }
1293 
1294  private void HandleOrderUpdated(OrderUpdateEvent e)
1295  {
1296  if (!_completeOrders.TryGetValue(e.OrderId, out var order))
1297  {
1298  Log.Error("BrokerageTransactionHandler.HandleOrderUpdated(): Unable to locate open order with id " + e.OrderId);
1299  return;
1300  }
1301 
1302  switch (order.Type)
1303  {
1304  case OrderType.TrailingStop:
1305  ((TrailingStopOrder)order).StopPrice = e.TrailingStopPrice;
1306  break;
1307 
1308  case OrderType.StopLimit:
1309  ((StopLimitOrder)order).StopTriggered = e.StopTriggered;
1310  break;
1311  }
1312  }
1313 
1314  /// <summary>
1315  /// Gets the price adjustment mode for the specified symbol from its subscription configurations
1316  /// </summary>
1317  private void SetPriceAdjustmentMode(Order order, IAlgorithm algorithm)
1318  {
1319  if (algorithm.LiveMode)
1320  {
1321  // live trading always uses raw prices
1323  return;
1324  }
1325 
1326  if (!_priceAdjustmentModes.TryGetValue(order.Symbol, out var mode))
1327  {
1328  var configs = algorithm.SubscriptionManager.SubscriptionDataConfigService
1329  .GetSubscriptionDataConfigs(order.Symbol, includeInternalConfigs: true);
1330  if (configs.Count == 0)
1331  {
1332  throw new InvalidOperationException($"Unable to locate subscription data config for {order.Symbol}");
1333  }
1334 
1335  mode = configs[0].DataNormalizationMode;
1336  _priceAdjustmentModes[order.Symbol] = mode;
1337  }
1338 
1339  order.PriceAdjustmentMode = mode;
1340  }
1341 
1342  /// <summary>
1343  /// Debug logging helper method, called after HandleOrderEvent has finished updating status, price and quantity
1344  /// </summary>
1345  /// <param name="e">The order event</param>
1346  private static void LogOrderEvent(OrderEvent e)
1347  {
1348  if (Log.DebuggingEnabled)
1349  {
1350  Log.Debug("BrokerageTransactionHandler.LogOrderEvent(): " + e);
1351  }
1352  }
1353 
1354  /// <summary>
1355  /// Brokerages can send account updates, this include cash balance updates. Since it is of
1356  /// utmost important to always have an accurate picture of reality, we'll trust this information
1357  /// as truth
1358  /// </summary>
1359  private void HandleAccountChanged(AccountEvent account)
1360  {
1361  // how close are we?
1362  var existingCashBalance = _algorithm.Portfolio.CashBook[account.CurrencySymbol].Amount;
1363  if (existingCashBalance != account.CashBalance)
1364  {
1365  Log.Trace($"BrokerageTransactionHandler.HandleAccountChanged(): {account.CurrencySymbol} Cash Lean: {existingCashBalance} Brokerage: {account.CashBalance}. Will update: {_brokerage.AccountInstantlyUpdated}");
1366  }
1367 
1368  // maybe we don't actually want to do this, this data can be delayed. Must be explicitly supported by brokerage
1369  if (_brokerage.AccountInstantlyUpdated)
1370  {
1371  // override the current cash value so we're always guaranteed to be in sync with the brokerage's push updates
1372  _algorithm.Portfolio.CashBook[account.CurrencySymbol].SetAmount(account.CashBalance);
1373  }
1374  }
1375 
1376  /// <summary>
1377  /// Brokerage order id change is applied to the target order
1378  /// </summary>
1379  private void HandlerBrokerageOrderIdChangedEvent(BrokerageOrderIdChangedEvent brokerageOrderIdChangedEvent)
1380  {
1381  var originalOrder = GetOrderByIdInternal(brokerageOrderIdChangedEvent.OrderId);
1382 
1383  if (originalOrder == null)
1384  {
1385  // shouldn't happen but let's be careful
1386  Log.Error($"BrokerageTransactionHandler.HandlerBrokerageOrderIdChangedEvent(): Lean order id {brokerageOrderIdChangedEvent.OrderId} not found");
1387  return;
1388  }
1389 
1390  // we replace the whole collection
1391  originalOrder.BrokerId = brokerageOrderIdChangedEvent.BrokerId;
1392  }
1393 
1394  /// <summary>
1395  /// Option assignment/exercise event is received and propagated to the user algo
1396  /// </summary>
1397  private void HandlePositionAssigned(OrderEvent fill)
1398  {
1399  // informing user algorithm that option position has been assigned
1400  _algorithm.OnAssignmentOrderEvent(fill);
1401  }
1402 
1403  private void HandleDelistingNotification(DelistingNotificationEventArgs e)
1404  {
1405  if (_algorithm.Securities.TryGetValue(e.Symbol, out var security))
1406  {
1407  // only log always in live trading, in backtesting log if not 0 holdings
1408  if (_algorithm.LiveMode || security.Holdings.Quantity != 0)
1409  {
1410  Log.Trace(
1411  $"BrokerageTransactionHandler.HandleDelistingNotification(): UtcTime: {CurrentTimeUtc} clearing position for delisted holding: " +
1412  $"Symbol: {e.Symbol.Value}, " +
1413  $"Quantity: {security.Holdings.Quantity}");
1414  }
1415 
1416  // Only submit an order if we have holdings
1417  var quantity = -security.Holdings.Quantity;
1418  if (quantity != 0)
1419  {
1420  var tag = "Liquidate from delisting";
1421 
1422  // Create our order and add it
1423  var order = new MarketOrder(security.Symbol, quantity, _algorithm.UtcTime, tag);
1424  AddOpenOrder(order, _algorithm);
1425 
1426  // Create our fill with the latest price
1427  var fill = new OrderEvent(order, _algorithm.UtcTime, OrderFee.Zero)
1428  {
1429  FillPrice = security.Price,
1430  Status = OrderStatus.Filled,
1431  FillQuantity = order.Quantity
1432  };
1433 
1434  // Process this order event
1435  HandleOrderEvent(fill);
1436  }
1437  }
1438  }
1439 
1440  /// <summary>
1441  /// Option notification event is received and new order events are generated
1442  /// </summary>
1443  private void HandleOptionNotification(OptionNotificationEventArgs e)
1444  {
1445  if (_algorithm.Securities.TryGetValue(e.Symbol, out var security))
1446  {
1447  // let's take the order event lock, we will be looking at orders and security holdings
1448  // and we don't want them changing mid processing because of an order event coming in at the same time
1449  // for example: DateTime/decimal order attributes are not thread safe by nature!
1450  lock (_lockHandleOrderEvent)
1451  {
1453  {
1454  if (e.Position == 0)
1455  {
1456  // only log always in live trading, in backtesting log if not 0 holdings
1457  if (_algorithm.LiveMode || security.Holdings.Quantity != 0)
1458  {
1459  Log.Trace(
1460  $"BrokerageTransactionHandler.HandleOptionNotification(): UtcTime: {CurrentTimeUtc} clearing position for expired option holding: " +
1461  $"Symbol: {e.Symbol.Value}, " +
1462  $"Holdings: {security.Holdings.Quantity}");
1463  }
1464 
1465  var quantity = -security.Holdings.Quantity;
1466 
1467  // If the quantity is already 0 for Lean and the brokerage there is nothing else todo here
1468  if (quantity != 0)
1469  {
1470  var exerciseOrder = GenerateOptionExerciseOrder(security, quantity, e.Tag);
1471 
1472  EmitOptionNotificationEvents(security, exerciseOrder);
1473  }
1474  }
1475  else
1476  {
1477  Log.Error("BrokerageTransactionHandler.HandleOptionNotification(): " +
1478  $"unexpected position ({e.Position} instead of zero) " +
1479  $"for expired option contract: {e.Symbol.Value}");
1480  }
1481  }
1482  else
1483  {
1484  // if position is reduced, could be an early exercise or early assignment
1485  if (Math.Abs(e.Position) < security.Holdings.AbsoluteQuantity)
1486  {
1487  Log.Trace("BrokerageTransactionHandler.HandleOptionNotification(): " +
1488  $"Symbol {e.Symbol.Value} EventQuantity {e.Position} Holdings {security.Holdings.Quantity}");
1489 
1490  // if we are long the option and there is an open order, assume it's an early exercise
1491  if (security.Holdings.IsLong)
1492  {
1493  // we only care about open option exercise orders, if it's closed it means we already
1494  // processed it and we wouldn't have a need to handle it here
1495  if (GetOpenOrders(x =>
1496  x.Symbol == e.Symbol &&
1497  x.Type == OrderType.OptionExercise)
1498  .FirstOrDefault() is OptionExerciseOrder exerciseOrder)
1499  {
1500  EmitOptionNotificationEvents(security, exerciseOrder);
1501  }
1502  }
1503 
1504  // if we are short the option and there are no buy orders (open or recently closed), assume it's an early assignment
1505  else if (security.Holdings.IsShort)
1506  {
1507  var nowUtc = CurrentTimeUtc;
1508  // for some brokerages (like IB) there might be a race condition between getting an option
1509  // notification event and lean processing an order event. So if we are here, there are these options:
1510  // A) holdings -10 position 5
1511  // 1) WE just BOUGHT 15 and Lean doesn't know yet
1512  // 2) WE just SOLD 15 and this notification is old
1513  // B) holdings -10 position -5
1514  // 1) WE just BOUGHT 5 and Lean doesn't know yet
1515  // 2) WE just SOLD 5 more and this notification is old
1516  // - Seen this in production already
1517  // 3) Brokerage triggered an early assignment
1518 
1519  // so we get ALL orders for this symbol that were placed or got an update in the last 'orderWindowSeconds'
1520 
1521  const int orderWindowSeconds = 10;
1522  // NOTE: We do this checks for actual live trading only to handle the race condition stated above
1523  // for actual brokerages (excluding paper trading with PaperBrokerage).
1524  // TODO: If we confirm this race condition applies for IB only, we could move this to the brokerage itself.
1525  if (_brokerageIsBacktesting ||
1526  !GetOrders(x =>
1527  x.Symbol == e.Symbol
1528  && (x.Status.IsOpen() || x.Status.IsFill() &&
1529  (Math.Abs((x.Time - nowUtc).TotalSeconds) < orderWindowSeconds
1530  || (x.LastUpdateTime.HasValue && Math.Abs((x.LastUpdateTime.Value - nowUtc).TotalSeconds) < orderWindowSeconds)
1531  || (x.LastFillTime.HasValue && Math.Abs((x.LastFillTime.Value - nowUtc).TotalSeconds) < orderWindowSeconds)))).Any())
1532  {
1533  var quantity = e.Position - security.Holdings.Quantity;
1534 
1535  var exerciseOrder = GenerateOptionExerciseOrder(security, quantity, e.Tag);
1536 
1537  EmitOptionNotificationEvents(security, exerciseOrder);
1538  }
1539  }
1540  }
1541  }
1542  }
1543  }
1544  }
1545 
1546  /// <summary>
1547  /// New brokerage-side order event handler
1548  /// </summary>
1549  private void HandleNewBrokerageSideOrder(NewBrokerageOrderNotificationEventArgs e)
1550  {
1551  void onError(IReadOnlyCollection<SecurityType> supportedSecurityTypes) =>
1552  _algorithm.Debug($"Warning: New brokerage-side order could not be processed due to " +
1553  $"it's security not being supported. Supported security types are {string.Join(", ", supportedSecurityTypes)}");
1554 
1555  if (_algorithm.BrokerageMessageHandler.HandleOrder(e) &&
1556  _algorithm.GetOrAddUnrequestedSecurity(e.Order.Symbol, out _, onError))
1557  {
1558  AddOpenOrder(e.Order, _algorithm);
1559  }
1560  }
1561 
1562  private OptionExerciseOrder GenerateOptionExerciseOrder(Security security, decimal quantity, string tag)
1563  {
1564  // generate new exercise order and ticket for the option
1565  var order = new OptionExerciseOrder(security.Symbol, quantity, CurrentTimeUtc, tag);
1566 
1567  // save current security prices
1568  order.OrderSubmissionData = new OrderSubmissionData(security.BidPrice, security.AskPrice, security.Close);
1569  order.PriceCurrency = security.SymbolProperties.QuoteCurrency;
1570 
1571  AddOpenOrder(order, _algorithm);
1572  return order;
1573  }
1574 
1575  private void EmitOptionNotificationEvents(Security security, OptionExerciseOrder order)
1576  {
1577  // generate the order events reusing the option exercise model
1578  var option = (Option)security;
1579  var orderEvents = option.OptionExerciseModel.OptionExercise(option, order);
1580 
1581  foreach (var orderEvent in orderEvents)
1582  {
1583  HandleOrderEvent(orderEvent);
1584 
1585  if (orderEvent.IsAssignment)
1586  {
1587  orderEvent.Message = order.Tag;
1588  HandlePositionAssigned(orderEvent);
1589  }
1590  }
1591  }
1592 
1593  /// <summary>
1594  /// Gets the amount of time since the last call to algorithm.Portfolio.ProcessFill(fill)
1595  /// </summary>
1596  protected virtual TimeSpan TimeSinceLastFill =>
1597  CurrentTimeUtc - new DateTime(Interlocked.Read(ref _lastFillTimeTicks));
1598 
1599  /// <summary>
1600  /// Gets current time UTC. This is here to facilitate testing
1601  /// </summary>
1602  protected virtual DateTime CurrentTimeUtc => DateTime.UtcNow;
1603 
1604  /// <summary>
1605  /// Rounds off the order towards 0 to the nearest multiple of Lot Size
1606  /// </summary>
1607  public decimal RoundOffOrder(Order order, Security security)
1608  {
1609  var orderLotMod = order.Quantity % security.SymbolProperties.LotSize;
1610 
1611  if (orderLotMod != 0)
1612  {
1613  order.Quantity = order.Quantity - orderLotMod;
1614 
1615  if (!_firstRoundOffMessage)
1616  {
1617  _algorithm.Error("Warning: Due to brokerage limitations, orders will be rounded to " +
1618  $"the nearest lot size of {security.SymbolProperties.LotSize.ToStringInvariant()}"
1619  );
1620  _firstRoundOffMessage = true;
1621  }
1622  return order.Quantity;
1623  }
1624  else
1625  {
1626  return order.Quantity;
1627  }
1628  }
1629 
1630  /// <summary>
1631  /// Rounds the order prices to its security minimum price variation.
1632  /// <remarks>
1633  /// This procedure is needed to meet brokerage precision requirements.
1634  /// </remarks>
1635  /// </summary>
1636  protected void RoundOrderPrices(Order order, Security security)
1637  {
1638  var comboIsReady = order.TryGetGroupOrders(TryGetOrder, out var orders);
1639  orders.TryGetGroupOrdersSecurities(_algorithm.Portfolio, out var securities);
1640 
1641  RoundOrderPrices(order, security, comboIsReady, securities);
1642  }
1643 
1644  /// <summary>
1645  /// Rounds the order prices to its security minimum price variation.
1646  /// <remarks>
1647  /// This procedure is needed to meet brokerage precision requirements.
1648  /// </remarks>
1649  /// </summary>
1650  protected void RoundOrderPrices(Order order, Security security, bool comboIsReady, Dictionary<Order, Security> orders)
1651  {
1652  switch (order.Type)
1653  {
1654  case OrderType.Limit:
1655  {
1656  var limitOrder = (LimitOrder)order;
1657  RoundOrderPrice(security, limitOrder.LimitPrice, "LimitPrice", (roundedPrice) => limitOrder.LimitPrice = roundedPrice);
1658  }
1659  break;
1660 
1661  case OrderType.StopMarket:
1662  {
1663  var stopMarketOrder = (StopMarketOrder)order;
1664  RoundOrderPrice(security, stopMarketOrder.StopPrice, "StopPrice", (roundedPrice) => stopMarketOrder.StopPrice = roundedPrice);
1665  }
1666  break;
1667 
1668  case OrderType.StopLimit:
1669  {
1670  var stopLimitOrder = (StopLimitOrder)order;
1671  RoundOrderPrice(security, stopLimitOrder.LimitPrice, "LimitPrice", (roundedPrice) => stopLimitOrder.LimitPrice = roundedPrice);
1672  RoundOrderPrice(security, stopLimitOrder.StopPrice, "StopPrice", (roundedPrice) => stopLimitOrder.StopPrice = roundedPrice);
1673  }
1674  break;
1675 
1676  case OrderType.TrailingStop:
1677  {
1678  var trailingStopOrder = (TrailingStopOrder)order;
1679  RoundOrderPrice(security, trailingStopOrder.StopPrice, "StopPrice",
1680  (roundedPrice) => trailingStopOrder.StopPrice = roundedPrice);
1681 
1682  if (!trailingStopOrder.TrailingAsPercentage)
1683  {
1684  RoundOrderPrice(security, trailingStopOrder.TrailingAmount, "TrailingAmount",
1685  (roundedAmount) => trailingStopOrder.TrailingAmount = roundedAmount);
1686  }
1687  }
1688  break;
1689 
1690  case OrderType.LimitIfTouched:
1691  {
1692  var limitIfTouchedOrder = (LimitIfTouchedOrder)order;
1693  RoundOrderPrice(security, limitIfTouchedOrder.LimitPrice, "LimitPrice",
1694  (roundedPrice) => limitIfTouchedOrder.LimitPrice = roundedPrice);
1695  RoundOrderPrice(security, limitIfTouchedOrder.TriggerPrice, "TriggerPrice",
1696  (roundedPrice) => limitIfTouchedOrder.TriggerPrice = roundedPrice);
1697  }
1698  break;
1699 
1700  case OrderType.ComboLegLimit:
1701  {
1702  var comboLegOrder = (ComboLegLimitOrder)order;
1703  RoundOrderPrice(security, comboLegOrder.LimitPrice, "LimitPrice",
1704  (roundedPrice) => comboLegOrder.LimitPrice = roundedPrice);
1705  }
1706  break;
1707 
1708  case OrderType.ComboLimit:
1709  {
1710  if (comboIsReady)
1711  {
1712  // all orders in the combo have been received.
1713  // we can now round the limit price of the group order,
1714  // for which we need to find the smallest price variation from each leg security
1715  var groupOrderManager = order.GroupOrderManager;
1716  var increment = 0m;
1717  foreach (var (legOrder, legSecurity) in orders)
1718  {
1719  var legIncrement = legSecurity.PriceVariationModel.GetMinimumPriceVariation(
1720  new GetMinimumPriceVariationParameters(legSecurity, legOrder.Price));
1721  if (legIncrement > 0 && (increment == 0 || legIncrement < increment))
1722  {
1723  increment = legIncrement;
1724  }
1725  }
1726 
1727  RoundOrderPrice(groupOrderManager.LimitPrice, increment, "LimitPrice",
1728  (roundedPrice) => groupOrderManager.LimitPrice = roundedPrice);
1729  }
1730 
1731  }
1732  break;
1733  }
1734  }
1735 
1736  private void RoundOrderPrice(Security security, decimal price, string priceType, Action<decimal> setPrice)
1737  {
1738  var increment = security.PriceVariationModel.GetMinimumPriceVariation(new GetMinimumPriceVariationParameters(security, price));
1739  RoundOrderPrice(price, increment, priceType, setPrice);
1740  }
1741 
1742  [MethodImpl(MethodImplOptions.AggressiveInlining)]
1743  private void RoundOrderPrice(decimal price, decimal increment, string priceType, Action<decimal> setPrice)
1744  {
1745  if (increment > 0)
1746  {
1747  var roundedPrice = Math.Round(price / increment) * increment;
1748  setPrice(roundedPrice);
1749  SendWarningOnPriceChange(priceType, roundedPrice, price);
1750  }
1751  }
1752 
1753  private Order TryGetOrder(int orderId)
1754  {
1755  _completeOrders.TryGetValue(orderId, out var order);
1756  return order;
1757  }
1758 
1759  private void InvalidateOrders(List<Order> orders, string message)
1760  {
1761  for (var i = 0; i < orders.Count; i++)
1762  {
1763  var orderInGroup = orders[i];
1764  if (!orderInGroup.Status.IsClosed())
1765  {
1766  orderInGroup.Status = OrderStatus.Invalid;
1767  }
1768  HandleOrderEvents(new List<OrderEvent> { new OrderEvent(orderInGroup, _algorithm.UtcTime, OrderFee.Zero, message) });
1769  }
1770  }
1771 
1772  private void SendWarningOnPriceChange(string priceType, decimal priceRound, decimal priceOriginal)
1773  {
1774  if (!priceOriginal.Equals(priceRound))
1775  {
1776  _algorithm.Error(
1777  $"Warning: To meet brokerage precision requirements, order {priceType.ToStringInvariant()} was rounded to {priceRound.ToStringInvariant()} from {priceOriginal.ToStringInvariant()}"
1778  );
1779  }
1780  }
1781 
1782  private string GetShortableErrorMessage(Symbol symbol, decimal quantity)
1783  {
1784  var shortableQuantity = _algorithm.ShortableQuantity(symbol);
1785  return $"Order exceeds shortable quantity {shortableQuantity} for Symbol {symbol} requested {quantity})";
1786  }
1787  }
1788 }
1789