From 326d719a491b0bdeeca51b003de9787303480b4c Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sun, 28 Dec 2025 23:46:39 +0100 Subject: [PATCH] Force synchronous processing for entire announce logic flow --- RNS/Transport.py | 581 ++++++++++++++++++++++++----------------------- 1 file changed, 295 insertions(+), 286 deletions(-) diff --git a/RNS/Transport.py b/RNS/Transport.py index cf2bb78..ee2ec80 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -37,6 +37,7 @@ import struct import inspect import threading from time import sleep +from threading import Lock from .vendor import umsgpack as umsgpack from RNS.Interfaces.BackboneInterface import BackboneInterface @@ -154,6 +155,7 @@ class Transport: tables_cull_interval = 5.0 interface_last_jobs = 0.0 interface_jobs_interval = 5.0 + inbound_announce_lock = Lock() traffic_rxb = 0 traffic_txb = 0 @@ -1524,325 +1526,332 @@ class Transport: random_blob = packet.data[RNS.Identity.KEYSIZE//8+RNS.Identity.NAME_HASH_LENGTH//8:RNS.Identity.KEYSIZE//8+RNS.Identity.NAME_HASH_LENGTH//8+10] random_blobs = [] - if packet.destination_hash in Transport.path_table: - random_blobs = Transport.path_table[packet.destination_hash][IDX_PT_RANDBLOBS] + with Transport.inbound_announce_lock: + if packet.destination_hash in Transport.path_table: + random_blobs = Transport.path_table[packet.destination_hash][IDX_PT_RANDBLOBS] - # If we already have a path to the announced - # destination, but the hop count is equal or - # less, we'll update our tables. - if packet.hops <= Transport.path_table[packet.destination_hash][IDX_PT_HOPS]: - # Make sure we haven't heard the random - # blob before, so announces can't be - # replayed to forge paths. - # TODO: Check whether this approach works - # under all circumstances - path_timebase = Transport.timebase_from_random_blobs(random_blobs) - if not random_blob in random_blobs and announce_emitted > path_timebase: - Transport.mark_path_unknown_state(packet.destination_hash) - should_add = True - else: - should_add = False - else: - # If an announce arrives with a larger hop - # count than we already have in the table, - # ignore it, unless the path is expired, or - # the emission timestamp is more recent. - now = time.time() - path_expires = Transport.path_table[packet.destination_hash][IDX_PT_EXPIRES] - - path_announce_emitted = 0 - for path_random_blob in random_blobs: - path_announce_emitted = max(path_announce_emitted, int.from_bytes(path_random_blob[5:10], "big")) - if path_announce_emitted >= announce_emitted: - break - - # If the path has expired, consider this - # announce for adding to the path table. - if (now >= path_expires): - # We check that the announce is - # different from ones we've already heard, - # to avoid loops in the network - if not random_blob in random_blobs: - # TODO: Check that this ^ approach actually - # works under all circumstances - RNS.log("Replacing destination table entry for "+str(RNS.prettyhexrep(packet.destination_hash))+" with new announce due to expired path", RNS.LOG_DEBUG) + # If we already have a path to the announced + # destination, but the hop count is equal or + # less, we'll update our tables. + if packet.hops <= Transport.path_table[packet.destination_hash][IDX_PT_HOPS]: + # Make sure we haven't heard the random + # blob before, so announces can't be + # replayed to forge paths. + # TODO: Check whether this approach works + # under all circumstances + path_timebase = Transport.timebase_from_random_blobs(random_blobs) + if not random_blob in random_blobs and announce_emitted > path_timebase: Transport.mark_path_unknown_state(packet.destination_hash) should_add = True else: should_add = False else: - # If the path is not expired, but the emission - # is more recent, and we haven't already heard - # this announce before, update the path table. - if (announce_emitted > path_announce_emitted): + # If an announce arrives with a larger hop + # count than we already have in the table, + # ignore it, unless the path is expired, or + # the emission timestamp is more recent. + now = time.time() + path_expires = Transport.path_table[packet.destination_hash][IDX_PT_EXPIRES] + + path_announce_emitted = 0 + for path_random_blob in random_blobs: + path_announce_emitted = max(path_announce_emitted, int.from_bytes(path_random_blob[5:10], "big")) + if path_announce_emitted >= announce_emitted: + break + + # If the path has expired, consider this + # announce for adding to the path table. + if (now >= path_expires): + # We check that the announce is + # different from ones we've already heard, + # to avoid loops in the network if not random_blob in random_blobs: - RNS.log("Replacing destination table entry for "+str(RNS.prettyhexrep(packet.destination_hash))+" with new announce, since it was more recently emitted", RNS.LOG_DEBUG) + # TODO: Check that this ^ approach actually + # works under all circumstances + RNS.log("Replacing destination table entry for "+str(RNS.prettyhexrep(packet.destination_hash))+" with new announce due to expired path", RNS.LOG_DEBUG) Transport.mark_path_unknown_state(packet.destination_hash) should_add = True else: should_add = False - - # If we have already heard this announce before, - # but the path has been marked as unresponsive - # by a failed communications attempt or similar, - # allow updating the path table to this one. - elif announce_emitted == path_announce_emitted: - if Transport.path_is_unresponsive(packet.destination_hash): - RNS.log("Replacing destination table entry for "+str(RNS.prettyhexrep(packet.destination_hash))+" with new announce, since previously tried path was unresponsive", RNS.LOG_DEBUG) - should_add = True - else: - should_add = False + else: + # If the path is not expired, but the emission + # is more recent, and we haven't already heard + # this announce before, update the path table. + if (announce_emitted > path_announce_emitted): + if not random_blob in random_blobs: + RNS.log("Replacing destination table entry for "+str(RNS.prettyhexrep(packet.destination_hash))+" with new announce, since it was more recently emitted", RNS.LOG_DEBUG) + Transport.mark_path_unknown_state(packet.destination_hash) + should_add = True + else: + should_add = False + + # If we have already heard this announce before, + # but the path has been marked as unresponsive + # by a failed communications attempt or similar, + # allow updating the path table to this one. + elif announce_emitted == path_announce_emitted: + if Transport.path_is_unresponsive(packet.destination_hash): + RNS.log("Replacing destination table entry for "+str(RNS.prettyhexrep(packet.destination_hash))+" with new announce, since previously tried path was unresponsive", RNS.LOG_DEBUG) + should_add = True + else: + should_add = False - else: - # If this destination is unknown in our table - # we should add it - should_add = True + else: + # If this destination is unknown in our table + # we should add it + should_add = True - if should_add: - now = time.time() + if should_add: + now = time.time() - rate_blocked = False - if packet.context != RNS.Packet.PATH_RESPONSE and packet.receiving_interface.announce_rate_target != None: - if not packet.destination_hash in Transport.announce_rate_table: - rate_entry = { "last": now, "rate_violations": 0, "blocked_until": 0, "timestamps": [now]} - Transport.announce_rate_table[packet.destination_hash] = rate_entry - - else: - rate_entry = Transport.announce_rate_table[packet.destination_hash] - rate_entry["timestamps"].append(now) - - while len(rate_entry["timestamps"]) > Transport.MAX_RATE_TIMESTAMPS: - rate_entry["timestamps"].pop(0) - - current_rate = now - rate_entry["last"] - - if now > rate_entry["blocked_until"]: - if current_rate < packet.receiving_interface.announce_rate_target: rate_entry["rate_violations"] += 1 - else: rate_entry["rate_violations"] = max(0, rate_entry["rate_violations"]-1) - - if rate_entry["rate_violations"] > packet.receiving_interface.announce_rate_grace: - rate_target = packet.receiving_interface.announce_rate_target - rate_penalty = packet.receiving_interface.announce_rate_penalty - rate_entry["blocked_until"] = rate_entry["last"] + rate_target + rate_penalty - rate_blocked = True - else: - rate_entry["last"] = now + rate_blocked = False + if packet.context != RNS.Packet.PATH_RESPONSE and packet.receiving_interface.announce_rate_target != None: + if not packet.destination_hash in Transport.announce_rate_table: + rate_entry = { "last": now, "rate_violations": 0, "blocked_until": 0, "timestamps": [now]} + Transport.announce_rate_table[packet.destination_hash] = rate_entry else: - rate_blocked = True - + rate_entry = Transport.announce_rate_table[packet.destination_hash] + rate_entry["timestamps"].append(now) - retries = 0 - announce_hops = packet.hops - local_rebroadcasts = 0 - block_rebroadcasts = False - attached_interface = None - - retransmit_timeout = now + (RNS.rand() * Transport.PATHFINDER_RW) + while len(rate_entry["timestamps"]) > Transport.MAX_RATE_TIMESTAMPS: + rate_entry["timestamps"].pop(0) - if hasattr(packet.receiving_interface, "mode") and packet.receiving_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT: - expires = now + Transport.AP_PATH_TIME - elif hasattr(packet.receiving_interface, "mode") and packet.receiving_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING: - expires = now + Transport.ROAMING_PATH_TIME - else: - expires = now + Transport.PATHFINDER_E - - if not random_blob in random_blobs: - random_blobs.append(random_blob) - random_blobs = random_blobs[-Transport.MAX_RANDOM_BLOBS:] + current_rate = now - rate_entry["last"] - if (RNS.Reticulum.transport_enabled() or Transport.from_local_client(packet)) and packet.context != RNS.Packet.PATH_RESPONSE: - # Insert announce into announce table for retransmission + if now > rate_entry["blocked_until"]: + if current_rate < packet.receiving_interface.announce_rate_target: rate_entry["rate_violations"] += 1 + else: rate_entry["rate_violations"] = max(0, rate_entry["rate_violations"]-1) - if rate_blocked: - RNS.log("Blocking rebroadcast of announce from "+RNS.prettyhexrep(packet.destination_hash)+" due to excessive announce rate", RNS.LOG_DEBUG) + if rate_entry["rate_violations"] > packet.receiving_interface.announce_rate_grace: + rate_target = packet.receiving_interface.announce_rate_target + rate_penalty = packet.receiving_interface.announce_rate_penalty + rate_entry["blocked_until"] = rate_entry["last"] + rate_target + rate_penalty + rate_blocked = True + else: + rate_entry["last"] = now + + else: + rate_blocked = True + + + retries = 0 + announce_hops = packet.hops + local_rebroadcasts = 0 + block_rebroadcasts = False + attached_interface = None + retransmit_timeout = now + (RNS.rand() * Transport.PATHFINDER_RW) + + if hasattr(packet.receiving_interface, "mode") and packet.receiving_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT: + expires = now + Transport.AP_PATH_TIME + elif hasattr(packet.receiving_interface, "mode") and packet.receiving_interface.mode == RNS.Interfaces.Interface.Interface.MODE_ROAMING: + expires = now + Transport.ROAMING_PATH_TIME else: - if Transport.from_local_client(packet): - # If the announce is from a local client, - # it is announced immediately, but only one time. + expires = now + Transport.PATHFINDER_E + + if not random_blob in random_blobs: + random_blobs.append(random_blob) + random_blobs = random_blobs[-Transport.MAX_RANDOM_BLOBS:] + + if (RNS.Reticulum.transport_enabled() or Transport.from_local_client(packet)) and packet.context != RNS.Packet.PATH_RESPONSE: + # Insert announce into announce table for retransmission + + if rate_blocked: + RNS.log("Blocking rebroadcast of announce from "+RNS.prettyhexrep(packet.destination_hash)+" due to excessive announce rate", RNS.LOG_DEBUG) + + else: + if Transport.from_local_client(packet): + # If the announce is from a local client, + # it is announced immediately, but only one time. + retransmit_timeout = now + retries = Transport.PATHFINDER_R + + Transport.announce_table[packet.destination_hash] = [ + now, # 0: IDX_AT_TIMESTAMP + retransmit_timeout, # 1: IDX_AT_RTRNS_TMO + retries, # 2: IDX_AT_RETRIES + received_from, # 3: IDX_AT_RCVD_IF + announce_hops, # 4: IDX_AT_HOPS + packet, # 5: IDX_AT_PACKET + local_rebroadcasts, # 6: IDX_AT_LCL_RBRD + block_rebroadcasts, # 7: IDX_AT_BLCK_RBRD + attached_interface, # 8: IDX_AT_ATTCHD_IF + ] + + # TODO: Check from_local_client once and store result + elif Transport.from_local_client(packet) and packet.context == RNS.Packet.PATH_RESPONSE: + # If this is a path response from a local client, + # check if any external interfaces have pending + # path requests. + if packet.destination_hash in Transport.pending_local_path_requests: + desiring_interface = Transport.pending_local_path_requests.pop(packet.destination_hash) retransmit_timeout = now retries = Transport.PATHFINDER_R - Transport.announce_table[packet.destination_hash] = [ - now, # 0: IDX_AT_TIMESTAMP - retransmit_timeout, # 1: IDX_AT_RTRNS_TMO - retries, # 2: IDX_AT_RETRIES - received_from, # 3: IDX_AT_RCVD_IF - announce_hops, # 4: IDX_AT_HOPS - packet, # 5: IDX_AT_PACKET - local_rebroadcasts, # 6: IDX_AT_LCL_RBRD - block_rebroadcasts, # 7: IDX_AT_BLCK_RBRD - attached_interface, # 8: IDX_AT_ATTCHD_IF - ] + Transport.announce_table[packet.destination_hash] = [ + now, + retransmit_timeout, + retries, + received_from, + announce_hops, + packet, + local_rebroadcasts, + block_rebroadcasts, + attached_interface + ] - # TODO: Check from_local_client once and store result - elif Transport.from_local_client(packet) and packet.context == RNS.Packet.PATH_RESPONSE: - # If this is a path response from a local client, - # check if any external interfaces have pending - # path requests. - if packet.destination_hash in Transport.pending_local_path_requests: - desiring_interface = Transport.pending_local_path_requests.pop(packet.destination_hash) - retransmit_timeout = now - retries = Transport.PATHFINDER_R - - Transport.announce_table[packet.destination_hash] = [ - now, - retransmit_timeout, - retries, - received_from, - announce_hops, - packet, - local_rebroadcasts, - block_rebroadcasts, - attached_interface - ] - - # If we have any local clients connected, we re- - # transmit the announce to them immediately - if (len(Transport.local_client_interfaces)): - announce_identity = RNS.Identity.recall(packet.destination_hash) - announce_destination = RNS.Destination(announce_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "unknown", "unknown"); - announce_destination.hash = packet.destination_hash - announce_destination.hexhash = announce_destination.hash.hex() - announce_context = RNS.Packet.NONE - announce_data = packet.data - - # TODO: Shouldn't the context be PATH_RESPONSE in the first case here? - if Transport.from_local_client(packet) and packet.context == RNS.Packet.PATH_RESPONSE: - for local_interface in Transport.local_client_interfaces: - if packet.receiving_interface != local_interface: - new_announce = RNS.Packet( - announce_destination, - announce_data, - RNS.Packet.ANNOUNCE, - context = announce_context, - header_type = RNS.Packet.HEADER_2, - transport_type = Transport.TRANSPORT, - transport_id = Transport.identity.hash, - attached_interface = local_interface, - context_flag = packet.context_flag, - ) - - new_announce.hops = packet.hops - new_announce.send() - - else: - for local_interface in Transport.local_client_interfaces: - if packet.receiving_interface != local_interface: - new_announce = RNS.Packet( - announce_destination, - announce_data, - RNS.Packet.ANNOUNCE, - context = announce_context, - header_type = RNS.Packet.HEADER_2, - transport_type = Transport.TRANSPORT, - transport_id = Transport.identity.hash, - attached_interface = local_interface, - context_flag = packet.context_flag, - ) - - new_announce.hops = packet.hops - new_announce.send() - - # If we have any waiting discovery path requests - # for this destination, we retransmit to that - # interface immediately - if packet.destination_hash in Transport.discovery_path_requests: - pr_entry = Transport.discovery_path_requests[packet.destination_hash] - attached_interface = pr_entry["requesting_interface"] - - interface_str = " on "+str(attached_interface) - - RNS.log("Got matching announce, answering waiting discovery path request for "+RNS.prettyhexrep(packet.destination_hash)+interface_str, RNS.LOG_DEBUG) - announce_identity = RNS.Identity.recall(packet.destination_hash) - announce_destination = RNS.Destination(announce_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "unknown", "unknown"); - announce_destination.hash = packet.destination_hash - announce_destination.hexhash = announce_destination.hash.hex() - announce_context = RNS.Packet.NONE - announce_data = packet.data - - new_announce = RNS.Packet( - announce_destination, - announce_data, - RNS.Packet.ANNOUNCE, - context = RNS.Packet.PATH_RESPONSE, - header_type = RNS.Packet.HEADER_2, - transport_type = Transport.TRANSPORT, - transport_id = Transport.identity.hash, - attached_interface = attached_interface, - context_flag = packet.context_flag, - ) - - new_announce.hops = packet.hops - new_announce.send() - - if not Transport.owner.is_connected_to_shared_instance: Transport.cache(packet, force_cache=True, packet_type="announce") - path_table_entry = [now, received_from, announce_hops, expires, random_blobs, packet.receiving_interface, packet.packet_hash] - Transport.path_table[packet.destination_hash] = path_table_entry - RNS.log("Destination "+RNS.prettyhexrep(packet.destination_hash)+" is now "+str(announce_hops)+" hops away via "+RNS.prettyhexrep(received_from)+" on "+str(packet.receiving_interface), RNS.LOG_DEBUG) - - # If the receiving interface is a tunnel, we add the - # announce to the tunnels table - if hasattr(packet.receiving_interface, "tunnel_id") and packet.receiving_interface.tunnel_id != None: - tunnel_entry = Transport.tunnels[packet.receiving_interface.tunnel_id] - paths = tunnel_entry[IDX_TT_PATHS] - paths[packet.destination_hash] = [now, received_from, announce_hops, expires, random_blobs, None, packet.packet_hash] - expires = time.time() + Transport.DESTINATION_TIMEOUT - tunnel_entry[IDX_TT_EXPIRES] = expires - RNS.log("Path to "+RNS.prettyhexrep(packet.destination_hash)+" associated with tunnel "+RNS.prettyhexrep(packet.receiving_interface.tunnel_id), RNS.LOG_DEBUG) - - # Call externally registered callbacks from apps - # wanting to know when an announce arrives - for handler in Transport.announce_handlers: - try: - # Check that the announced destination matches - # the handlers aspect filter - execute_callback = False + # If we have any local clients connected, we re- + # transmit the announce to them immediately + if (len(Transport.local_client_interfaces)): announce_identity = RNS.Identity.recall(packet.destination_hash) - if handler.aspect_filter == None: - # If the handlers aspect filter is set to - # None, we execute the callback in all cases - execute_callback = True + announce_destination = RNS.Destination(announce_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "unknown", "unknown"); + announce_destination.hash = packet.destination_hash + announce_destination.hexhash = announce_destination.hash.hex() + announce_context = RNS.Packet.NONE + announce_data = packet.data + + # TODO: Shouldn't the context be PATH_RESPONSE in the first case here? + if Transport.from_local_client(packet) and packet.context == RNS.Packet.PATH_RESPONSE: + for local_interface in Transport.local_client_interfaces: + if packet.receiving_interface != local_interface: + new_announce = RNS.Packet( + announce_destination, + announce_data, + RNS.Packet.ANNOUNCE, + context = announce_context, + header_type = RNS.Packet.HEADER_2, + transport_type = Transport.TRANSPORT, + transport_id = Transport.identity.hash, + attached_interface = local_interface, + context_flag = packet.context_flag, + ) + + new_announce.hops = packet.hops + new_announce.send() + else: - handler_expected_hash = RNS.Destination.hash_from_name_and_identity(handler.aspect_filter, announce_identity) - if packet.destination_hash == handler_expected_hash: + for local_interface in Transport.local_client_interfaces: + if packet.receiving_interface != local_interface: + new_announce = RNS.Packet( + announce_destination, + announce_data, + RNS.Packet.ANNOUNCE, + context = announce_context, + header_type = RNS.Packet.HEADER_2, + transport_type = Transport.TRANSPORT, + transport_id = Transport.identity.hash, + attached_interface = local_interface, + context_flag = packet.context_flag, + ) + + new_announce.hops = packet.hops + new_announce.send() + + # If we have any waiting discovery path requests + # for this destination, we retransmit to that + # interface immediately + if packet.destination_hash in Transport.discovery_path_requests: + pr_entry = Transport.discovery_path_requests[packet.destination_hash] + attached_interface = pr_entry["requesting_interface"] + + interface_str = " on "+str(attached_interface) + + RNS.log("Got matching announce, answering waiting discovery path request for "+RNS.prettyhexrep(packet.destination_hash)+interface_str, RNS.LOG_DEBUG) + announce_identity = RNS.Identity.recall(packet.destination_hash) + announce_destination = RNS.Destination(announce_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "unknown", "unknown"); + announce_destination.hash = packet.destination_hash + announce_destination.hexhash = announce_destination.hash.hex() + announce_context = RNS.Packet.NONE + announce_data = packet.data + + new_announce = RNS.Packet( + announce_destination, + announce_data, + RNS.Packet.ANNOUNCE, + context = RNS.Packet.PATH_RESPONSE, + header_type = RNS.Packet.HEADER_2, + transport_type = Transport.TRANSPORT, + transport_id = Transport.identity.hash, + attached_interface = attached_interface, + context_flag = packet.context_flag, + ) + + new_announce.hops = packet.hops + new_announce.send() + + if not Transport.owner.is_connected_to_shared_instance: Transport.cache(packet, force_cache=True, packet_type="announce") + path_table_entry = [now, received_from, announce_hops, expires, random_blobs, packet.receiving_interface, packet.packet_hash] + Transport.path_table[packet.destination_hash] = path_table_entry + RNS.log("Destination "+RNS.prettyhexrep(packet.destination_hash)+" is now "+str(announce_hops)+" hops away via "+RNS.prettyhexrep(received_from)+" on "+str(packet.receiving_interface), RNS.LOG_DEBUG) + + # If the receiving interface is a tunnel, we add the + # announce to the tunnels table + if hasattr(packet.receiving_interface, "tunnel_id") and packet.receiving_interface.tunnel_id != None: + tunnel_entry = Transport.tunnels[packet.receiving_interface.tunnel_id] + paths = tunnel_entry[IDX_TT_PATHS] + paths[packet.destination_hash] = [now, received_from, announce_hops, expires, random_blobs, None, packet.packet_hash] + expires = time.time() + Transport.DESTINATION_TIMEOUT + tunnel_entry[IDX_TT_EXPIRES] = expires + RNS.log("Path to "+RNS.prettyhexrep(packet.destination_hash)+" associated with tunnel "+RNS.prettyhexrep(packet.receiving_interface.tunnel_id), RNS.LOG_DEBUG) + + # Call externally registered callbacks from apps + # wanting to know when an announce arrives + for handler in Transport.announce_handlers: + try: + # Check that the announced destination matches + # the handlers aspect filter + execute_callback = False + announce_identity = RNS.Identity.recall(packet.destination_hash) + if handler.aspect_filter == None: + # If the handlers aspect filter is set to + # None, we execute the callback in all cases execute_callback = True - - # If this is a path response, check whether the - # handler wants to receive it. - if packet.context == RNS.Packet.PATH_RESPONSE: - if hasattr(handler, "receive_path_responses") and handler.receive_path_responses == True: - pass else: - execute_callback = False + handler_expected_hash = RNS.Destination.hash_from_name_and_identity(handler.aspect_filter, announce_identity) + if packet.destination_hash == handler_expected_hash: + execute_callback = True - if execute_callback: + # If this is a path response, check whether the + # handler wants to receive it. + if packet.context == RNS.Packet.PATH_RESPONSE: + if hasattr(handler, "receive_path_responses") and handler.receive_path_responses == True: + pass + else: + execute_callback = False - if len(inspect.signature(handler.received_announce).parameters) == 3: - handler.received_announce(destination_hash=packet.destination_hash, - announced_identity=announce_identity, - app_data=RNS.Identity.recall_app_data(packet.destination_hash)) - - elif len(inspect.signature(handler.received_announce).parameters) == 4: - handler.received_announce(destination_hash=packet.destination_hash, - announced_identity=announce_identity, - app_data=RNS.Identity.recall_app_data(packet.destination_hash), - announce_packet_hash = packet.packet_hash) - - elif len(inspect.signature(handler.received_announce).parameters) == 5: - handler.received_announce(destination_hash=packet.destination_hash, - announced_identity=announce_identity, - app_data=RNS.Identity.recall_app_data(packet.destination_hash), - announce_packet_hash = packet.packet_hash, - is_path_response = packet.context == RNS.Packet.PATH_RESPONSE) - else: - raise TypeError("Invalid signature for announce handler callback") + if execute_callback: + if len(inspect.signature(handler.received_announce).parameters) == 3: + def job(): + handler.received_announce(destination_hash=packet.destination_hash, + announced_identity=announce_identity, + app_data=RNS.Identity.recall_app_data(packet.destination_hash)) + threading.Thread(target=job, daemon=True).start() + + elif len(inspect.signature(handler.received_announce).parameters) == 4: + def job(): + handler.received_announce(destination_hash=packet.destination_hash, + announced_identity=announce_identity, + app_data=RNS.Identity.recall_app_data(packet.destination_hash), + announce_packet_hash = packet.packet_hash) + threading.Thread(target=job, daemon=True).start() + + elif len(inspect.signature(handler.received_announce).parameters) == 5: + def job(): + handler.received_announce(destination_hash=packet.destination_hash, + announced_identity=announce_identity, + app_data=RNS.Identity.recall_app_data(packet.destination_hash), + announce_packet_hash = packet.packet_hash, + is_path_response = packet.context == RNS.Packet.PATH_RESPONSE) + threading.Thread(target=job, daemon=True).start() - except Exception as e: - RNS.log("Error while processing external announce callback.", RNS.LOG_ERROR) - RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) - RNS.trace_exception(e) + else: + raise TypeError("Invalid signature for announce handler callback") + + except Exception as e: + RNS.log("Error while processing external announce callback.", RNS.LOG_ERROR) + RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) + RNS.trace_exception(e) # Handling for link requests to local destinations elif packet.packet_type == RNS.Packet.LINKREQUEST: