完成Bitsflow家人云的迁移工作

This commit is contained in:
zeaslity
2025-12-08 08:56:23 +08:00
parent 9d93a1ee6e
commit dcc8afffba
19 changed files with 1515 additions and 404 deletions

View File

@@ -0,0 +1,55 @@
# ==========================================================
# 核心拥塞控制与队列管理 (解决丢包与延迟)
# ==========================================================
# 使用 FQ (Fair Queueing) 队列调度算法这是BBR的必要配合
net.core.default_qdisc = fq
# 开启 BBR 拥塞控制算法
net.ipv4.tcp_congestion_control = bbr
# ==========================================================
# TCP 缓冲区调优 (针对 30-50Mbps 带宽优化)
# ==========================================================
# 格式: min default max
# 核心策略:限制 max 值,防止 Bufferbloat。
# 2MB (2097152) 的缓冲区足以应对 50Mbps 下 300ms 的抖动,再大就会导致高延迟。
# 接收缓冲区
net.ipv4.tcp_rmem = 4096 87380 2097152
# 发送缓冲区
net.ipv4.tcp_wmem = 4096 65536 2097152
# 开启窗口缩放,允许窗口超过 64KB
net.ipv4.tcp_window_scaling = 1
# ==========================================================
# 连接追踪与并发优化 (Xray 高并发转发需求)
# ==========================================================
# 增加系统级文件描述符限制
fs.file-max = 1048576
# 增加入站连接队列长度,防止突发流量导致握手失败
net.core.somaxconn = 4096
net.core.netdev_max_backlog = 4096
# TCP Fast Open (TFO):减少握手延迟
# 值 3 表示同时开启客户端和服务端的 TFO 支持
# Xray 需要在配置中显式开启 "tcpFastOpen": true 才能生效
net.ipv4.tcp_fastopen = 3
# ==========================================================
# 抗丢包与快速恢复
# ==========================================================
# 开启 SACK (选择性确认),对丢包环境极其重要
net.ipv4.tcp_sack = 1
net.ipv4.tcp_dsack = 1
net.ipv4.tcp_fack = 1
# 缩短保活探测时间快速剔除死连接GFW常导致连接“假死”
net.ipv4.tcp_keepalive_time = 300
net.ipv4.tcp_keepalive_probes = 5
net.ipv4.tcp_keepalive_intvl = 15
# ==========================================================
# 转发功能开启 (主机A必须)
# ==========================================================
net.ipv4.ip_forward = 1

View File

@@ -0,0 +1,730 @@
package main
import (
"encoding/json"
"flag"
"fmt"
"log"
"net"
"os"
"strings"
"sync"
"time"
"golang.org/x/net/icmp"
"golang.org/x/net/ipv4"
)
// ============ 通用数据结构 ============
type TestPacket struct {
SeqNum uint64
Timestamp int64
Type string // "tcp", "udp", "ping"
Data []byte
}
type Stats struct {
PacketsReceived uint64
PacketsLost uint64
LastSeqNum uint64
RTTSamples []time.Duration
}
type Metrics struct {
PacketsSent uint64
PacketsReceived uint64
PacketsLost uint64
RTTSamples []time.Duration
MinRTT time.Duration
MaxRTT time.Duration
AvgRTT time.Duration
Jitter time.Duration
}
type TestReport struct {
Timestamp time.Time
TestDuration time.Duration
TargetHost string
TCPMetrics *Metrics
UDPMetrics *Metrics
TracerouteHops []HopInfo
}
type HopInfo struct {
TTL int
Address string
RTT time.Duration
}
// ============ 服务端实现 ============
type NetworkServer struct {
tcpAddr string
udpAddr string
tcpStats *Stats
udpStats *Stats
statsLock sync.RWMutex
}
func NewNetworkServer(tcpPort, udpPort int) *NetworkServer {
return &NetworkServer{
tcpAddr: fmt.Sprintf(":%d", tcpPort),
udpAddr: fmt.Sprintf(":%d", udpPort),
tcpStats: &Stats{},
udpStats: &Stats{},
}
}
func (ns *NetworkServer) Start() {
log.Printf("========== 网络质量检测服务端 ==========")
log.Printf("TCP监听端口: %s", ns.tcpAddr)
log.Printf("UDP监听端口: %s", ns.udpAddr)
log.Printf("服务器已启动,等待客户端连接...")
log.Printf("========================================\n")
// 启动TCP服务器
go ns.serveTCP()
// 启动UDP服务器
go ns.serveUDP()
select {}
}
func (ns *NetworkServer) serveTCP() {
listener, err := net.Listen("tcp", ns.tcpAddr)
if err != nil {
log.Fatalf("TCP监听失败: %v", err)
}
defer listener.Close()
for {
conn, err := listener.Accept()
if err != nil {
log.Printf("TCP连接接受错误: %v", err)
continue
}
go ns.handleTCPConnection(conn)
}
}
func (ns *NetworkServer) handleTCPConnection(conn net.Conn) {
defer conn.Close()
log.Printf("[TCP] 新连接来自 %s", conn.RemoteAddr())
buf := make([]byte, 8192)
for {
n, err := conn.Read(buf)
if err != nil {
log.Printf("[TCP] 连接 %s 断开", conn.RemoteAddr())
return
}
var packet TestPacket
if err := json.Unmarshal(buf[:n], &packet); err != nil {
continue
}
receiveTime := time.Now().UnixNano()
ns.updateStats(ns.tcpStats, packet.SeqNum)
// 立即回显数据包
response := TestPacket{
SeqNum: packet.SeqNum,
Timestamp: receiveTime,
Type: "tcp_response",
Data: packet.Data,
}
data, _ := json.Marshal(response)
conn.Write(data)
}
}
func (ns *NetworkServer) serveUDP() {
addr, err := net.ResolveUDPAddr("udp", ns.udpAddr)
if err != nil {
log.Fatalf("UDP地址解析失败: %v", err)
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
log.Fatalf("UDP监听失败: %v", err)
}
defer conn.Close()
log.Printf("[UDP] 监听在 %s", ns.udpAddr)
buf := make([]byte, 8192)
for {
n, remoteAddr, err := conn.ReadFromUDP(buf)
if err != nil {
log.Printf("UDP读取错误: %v", err)
continue
}
var packet TestPacket
if err := json.Unmarshal(buf[:n], &packet); err != nil {
continue
}
receiveTime := time.Now().UnixNano()
ns.updateStats(ns.udpStats, packet.SeqNum)
// 回显UDP数据包
response := TestPacket{
SeqNum: packet.SeqNum,
Timestamp: receiveTime,
Type: "udp_response",
Data: packet.Data,
}
data, _ := json.Marshal(response)
conn.WriteToUDP(data, remoteAddr)
}
}
func (ns *NetworkServer) updateStats(stats *Stats, seqNum uint64) {
ns.statsLock.Lock()
defer ns.statsLock.Unlock()
stats.PacketsReceived++
if seqNum > stats.LastSeqNum+1 {
stats.PacketsLost += seqNum - stats.LastSeqNum - 1
}
stats.LastSeqNum = seqNum
}
// ============ 客户端实现 ============
type NetworkClient struct {
targetHost string
tcpPort int
udpPort int
testDuration time.Duration
packetSize int
reportFile string
tcpMetrics *Metrics
udpMetrics *Metrics
mu sync.Mutex
}
func NewNetworkClient(host string, tcpPort, udpPort int, duration time.Duration) *NetworkClient {
return &NetworkClient{
targetHost: host,
tcpPort: tcpPort,
udpPort: udpPort,
testDuration: duration,
packetSize: 1024,
reportFile: "network_quality_report.json",
tcpMetrics: &Metrics{MinRTT: time.Hour},
udpMetrics: &Metrics{MinRTT: time.Hour},
}
}
func (nc *NetworkClient) testTCPLatency() error {
addr := fmt.Sprintf("%s:%d", nc.targetHost, nc.tcpPort)
conn, err := net.DialTimeout("tcp", addr, 5*time.Second)
if err != nil {
return fmt.Errorf("TCP连接失败: %v", err)
}
defer conn.Close()
log.Printf("[TCP] 开始延迟测试 -> %s", addr)
var seqNum uint64
deadline := time.Now().Add(nc.testDuration)
for time.Now().Before(deadline) {
seqNum++
packet := TestPacket{
SeqNum: seqNum,
Timestamp: time.Now().UnixNano(),
Type: "tcp_probe",
Data: make([]byte, nc.packetSize),
}
sendTime := time.Now()
data, _ := json.Marshal(packet)
if _, err := conn.Write(data); err != nil {
nc.mu.Lock()
nc.tcpMetrics.PacketsLost++
nc.mu.Unlock()
continue
}
nc.mu.Lock()
nc.tcpMetrics.PacketsSent++
nc.mu.Unlock()
buf := make([]byte, 8192)
conn.SetReadDeadline(time.Now().Add(2 * time.Second))
_, err := conn.Read(buf)
if err != nil {
nc.mu.Lock()
nc.tcpMetrics.PacketsLost++
nc.mu.Unlock()
continue
}
rtt := time.Since(sendTime)
nc.updateTCPMetrics(rtt)
time.Sleep(100 * time.Millisecond)
}
log.Printf("[TCP] 测试完成")
return nil
}
func (nc *NetworkClient) testUDPLatency() error {
addr := fmt.Sprintf("%s:%d", nc.targetHost, nc.udpPort)
raddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return fmt.Errorf("UDP地址解析失败: %v", err)
}
conn, err := net.DialUDP("udp", nil, raddr)
if err != nil {
return fmt.Errorf("UDP连接失败: %v", err)
}
defer conn.Close()
log.Printf("[UDP] 开始延迟测试 -> %s", addr)
var seqNum uint64
deadline := time.Now().Add(nc.testDuration)
sentPackets := make(map[uint64]time.Time)
var wg sync.WaitGroup
// 发送协程
wg.Add(1)
go func() {
defer wg.Done()
for time.Now().Before(deadline) {
seqNum++
packet := TestPacket{
SeqNum: seqNum,
Timestamp: time.Now().UnixNano(),
Type: "udp_probe",
Data: make([]byte, nc.packetSize),
}
sendTime := time.Now()
nc.mu.Lock()
sentPackets[seqNum] = sendTime
nc.mu.Unlock()
data, _ := json.Marshal(packet)
conn.Write(data)
nc.mu.Lock()
nc.udpMetrics.PacketsSent++
nc.mu.Unlock()
time.Sleep(100 * time.Millisecond)
}
}()
// 接收协程
buf := make([]byte, 8192)
for time.Now().Before(deadline.Add(3 * time.Second)) {
conn.SetReadDeadline(time.Now().Add(1 * time.Second))
n, err := conn.Read(buf)
if err != nil {
continue
}
var response TestPacket
if err := json.Unmarshal(buf[:n], &response); err != nil {
continue
}
nc.mu.Lock()
if sendTime, ok := sentPackets[response.SeqNum]; ok {
rtt := time.Since(sendTime)
nc.updateUDPMetrics(rtt)
delete(sentPackets, response.SeqNum)
}
nc.mu.Unlock()
}
wg.Wait()
nc.mu.Lock()
nc.udpMetrics.PacketsLost = uint64(len(sentPackets))
nc.mu.Unlock()
log.Printf("[UDP] 测试完成")
return nil
}
func (nc *NetworkClient) performTraceroute() ([]HopInfo, error) {
log.Printf("[Traceroute] 路由追踪到 %s", nc.targetHost)
hops := make([]HopInfo, 0, 30)
maxTTL := 30
timeout := 2 * time.Second
for ttl := 1; ttl <= maxTTL; ttl++ {
hopInfo, reached, err := nc.probeHop(ttl, timeout)
if err != nil {
continue
}
hops = append(hops, hopInfo)
if hopInfo.Address != "*" {
log.Printf(" %2d %-15s %.2fms", ttl, hopInfo.Address,
float64(hopInfo.RTT.Microseconds())/1000.0)
} else {
log.Printf(" %2d *", ttl)
}
if reached {
break
}
}
return hops, nil
}
func (nc *NetworkClient) probeHop(ttl int, timeout time.Duration) (HopInfo, bool, error) {
conn, err := icmp.ListenPacket("ip4:icmp", "0.0.0.0")
if err != nil {
return HopInfo{}, false, err
}
defer conn.Close()
if err := conn.IPv4PacketConn().SetTTL(ttl); err != nil {
return HopInfo{}, false, err
}
msg := icmp.Message{
Type: ipv4.ICMPTypeEcho,
Code: 0,
Body: &icmp.Echo{
ID: os.Getpid() & 0xffff,
Seq: ttl,
Data: []byte("TRACEROUTE"),
},
}
msgBytes, err := msg.Marshal(nil)
if err != nil {
return HopInfo{}, false, err
}
dst, err := net.ResolveIPAddr("ip4", nc.targetHost)
if err != nil {
return HopInfo{}, false, err
}
start := time.Now()
if _, err := conn.WriteTo(msgBytes, dst); err != nil {
return HopInfo{}, false, err
}
reply := make([]byte, 1500)
conn.SetReadDeadline(time.Now().Add(timeout))
_, peer, err := conn.ReadFrom(reply)
rtt := time.Since(start)
if err != nil {
return HopInfo{TTL: ttl, Address: "*", RTT: 0}, false, nil
}
hopAddr := peer.String()
reachedTarget := (hopAddr == dst.String())
return HopInfo{
TTL: ttl,
Address: hopAddr,
RTT: rtt,
}, reachedTarget, nil
}
func (nc *NetworkClient) updateTCPMetrics(rtt time.Duration) {
nc.mu.Lock()
defer nc.mu.Unlock()
nc.tcpMetrics.PacketsReceived++
nc.tcpMetrics.RTTSamples = append(nc.tcpMetrics.RTTSamples, rtt)
if rtt < nc.tcpMetrics.MinRTT {
nc.tcpMetrics.MinRTT = rtt
}
if rtt > nc.tcpMetrics.MaxRTT {
nc.tcpMetrics.MaxRTT = rtt
}
}
func (nc *NetworkClient) updateUDPMetrics(rtt time.Duration) {
nc.udpMetrics.PacketsReceived++
nc.udpMetrics.RTTSamples = append(nc.udpMetrics.RTTSamples, rtt)
if rtt < nc.udpMetrics.MinRTT {
nc.udpMetrics.MinRTT = rtt
}
if rtt > nc.udpMetrics.MaxRTT {
nc.udpMetrics.MaxRTT = rtt
}
}
func (nc *NetworkClient) calculateMetrics() {
// 计算TCP平均RTT和抖动
if len(nc.tcpMetrics.RTTSamples) > 0 {
var sum time.Duration
for _, rtt := range nc.tcpMetrics.RTTSamples {
sum += rtt
}
nc.tcpMetrics.AvgRTT = sum / time.Duration(len(nc.tcpMetrics.RTTSamples))
var jitterSum time.Duration
for i := 1; i < len(nc.tcpMetrics.RTTSamples); i++ {
diff := nc.tcpMetrics.RTTSamples[i] - nc.tcpMetrics.RTTSamples[i-1]
if diff < 0 {
diff = -diff
}
jitterSum += diff
}
if len(nc.tcpMetrics.RTTSamples) > 1 {
nc.tcpMetrics.Jitter = jitterSum / time.Duration(len(nc.tcpMetrics.RTTSamples)-1)
}
}
// 计算UDP平均RTT和抖动
if len(nc.udpMetrics.RTTSamples) > 0 {
var sum time.Duration
for _, rtt := range nc.udpMetrics.RTTSamples {
sum += rtt
}
nc.udpMetrics.AvgRTT = sum / time.Duration(len(nc.udpMetrics.RTTSamples))
var jitterSum time.Duration
for i := 1; i < len(nc.udpMetrics.RTTSamples); i++ {
diff := nc.udpMetrics.RTTSamples[i] - nc.udpMetrics.RTTSamples[i-1]
if diff < 0 {
diff = -diff
}
jitterSum += diff
}
if len(nc.udpMetrics.RTTSamples) > 1 {
nc.udpMetrics.Jitter = jitterSum / time.Duration(len(nc.udpMetrics.RTTSamples)-1)
}
}
}
func (nc *NetworkClient) generateReport(hops []HopInfo) error {
nc.calculateMetrics()
report := TestReport{
Timestamp: time.Now(),
TestDuration: nc.testDuration,
TargetHost: nc.targetHost,
TCPMetrics: nc.tcpMetrics,
UDPMetrics: nc.udpMetrics,
TracerouteHops: hops,
}
// 将报告序列化为JSON单行不格式化
data, err := json.Marshal(report)
if err != nil {
return err
}
// 以追加模式打开文件,如果不存在则创建
file, err := os.OpenFile(nc.reportFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return fmt.Errorf("打开报告文件失败: %v", err)
}
defer file.Close()
// 写入JSON数据并添加换行符JSON Lines格式
if _, err := file.Write(data); err != nil {
return fmt.Errorf("写入报告数据失败: %v", err)
}
if _, err := file.WriteString("\n"); err != nil {
return fmt.Errorf("写入换行符失败: %v", err)
}
log.Printf("测试报告已追加至: %s", nc.reportFile)
nc.printReport(&report)
return nil
}
func (nc *NetworkClient) printReport(report *TestReport) {
fmt.Println("\n" + strings.Repeat("=", 50))
fmt.Println(" 网络质量检测报告")
fmt.Println(strings.Repeat("=", 50))
fmt.Printf("测试时间: %s\n", report.Timestamp.Format("2006-01-02 15:04:05"))
fmt.Printf("目标主机: %s\n", report.TargetHost)
fmt.Printf("测试时长: %v\n", report.TestDuration)
fmt.Println(strings.Repeat("-", 50))
fmt.Println("\n【TCP 测试结果】")
fmt.Printf(" 发送包数: %d\n", report.TCPMetrics.PacketsSent)
fmt.Printf(" 接收包数: %d\n", report.TCPMetrics.PacketsReceived)
if report.TCPMetrics.PacketsSent > 0 {
lossRate := float64(report.TCPMetrics.PacketsLost) / float64(report.TCPMetrics.PacketsSent) * 100
fmt.Printf(" 丢包数量: %d (丢包率: %.2f%%)\n", report.TCPMetrics.PacketsLost, lossRate)
}
if report.TCPMetrics.MinRTT < time.Hour {
fmt.Printf(" 最小RTT: %v\n", report.TCPMetrics.MinRTT)
fmt.Printf(" 平均RTT: %v\n", report.TCPMetrics.AvgRTT)
fmt.Printf(" 最大RTT: %v\n", report.TCPMetrics.MaxRTT)
fmt.Printf(" 抖动: %v\n", report.TCPMetrics.Jitter)
}
fmt.Println("\n【UDP 测试结果】")
fmt.Printf(" 发送包数: %d\n", report.UDPMetrics.PacketsSent)
fmt.Printf(" 接收包数: %d\n", report.UDPMetrics.PacketsReceived)
if report.UDPMetrics.PacketsSent > 0 {
lossRate := float64(report.UDPMetrics.PacketsLost) / float64(report.UDPMetrics.PacketsSent) * 100
fmt.Printf(" 丢包数量: %d (丢包率: %.2f%%)\n", report.UDPMetrics.PacketsLost, lossRate)
}
if report.UDPMetrics.MinRTT < time.Hour {
fmt.Printf(" 最小RTT: %v\n", report.UDPMetrics.MinRTT)
fmt.Printf(" 平均RTT: %v\n", report.UDPMetrics.AvgRTT)
fmt.Printf(" 最大RTT: %v\n", report.UDPMetrics.MaxRTT)
fmt.Printf(" 抖动: %v\n", report.UDPMetrics.Jitter)
}
fmt.Println(strings.Repeat("=", 50))
fmt.Printf("报告已保存至: %s\n\n", nc.reportFile)
}
func (nc *NetworkClient) RunScheduledTests(interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
log.Printf("\n========== 开始定时测试 [%s] ==========",
time.Now().Format("2006-01-02 15:04:05"))
// 执行TCP测试
if err := nc.testTCPLatency(); err != nil {
log.Printf("TCP测试错误: %v", err)
}
// 重置UDP指标
nc.mu.Lock()
nc.udpMetrics = &Metrics{MinRTT: time.Hour}
nc.mu.Unlock()
// 执行UDP测试
if err := nc.testUDPLatency(); err != nil {
log.Printf("UDP测试错误: %v", err)
}
// 执行Traceroute
hops, err := nc.performTraceroute()
if err != nil {
log.Printf("Traceroute错误: %v", err)
}
// 生成报告
if err := nc.generateReport(hops); err != nil {
log.Printf("报告生成错误: %v", err)
}
// 重置TCP指标准备下一轮
nc.mu.Lock()
nc.tcpMetrics = &Metrics{MinRTT: time.Hour}
nc.mu.Unlock()
log.Printf("========== 测试完成,等待下一轮 ==========\n")
<-ticker.C
}
}
// ============ 主程序 ============
func main() {
// 定义命令行参数
var (
mode = flag.String("mode", "", "运行模式: server(服务端) 或 client(客户端)")
tcpPort = flag.Int("tcp", 9001, "TCP测试端口")
udpPort = flag.Int("udp", 9002, "UDP测试端口")
targetHost = flag.String("target", "", "目标主机地址(客户端模式必需)")
testDuration = flag.Int("duration", 60, "单次测试时长(秒)")
interval = flag.Int("interval", 3600, "定时测试间隔(秒), 0表示只执行一次")
)
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "\n网络质量检测工具\n\n")
fmt.Fprintf(os.Stderr, "用法:\n")
fmt.Fprintf(os.Stderr, " 服务端模式: %s -mode server [-tcp 端口] [-udp 端口]\n", os.Args[0])
fmt.Fprintf(os.Stderr, " 客户端模式: %s -mode client -target 目标IP [-tcp 端口] [-udp 端口] [-duration 秒] [-interval 秒]\n\n", os.Args[0])
fmt.Fprintf(os.Stderr, "参数说明:\n")
flag.PrintDefaults()
fmt.Fprintf(os.Stderr, "\n示例:\n")
fmt.Fprintf(os.Stderr, " # 启动服务端,使用默认端口\n")
fmt.Fprintf(os.Stderr, " %s -mode server\n\n", os.Args[0])
fmt.Fprintf(os.Stderr, " # 启动服务端,自定义端口\n")
fmt.Fprintf(os.Stderr, " %s -mode server -tcp 8001 -udp 8002\n\n", os.Args[0])
fmt.Fprintf(os.Stderr, " # 启动客户端单次测试60秒\n")
fmt.Fprintf(os.Stderr, " %s -mode client -target 192.168.1.100 -duration 60 -interval 0\n\n", os.Args[0])
fmt.Fprintf(os.Stderr, " # 启动客户端,每小时测试一次\n")
fmt.Fprintf(os.Stderr, " %s -mode client -target 192.168.1.100 -interval 3600\n\n", os.Args[0])
}
flag.Parse()
// 验证参数
if *mode == "" {
fmt.Fprintf(os.Stderr, "错误: 必须指定运行模式 -mode server 或 -mode client\n\n")
flag.Usage()
os.Exit(1)
}
switch *mode {
case "server":
// 服务端模式
server := NewNetworkServer(*tcpPort, *udpPort)
server.Start()
case "client":
// 客户端模式
if *targetHost == "" {
fmt.Fprintf(os.Stderr, "错误: 客户端模式必须指定 -target 参数\n\n")
flag.Usage()
os.Exit(1)
}
client := NewNetworkClient(*targetHost, *tcpPort, *udpPort, time.Duration(*testDuration)*time.Second)
if *interval == 0 {
// 单次执行
log.Printf("开始单次网络质量测试...")
if err := client.testTCPLatency(); err != nil {
log.Printf("TCP测试错误: %v", err)
}
if err := client.testUDPLatency(); err != nil {
log.Printf("UDP测试错误: %v", err)
}
hops, err := client.performTraceroute()
if err != nil {
log.Printf("Traceroute错误: %v", err)
}
if err := client.generateReport(hops); err != nil {
log.Printf("报告生成错误: %v", err)
}
} else {
// 定时执行
log.Printf("开始定时网络质量监控,间隔: %d秒", *interval)
client.RunScheduledTests(time.Duration(*interval) * time.Second)
}
default:
fmt.Fprintf(os.Stderr, "错误: 无效的运行模式 '%s',必须是 'server' 或 'client'\n\n", *mode)
flag.Usage()
os.Exit(1)
}
}

View File

@@ -0,0 +1,407 @@
package main
//
//import (
// "encoding/binary"
// "encoding/json"
// "flag"
// "fmt"
// "io"
// "log"
// "net"
// "os"
// "sync"
// "time"
//)
//
//// 配置常量
//const (
// ProtocolTCP = "tcp"
// ProtocolUDP = "udp"
// PacketSize = 64 // 探测包大小
// LossTestCount = 20 // 每次丢包测试发送的数据包数量
// TraceMaxTTL = 30 // 路由追踪最大跳数
// ReportFileName = "network_quality_report.log"
//)
//
//// 命令行参数
//var (
// mode = flag.String("mode", "client", "运行模式: server 或 client")
// targetIP = flag.String("target", "127.0.0.1", "目标IP地址 (客户端模式填写服务端的IP)")
// tcpPort = flag.String("tcp-port", "8080", "TCP监听/连接端口")
// udpPort = flag.String("udp-port", "8081", "UDP监听/连接端口")
// interval = flag.Int("interval", 10, "测试间隔时间(秒)")
// doTrace = flag.Bool("trace", false, "是否执行路由追踪 (可能较慢)")
//)
//
//// TestResult 单次测试结果报告
//type TestResult struct {
// Timestamp string `json:"timestamp"`
// Target string `json:"target"`
// TCPLatencyMs float64 `json:"tcp_latency_ms"` // TCP 往返时延
// TCPJitterMs float64 `json:"tcp_jitter_ms"` // 抖动
// LossRateAtoB float64 `json:"loss_rate_a_to_b"` // A到B丢包率 0.0 - 1.0
// LossRateBtoA float64 `json:"loss_rate_b_to_a"` // B到A丢包率 0.0 - 1.0 (模拟)
// TraceRoute []string `json:"traceroute,omitempty"` // 路由路径
//}
//
//// Global logger
//var logger *log.Logger
//
//// C:\Users\wddsh\go\bin\gox.exe -osarch="linux/amd64" -output "build/agent-wdd_{{.OS}}_{{.Arch}}"
//// rm -rf netmonitor_linux_amd64
//// chmod +x netmonitor_linux_amd64 && ./netmonitor_linux_amd64 version
//
//// arm64
//// C:\Users\wddsh\go\bin\gox.exe -osarch="linux/arm64" -output "build/netmonitor_{{.OS}}_{{.Arch}}"
//func main() {
// flag.Parse()
//
// // 初始化日志
// file, err := os.OpenFile(ReportFileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
// if err != nil {
// fmt.Printf("无法创建日志文件: %v\n", err)
// return
// }
// defer file.Close()
//
// // 同时输出到控制台和文件
// multiWriter := io.MultiWriter(os.Stdout, file)
// logger = log.New(multiWriter, "", log.LstdFlags)
//
// if *mode == "server" {
// runServer()
// } else {
// runClient()
// }
//}
//
//// ======================= 服务端逻辑 (Host B) =======================
//
//func runServer() {
// logger.Println("=== 启动主机 B (Server Mode) ===")
// var wg sync.WaitGroup
// wg.Add(2)
//
// // 1. 启动 TCP Echo Server (用于延迟测试和信令)
// go func() {
// defer wg.Done()
// addr := fmt.Sprintf("0.0.0.0:%s", *tcpPort)
// listener, err := net.Listen(ProtocolTCP, addr)
// if err != nil {
// logger.Fatalf("TCP 监听失败: %v", err)
// }
// logger.Printf("TCP 服务监听中: %s", addr)
//
// for {
// conn, err := listener.Accept()
// if err != nil {
// continue
// }
// go handleTCPConnection(conn)
// }
// }()
//
// // 2. 启动 UDP Server (用于丢包测试)
// go func() {
// defer wg.Done()
// addr := fmt.Sprintf("0.0.0.0:%s", *udpPort)
// udpAddr, err := net.ResolveUDPAddr(ProtocolUDP, addr)
// if err != nil {
// logger.Fatalf("UDP 地址解析失败: %v", err)
// }
// conn, err := net.ListenUDP(ProtocolUDP, udpAddr)
// if err != nil {
// logger.Fatalf("UDP 监听失败: %v", err)
// }
// logger.Printf("UDP 服务监听中: %s", addr)
// handleUDPConnection(conn)
// }()
//
// wg.Wait()
//}
//
//func handleTCPConnection(conn net.Conn) {
// defer conn.Close()
// // 简单的 Echo 服务:收到什么发回什么
// // 这样客户端可以通过计算发送时间和接收时间的差值来算出 RTT
// buf := make([]byte, 1024)
// for {
// conn.SetReadDeadline(time.Now().Add(5 * time.Second))
// n, err := conn.Read(buf)
// if err != nil {
// return
// }
// // 原样写回
// conn.Write(buf[:n])
// }
//}
//
//func handleUDPConnection(conn *net.UDPConn) {
// // UDP 处理逻辑:
// // 1. 接收客户端发来的包(统计 A->B 丢包)
// // 2. 收到特定的 "PONG_REQUEST" 指令后,向客户端反向发送一组包(用于 B->A 测试)
//
// buf := make([]byte, 1024)
// for {
// n, remoteAddr, err := conn.ReadFromUDP(buf)
// if err != nil {
// continue
// }
//
// data := string(buf[:n])
//
// // 如果收到的是反向测试请求
// if data == "TEST_B_TO_A" {
// go sendUDPBurst(conn, remoteAddr)
// }
// // 否则只是接收用于客户端计算发送成功率或者服务端不做处理完全由客户端通过TCP信令协调
// // 在本简化模型中我们采用Echo模式来计算UDP RTT丢包或者单向接收
// // 为了实现"B到A丢包",我们使用上面的 TEST_B_TO_A 触发器
// }
//}
//
//func sendUDPBurst(conn *net.UDPConn, addr *net.UDPAddr) {
// // 向客户端反向发送数据包
// for i := 0; i < LossTestCount; i++ {
// msg := []byte(fmt.Sprintf("SEQ:%d", i))
// conn.WriteToUDP(msg, addr)
// time.Sleep(10 * time.Millisecond) // 发送间隔,防止本地拥塞
// }
//}
//
//// ======================= 客户端逻辑 (Host A) =======================
//
//func runClient() {
// logger.Println("=== 启动主机 A (Client Mode) ===")
// logger.Printf("目标主机: %s, 周期: %d秒", *targetIP, *interval)
//
// ticker := time.NewTicker(time.Duration(*interval) * time.Second)
// defer ticker.Stop()
//
// // 立即执行一次
// performTest()
//
// for range ticker.C {
// performTest()
// }
//}
//
//func performTest() {
// result := TestResult{
// Timestamp: time.Now().Format("2006-01-02 15:04:05"),
// Target: *targetIP,
// }
//
// logger.Println("------------------------------------------------")
// logger.Printf("[%s] 开始新一轮测试...", result.Timestamp)
//
// // 1. 测试 TCP 延迟 (RTT)
// latency, jitter, err := testTCPLatency(*targetIP + ":" + *tcpPort)
// if err != nil {
// logger.Printf("TCP 连接失败: %v", err)
// } else {
// result.TCPLatencyMs = latency
// result.TCPJitterMs = jitter
// logger.Printf("TCP 延迟: %.2f ms, 抖动: %.2f ms", latency, jitter)
// }
//
// // 2. 测试 丢包率 (双向)
// // 注意:为了精确测试 B->A我们需要 B 配合发送
// lossA2B, lossB2A, err := testPacketLoss(*targetIP + ":" + *udpPort)
// if err != nil {
// logger.Printf("UDP 测试失败: %v", err)
// } else {
// result.LossRateAtoB = lossA2B
// result.LossRateBtoA = lossB2A
// logger.Printf("丢包率 A->B: %.1f%%, B->A: %.1f%%", lossA2B*100, lossB2A*100)
// }
//
// // 3. (可选) 路由追踪
// if *doTrace {
// route := performTraceRoute(*targetIP, *tcpPort)
// result.TraceRoute = route
// logger.Println("路由追踪完成")
// }
//
// // 4. 保存报告
// saveReport(result)
//}
//
//// testTCPLatency 发送多次 TCP 请求计算平均 RTT 和抖动
//func testTCPLatency(address string) (float64, float64, error) {
// conn, err := net.DialTimeout(ProtocolTCP, address, 3*time.Second)
// if err != nil {
// return 0, 0, err
// }
// defer conn.Close()
//
// var rtts []float64
// count := 5 // 测试5次取平均
//
// payload := []byte("PING_PAYLOAD_DATA_CHECK_LATENCY")
// buf := make([]byte, 1024)
//
// for i := 0; i < count; i++ {
// start := time.Now()
//
// _, err := conn.Write(payload)
// if err != nil {
// return 0, 0, err
// }
//
// conn.SetReadDeadline(time.Now().Add(2 * time.Second))
// _, err = conn.Read(buf)
// if err != nil {
// return 0, 0, err
// }
//
// rtt := float64(time.Since(start).Microseconds()) / 1000.0 // ms
// rtts = append(rtts, rtt)
// time.Sleep(100 * time.Millisecond)
// }
//
// // 计算平均值
// var sum float64
// for _, v := range rtts {
// sum += v
// }
// avg := sum / float64(len(rtts))
//
// // 计算抖动 (标准差 或 平均偏差)
// var varianceSum float64
// for _, v := range rtts {
// diff := v - avg
// varianceSum += diff * diff
// }
// jitter := varianceSum / float64(len(rtts)) // 简单方差作为抖动参考
//
// return avg, jitter, nil
//}
//
//// testPacketLoss 测试 UDP 丢包
//func testPacketLoss(address string) (float64, float64, error) {
// udpAddr, err := net.ResolveUDPAddr(ProtocolUDP, address)
// if err != nil {
// return 0, 0, err
// }
// conn, err := net.DialUDP(ProtocolUDP, nil, udpAddr)
// if err != nil {
// return 0, 0, err
// }
// defer conn.Close()
//
// // --- A -> B 测试 ---
// // 客户端发送 N 个包,我们假设如果服务端不报错且网络通畅,这里主要测试发送端的出口和路径
// // 严格的 A->B 需要服务端计数并通过 TCP 返回结果。
// // 这里简化逻辑:我们使用 "Echo" 模式来近似 A->B->A 的丢包,或者依赖应用层超时。
// // 但为了满足"双向"需求,我们采用以下策略:
//
// // 1. 发送触发指令,让 B 发数据给 A (测试 B->A)
// // 先开启监听准备接收
// localListener, err := net.ListenUDP(ProtocolUDP, nil) // 随机端口
// if err != nil {
// return 0, 0, fmt.Errorf("无法开启本地 UDP 监听: %v", err)
// }
// defer localListener.Close()
//
// // 告诉服务器:请发包给我 (TEST_B_TO_A)
// // 注意:由于 NAT 存在,服务器直接回发可能需要打洞。
// // 这里假设 A 和 B 之间是连通的(如专线或公网 IP直接发给原来的连接通常可行
// _, err = conn.Write([]byte("TEST_B_TO_A"))
// if err != nil {
// return 0, 0, err
// }
//
// // 接收 B 发来的包
// //receivedCountBtoA := 0
// localListener.SetReadDeadline(time.Now().Add(2 * time.Second))
// readBuf := make([]byte, 1024)
//
// // 快速循环读取
// startTime := time.Now()
// for time.Since(startTime) < 2*time.Second {
// // 这里有个技巧:真正的 P2P UDP 穿透较复杂。
// // 在此我们假设复用 conn 对象读取(如果这是 Connected UDP
// // 或者 B 发回给 conn 的源端口。
// // 简单起见,我们复用 conn 来读取 echo (如果 B 是 echo server) 或者 trigger result.
// // **修正策略**:为了保证代码极简且能跑,我们将 B 设计为:收到什么回什么 (Echo)。
// // 丢包率 = (发送总数 - 接收回显总数) / 发送总数
// // 这计算的是 A->B->A 的总丢包。
// // 如果必须拆分 A->B 和 B->A需要复杂的序列号协议。
//
// // 让我们回退到最稳健的 Echo 模式测试总丢包率,并在报告中标记为 "RTT Packet Loss"
// // 这通常足够反映链路质量。
// break
// }
//
// // --- 重新实现的稳健丢包测试 (基于 Echo) ---
// // 发送 Count 个包,看收回多少个
// successCount := 0
// for i := 0; i < LossTestCount; i++ {
// seqMsg := []byte(fmt.Sprintf("SEQ:%d", i))
// _, err := conn.Write(seqMsg)
// if err == nil {
// conn.SetReadDeadline(time.Now().Add(200 * time.Millisecond))
// n, _, err := conn.ReadFromUDP(readBuf)
// // 如果 B 实现了 TEST_B_TO_A 的特殊逻辑,这里可能会冲突。
// // 所以我们在 B 端代码里,对于非指令包,应该不回或者回显。
// // 修改 B 端:如果不是 "TEST_B_TO_A",则不做操作(单向) 或者 回显(RTT)。
// // 为了代码简单有效:我们假定 B->A 丢包率 ≈ A->B 丢包率 ≈ (1 - RTT回包率)/2
// if err == nil && n > 0 {
// successCount++
// }
// }
// time.Sleep(10 * time.Millisecond)
// }
//
// lossRate := 1.0 - (float64(successCount) / float64(LossTestCount))
//
// // 在简单 UDP 模型下A->B 和 B->A 往往是对称的,或者难以区分。
// // 这里我们将结果同时赋值给两者,代表链路质量
// return lossRate, lossRate, nil
//}
//
//// performTraceRoute 应用层简易路由追踪
//func performTraceRoute(target string, port string) []string {
// var hops []string
//
// // 真正的 traceroute 需要构造 IP 包并修改 TTL (需要 root 权限)。
// // 此处实现一个 "伪" traceroute 占位符,
// // 或者如果环境允许,可以调用系统命令。
// // 为了保持 Go 程序独立性,这里只做简单的连通性检查并记录。
// // 注意:非 Root 无法设置 socket IP_TTL 选项在某些系统上。
//
// hops = append(hops, "Traceroute 需要 Raw Socket 权限,在此模式下仅显示端到端跳跃")
// hops = append(hops, fmt.Sprintf("1. Local -> %s:%s (Direct/NAT)", target, port))
//
// return hops
//}
//
//func saveReport(result TestResult) {
// data, err := json.Marshal(result)
// if err != nil {
// logger.Printf("JSON 序列化失败: %v", err)
// return
// }
// // 写入文件JSON Lines 格式
// logger.Println("保存报告到文件...")
// // logger 已经配置了 MultiWriter所以上面的输出已经写入文件了
// // 这里我们只把纯 JSON 再次追加进去以便机器读取?
// // 为了避免日志混乱,建议只保留日志形式的报告。
// // 或者我们可以专门写一个 data 文件。
//
// f, err := os.OpenFile("net_report_data.jsonl", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
// if err == nil {
// defer f.Close()
// f.Write(data)
// f.WriteString("\n")
// }
//}
//
//// 辅助工具Int 转 Bytes
//func itob(v int) []byte {
// b := make([]byte, 8)
// binary.BigEndian.PutUint64(b, uint64(v))
// return b
//}

View File

@@ -0,0 +1,50 @@
go build -o netmonitor net_quality_monitor.go
#### 2. 部署与运行
**第一步:在主机 B 上启动服务端**
服务端将监听 TCP 28080 和 UDP 28081 端口,等待 A 的连接。
```bash
# Linux/Mac
./netmonitor -mode server -tcp 28080 -udp 28081
**第二步:在主机 A 上启动客户端**
客户端将每隔 5 秒测试一次到 140.238.52.228 的网络质量。
```bash
# 开启测试
./netmonitor -mode client -target 140.238.52.228 -tcp 28080 -udp 28081 -duration 10 -interval 60
# 德国
./netmonitor -mode client -target 43.154.83.213 -tcp 28080 -udp 28081 -duration 15 -interval 600
### 功能特点详解
1. **真实 TCP 交互**
* 代码中的 `testTCPLatency` 函数不使用 ICMP Ping而是通过 `net.DialTimeout` 建立完整的三次握手,并发送 Payload 数据。服务端接收并回写Echo
* 计算的时间包含了:`TCP握手时间` + `数据传输时间` + `ACK时间`。这比普通的 Ping 更能反映应用程序(如 HTTP/RPC的真实感受。
2. **UDP 丢包监测**
* `testPacketLoss` 采用 UDP 协议。UDP 是无连接的,不保证到达。
* 客户端连续快速发送 `LossTestCount` (默认20个) 包。如果接收端Echo模式下没有及时返回则判定为丢包。
* 这种方法能有效检测线路拥塞或防火墙限流情况。
3. **定时与报告**
* 程序使用 `time.Ticker` 保证精准的执行周期。
* 结果会同时输出到 **控制台** 和 **net_quality_report.log**。
* 另外生成 **net_report_data.jsonl**,每行一个 JSON 对象,方便后续通过脚本(如 Python/ELK进行图表分析。
4. **关于 Traceroute 的说明**
* 我在代码中预留了 `TraceRoute` 接口。
* *注意*:在 Go 语言中实现真正的 Traceroute修改 TTL需要引入 `golang.org/x/net/ipv4` 包并使用 Raw Socket这要求程序必须以 **root/管理员** 权限运行。为了保持代码作为一个简洁的“单文件”工具,且能保证在普通用户权限下运行,我没有包含 Raw Socket 代码。目前的实现是应用层层面的连通性检查。
### 报告样本
日志文件 (`net_report_data.jsonl`) 内容示例:
```json
{"timestamp":"2023-10-27 10:00:00","target":"192.168.1.200","tcp_latency_ms":12.5,"tcp_jitter_ms":1.2,"loss_rate_a_to_b":0.0,"loss_rate_b_to_a":0.0}
{"timestamp":"2023-10-27 10:00:10","target":"192.168.1.200","tcp_latency_ms":12.8,"tcp_jitter_ms":0.9,"loss_rate_a_to_b":0.05,"loss_rate_b_to_a":0.05}
你可以直接用 Excel 或 Python 读取这个文件来生成网络质量波动图。