udp服务器和客户端

需求背景

线上的项目有使用lvs作为端口转发,使用的是udp的端口转发服务,想在测试环境下面模拟一个高并发的udp的服务器和客户端的环境,现成的没有相关的软件,那么就用两个脚本进行实现

要实现这个服务,需要准备一个服务端的脚本,和客户端的脚本,服务端使用用本地文件对外提供服务,客户端请求数据流并保存到本地

需求分析

考虑高并发,所以需要对软件进行一下限速,然后尽量高的并发的,一个端口响应一个请求,并发的取读取udp的请求

需求实现

python实现

这个实现是最开始实现的一个版本,服务端和客户端都采用的python的,一个python进程启动20个端口,然后并发启动进程,这个发现一个问题,因为python的进程占用cpu很容易100%,这个对系统的资源有点大,造成无法处理很大的并发,客户端也是,这个如果小批量的测试还是可以的,也记录下

服务端实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
[root@lab101 tftpboot]# cat server2-1.py
#! /usr/bin/env python3
# -*- coding:utf-8 -*-
import socket
import os
import time
import threading

def start_udp_file_server(host, port, buffer_size=1024, file_path='video.mp4', target_speed_mbps=2):
# 创建UDP套接字
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

# 绑定到指定的地址和端口
server_socket.bind((host, port))
print(f"UDP 文件服务器启动,监听 {host}:{port}...")

# 计算每MB需要的时间(秒)
target_speed_bps = target_speed_mbps * 1024 * 1024
time_per_chunk = buffer_size / target_speed_bps

while True:
try:
# 接收客户端的请求
data, addr = server_socket.recvfrom(buffer_size)
request = data.decode()

print(f"接收到来自 {addr} 的请求: {request}")

# 判断请求是否为“GET”并且文件存在
if request == 'GET' and os.path.exists(file_path):
start_time = time.time()
total_sent = 0

with open(file_path, 'rb') as file:
while True:
chunk = file.read(buffer_size)
if not chunk:
break
# 发送数据块到客户端
server_socket.sendto(chunk, addr)
total_sent += len(chunk)

# 计算并显示发送速度
elapsed_time = time.time() - start_time
speed = total_sent / (1024 * 1024) / elapsed_time # MB/s
# print(f"已发送 {total_sent / (1024 * 1024):.2f} MB, 速度: {speed:.2f} MB/s")

# 控制发送速度
time.sleep(time_per_chunk)

# 发送文件结束标志
server_socket.sendto(b'END', addr)
print("文件已发送完毕")
else:
server_socket.sendto(b'FILE_NOT_FOUND', addr)
print("文件未找到或请求无效")
except Exception as e:
print(f"发生错误: {e}")

def run_servers_on_multiple_ports(host='192.167.19.101', ports=[30000,30001,30002,30003,30004,30005,30006,30007,30008,30009,30010,30011,30012,30013,30014,30015,30016,30017,30018,30019,30020], buffer_size=1024, file_path='video.mp4', target_speed_mbps=2):
threads = []
for port in ports:
thread = threading.Thread(target=start_udp_file_server, args=(host, port, buffer_size, file_path, target_speed_mbps))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()

if __name__ == '__main__':
run_servers_on_multiple_ports()

使用方法

1
python server.py 

客户端实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
[root@lab104 udp]# cat test.py
#! /usr/bin/env python3
# -*- coding:utf-8 -*-

import socket
import sys
def udp_file_client(server_ip, server_port, request_message='GET', output_file='received_video.mp4', buffer_size=1024):
# 创建UDP套接字
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

# 发送请求到服务器
client_socket.sendto(request_message.encode(), (server_ip, server_port))
print("请求已发送")

with open(output_file, 'wb') as file:
while True:
# 接收数据
data, _ = client_socket.recvfrom(buffer_size)
if data == b'END':
break
elif data == b'FILE_NOT_FOUND':
print("文件未找到")
break
else:
file.write(data)

print("文件接收完毕")

if __name__ == '__main__':
ip = sys.argv[1]
port = int(sys.argv[2])
udp_file_client(ip, port)

使用方法

1
2
3
4
5
6
7
[root@lab104 udp]# cat test1.sh
for a in `seq 30001 30200`
do

python test.py 192.168.3.235 $a &
sleep 0.5
done

这样就可以并发请求了

go的实现

go的好处是比python的占用小,并且处处可运行,无需基础环境

服务端的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
[root@lab103 udp]# cat server1.go
package main

import (
"fmt"
"net"
"os"
"strconv"
"sync"
"time"
)

func startUDPFileServer(wg *sync.WaitGroup, host string, port int, bufferSize int, filePath string, targetSpeedMbps int) {
defer wg.Done()

addr := fmt.Sprintf("%s:%d", host, port)
udpAddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
fmt.Printf("Failed to resolve address: %v\n", err)
return
}

conn, err := net.ListenUDP("udp", udpAddr)
if err != nil {
fmt.Printf("Failed to listen on %s: %v\n", addr, err)
return
}
defer conn.Close()

fmt.Printf("UDP File server started, listening on %s...\n", addr)

targetSpeedBps := targetSpeedMbps * 1024 * 1024

buffer := make([]byte, bufferSize)

for {
n, clientAddr, err := conn.ReadFromUDP(buffer)
if err != nil {
fmt.Printf("Error receiving data: %v\n", err)
continue
}
request := string(buffer[:n])
fmt.Printf("Received request from %v: %s\n", clientAddr, request)

if request == "GET" {
if _, err := os.Stat(filePath); os.IsNotExist(err) {
conn.WriteToUDP([]byte("FILE_NOT_FOUND"), clientAddr)
fmt.Println("File not found")
continue
}

file, err := os.Open(filePath)
if err != nil {
fmt.Printf("Failed to open file: %v\n", err)
continue
}
defer file.Close()

totalSent := 0
startTime := time.Now()

for {
n, err := file.Read(buffer)
if err != nil {
break
}
_, err = conn.WriteToUDP(buffer[:n], clientAddr)
if err != nil {
fmt.Printf("Error sending data: %v\n", err)
break
}
totalSent += n

// 控制发送速度
timeElapsed := time.Since(startTime).Seconds()
expectedTime := float64(totalSent) / float64(targetSpeedBps)
if timeElapsed < expectedTime {
time.Sleep(time.Duration(expectedTime-timeElapsed) * time.Second)
}
}

conn.WriteToUDP([]byte("END"), clientAddr)
fmt.Println("File sent successfully")
} else {
conn.WriteToUDP([]byte("INVALID_REQUEST"), clientAddr)
fmt.Println("Invalid request")
}
}
}

func runServersOnMultiplePorts(host string, startPort, endPort int, bufferSize int, filePath string, targetSpeedMbps int) {
var wg sync.WaitGroup

for port := startPort; port <= endPort; port++ {
wg.Add(1)
go startUDPFileServer(&wg, host, port, bufferSize, filePath, targetSpeedMbps)
}

wg.Wait()
}

func main() {
if len(os.Args) != 4 {
fmt.Println("Usage: go run server.go <host> <start_port> <end_port>")
return
}

host := os.Args[1]
startPort, err1 := strconv.Atoi(os.Args[2])
endPort, err2 := strconv.Atoi(os.Args[3])

if err1 != nil || err2 != nil {
fmt.Println("Invalid port number")
return
}

runServersOnMultiplePorts(host, startPort, endPort, 1024, "video.mp4", 2)
}

使用方法

1
[root@lab103 udp]# ./server1 192.168.19.103 30001  30050

客户端的实现

版本一:存储文件的版本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
[root@lab104 udp]# cat client1.go
package main

import (
"fmt"
"net"
"os"
"strconv"
"sync"
"time"
)

func udpFileClient(serverIP string, port int, requestMessage string, outputFile string, bufferSize int, wg *sync.WaitGroup) {
defer wg.Done()

// 创建UDP地址
serverAddr := &net.UDPAddr{
IP: net.ParseIP(serverIP),
Port: port,
}

// 创建UDP套接字
conn, err := net.DialUDP("udp", nil, serverAddr)
if err != nil {
fmt.Printf("端口 %d: 无法创建UDP套接字: %v\n", port, err)
return
}
defer conn.Close()

// 发送请求到服务器
_, err = conn.Write([]byte(requestMessage))
if err != nil {
fmt.Printf("端口 %d: 发送请求失败: %v\n", port, err)
return
}
fmt.Printf("端口 %d: 请求已发送\n", port)

// 打开文件用于写入接收的数据
file, err := os.Create(fmt.Sprintf("%s_%d.mp4", outputFile, port))
if err != nil {
fmt.Printf("端口 %d: 无法创建文件: %v\n", port, err)
return
}
defer file.Close()

buffer := make([]byte, bufferSize)
for {
conn.SetReadDeadline(time.Now().Add(5 * time.Second)) // 设置超时时间
n, _, err := conn.ReadFromUDP(buffer)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
fmt.Printf("端口 %d: 接收超时\n", port)
break
}
fmt.Printf("端口 %d: 接收数据失败: %v\n", port, err)
return
}

data := buffer[:n]
if string(data) == "END" {
break
} else if string(data) == "FILE_NOT_FOUND" {
fmt.Printf("端口 %d: 文件未找到\n", port)
return
}

_, err = file.Write(data)
if err != nil {
fmt.Printf("端口 %d: 写入文件失败: %v\n", port, err)
return
}
}

fmt.Printf("端口 %d: 文件接收完毕\n", port)
}

func main() {
if len(os.Args) != 4 {
fmt.Println("用法: go run client.go <IP> <起始端口> <结束端口>")
return
}

ip := os.Args[1]
startPort, err := strconv.Atoi(os.Args[2])
if err != nil {
fmt.Println("起始端口无效")
return
}
endPort, err := strconv.Atoi(os.Args[3])
if err != nil {
fmt.Println("结束端口无效")
return
}

if startPort > endPort {
fmt.Println("起始端口不能大于结束端口")
return
}

var wg sync.WaitGroup
for port := startPort; port <= endPort; port++ {
wg.Add(1)
go udpFileClient(ip, port, "GET", "received_video", 1024, &wg)
}

wg.Wait()
}

使用方法

1
./client1 192.168.19.103 30001 30050
版本二:存储内存的版本

上面的客户端的版本是本地要存储实际文件的,这个可能本地磁盘性能也会有影响,我们可以给一个丢内存的版本

客户端只请求,数据丢内存的版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
[root@lab104 udp]# cat client2.go
package main

import (
"bytes"
"fmt"
"net"
"os"
"strconv"
"sync"
"time"
)

func udpFileClient(serverIP string, port int, requestMessage string, bufferSize int, wg *sync.WaitGroup) {
defer wg.Done()

// 创建UDP地址
serverAddr := &net.UDPAddr{
IP: net.ParseIP(serverIP),
Port: port,
}

// 创建UDP套接字
conn, err := net.DialUDP("udp", nil, serverAddr)
if err != nil {
fmt.Printf("端口 %d: 无法创建UDP套接字: %v\n", port, err)
return
}
defer conn.Close()

// 发送请求到服务器
_, err = conn.Write([]byte(requestMessage))
if err != nil {
fmt.Printf("端口 %d: 发送请求失败: %v\n", port, err)
return
}
fmt.Printf("端口 %d: 请求已发送\n", port)

// 使用 bytes.Buffer 存储接收到的数据
var buffer bytes.Buffer
dataBuffer := make([]byte, bufferSize)

for {
conn.SetReadDeadline(time.Now().Add(5 * time.Second)) // 设置超时时间
n, _, err := conn.ReadFromUDP(dataBuffer)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
fmt.Printf("端口 %d: 接收超时\n", port)
break
}
fmt.Printf("端口 %d: 接收数据失败: %v\n", port, err)
return
}

data := dataBuffer[:n]
if string(data) == "END" {
break
} else if string(data) == "FILE_NOT_FOUND" {
fmt.Printf("端口 %d: 文件未找到\n", port)
return
}

_, err = buffer.Write(data)
if err != nil {
fmt.Printf("端口 %d: 写入内存失败: %v\n", port, err)
return
}
}

fmt.Printf("端口 %d: 文件接收完毕,总数据大小: %d bytes\n", port, buffer.Len())
// 如果需要,你可以在这里进一步处理 buffer 中的数据
}

func main() {
if len(os.Args) != 4 {
fmt.Println("用法: go run client.go <IP> <起始端口> <结束端口>")
return
}

ip := os.Args[1]
startPort, err := strconv.Atoi(os.Args[2])
if err != nil {
fmt.Println("起始端口无效")
return
}
endPort, err := strconv.Atoi(os.Args[3])
if err != nil {
fmt.Println("结束端口无效")
return
}

if startPort > endPort {
fmt.Println("起始端口不能大于结束端口")
return
}

var wg sync.WaitGroup
for port := startPort; port <= endPort; port++ {
wg.Add(1)
go udpFileClient(ip, port, "GET", 1024, &wg)
}

wg.Wait()
}

使用方法

1
[root@lab104 udp]# ./client2 192.168.19.103 30001 30050
版本三:存储内存并控制大小的版本

上面的版本有个问题是文件大小多大会占用多大内存,没有处理内存的问题,我们需要控制内存,设置一个缓冲区,缓冲区满了就释放内存,下面的版本就是设置的20MB的单个文件的缓冲区,如果是2MB/s,差不多可以缓冲10s左右的数据,这个可以自己修改具体的

20个并发占用的内存大概在1G左右,这个内存大小还行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
[root@lab104 udp]# cat client3.go
package main

import (
"fmt"
"net"
"os"
"strconv"
"sync"
"time"
)

const maxBufferSize = 20 * 1024 * 1024 // 20 MB

func udpFileClient(serverIP string, port int, requestMessage string, bufferSize int, wg *sync.WaitGroup) {
defer wg.Done()

// 创建UDP地址
serverAddr := &net.UDPAddr{
IP: net.ParseIP(serverIP),
Port: port,
}

// 创建UDP套接字
conn, err := net.DialUDP("udp", nil, serverAddr)
if err != nil {
fmt.Printf("端口 %d: 无法创建UDP套接字: %v\n", port, err)
return
}
defer conn.Close()

// 发送请求到服务器
_, err = conn.Write([]byte(requestMessage))
if err != nil {
fmt.Printf("端口 %d: 发送请求失败: %v\n", port, err)
return
}
fmt.Printf("端口 %d: 请求已发送\n", port)

// 使用固定大小的缓冲区存储接收到的数据
dataBuffer := make([]byte, bufferSize)
totalData := make([]byte, 0, maxBufferSize)

for {
conn.SetReadDeadline(time.Now().Add(5 * time.Second)) // 设置超时时间
n, _, err := conn.ReadFromUDP(dataBuffer)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
fmt.Printf("端口 %d: 接收超时\n", port)
break
}
fmt.Printf("端口 %d: 接收数据失败: %v\n", port, err)
return
}

data := dataBuffer[:n]
if string(data) == "END" {
break
} else if string(data) == "FILE_NOT_FOUND" {
fmt.Printf("端口 %d: 文件未找到\n", port)
return
}

// 如果总数据量超过了最大缓冲区大小,则丢弃之前的数据
if len(totalData)+len(data) > maxBufferSize {
totalData = totalData[len(data):]
}

totalData = append(totalData, data...)
}

fmt.Printf("端口 %d: 数据接收完毕,总数据大小: %d bytes\n", port, len(totalData))
// 在这里处理 totalData,如进行统计或分析
}

func main() {
if len(os.Args) != 4 {
fmt.Println("用法: go run client.go <IP> <起始端口> <结束端口>")
return
}

ip := os.Args[1]
startPort, err := strconv.Atoi(os.Args[2])
if err != nil {
fmt.Println("起始端口无效")
return
}
endPort, err := strconv.Atoi(os.Args[3])
if err != nil {
fmt.Println("结束端口无效")
return
}

if startPort > endPort {
fmt.Println("起始端口不能大于结束端口")
return
}

var wg sync.WaitGroup
for port := startPort; port <= endPort; port++ {
wg.Add(1)
go udpFileClient(ip, port, "GET", 1024, &wg)
}

wg.Wait()
}

总结

使用上面的就实现了一个udp的服务端和客户端的模拟程序