双向压力测试 #!/usr/bin/env python3 # -*- coding: utf-8 -*- import osimport timeimport socketimport structimport selectimport threadingimport subprocessimport argparse# ===== CAN 常量 ===== SOL_CAN_RAW= getattr ( socket, "SOL_CAN_RAW" , 101 ) CAN_RAW_FILTER= getattr ( socket, "CAN_RAW_FILTER" , 1 ) CAN_RAW_LOOPBACK= getattr ( socket, "CAN_RAW_LOOPBACK" , 3 ) CAN_RAW_RECV_OWN_MSGS= getattr ( socket, "CAN_RAW_RECV_OWN_MSGS" , 4 ) CAN_ECHO_FLAG= 0x20000000 STD_MASK= 0x7FF CAN_ID= 0x123 PAYLOAD= bytes ( [ 0x11 , 0x22 , 0x33 , 0x44 , 0x55 , 0x66 , 0x77 , 0x88 ] ) CAN_FRAME_FMT= "=IB3x8s" # can_id, dlc, pad, data # ===== 工具函数 ===== def run ( cmd) : subprocess. run( cmd, shell= True , stdout= subprocess. DEVNULL, stderr= subprocess. DEVNULL) def setup_can ( ) : run( "sudo ifconfig can0 down" ) run( "sudo ifconfig can1 down" ) run( "sudo ip link set can0 up type can bitrate 1000000" ) run( "sudo ip link set can1 up type can bitrate 1000000 dbitrate 1000000 restart-ms 1000 berr-reporting on fd on" ) run( "sudo ifconfig can0 txqueuelen 65536" ) run( "sudo ifconfig can1 txqueuelen 65536" ) def open_can ( iface, is_tx) : s= socket. socket( socket. AF_CAN, socket. SOCK_RAW, socket. CAN_RAW) s. bind( ( iface, ) ) flt= struct. pack( "=II" , CAN_ID, STD_MASK) s. setsockopt( SOL_CAN_RAW, CAN_RAW_FILTER, flt) if is_tx: s. setsockopt( SOL_CAN_RAW, CAN_RAW_LOOPBACK, 0 ) else : s. setsockopt( SOL_CAN_RAW, CAN_RAW_RECV_OWN_MSGS, 0 ) return sdef pack_frame ( ) : return struct. pack( CAN_FRAME_FMT, CAN_ID, 8 , PAYLOAD) def fmt_frame ( ) : return "ID=0x123 DATA=11 22 33 44 55 66 77 88" # ===== 发送线程 ===== def send_loop ( sock, rate, end_t, stat, key, tag) : interval= 1.0 / rate next_t= time. perf_counter( ) frame= pack_frame( ) printed= False while time. perf_counter( ) < end_t: now= time. perf_counter( ) if now< next_t: time. sleep( min ( 0.0005 , next_t- now) ) continue try : sock. send( frame) stat[ key] += 1 if not printed: print ( f"[ { tag} TX OK] { fmt_frame( ) } " ) printed= True except OSError: pass next_t+= interval# ===== 接收线程 ===== def recv_loop ( sock, end_t, stat, key, tag) : printed= False while time. perf_counter( ) < end_t: r, _, _= select. select( [ sock] , [ ] , [ ] , 0.05 ) if not r: continue frame= sock. recv( 16 ) can_id, dlc, data= struct. unpack( CAN_FRAME_FMT, frame) if can_id& CAN_ECHO_FLAG: continue if ( can_id& STD_MASK) == CAN_IDand data[ : 8 ] == PAYLOAD: stat[ key] += 1 if not printed: print ( f"[ { tag} RX OK] { fmt_frame( ) } " ) printed= True # ===== 主程序 ===== def main ( ) : parser= argparse. ArgumentParser( ) parser. add_argument( "--no-setup" , action= "store_true" , help = "跳过 can 配置" ) parser. add_argument( "--duration" , type = float , default= 3.0 , help = "每档测试秒数" ) parser. add_argument( "--rates" , default= "10,50,100,200,500,1000,2000,3000,4000,5000" ) args= parser. parse_args( ) if not args. no_setup: setup_can( ) tx0= open_can( "can0" , True ) tx1= open_can( "can1" , True ) rx0= open_can( "can0" , False ) rx1= open_can( "can1" , False ) rates= [ int ( x) for xin args. rates. split( "," ) ] print ( "\n=== CAN0 <-> CAN1 双向收发测试 ===" ) print ( "ID=0x123 DATA=11 22 33 44 55 66 77 88\n" ) print ( "rate(Hz) | can0->can1 sent/recv drop% | can1->can0 sent/recv drop%" ) for ratein rates: stat= { "s0" : 0 , "r1" : 0 , "s1" : 0 , "r0" : 0 } start= time. perf_counter( ) send_end= start+ args. duration recv_end= send_end+ 0.5 ts0= threading. Thread( target= send_loop, args= ( tx0, rate, send_end, stat, "s0" , "can0" ) ) ts1= threading. Thread( target= send_loop, args= ( tx1, rate, send_end, stat, "s1" , "can1" ) ) tr0= threading. Thread( target= recv_loop, args= ( rx0, recv_end, stat, "r0" , "can0" ) ) tr1= threading. Thread( target= recv_loop, args= ( rx1, recv_end, stat, "r1" , "can1" ) ) tr0. start( ) ; tr1. start( ) ts0. start( ) ; ts1. start( ) ts0. join( ) ; ts1. join( ) tr0. join( ) ; tr1. join( ) d01= 0 if stat[ "s0" ] == 0 else ( stat[ "s0" ] - stat[ "r1" ] ) / stat[ "s0" ] * 100 d10= 0 if stat[ "s1" ] == 0 else ( stat[ "s1" ] - stat[ "r0" ] ) / stat[ "s1" ] * 100 print ( f" { rate: 7d } | { stat[ 's0' ] : 5d } / { stat[ 'r1' ] : 5d } { d01: 6.2f } %" f" | { stat[ 's1' ] : 5d } / { stat[ 'r0' ] : 5d } { d10: 6.2f } %" ) if d01> 1 or d10> 1 : print ( ">> 已明显开始丢包,再提速意义不大了" ) break print ( "\n=== 测试结束 ===" ) if __name__== "__main__" : if os. geteuid( ) != 0 : print ( "请 sudo 运行" ) exit( 1 ) main( )