测试路由转发最大连接数脚本

背景

有一个场景下,路由器配置了端口转发,出现了一些端口响应问题,我们需要确定下这个环境的单个端口最大并发数目

方式是启动一个服务端,然后执行数目的长连接,检查最多可以连接到多少

测试脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import asyncio
import sys

async def handle_client(reader, writer):
addr = writer.get_extra_info('peername')
print(f"New connection from {addr}")

try:
while True:
data = await reader.read(1024)
if not data:
break
message = data.decode().strip()
if message == "ping":
writer.write(b"pong\n")
await writer.drain()
except Exception as e:
print(f"Error with {addr}: {e}")
finally:
writer.close()
print(f"Connection closed: {addr}")

def main():
loop = asyncio.get_event_loop()
server_coro = asyncio.start_server(handle_client, '0.0.0.0', 4444, loop=loop)
server = loop.run_until_complete(server_coro)

print(f"Serving on {server.sockets[0].getsockname()}")

try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

if __name__ == "__main__":
main()

上面的是版本一的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import asyncio

active_connections = set()

async def handle_client(reader, writer):
addr = writer.get_extra_info('peername')
active_connections.add(writer)
print(f"New connection from {addr}")

try:
while True:
data = await reader.read(1024)
if not data:
break
message = data.decode().strip()
if message == "ping":
writer.write(b"pong\n")
await writer.drain()
except Exception as e:
print(f"Error with {addr}: {e}")
finally:
active_connections.discard(writer)
writer.close()
await writer.wait_closed()
print(f"Connection closed: {addr}")

async def monitor_connections():
while True:
print(f"Active connections: {len(active_connections)}")
await asyncio.sleep(3)

def main():
loop = asyncio.get_event_loop()
server_coro = asyncio.start_server(handle_client, '0.0.0.0', 4444, loop=loop)
server = loop.run_until_complete(server_coro)

print(f"Serving on {server.sockets[0].getsockname()}")

# 启动监控任务
loop.create_task(monitor_connections())

try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

if __name__ == "__main__":
main()

这个是版本二的,这个支持实时打印当前的连接

长请求版本

client_long.py脚本

1
python3 client_long.py 20000
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import asyncio
import sys
import time

async def client_connection(conn_id):
try:
reader, writer = await asyncio.open_connection('192.168.1.19', 4444)
print(f"Connection {conn_id} established")

while True:
writer.write(b"ping\n")
await writer.drain()

response = await reader.read(1024)
if not response or response.decode().strip() != "pong":
print(f"Connection {conn_id} bad response: {response}")
break

await asyncio.sleep(5)
except Exception as e:
print(f"Connection {conn_id} error: {str(e)}")
finally:
if 'writer' in locals():
writer.close()
print(f"Connection {conn_id} closed")

def main(target_connections):
loop = asyncio.get_event_loop()
connections = []
batch_size = 1000
current_count = 0

try:
while current_count < target_connections:
new_connections = min(batch_size, target_connections - current_count)
print(f"Adding {new_connections} connections...")

tasks = []
for i in range(new_connections):
task = asyncio.ensure_future(client_connection(current_count + i))
tasks.append(task)
connections.append(task)

current_count += new_connections

# 让事件循环处理新连接
loop.run_until_complete(asyncio.sleep(1))

# 等待所有连接完成
loop.run_until_complete(asyncio.gather(*connections))
finally:
loop.close()

if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python long_client.py <total_connections>")
sys.exit(1)

try:
total = int(sys.argv[1])
if total <= 0:
raise ValueError
except ValueError:
print("Invalid connection count. Must be positive integer.")
sys.exit(1)

start_time = time.time()
print(f"Starting long connection test with {total} connections")
main(total)
print(f"Test completed in {time.time()-start_time:.2f} seconds")

短请求版本

client_shot.py

1
2
3
4
5
6
7
8
9
10
11
12
import socket

def send_ping():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(('192.168.1.19', 4444))
s.sendall(b"ping\n")
response = s.recv(1024)
print(f"Received: {response.decode().strip()}")

if __name__ == "__main__":
send_ping()

使用上面的长脚本建立指定数目的长连接,使用短连接脚本呢验证连通性


测试路由转发最大连接数脚本
https://zphj1987.com/2025/06/09/测试路由转发最大连接数脚本/
作者
zphj1987
发布于
2025年6月9日
许可协议