2025-05-10 15:31:35 +02:00
# import tracemalloc
# tracemalloc.start()
2022-06-09 10:33:13 +02:00
import unittest
2022-06-09 19:54:20 +02:00
import subprocess
import shlex
2022-06-09 10:33:13 +02:00
import threading
import time
2023-05-18 23:32:29 +02:00
import random
2023-03-04 23:37:58 -06:00
from unittest import skipIf
2022-06-09 10:33:13 +02:00
import RNS
import os
2023-02-25 18:23:25 -06:00
from tests . channel import MessageTest
from RNS . Channel import MessageBase
2023-03-04 23:37:58 -06:00
from RNS . Buffer import StreamDataMessage
from RNS . Interfaces . LocalInterface import LocalClientInterface
from math import ceil
2023-02-25 18:23:25 -06:00
2022-06-09 10:33:13 +02:00
APP_NAME = " rns_unit_tests "
fixed_keys = [
2022-06-30 20:37:51 +02:00
( " f8953ffaf607627e615603ff1530c82c434cf87c07179dd7689ea776f30b964cfb7ba6164af00c5111a45e69e57d885e1285f8dbfe3a21e95ae17cf676b0f8b7 " , " 650b5d76b6bec0390d1f8cfca5bd33f9 " ) ,
( " d85d036245436a3c33d3228affae06721f8203bc364ee0ee7556368ac62add650ebf8f926abf628da9d92baaa12db89bd6516ee92ec29765f3afafcb8622d697 " , " 1469e89450c361b253aefb0c606b6111 " ) ,
( " 8893e2bfd30fc08455997caf7abb7a6341716768dbbf9a91cc1455bd7eeaf74cdc10ec72a4d4179696040bac620ee97ebc861e2443e5270537ae766d91b58181 " , " e5fe93ee4acba095b3b9b6541515ed3e " ) ,
( " b82c7a4f047561d974de7e38538281d7f005d3663615f30d9663bad35a716063c931672cd452175d55bcdd70bb7aa35a9706872a97963dc52029938ea7341b39 " , " 1333b911fa8ebb16726996adbe3c6262 " ) ,
( " 08bb35f92b06a0832991165a0d9b4fd91af7b7765ce4572aa6222070b11b767092b61b0fd18b3a59cae6deb9db6d4bfb1c7fcfe076cfd66eea7ddd5f877543b9 " , " d13712efc45ef87674fb5ac26c37c912 " ) ,
2022-06-09 10:33:13 +02:00
]
2023-05-18 23:32:29 +02:00
BUFFER_TEST_TARGET = 32000
2025-05-06 14:23:42 +02:00
LINK_UP_WAIT = 0.050
2023-05-18 23:32:29 +02:00
2022-06-09 19:54:20 +02:00
def targets_job ( caller ) :
cmd = " python -c \" from tests.link import targets; targets() \" "
2022-06-10 11:27:52 +02:00
print ( " Opening subprocess for " + str ( cmd ) + " ... " , RNS . LOG_VERBOSE )
2022-06-09 19:54:20 +02:00
ppath = os . getcwd ( )
try :
2022-06-10 11:27:52 +02:00
caller . process = subprocess . Popen ( shlex . split ( cmd ) , cwd = ppath , stdout = subprocess . PIPE )
2022-06-09 19:54:20 +02:00
except Exception as e :
raise e
caller . pipe_is_open = False
2022-06-09 10:33:13 +02:00
c_rns = None
2022-06-09 19:54:20 +02:00
def init_rns ( caller = None ) :
2022-06-09 10:33:13 +02:00
global c_rns
if c_rns == None :
2022-06-14 13:44:12 +02:00
if caller != None :
targets_job ( caller )
time . sleep ( 2 )
2022-06-09 10:33:13 +02:00
print ( " Starting local RNS instance... " )
c_rns = RNS . Reticulum ( " ./tests/rnsconfig " )
2022-06-14 13:44:12 +02:00
if caller != None :
c_rns . m_proc = caller . process
2022-06-09 10:33:13 +02:00
print ( " Done starting local RNS instance... " )
2022-06-10 11:27:52 +02:00
def close_rns ( ) :
global c_rns
if c_rns != None :
c_rns . m_proc . kill ( )
2022-06-09 13:32:32 +02:00
class TestLink ( unittest . TestCase ) :
2022-06-09 10:33:13 +02:00
def setUp ( self ) :
pass
2022-06-10 11:27:52 +02:00
@classmethod
def tearDownClass ( cls ) :
close_rns ( )
2023-05-10 18:43:17 +02:00
@skipIf ( os . getenv ( ' SKIP_NORMAL_TESTS ' ) != None , " Skipping " )
2025-05-06 14:23:42 +02:00
def test_00_valid_announce ( self ) :
2022-10-04 06:55:50 +02:00
init_rns ( self )
print ( " " )
2025-05-06 14:23:42 +02:00
print ( " Testing valid announce... " )
2022-10-04 06:55:50 +02:00
fid = RNS . Identity . from_bytes ( bytes . fromhex ( fixed_keys [ 3 ] [ 0 ] ) )
dst = RNS . Destination ( fid , RNS . Destination . IN , RNS . Destination . SINGLE , " test " , " announce " )
ap = dst . announce ( send = False )
ap . pack ( )
self . assertEqual ( RNS . Identity . validate_announce ( ap ) , True )
2023-05-10 18:43:17 +02:00
@skipIf ( os . getenv ( ' SKIP_NORMAL_TESTS ' ) != None , " Skipping " )
2025-05-06 14:23:42 +02:00
def test_01_invalid_announce ( self ) :
2022-10-04 06:55:50 +02:00
init_rns ( self )
print ( " " )
2025-05-06 14:23:42 +02:00
print ( " Testing invalid announce... " )
2022-10-04 06:55:50 +02:00
fid = RNS . Identity . from_bytes ( bytes . fromhex ( fixed_keys [ 4 ] [ 0 ] ) )
dst = RNS . Destination ( fid , RNS . Destination . IN , RNS . Destination . SINGLE , " test " , " announce " )
ap = dst . announce ( send = False )
fake_dst = bytes . fromhex ( " 1333b911fa8ebb16726996adbe3c6262 " )
pre_len = len ( ap . data )
ap . data = fake_dst + ap . data [ 16 : ]
self . assertEqual ( pre_len , len ( ap . data ) )
ap . pack ( )
ap . send ( )
self . assertEqual ( RNS . Identity . validate_announce ( ap ) , False )
2023-05-10 18:43:17 +02:00
@skipIf ( os . getenv ( ' SKIP_NORMAL_TESTS ' ) != None , " Skipping " )
2025-05-06 14:23:42 +02:00
def test_02_establish ( self ) :
2022-06-09 19:54:20 +02:00
init_rns ( self )
2022-06-09 10:33:13 +02:00
print ( " " )
id1 = RNS . Identity . from_bytes ( bytes . fromhex ( fixed_keys [ 0 ] [ 0 ] ) )
self . assertEqual ( id1 . hash , bytes . fromhex ( fixed_keys [ 0 ] [ 1 ] ) )
2024-05-04 20:27:27 +02:00
RNS . Transport . request_path ( bytes . fromhex ( " fb48da0e82e6e01ba0c014513f74540d " ) )
time . sleep ( 0.2 )
2022-06-09 10:33:13 +02:00
dest = RNS . Destination ( id1 , RNS . Destination . OUT , RNS . Destination . SINGLE , APP_NAME , " link " , " establish " )
2022-06-30 20:37:51 +02:00
2022-10-06 23:14:32 +02:00
self . assertEqual ( dest . hash , bytes . fromhex ( " fb48da0e82e6e01ba0c014513f74540d " ) )
2022-06-09 10:33:13 +02:00
2025-05-06 14:23:42 +02:00
print ( " Testing default mode link establishment... " )
2022-06-09 10:33:13 +02:00
l1 = RNS . Link ( dest )
2025-05-06 14:23:42 +02:00
time . sleep ( LINK_UP_WAIT )
2022-06-09 10:33:13 +02:00
self . assertEqual ( l1 . status , RNS . Link . ACTIVE )
l1 . teardown ( )
2025-05-06 14:23:42 +02:00
time . sleep ( LINK_UP_WAIT )
2022-06-09 10:33:13 +02:00
self . assertEqual ( l1 . status , RNS . Link . CLOSED )
2025-05-06 14:23:42 +02:00
2025-05-26 19:04:30 +02:00
exc_triggered = False
2025-05-06 14:23:42 +02:00
print ( " Testing AES_128_CBC mode link establishment... " )
2025-05-26 19:04:30 +02:00
try : l2 = RNS . Link ( dest , mode = RNS . Link . MODE_AES128_CBC )
except TypeError as e : exc_triggered = True
self . assertEqual ( exc_triggered , True )
2025-05-06 14:23:42 +02:00
print ( " Testing AES_256_CBC mode link establishment... " )
l3 = RNS . Link ( dest , mode = RNS . Link . MODE_AES256_CBC )
time . sleep ( LINK_UP_WAIT )
self . assertEqual ( l3 . status , RNS . Link . ACTIVE )
self . assertEqual ( l3 . mode , RNS . Link . MODE_AES256_CBC )
self . assertEqual ( len ( l3 . derived_key ) , 64 )
l3 . teardown ( )
time . sleep ( LINK_UP_WAIT )
self . assertEqual ( l3 . status , RNS . Link . CLOSED )
2022-06-09 10:33:13 +02:00
2023-05-10 18:43:17 +02:00
@skipIf ( os . getenv ( ' SKIP_NORMAL_TESTS ' ) != None , " Skipping " )
2025-05-06 14:23:42 +02:00
def test_03a_packets ( self ) :
2022-06-09 19:54:20 +02:00
init_rns ( self )
2022-06-09 10:33:13 +02:00
print ( " " )
2025-05-06 14:23:42 +02:00
print ( " Testing default mode link packets... " )
2022-06-09 10:33:13 +02:00
# TODO: Load this from public bytes only
id1 = RNS . Identity . from_bytes ( bytes . fromhex ( fixed_keys [ 0 ] [ 0 ] ) )
self . assertEqual ( id1 . hash , bytes . fromhex ( fixed_keys [ 0 ] [ 1 ] ) )
2024-05-04 20:27:27 +02:00
RNS . Transport . request_path ( bytes . fromhex ( " fb48da0e82e6e01ba0c014513f74540d " ) )
time . sleep ( 0.2 )
2022-06-09 10:33:13 +02:00
dest = RNS . Destination ( id1 , RNS . Destination . OUT , RNS . Destination . SINGLE , APP_NAME , " link " , " establish " )
2022-06-30 20:37:51 +02:00
2022-10-06 23:14:32 +02:00
self . assertEqual ( dest . hash , bytes . fromhex ( " fb48da0e82e6e01ba0c014513f74540d " ) )
2022-06-09 10:33:13 +02:00
l1 = RNS . Link ( dest )
2025-05-06 14:23:42 +02:00
time . sleep ( LINK_UP_WAIT )
2022-06-09 10:33:13 +02:00
self . assertEqual ( l1 . status , RNS . Link . ACTIVE )
b = 0
pr_t = 0
receipts = [ ]
2022-06-14 13:44:12 +02:00
if RNS . Cryptography . backend ( ) == " internal " or RNS . Reticulum . MTU > 500 :
2022-06-10 16:36:30 +02:00
num_packets = 50
else :
num_packets = 500
2022-06-09 10:33:13 +02:00
packet_size = RNS . Link . MDU
2022-06-09 13:32:32 +02:00
pstart = time . time ( )
2022-06-09 10:33:13 +02:00
print ( " Sending " + str ( num_packets ) + " link packets of " + str ( packet_size ) + " bytes... " )
for i in range ( 0 , num_packets ) :
2023-11-06 11:10:38 +01:00
time . sleep ( 0.003 )
2022-06-09 10:33:13 +02:00
b + = packet_size
data = os . urandom ( packet_size )
start = time . time ( )
p = RNS . Packet ( l1 , data )
receipts . append ( p . send ( ) )
pr_t + = time . time ( ) - start
2022-06-09 13:32:32 +02:00
print ( " Sent " + self . size_str ( b ) + " , " + self . size_str ( b / pr_t , " b " ) + " ps " )
print ( " Checking receipts... " , end = " " )
2022-06-09 10:33:13 +02:00
all_ok = False
2022-06-10 16:36:30 +02:00
receipt_timeout = time . time ( ) + 35
2022-06-09 10:33:13 +02:00
while not all_ok and time . time ( ) < receipt_timeout :
for r in receipts :
all_ok = True
if not r . status == RNS . PacketReceipt . DELIVERED :
all_ok = False
2022-06-09 13:32:32 +02:00
break
2025-05-06 14:23:42 +02:00
time . sleep ( 0.01 )
2022-06-09 13:32:32 +02:00
pduration = time . time ( ) - pstart
2022-06-12 11:58:54 +02:00
n_failed = 0
for r in receipts :
if not r . status == RNS . PacketReceipt . DELIVERED :
n_failed + = 1
if n_failed > 0 :
ns = " s " if n_failed != 1 else " "
print ( " Failed to receive proof for " + str ( n_failed ) + " packet " + ns )
2022-06-09 10:33:13 +02:00
self . assertEqual ( all_ok , True )
2022-06-09 13:32:32 +02:00
print ( " OK! " )
print ( " Single packet and proof round-trip throughput is " + self . size_str ( b / pduration , " b " ) + " ps " )
2022-06-09 10:33:13 +02:00
l1 . teardown ( )
2025-05-06 14:23:42 +02:00
time . sleep ( LINK_UP_WAIT )
self . assertEqual ( l1 . status , RNS . Link . CLOSED )
@skipIf ( os . getenv ( ' SKIP_NORMAL_TESTS ' ) != None , " Skipping " )
def test_03b_packets ( self ) :
init_rns ( self )
print ( " " )
print ( " Testing AES_256_CBC mode link packets... " )
# TODO: Load this from public bytes only
id1 = RNS . Identity . from_bytes ( bytes . fromhex ( fixed_keys [ 0 ] [ 0 ] ) )
self . assertEqual ( id1 . hash , bytes . fromhex ( fixed_keys [ 0 ] [ 1 ] ) )
RNS . Transport . request_path ( bytes . fromhex ( " fb48da0e82e6e01ba0c014513f74540d " ) )
time . sleep ( 0.2 )
dest = RNS . Destination ( id1 , RNS . Destination . OUT , RNS . Destination . SINGLE , APP_NAME , " link " , " establish " )
self . assertEqual ( dest . hash , bytes . fromhex ( " fb48da0e82e6e01ba0c014513f74540d " ) )
l1 = RNS . Link ( dest , mode = RNS . Link . MODE_AES256_CBC )
time . sleep ( LINK_UP_WAIT )
self . assertEqual ( l1 . status , RNS . Link . ACTIVE )
b = 0
pr_t = 0
receipts = [ ]
if RNS . Cryptography . backend ( ) == " internal " or RNS . Reticulum . MTU > 500 :
num_packets = 50
else :
num_packets = 500
packet_size = RNS . Link . MDU
pstart = time . time ( )
print ( " Sending " + str ( num_packets ) + " link packets of " + str ( packet_size ) + " bytes... " )
for i in range ( 0 , num_packets ) :
time . sleep ( 0.003 )
b + = packet_size
data = os . urandom ( packet_size )
start = time . time ( )
p = RNS . Packet ( l1 , data )
receipts . append ( p . send ( ) )
pr_t + = time . time ( ) - start
print ( " Sent " + self . size_str ( b ) + " , " + self . size_str ( b / pr_t , " b " ) + " ps " )
print ( " Checking receipts... " , end = " " )
all_ok = False
receipt_timeout = time . time ( ) + 35
while not all_ok and time . time ( ) < receipt_timeout :
for r in receipts :
all_ok = True
if not r . status == RNS . PacketReceipt . DELIVERED :
all_ok = False
break
time . sleep ( 0.01 )
pduration = time . time ( ) - pstart
n_failed = 0
for r in receipts :
if not r . status == RNS . PacketReceipt . DELIVERED :
n_failed + = 1
if n_failed > 0 :
ns = " s " if n_failed != 1 else " "
print ( " Failed to receive proof for " + str ( n_failed ) + " packet " + ns )
self . assertEqual ( all_ok , True )
print ( " OK! " )
print ( " Single packet and proof round-trip throughput is " + self . size_str ( b / pduration , " b " ) + " ps " )
l1 . teardown ( )
time . sleep ( LINK_UP_WAIT )
2022-06-09 10:33:13 +02:00
self . assertEqual ( l1 . status , RNS . Link . CLOSED )
2023-05-10 18:43:17 +02:00
@skipIf ( os . getenv ( ' SKIP_NORMAL_TESTS ' ) != None , " Skipping " )
2025-05-06 14:23:42 +02:00
def test_04_micro_resource ( self ) :
2022-06-09 19:54:20 +02:00
init_rns ( self )
2022-06-09 19:27:11 +02:00
print ( " " )
print ( " Micro resource test " )
# TODO: Load this from public bytes only
id1 = RNS . Identity . from_bytes ( bytes . fromhex ( fixed_keys [ 0 ] [ 0 ] ) )
self . assertEqual ( id1 . hash , bytes . fromhex ( fixed_keys [ 0 ] [ 1 ] ) )
2024-01-14 00:12:30 +01:00
RNS . Transport . request_path ( bytes . fromhex ( " fb48da0e82e6e01ba0c014513f74540d " ) )
time . sleep ( 0.2 )
2022-06-09 19:27:11 +02:00
dest = RNS . Destination ( id1 , RNS . Destination . OUT , RNS . Destination . SINGLE , APP_NAME , " link " , " establish " )
2022-06-30 20:37:51 +02:00
2022-10-06 23:14:32 +02:00
self . assertEqual ( dest . hash , bytes . fromhex ( " fb48da0e82e6e01ba0c014513f74540d " ) )
2022-06-09 19:27:11 +02:00
l1 = RNS . Link ( dest )
2025-05-06 14:23:42 +02:00
time . sleep ( LINK_UP_WAIT )
2022-06-09 19:27:11 +02:00
self . assertEqual ( l1 . status , RNS . Link . ACTIVE )
resource_timeout = 120
resource_size = 128
data = os . urandom ( resource_size )
print ( " Sending " + self . size_str ( resource_size ) + " resource... " )
resource = RNS . Resource ( data , l1 , timeout = resource_timeout )
start = time . time ( )
2024-01-14 00:12:30 +01:00
# This is a hack, don't do it. Use the callbacks instead.
2022-06-09 19:27:11 +02:00
while resource . status < RNS . Resource . COMPLETE :
time . sleep ( 0.01 )
t = time . time ( ) - start
self . assertEqual ( resource . status , RNS . Resource . COMPLETE )
2025-05-10 15:31:35 +02:00
print ( " Resource completed at " + self . size_str ( resource . total_size / t , " b " ) + f " ps ( { RNS . prettysize ( resource . total_size ) } , { RNS . prettyshorttime ( t ) } ) " )
# Test sending resource with metadata
data = os . urandom ( resource_size )
metadata = { " text " : " Some text " , " numbers " : [ 1 , 2 , 3 , 4 ] , " blob " : os . urandom ( 32 ) }
print ( " Sending " + self . size_str ( resource_size ) + " resource with metadata... " )
resource = RNS . Resource ( data , l1 , metadata = metadata , timeout = resource_timeout )
start = time . time ( )
# This is a hack, don't do it. Use the callbacks instead.
while resource . status < RNS . Resource . COMPLETE :
time . sleep ( 0.01 )
t = time . time ( ) - start
self . assertEqual ( resource . status , RNS . Resource . COMPLETE )
print ( " Resource completed at " + self . size_str ( resource . total_size / t , " b " ) + f " ps ( { RNS . prettysize ( resource . total_size ) } , { RNS . prettyshorttime ( t ) } ) " )
# Test sending resource with invalid size metadata
data = os . urandom ( resource_size )
metadata = os . urandom ( RNS . Resource . METADATA_MAX_SIZE + 1 )
print ( " Sending " + self . size_str ( resource_size ) + " resource with invalid size metadata... " )
exception_raised = False
try : resource = RNS . Resource ( data , l1 , metadata = metadata , timeout = resource_timeout )
except SystemError : exception_raised = True
self . assertEqual ( exception_raised , True )
2022-06-09 19:27:11 +02:00
l1 . teardown ( )
time . sleep ( 0.5 )
self . assertEqual ( l1 . status , RNS . Link . CLOSED )
2023-05-10 18:43:17 +02:00
@skipIf ( os . getenv ( ' SKIP_NORMAL_TESTS ' ) != None , " Skipping " )
2025-05-06 14:23:42 +02:00
def test_05_mini_resource ( self ) :
2022-07-09 15:50:18 +02:00
init_rns ( self )
print ( " " )
print ( " Mini resource test " )
# TODO: Load this from public bytes only
id1 = RNS . Identity . from_bytes ( bytes . fromhex ( fixed_keys [ 0 ] [ 0 ] ) )
self . assertEqual ( id1 . hash , bytes . fromhex ( fixed_keys [ 0 ] [ 1 ] ) )
2024-01-14 00:12:30 +01:00
RNS . Transport . request_path ( bytes . fromhex ( " fb48da0e82e6e01ba0c014513f74540d " ) )
time . sleep ( 0.2 )
2022-07-09 15:50:18 +02:00
dest = RNS . Destination ( id1 , RNS . Destination . OUT , RNS . Destination . SINGLE , APP_NAME , " link " , " establish " )
2022-10-06 23:14:32 +02:00
self . assertEqual ( dest . hash , bytes . fromhex ( " fb48da0e82e6e01ba0c014513f74540d " ) )
2022-07-09 15:50:18 +02:00
l1 = RNS . Link ( dest )
2025-05-06 14:23:42 +02:00
time . sleep ( LINK_UP_WAIT )
2022-07-09 15:50:18 +02:00
self . assertEqual ( l1 . status , RNS . Link . ACTIVE )
resource_timeout = 120
resource_size = 256 * 1000
2025-05-10 15:31:35 +02:00
# Test sending resource without metadata
2022-07-09 15:50:18 +02:00
data = os . urandom ( resource_size )
print ( " Sending " + self . size_str ( resource_size ) + " resource... " )
resource = RNS . Resource ( data , l1 , timeout = resource_timeout )
start = time . time ( )
2024-01-14 00:12:30 +01:00
# This is a hack, don't do it. Use the callbacks instead.
2022-07-09 15:50:18 +02:00
while resource . status < RNS . Resource . COMPLETE :
time . sleep ( 0.01 )
t = time . time ( ) - start
self . assertEqual ( resource . status , RNS . Resource . COMPLETE )
2025-05-10 15:31:35 +02:00
print ( " Resource completed at " + self . size_str ( resource . total_size / t , " b " ) + f " ps ( { RNS . prettysize ( resource . total_size ) } , { RNS . prettyshorttime ( t ) } ) " )
# Test sending resource with metadata
data = os . urandom ( resource_size )
metadata = { " text " : " Some text " , " numbers " : [ 1 , 2 , 3 , 4 ] , " blob " : os . urandom ( 8192 ) }
print ( " Sending " + self . size_str ( resource_size ) + " resource with metadata... " )
resource = RNS . Resource ( data , l1 , metadata = metadata , timeout = resource_timeout )
start = time . time ( )
# This is a hack, don't do it. Use the callbacks instead.
while resource . status < RNS . Resource . COMPLETE :
time . sleep ( 0.01 )
t = time . time ( ) - start
self . assertEqual ( resource . status , RNS . Resource . COMPLETE )
print ( " Resource completed at " + self . size_str ( resource . total_size / t , " b " ) + f " ps ( { RNS . prettysize ( resource . total_size ) } , { RNS . prettyshorttime ( t ) } ) " )
2022-07-09 15:50:18 +02:00
l1 . teardown ( )
time . sleep ( 0.5 )
self . assertEqual ( l1 . status , RNS . Link . CLOSED )
2023-05-10 18:43:17 +02:00
@skipIf ( os . getenv ( ' SKIP_NORMAL_TESTS ' ) != None , " Skipping " )
2025-05-06 14:23:42 +02:00
def test_06_small_resource ( self ) :
2022-06-09 19:54:20 +02:00
init_rns ( self )
2022-06-09 19:27:11 +02:00
print ( " " )
print ( " Small resource test " )
# TODO: Load this from public bytes only
id1 = RNS . Identity . from_bytes ( bytes . fromhex ( fixed_keys [ 0 ] [ 0 ] ) )
self . assertEqual ( id1 . hash , bytes . fromhex ( fixed_keys [ 0 ] [ 1 ] ) )
2024-01-14 00:12:30 +01:00
RNS . Transport . request_path ( bytes . fromhex ( " fb48da0e82e6e01ba0c014513f74540d " ) )
time . sleep ( 0.2 )
2022-06-09 19:27:11 +02:00
dest = RNS . Destination ( id1 , RNS . Destination . OUT , RNS . Destination . SINGLE , APP_NAME , " link " , " establish " )
2022-10-06 23:14:32 +02:00
self . assertEqual ( dest . hash , bytes . fromhex ( " fb48da0e82e6e01ba0c014513f74540d " ) )
2022-06-09 19:27:11 +02:00
l1 = RNS . Link ( dest )
2025-05-06 14:23:42 +02:00
time . sleep ( LINK_UP_WAIT )
2022-06-09 19:27:11 +02:00
self . assertEqual ( l1 . status , RNS . Link . ACTIVE )
resource_timeout = 120
resource_size = 1000 * 1000
data = os . urandom ( resource_size )
print ( " Sending " + self . size_str ( resource_size ) + " resource... " )
resource = RNS . Resource ( data , l1 , timeout = resource_timeout )
start = time . time ( )
2024-01-14 00:12:30 +01:00
# This is a hack, don't do it. Use the callbacks instead.
2022-06-09 19:27:11 +02:00
while resource . status < RNS . Resource . COMPLETE :
time . sleep ( 0.01 )
t = time . time ( ) - start
self . assertEqual ( resource . status , RNS . Resource . COMPLETE )
2025-05-10 15:31:35 +02:00
print ( " Resource completed at " + self . size_str ( resource . total_size / t , " b " ) + f " ps ( { RNS . prettysize ( resource . total_size ) } , { RNS . prettyshorttime ( t ) } ) " )
# Test sending resource with metadata
data = os . urandom ( resource_size )
metadata = { " text " : " Some text " , " numbers " : [ 1 , 2 , 3 , 4 ] , " blob " : os . urandom ( 8192 ) }
print ( " Sending " + self . size_str ( resource_size ) + " resource with metadata... " )
resource = RNS . Resource ( data , l1 , metadata = metadata , timeout = resource_timeout )
start = time . time ( )
# This is a hack, don't do it. Use the callbacks instead.
while resource . status < RNS . Resource . COMPLETE :
time . sleep ( 0.01 )
t = time . time ( ) - start
self . assertEqual ( resource . status , RNS . Resource . COMPLETE )
print ( " Resource completed at " + self . size_str ( resource . total_size / t , " b " ) + f " ps ( { RNS . prettysize ( resource . total_size ) } , { RNS . prettyshorttime ( t ) } ) " )
2022-06-09 10:33:13 +02:00
2022-06-09 19:27:11 +02:00
l1 . teardown ( )
2025-05-06 14:23:42 +02:00
time . sleep ( LINK_UP_WAIT )
2022-06-09 19:27:11 +02:00
self . assertEqual ( l1 . status , RNS . Link . CLOSED )
2023-05-10 18:43:17 +02:00
@skipIf ( os . getenv ( ' SKIP_NORMAL_TESTS ' ) != None , " Skipping " )
2025-05-06 14:23:42 +02:00
def test_07_medium_resource ( self ) :
2022-06-10 16:36:30 +02:00
if RNS . Cryptography . backend ( ) == " internal " :
print ( " Skipping medium resource test... " )
return
2022-06-09 19:54:20 +02:00
init_rns ( self )
2022-06-09 10:33:13 +02:00
print ( " " )
2022-06-09 19:27:11 +02:00
print ( " Medium resource test " )
2022-06-09 10:33:13 +02:00
# TODO: Load this from public bytes only
id1 = RNS . Identity . from_bytes ( bytes . fromhex ( fixed_keys [ 0 ] [ 0 ] ) )
self . assertEqual ( id1 . hash , bytes . fromhex ( fixed_keys [ 0 ] [ 1 ] ) )
2024-01-14 00:12:30 +01:00
RNS . Transport . request_path ( bytes . fromhex ( " fb48da0e82e6e01ba0c014513f74540d " ) )
time . sleep ( 0.2 )
2022-06-09 10:33:13 +02:00
dest = RNS . Destination ( id1 , RNS . Destination . OUT , RNS . Destination . SINGLE , APP_NAME , " link " , " establish " )
2022-10-06 23:14:32 +02:00
self . assertEqual ( dest . hash , bytes . fromhex ( " fb48da0e82e6e01ba0c014513f74540d " ) )
2022-06-09 10:33:13 +02:00
l1 = RNS . Link ( dest )
2025-05-06 14:23:42 +02:00
time . sleep ( LINK_UP_WAIT )
2022-06-09 10:33:13 +02:00
self . assertEqual ( l1 . status , RNS . Link . ACTIVE )
resource_timeout = 120
2022-06-09 13:32:32 +02:00
resource_size = 5 * 1000 * 1000
2022-06-09 10:33:13 +02:00
data = os . urandom ( resource_size )
print ( " Sending " + self . size_str ( resource_size ) + " resource... " )
2025-05-10 20:59:23 +02:00
resource = RNS . Resource ( data , l1 , timeout = resource_timeout , callback = self . lr_callback , auto_compress = False )
2022-06-09 10:33:13 +02:00
start = time . time ( )
2025-05-10 20:59:23 +02:00
TestLink . large_resource_status = resource . status
while TestLink . large_resource_status < RNS . Resource . COMPLETE :
time . sleep ( 0.001 )
2022-06-09 10:33:13 +02:00
t = time . time ( ) - start
self . assertEqual ( resource . status , RNS . Resource . COMPLETE )
2025-05-10 15:31:35 +02:00
print ( " Resource completed at " + self . size_str ( resource . total_size / t , " b " ) + f " ps ( { RNS . prettysize ( resource . total_size ) } , { RNS . prettyshorttime ( t ) } ) " )
# Test sending resource with metadata
data = os . urandom ( resource_size )
metadata = { " text " : " Some text " , " numbers " : [ 1 , 2 , 3 , 4 ] , " blob " : os . urandom ( 8192 ) }
print ( " Sending " + self . size_str ( resource_size ) + " resource with metadata... " )
2025-05-10 20:59:23 +02:00
resource = RNS . Resource ( data , l1 , timeout = resource_timeout , callback = self . lr_callback , auto_compress = False )
2025-05-10 15:31:35 +02:00
start = time . time ( )
2025-05-10 20:59:23 +02:00
TestLink . large_resource_status = resource . status
while TestLink . large_resource_status < RNS . Resource . COMPLETE :
time . sleep ( 0.001 )
2025-05-10 15:31:35 +02:00
t = time . time ( ) - start
self . assertEqual ( resource . status , RNS . Resource . COMPLETE )
print ( " Resource completed at " + self . size_str ( resource . total_size / t , " b " ) + f " ps ( { RNS . prettysize ( resource . total_size ) } , { RNS . prettyshorttime ( t ) } ) " )
2022-06-09 10:33:13 +02:00
l1 . teardown ( )
2025-05-06 14:23:42 +02:00
time . sleep ( LINK_UP_WAIT )
2022-06-09 10:33:13 +02:00
self . assertEqual ( l1 . status , RNS . Link . CLOSED )
2024-01-14 00:12:30 +01:00
large_resource_status = None
def lr_callback ( self , resource ) :
TestLink . large_resource_status = resource . status
2023-05-10 18:43:17 +02:00
@skipIf ( os . getenv ( ' SKIP_NORMAL_TESTS ' ) != None , " Skipping " )
2025-05-06 14:23:42 +02:00
def test_09_large_resource ( self ) :
2022-06-10 16:36:30 +02:00
if RNS . Cryptography . backend ( ) == " internal " :
print ( " Skipping large resource test... " )
return
2022-06-09 19:54:20 +02:00
init_rns ( self )
2022-06-09 19:27:11 +02:00
print ( " " )
print ( " Large resource test " )
# TODO: Load this from public bytes only
id1 = RNS . Identity . from_bytes ( bytes . fromhex ( fixed_keys [ 0 ] [ 0 ] ) )
self . assertEqual ( id1 . hash , bytes . fromhex ( fixed_keys [ 0 ] [ 1 ] ) )
2024-01-14 00:12:30 +01:00
RNS . Transport . request_path ( bytes . fromhex ( " fb48da0e82e6e01ba0c014513f74540d " ) )
time . sleep ( 0.2 )
2022-06-09 19:27:11 +02:00
dest = RNS . Destination ( id1 , RNS . Destination . OUT , RNS . Destination . SINGLE , APP_NAME , " link " , " establish " )
2022-10-06 23:14:32 +02:00
self . assertEqual ( dest . hash , bytes . fromhex ( " fb48da0e82e6e01ba0c014513f74540d " ) )
2022-06-09 19:27:11 +02:00
l1 = RNS . Link ( dest )
2025-05-06 14:23:42 +02:00
time . sleep ( LINK_UP_WAIT )
2022-06-09 19:27:11 +02:00
self . assertEqual ( l1 . status , RNS . Link . ACTIVE )
resource_timeout = 120
2024-01-14 00:12:30 +01:00
resource_size = 50 * 1000 * 1000
2022-06-09 19:27:11 +02:00
data = os . urandom ( resource_size )
print ( " Sending " + self . size_str ( resource_size ) + " resource... " )
2025-01-13 15:42:32 +01:00
resource = RNS . Resource ( data , l1 , timeout = resource_timeout , callback = self . lr_callback , auto_compress = False )
2022-06-09 19:27:11 +02:00
start = time . time ( )
2024-01-14 00:12:30 +01:00
TestLink . large_resource_status = resource . status
while TestLink . large_resource_status < RNS . Resource . COMPLETE :
2025-05-10 20:59:23 +02:00
time . sleep ( 0.001 )
2022-06-09 19:27:11 +02:00
t = time . time ( ) - start
2024-01-14 00:12:30 +01:00
self . assertEqual ( TestLink . large_resource_status , RNS . Resource . COMPLETE )
2025-05-10 15:31:35 +02:00
print ( " Resource completed at " + self . size_str ( resource . total_size / t , " b " ) + f " ps ( { RNS . prettysize ( resource . total_size ) } , { RNS . prettyshorttime ( t ) } ) " )
# Test sending resource with metadata
metadata = { " text " : " Some text " , " numbers " : [ 1 , 2 , 3 , 4 ] , " blob " : os . urandom ( 8192 ) }
print ( " Sending " + self . size_str ( resource_size ) + " resource with metadata... " )
resource = RNS . Resource ( data , l1 , timeout = resource_timeout , callback = self . lr_callback , auto_compress = False )
start = time . time ( )
TestLink . large_resource_status = resource . status
while TestLink . large_resource_status < RNS . Resource . COMPLETE :
time . sleep ( 0.01 )
t = time . time ( ) - start
self . assertEqual ( resource . status , RNS . Resource . COMPLETE )
print ( " Resource with metadata completed at " + self . size_str ( resource . total_size / t , " b " ) + f " ps ( { RNS . prettysize ( resource . total_size ) } , { RNS . prettyshorttime ( t ) } ) " )
2022-06-09 19:27:11 +02:00
l1 . teardown ( )
2025-05-06 14:23:42 +02:00
time . sleep ( LINK_UP_WAIT )
2022-06-09 19:27:11 +02:00
self . assertEqual ( l1 . status , RNS . Link . CLOSED )
2022-06-09 10:33:13 +02:00
2025-05-10 15:31:35 +02:00
@skipIf ( os . getenv ( ' SKIP_NORMAL_TESTS ' ) != None , " Skipping " )
2023-02-25 18:23:25 -06:00
def test_10_channel_round_trip ( self ) :
global c_rns
init_rns ( self )
print ( " " )
print ( " Channel round trip test " )
# TODO: Load this from public bytes only
id1 = RNS . Identity . from_bytes ( bytes . fromhex ( fixed_keys [ 0 ] [ 0 ] ) )
self . assertEqual ( id1 . hash , bytes . fromhex ( fixed_keys [ 0 ] [ 1 ] ) )
2024-05-04 20:27:27 +02:00
RNS . Transport . request_path ( bytes . fromhex ( " fb48da0e82e6e01ba0c014513f74540d " ) )
time . sleep ( 0.2 )
2023-02-25 18:23:25 -06:00
dest = RNS . Destination ( id1 , RNS . Destination . OUT , RNS . Destination . SINGLE , APP_NAME , " link " , " establish " )
self . assertEqual ( dest . hash , bytes . fromhex ( " fb48da0e82e6e01ba0c014513f74540d " ) )
l1 = RNS . Link ( dest )
time . sleep ( 1 )
self . assertEqual ( l1 . status , RNS . Link . ACTIVE )
received = [ ]
def handle_message ( message : MessageBase ) :
received . append ( message )
test_message = MessageTest ( )
test_message . data = " Hello "
2023-02-26 07:25:49 -06:00
channel = l1 . get_channel ( )
channel . register_message_type ( MessageTest )
2023-02-26 11:23:38 -06:00
channel . add_message_handler ( handle_message )
2023-02-26 07:25:49 -06:00
channel . send ( test_message )
2023-02-25 18:23:25 -06:00
2025-05-06 14:23:42 +02:00
time . sleep ( LINK_UP_WAIT )
2023-02-25 18:23:25 -06:00
self . assertEqual ( 1 , len ( received ) )
rx_message = received [ 0 ]
self . assertIsInstance ( rx_message , MessageTest )
self . assertEqual ( " Hello back " , rx_message . data )
self . assertEqual ( test_message . id , rx_message . id )
self . assertNotEqual ( test_message . not_serialized , rx_message . not_serialized )
2023-05-10 18:43:17 +02:00
self . assertEqual ( 0 , len ( l1 . _channel . _rx_ring ) )
2023-02-25 18:23:25 -06:00
l1 . teardown ( )
2025-05-06 14:23:42 +02:00
time . sleep ( LINK_UP_WAIT )
2023-02-25 18:23:25 -06:00
self . assertEqual ( l1 . status , RNS . Link . CLOSED )
self . assertEqual ( 0 , len ( l1 . _channel . _rx_ring ) )
2025-05-10 15:31:35 +02:00
@skipIf ( os . getenv ( ' SKIP_NORMAL_TESTS ' ) != None , " Skipping " )
2023-03-02 17:13:55 -06:00
def test_11_buffer_round_trip ( self ) :
global c_rns
init_rns ( self )
print ( " " )
print ( " Buffer round trip test " )
# TODO: Load this from public bytes only
id1 = RNS . Identity . from_bytes ( bytes . fromhex ( fixed_keys [ 0 ] [ 0 ] ) )
self . assertEqual ( id1 . hash , bytes . fromhex ( fixed_keys [ 0 ] [ 1 ] ) )
2024-05-04 20:27:27 +02:00
RNS . Transport . request_path ( bytes . fromhex ( " fb48da0e82e6e01ba0c014513f74540d " ) )
time . sleep ( 0.2 )
2023-03-02 17:13:55 -06:00
dest = RNS . Destination ( id1 , RNS . Destination . OUT , RNS . Destination . SINGLE , APP_NAME , " link " , " establish " )
self . assertEqual ( dest . hash , bytes . fromhex ( " fb48da0e82e6e01ba0c014513f74540d " ) )
l1 = RNS . Link ( dest )
2025-05-06 14:23:42 +02:00
time . sleep ( LINK_UP_WAIT )
2023-03-02 17:13:55 -06:00
self . assertEqual ( l1 . status , RNS . Link . ACTIVE )
buffer = None
received = [ ]
def handle_data ( ready_bytes : int ) :
data = buffer . read ( ready_bytes )
received . append ( data )
channel = l1 . get_channel ( )
buffer = RNS . Buffer . create_bidirectional_buffer ( 0 , 0 , channel , handle_data )
buffer . write ( " Hi there " . encode ( " utf-8 " ) )
buffer . flush ( )
2025-05-06 14:23:42 +02:00
time . sleep ( LINK_UP_WAIT )
2023-03-02 17:13:55 -06:00
self . assertEqual ( 1 , len ( received ) )
rx_message = received [ 0 ] . decode ( " utf-8 " )
self . assertEqual ( " Hi there back at you " , rx_message )
l1 . teardown ( )
2025-05-06 14:23:42 +02:00
time . sleep ( LINK_UP_WAIT )
2023-03-02 17:13:55 -06:00
self . assertEqual ( l1 . status , RNS . Link . CLOSED )
2025-05-10 15:31:35 +02:00
@skipIf ( os . getenv ( ' SKIP_NORMAL_TESTS ' ) != None and os . getenv ( ' RUN_SLOW_TESTS ' ) == None , " Skipping " )
2023-03-04 23:37:58 -06:00
def test_12_buffer_round_trip_big ( self , local_bitrate = None ) :
2023-05-18 23:32:29 +02:00
global c_rns , buffer_read_target
2023-03-04 23:37:58 -06:00
init_rns ( self )
print ( " " )
print ( " Buffer round trip test " )
local_interface = next ( filter ( lambda iface : isinstance ( iface , LocalClientInterface ) , RNS . Transport . interfaces ) , None )
self . assertIsNotNone ( local_interface )
original_bitrate = local_interface . bitrate
try :
if local_bitrate is not None :
local_interface . bitrate = local_bitrate
local_interface . _force_bitrate = True
print ( " Forcing local bitrate of " + str ( local_bitrate ) + " bps ( " + str ( round ( local_bitrate / 8 , 0 ) ) + " B/s) " )
# TODO: Load this from public bytes only
id1 = RNS . Identity . from_bytes ( bytes . fromhex ( fixed_keys [ 0 ] [ 0 ] ) )
self . assertEqual ( id1 . hash , bytes . fromhex ( fixed_keys [ 0 ] [ 1 ] ) )
2024-05-04 20:27:27 +02:00
RNS . Transport . request_path ( bytes . fromhex ( " fb48da0e82e6e01ba0c014513f74540d " ) )
time . sleep ( 0.2 )
2023-03-04 23:37:58 -06:00
dest = RNS . Destination ( id1 , RNS . Destination . OUT , RNS . Destination . SINGLE , APP_NAME , " link " , " establish " )
self . assertEqual ( dest . hash , bytes . fromhex ( " fb48da0e82e6e01ba0c014513f74540d " ) )
l1 = RNS . Link ( dest )
# delay a reasonable time for link to come up at current bitrate
link_sleep = max ( RNS . Link . MDU * 3 / local_interface . bitrate * 8 , 2 )
timeout_at = time . time ( ) + link_sleep
print ( " Waiting " + str ( round ( link_sleep , 1 ) ) + " sec for link to come up " )
while l1 . status != RNS . Link . ACTIVE and time . time ( ) < timeout_at :
time . sleep ( 0.01 )
self . assertEqual ( l1 . status , RNS . Link . ACTIVE )
buffer = None
received = [ ]
2023-05-18 23:32:29 +02:00
2023-03-04 23:37:58 -06:00
def handle_data ( ready_bytes : int ) :
2023-05-18 23:32:29 +02:00
global received_bytes
2023-03-04 23:37:58 -06:00
data = buffer . read ( ready_bytes )
received . append ( data )
channel = l1 . get_channel ( )
buffer = RNS . Buffer . create_bidirectional_buffer ( 0 , 0 , channel , handle_data )
# try to make the message big enough to split across packets, but
# small enough to make the test complete in a reasonable amount of time
2023-05-10 18:43:17 +02:00
# seed_text = "0123456789"
# message = seed_text*ceil(min(max(local_interface.bitrate / 8,
# StreamDataMessage.MAX_DATA_LEN * 2 / len(seed_text)),
# 1000))
if local_interface . bitrate < 1000 :
target_bytes = 3000
else :
2023-05-18 23:32:29 +02:00
target_bytes = BUFFER_TEST_TARGET
2023-05-10 18:43:17 +02:00
2023-05-18 23:32:29 +02:00
random . seed ( 154889 )
message = random . randbytes ( target_bytes )
buffer_read_target = len ( message )
2023-05-10 18:43:17 +02:00
2023-03-04 23:37:58 -06:00
# the return message will have an appendage string " back at you"
# for every StreamDataMessage that arrives. To verify, we need
# to insert that string every MAX_DATA_LEN and also at the end.
2023-05-10 18:43:17 +02:00
expected_rx_message = b " "
2023-03-04 23:37:58 -06:00
for i in range ( 0 , len ( message ) ) :
if i > 0 and ( i % StreamDataMessage . MAX_DATA_LEN ) == 0 :
2023-05-10 18:43:17 +02:00
expected_rx_message + = " back at you " . encode ( " utf-8 " )
expected_rx_message + = bytes ( [ message [ i ] ] )
expected_rx_message + = " back at you " . encode ( " utf-8 " )
2023-03-04 23:37:58 -06:00
# since the segments will be received at max length for a
# StreamDataMessage, the appended text will end up in a
# separate packet.
2023-05-18 23:32:29 +02:00
print ( " Sending " + str ( len ( message ) ) + " bytes, receiving " + str ( len ( expected_rx_message ) ) + " bytes, " )
2023-05-10 18:43:17 +02:00
buffer . write ( message )
2023-03-04 23:37:58 -06:00
buffer . flush ( )
2023-05-10 18:43:17 +02:00
2023-05-18 23:32:29 +02:00
timeout = time . time ( ) + 4
while not time . time ( ) > timeout :
time . sleep ( 1 )
print ( f " Received { len ( received ) } chunks so far " )
time . sleep ( 1 )
2023-03-04 23:37:58 -06:00
data = bytearray ( )
for rx in received :
2026-04-12 14:55:42 +02:00
if rx : data . extend ( rx )
2023-05-10 18:43:17 +02:00
rx_message = data
2023-03-04 23:37:58 -06:00
2023-05-18 23:32:29 +02:00
print ( f " Received { len ( received ) } chunks, totalling { len ( rx_message ) } bytes " )
2023-03-04 23:37:58 -06:00
self . assertEqual ( len ( expected_rx_message ) , len ( rx_message ) )
for i in range ( 0 , len ( expected_rx_message ) ) :
self . assertEqual ( expected_rx_message [ i ] , rx_message [ i ] )
self . assertEqual ( expected_rx_message , rx_message )
l1 . teardown ( )
2025-05-06 14:23:42 +02:00
time . sleep ( LINK_UP_WAIT )
2023-03-04 23:37:58 -06:00
self . assertEqual ( l1 . status , RNS . Link . CLOSED )
finally :
local_interface . bitrate = original_bitrate
local_interface . _force_bitrate = False
# Run with
# RUN_SLOW_TESTS=1 python tests/link.py TestLink.test_13_buffer_round_trip_big_slow
# Or
# make RUN_SLOW_TESTS=1 test
2023-05-10 18:43:17 +02:00
@skipIf ( os . getenv ( ' RUN_SLOW_TESTS ' ) == None , " Not running slow tests " )
2023-03-04 23:37:58 -06:00
def test_13_buffer_round_trip_big_slow ( self ) :
self . test_12_buffer_round_trip_big ( local_bitrate = 410 )
2022-06-09 10:33:13 +02:00
def size_str ( self , num , suffix = ' B ' ) :
units = [ ' ' , ' K ' , ' M ' , ' G ' , ' T ' , ' P ' , ' E ' , ' Z ' ]
last_unit = ' Y '
if suffix == ' b ' :
num * = 8
units = [ ' ' , ' K ' , ' M ' , ' G ' , ' T ' , ' P ' , ' E ' , ' Z ' ]
last_unit = ' Y '
for unit in units :
if abs ( num ) < 1000.0 :
if unit == " " :
return " %.0f %s %s " % ( num , unit , suffix )
else :
return " %.2f %s %s " % ( num , unit , suffix )
num / = 1000.0
return " %.2f %s %s " % ( num , last_unit , suffix )
if __name__ == ' __main__ ' :
unittest . main ( verbosity = 1 )
2023-05-18 23:32:29 +02:00
buffer_read_len = 0
2022-06-14 13:44:12 +02:00
def targets ( yp = False ) :
if yp :
import yappi
2022-06-09 10:33:13 +02:00
def resource_started ( resource ) :
print ( " Resource started " )
2022-06-14 13:44:12 +02:00
if yp :
yappi . start ( )
2022-06-09 10:33:13 +02:00
2022-06-09 13:32:32 +02:00
def resource_concluded ( resource ) :
print ( " Resource concluded " )
2022-06-14 13:44:12 +02:00
if yp :
try :
yappi . stop ( )
yappi . get_func_stats ( ) . save ( " receiver_main_calls.data " , type = " pstat " )
threads = yappi . get_thread_stats ( )
for thread in threads :
print (
" Function stats for ( %s ) ( %d ) " % ( thread . name , thread . id )
) # it is the Thread.__class__.__name__
yappi . get_func_stats ( ctx_id = thread . id ) . save ( " receiver_thread_ " + str ( thread . id ) + " .data " , type = " pstat " )
except Exception as e :
print ( " Error: " + str ( e ) )
2022-06-09 13:32:32 +02:00
2022-06-09 17:14:43 +02:00
if hasattr ( resource . link . attached_interface , " rxptime " ) :
rx_pr = ( resource . link . attached_interface . rxb * 8 ) / resource . link . attached_interface . rxptime
print ( " Average RX proccessing rate: " + size_str ( rx_pr , " b " ) + " ps " )
2022-06-09 13:32:32 +02:00
2022-06-09 10:33:13 +02:00
def link_established ( link ) :
print ( " Link established " )
link . set_resource_strategy ( RNS . Link . ACCEPT_ALL )
link . set_resource_started_callback ( resource_started )
2022-06-09 13:32:32 +02:00
link . set_resource_concluded_callback ( resource_concluded )
2023-02-26 07:25:49 -06:00
channel = link . get_channel ( )
2022-06-09 10:33:13 +02:00
2023-02-25 18:23:25 -06:00
def handle_message ( message ) :
2023-05-10 18:43:17 +02:00
if isinstance ( message , MessageTest ) :
message . data = message . data + " back "
channel . send ( message )
2023-02-26 07:25:49 -06:00
channel . register_message_type ( MessageTest )
2023-02-26 11:23:38 -06:00
channel . add_message_handler ( handle_message )
2023-02-25 18:23:25 -06:00
2023-03-02 17:13:55 -06:00
buffer = None
2023-05-18 23:32:29 +02:00
response_data = [ ]
2023-03-02 17:13:55 -06:00
def handle_buffer ( ready_bytes : int ) :
2023-05-18 23:32:29 +02:00
global buffer_read_len , BUFFER_TEST_TARGET
2023-03-02 17:13:55 -06:00
data = buffer . read ( ready_bytes )
2023-05-18 23:32:29 +02:00
buffer_read_len + = len ( data )
response_data . append ( data )
if data == " Hi there " . encode ( " utf-8 " ) :
RNS . log ( " Sending response " )
for data in response_data :
buffer . write ( data + " back at you " . encode ( " utf-8 " ) )
buffer . flush ( )
buffer_read_len = 0
if buffer_read_len == BUFFER_TEST_TARGET :
RNS . log ( " Sending response " )
for data in response_data :
buffer . write ( data + " back at you " . encode ( " utf-8 " ) )
buffer . flush ( )
buffer_read_len = 0
2023-03-02 17:13:55 -06:00
buffer = RNS . Buffer . create_bidirectional_buffer ( 0 , 0 , channel , handle_buffer )
2023-05-10 18:43:17 +02:00
m_rns = RNS . Reticulum ( " ./tests/rnsconfig " , logdest = RNS . LOG_FILE , loglevel = RNS . LOG_EXTREME )
2022-06-09 10:33:13 +02:00
id1 = RNS . Identity . from_bytes ( bytes . fromhex ( fixed_keys [ 0 ] [ 0 ] ) )
d1 = RNS . Destination ( id1 , RNS . Destination . IN , RNS . Destination . SINGLE , APP_NAME , " link " , " establish " )
d1 . set_proof_strategy ( RNS . Destination . PROVE_ALL )
d1 . set_link_established_callback ( link_established )
2022-06-09 19:54:20 +02:00
while True :
time . sleep ( 1 )
2022-06-09 13:32:32 +02:00
2022-06-14 13:44:12 +02:00
def targets_profiling ( yp = False ) :
targets ( yp )
2022-06-09 13:32:32 +02:00
def profile_resource ( ) :
2022-06-14 13:44:12 +02:00
# import cProfile
# import pstats
# from pstats import SortKey
# cProfile.runctx("entry()", {"entry": resource_profiling, "size_str": size_str}, {}, "profile-resource.data")
# p = pstats.Stats("profile-resource.data")
resource_profiling ( )
2022-06-09 10:33:13 +02:00
2022-06-09 13:32:32 +02:00
def profile_targets ( ) :
2022-06-14 13:44:12 +02:00
targets_profiling ( yp = True )
# cProfile.runctx("entry()", {"entry": targets_profiling, "size_str": size_str}, {}, "profile-targets.data")
# p = pstats.Stats("profile-targets.data")
# p.strip_dirs().sort_stats(SortKey.TIME, SortKey.CUMULATIVE).print_stats()
2022-06-09 10:33:13 +02:00
2022-06-09 13:32:32 +02:00
def resource_profiling ( ) :
2022-06-09 10:33:13 +02:00
init_rns ( )
print ( " " )
# TODO: Load this from public bytes only
id1 = RNS . Identity . from_bytes ( bytes . fromhex ( fixed_keys [ 0 ] [ 0 ] ) )
dest = RNS . Destination ( id1 , RNS . Destination . OUT , RNS . Destination . SINGLE , APP_NAME , " link " , " establish " )
l1 = RNS . Link ( dest )
time . sleep ( 0.5 )
resource_timeout = 120
2022-06-09 19:27:11 +02:00
resource_size = 5 * 1000 * 1000
2022-06-09 10:33:13 +02:00
data = os . urandom ( resource_size )
print ( " Sending " + size_str ( resource_size ) + " resource... " )
2022-06-14 13:44:12 +02:00
import yappi
yappi . start ( )
2022-06-09 10:33:13 +02:00
resource = RNS . Resource ( data , l1 , timeout = resource_timeout )
start = time . time ( )
2022-06-09 13:32:32 +02:00
time . sleep ( 1 )
2022-06-09 10:33:13 +02:00
while resource . status < RNS . Resource . COMPLETE :
time . sleep ( 0.01 )
t = time . time ( ) - start
2022-06-09 13:32:32 +02:00
print ( " Resource completed at " + size_str ( resource_size / t , " b " ) + " ps " )
2022-06-14 13:44:12 +02:00
yappi . get_func_stats ( ) . save ( " sender_main_calls.data " , type = " pstat " )
threads = yappi . get_thread_stats ( )
for thread in threads :
print (
" Function stats for ( %s ) ( %d ) " % ( thread . name , thread . id )
) # it is the Thread.__class__.__name__
yappi . get_func_stats ( ctx_id = thread . id ) . save ( " sender_thread_ " + str ( thread . id ) + " .data " , type = " pstat " )
# t_pstats = yappi.convert2pstats(tstats)
# t_pstats.save("resource_tstat.data", type="pstat")
2022-06-09 17:07:44 +02:00
if hasattr ( resource . link . attached_interface , " rxptime " ) :
rx_pr = ( resource . link . attached_interface . rxb * 8 ) / resource . link . attached_interface . rxptime
print ( " Average RX proccessing rate: " + size_str ( rx_pr , " b " ) + " ps " )
2022-06-09 10:33:13 +02:00
l1 . teardown ( )
time . sleep ( 0.5 )
def size_str ( num , suffix = ' B ' ) :
2022-06-09 13:32:32 +02:00
units = [ ' ' , ' K ' , ' M ' , ' G ' , ' T ' , ' P ' , ' E ' , ' Z ' ]
last_unit = ' Y '
if suffix == ' b ' :
num * = 8
2022-06-09 10:33:13 +02:00
units = [ ' ' , ' K ' , ' M ' , ' G ' , ' T ' , ' P ' , ' E ' , ' Z ' ]
last_unit = ' Y '
2022-06-09 13:32:32 +02:00
for unit in units :
if abs ( num ) < 1000.0 :
if unit == " " :
return " %.0f %s %s " % ( num , unit , suffix )
else :
return " %.2f %s %s " % ( num , unit , suffix )
num / = 1000.0
2022-06-09 10:33:13 +02:00
2022-06-09 13:32:32 +02:00
return " %.2f %s %s " % ( num , last_unit , suffix )