@@ -75,6 +75,8 @@ typedef unsigned char byte_t;
7575
7676#define FOREACH_IFACE (i ) for (size_t i = 0; i < CANARD_IFACE_COUNT; i++)
7777
78+ #define TREE_NULL (canard_tree_t){ NULL, { NULL, NULL }, 0 }
79+
7880typedef enum transfer_kind_t
7981{
8082 transfer_kind_message = 0 ,
@@ -521,7 +523,8 @@ struct canard_txfer_t
521523 /// Memory-efficient but not very fast indexes optimized for a low number of pending transfers.
522524 canard_listed_t list_pending [CANARD_IFACE_COUNT ];
523525 canard_listed_t list_delayed ;
524- canard_listed_t list_oldest ;
526+ canard_listed_t list_agewise ;
527+ canard_tree_t index_reliable ; ///< Ordered by (topic hash, transfer-ID), contains ALL pending reliable transfers.
525528
526529 /// Mutable transmission state. All other fields, except for the index handles, are immutable.
527530 ///
@@ -576,8 +579,9 @@ static canard_txfer_t* txfer_new(const canard_mem_t mem,
576579 FOREACH_IFACE (i ) {
577580 tr -> list_pending [i ] = LIST_NULL ;
578581 }
579- tr -> list_delayed = LIST_NULL ;
580- tr -> list_oldest = LIST_NULL ;
582+ tr -> list_delayed = LIST_NULL ;
583+ tr -> list_agewise = LIST_NULL ;
584+ tr -> index_reliable = TREE_NULL ;
581585 //
582586 FOREACH_IFACE (i ) {
583587 tr -> head [i ] = tr -> cursor [i ] = NULL ;
@@ -686,8 +690,8 @@ static void txfer_retire(canard_t* const self, canard_txfer_t* const tr, const b
686690 const bool backlogged = txfer_is_backlogged (tr );
687691
688692 // Delist everywhere. Remember that delisting a non-listed entity is a safe no-op.
689- if (self -> tx .iterator [ reliable ] == tr ) {
690- self -> tx .iterator [ reliable ] = LIST_NEXT (tr , canard_txfer_t , list_oldest ); // May be NULL, is OK.
693+ if (self -> tx .iter == tr ) {
694+ self -> tx .iter = LIST_NEXT (tr , canard_txfer_t , list_agewise ); // May be NULL, is OK.
691695 }
692696 FOREACH_IFACE (i ) {
693697 if ((tr -> iface_bitmap & (1U << i )) != 0 ) {
@@ -698,8 +702,12 @@ static void txfer_retire(canard_t* const self, canard_txfer_t* const tr, const b
698702 }
699703 }
700704 }
701- delist (& self -> tx .oldest [reliable ], & tr -> list_oldest );
702705 delist (& self -> tx .delayed [shard ], & tr -> list_delayed );
706+ delist (& self -> tx .agewise , & tr -> list_agewise );
707+ if (reliable ) {
708+ CANARD_ASSERT (cavl2_is_inserted (self -> tx .reliable , & tr -> index_reliable ));
709+ cavl2_remove (& self -> tx .reliable , & tr -> index_reliable );
710+ }
703711
704712 // If the transfer was not backlogged, it may have been blocking other transfers, but only if it is reliable.
705713 // Best-effort transfers do not block anything because in the presence of a stalled interface they may take a
@@ -894,20 +902,32 @@ static tx_frame_t* tx_spool_v0(canard_t* const self,
894902 return head ;
895903}
896904
905+ /// To check all transfers under a specific topic hash, use lower bound lookup with transfer_id=0 and
906+ /// then iterate until the topic hash changes.
907+ /// This is O(log n) because the number of distinct transfer-ID in Cyphal/CAN is bounded at a low value.
908+ typedef struct
909+ {
910+ uint64_t topic_hash ;
911+ byte_t transfer_id ;
912+ } txfer_key_t ;
913+
914+ static int32_t tx_cavl_compare_reliable (const void * const user , const canard_tree_t * const node )
915+ {
916+ const txfer_key_t * const key = (const txfer_key_t * )user ;
917+ const canard_txfer_t * const tr = CAVL2_TO_OWNER (node , canard_txfer_t , index_reliable ); // clang-format off
918+ if (key -> topic_hash < tr -> topic_hash ) { return -1 ; }
919+ if (key -> topic_hash > tr -> topic_hash ) { return +1 ; } // clang-format on
920+ return (int32_t )key -> transfer_id - (int32_t )tr -> remote_transfer_id ;
921+ }
922+
897923/// When the queue is exhausted, finds a transfer to sacrifice using simple heuristics and returns it.
898924/// Will return NULL if there are no transfers worth sacrificing (no queue space can be reclaimed).
899925/// We cannot simply stop accepting new transfers when the queue is full, because it may be caused by a single
900926/// stalled interface holding back progress for all transfers.
901927/// The heuristics are subject to review and improvement.
902928static canard_txfer_t * tx_sacrifice (const canard_t * const self )
903929{
904- // A best-effort transfer can be stuck for a long time if there is a stalled redundant interface.
905- // A single stalled interface does not affect reliable transfers because the first ack will free them.
906- canard_txfer_t * tr = LIST_HEAD (self -> tx .oldest [0 ], canard_txfer_t , list_oldest ); // best-effort first
907- if (tr == NULL ) {
908- tr = LIST_HEAD (self -> tx .oldest [1 ], canard_txfer_t , list_oldest );
909- }
910- return tr ;
930+ return LIST_HEAD (self -> tx .agewise , canard_txfer_t , list_agewise );
911931}
912932
913933/// True on success, false if not possible to reclaim enough space.
@@ -978,6 +998,21 @@ static bool tx_push(canard_t* const self,
978998 // if they have the same arbitration priority as the new transfer, they should get a chance to go first.
979999 tx_promote_delayed (self , now );
9801000
1001+ // Ensure there are no duplicate reliable transfers with the same (topic hash, transfer-ID).
1002+ // Insert at the same time to avoid double tree walk.
1003+ if (tr -> reliable ) {
1004+ const txfer_key_t key = { .topic_hash = tr -> topic_hash , .transfer_id = tr -> transfer_id };
1005+ canard_txfer_t * const rel = CAVL2_TO_OWNER (
1006+ cavl2_find_or_insert (& self -> tx .reliable , & key , tx_cavl_compare_reliable , tr , cavl2_trivial_factory ),
1007+ canard_txfer_t ,
1008+ index_reliable );
1009+ if (rel != tr ) { // Duplicate found.
1010+ mem_free (self -> mem .tx_transfer , sizeof (canard_txfer_t ), tr );
1011+ self -> err .tx_duplicate ++ ;
1012+ return false;
1013+ }
1014+ }
1015+
9811016 // Ensure the queue has enough space. v0 transfers always use Classic CAN regardless of tr->fd.
9821017 const size_t mtu = tr -> fd ? CANARD_MTU_CAN_FD : CANARD_MTU_CAN_CLASSIC ;
9831018 const size_t size = bytes_chain_size (payload ); // TODO: pass the precomputed size into spool functions
@@ -986,6 +1021,10 @@ static bool tx_push(canard_t* const self,
9861021 if (!tx_ensure_queue_space (self , n_frames )) {
9871022 mem_free (self -> mem .tx_transfer , sizeof (canard_txfer_t ), tr );
9881023 self -> err .tx_capacity ++ ;
1024+ if (tr -> reliable ) {
1025+ CANARD_ASSERT (cavl2_is_inserted (self -> tx .reliable , & tr -> index_reliable ));
1026+ cavl2_remove (& self -> tx .reliable , & tr -> index_reliable );
1027+ }
9891028 return false;
9901029 }
9911030
@@ -996,6 +1035,10 @@ static bool tx_push(canard_t* const self,
9961035 if (spool == NULL ) {
9971036 mem_free (self -> mem .tx_transfer , sizeof (canard_txfer_t ), tr );
9981037 self -> err .oom ++ ;
1038+ if (tr -> reliable ) {
1039+ CANARD_ASSERT (cavl2_is_inserted (self -> tx .reliable , & tr -> index_reliable ));
1040+ cavl2_remove (& self -> tx .reliable , & tr -> index_reliable );
1041+ }
9991042 return false;
10001043 }
10011044 CANARD_ASSERT ((self -> tx .queue_size - queue_size_before ) == n_frames );
@@ -1011,8 +1054,8 @@ static bool tx_push(canard_t* const self,
10111054 }
10121055 }
10131056
1014- // Insert into the oldest list .
1015- enlist_tail (& self -> tx .oldest [ tr -> reliable ] , & tr -> list_oldest );
1057+ // Register the transfer .
1058+ enlist_tail (& self -> tx .agewise , & tr -> list_agewise );
10161059
10171060 // We need to ensure that transfers emitted at the same priority level are send strictly in push order.
10181061 // For best-effort transfers this is trivial as all we need to do is to enqueue them as-is.
@@ -1028,29 +1071,23 @@ static bool tx_push(canard_t* const self,
10281071 // If there are no such transfers, it can be enqueued immediately; observe that there may still be some reliable
10291072 // transfers waiting for ack, but if such transfers are neither pending nor delayed, it means that they have
10301073 // completed the last transmission attempt, no further attempts will be made, and thus no reordering can occur.
1031- const byte_t shard = txfer_shard (tr );
1032- bool has_preceding_reliable = false;
1033- FOREACH_IFACE (i ) {
1034- LIST_FIND_FIRST (self -> tx .pending [shard ][i ],
1035- canard_txfer_t ,
1036- list_pending [i ], // check if this is standard-compliant
1037- match ,
1038- (match -> topic_hash == tr -> topic_hash ) && match -> reliable );
1039- if (match != NULL ) {
1040- has_preceding_reliable = true;
1041- break ;
1074+ bool has_preceding_reliable = false;
1075+ {
1076+ const txfer_key_t key = { .topic_hash = tr -> topic_hash , .transfer_id = 0 };
1077+ canard_txfer_t * rel = CAVL2_TO_OWNER (
1078+ cavl2_lower_bound (self -> tx .reliable , & key , tx_cavl_compare_reliable ), canard_txfer_t , index_reliable );
1079+ while ((rel != NULL ) && (rel -> topic_hash == tr -> topic_hash )) {
1080+ // Ignore ourselves and transfers that will no longer be retransmitted.
1081+ if ((rel != tr ) && (rel -> delayed_until > BIG_BANG )) {
1082+ has_preceding_reliable = true;
1083+ break ;
1084+ }
1085+ rel = CAVL2_TO_OWNER (cavl2_next_greater (& rel -> index_reliable ), canard_txfer_t , index_reliable );
10421086 }
10431087 }
1044- if (!has_preceding_reliable ) {
1045- LIST_FIND_FIRST (self -> tx .delayed [shard ],
1046- canard_txfer_t ,
1047- list_delayed ,
1048- match ,
1049- (match -> topic_hash == tr -> topic_hash ) && match -> reliable );
1050- has_preceding_reliable = (match != NULL );
1051- }
10521088
10531089 // Schedule for transmission or backlog depending on the findings above.
1090+ const byte_t shard = txfer_shard (tr );
10541091 if (has_preceding_reliable ) { // into the backlog you go, buddy
10551092 tr -> delayed_until = HEAT_DEATH ;
10561093 enlist_tail (& self -> tx .delayed [shard ], & tr -> list_delayed ); // FIFO, newest at the tail.
@@ -1069,20 +1106,22 @@ static bool tx_push(canard_t* const self,
10691106}
10701107
10711108/// Handle an ACK received from a remote node.
1072- static void tx_receive_ack (canard_t * const self , const uint64_t topic_hash , const byte_t transfer_id )
1109+ static void tx_receive_ack (canard_t * const self , const uint64_t topic_hash_lower_bound , const byte_t transfer_id )
10731110{
1074- // Scan from oldest because they are the most likely to match.
1075- // We are not expected to hold a large number of pending reliable transfers, so a linear search is acceptable --
1076- // up to a couple dozen transfers should be at least on par with a BST lookup.
1077- // If this ever becomes a problem, we can add a separate index for pending reliable transfers keyed by topic hash.
1078- LIST_FIND_FIRST (self -> tx .oldest [1 ], // Search reliable only; best-effort transfers are not in this list.
1079- canard_txfer_t ,
1080- list_oldest ,
1081- tr , // Backlogged transfers may possibly have conflicting transfer-ID.
1082- (tr -> topic_hash == topic_hash ) && (tr -> transfer_id == transfer_id ) && !txfer_is_backlogged (tr ));
1083- if (tr != NULL ) {
1084- CANARD_ASSERT (tr -> reliable && (tr -> topic_hash == topic_hash ) && (tr -> transfer_id == transfer_id ));
1085- txfer_retire (self , tr , true);
1111+ const txfer_key_t key = { .topic_hash = topic_hash_lower_bound , .transfer_id = 0 };
1112+ canard_txfer_t * tr = CAVL2_TO_OWNER (cavl2_lower_bound (self -> tx .reliable , & key , tx_cavl_compare_reliable ), //
1113+ canard_txfer_t ,
1114+ index_reliable );
1115+ if ((tr == NULL ) || ((tr -> topic_hash & CANARD_P2P_TOPIC_HASH_LOWER_BOUND_MASK ) != topic_hash_lower_bound )) {
1116+ return ;
1117+ }
1118+ // Linear scan to find the matching transfer ID. This is bounded by the max transfer ID count (32).
1119+ while ((tr != NULL ) && ((tr -> topic_hash & CANARD_P2P_TOPIC_HASH_LOWER_BOUND_MASK ) == topic_hash_lower_bound )) {
1120+ if (tr -> remote_transfer_id == transfer_id ) { // Found!
1121+ txfer_retire (self , tr , true);
1122+ break ;
1123+ }
1124+ tr = CAVL2_TO_OWNER (cavl2_next_greater (& tr -> index_reliable ), canard_txfer_t , index_reliable );
10861125 }
10871126}
10881127
0 commit comments