Files
ProjectOctopus/port_forwarding/udp.go
2024-06-14 10:37:40 +08:00

216 lines
7.3 KiB
Go
Executable File

/**
* Filename: udp.go
* Description: the PortForward udp layer implement.
* Author: knownsec404
* Time: 2020.09.23
*/
package main
import (
"errors"
"net"
"time"
)
// as UDP client Conn
type UDPDistribute struct {
Established bool
Conn *(net.UDPConn)
RAddr net.Addr
Cache chan []byte
}
/**********************************************************************
* @Function: NewUDPDistribute(conn *(net.UDPConn), addr net.Addr) (*UDPDistribute)
* @Description: initialize UDPDistribute structure (as UDP client Conn)
* @Parameter: conn *(net.UDPConn), the udp connection object
* @Parameter: addr net.Addr, the udp client remote adddress
* @Return: *UDPDistribute, the new UDPDistribute structure pointer
**********************************************************************/
func NewUDPDistribute(conn *(net.UDPConn), addr net.Addr) *UDPDistribute {
return &UDPDistribute{
Established: true,
Conn: conn,
RAddr: addr,
Cache: make(chan []byte, 16),
}
}
/**********************************************************************
* @Function: (this *UDPDistribute) Close() (error)
* @Description: set "Established" flag is false, the UDP service will
* cleaned up according to certain conditions.
* @Parameter: nil
* @Return: error, the error
**********************************************************************/
func (this *UDPDistribute) Close() error {
this.Established = false
return nil
}
/**********************************************************************
* @Function: (this *UDPDistribute) Read(b []byte) (n int, err error)
* @Description: read data from connection, due to the udp implementation of
* PortForward, read here will only produce a timeout error and closed error
* (compared to the normal net.Conn object)
* @Parameter: b []byte, the buffer for receive data
* @Return: (n int, err error), the length of the data read and error
**********************************************************************/
func (this *UDPDistribute) Read(b []byte) (n int, err error) {
if !this.Established {
return 0, errors.New("udp distrubute has closed")
}
select {
case <-time.After(16 * time.Second):
return 0, errors.New("udp distrubute read timeout")
case data := <-this.Cache:
n := len(data)
copy(b, data)
return n, nil
}
}
/**********************************************************************
* @Function: (this *UDPDistribute) Write(b []byte) (n int, err error)
* @Description: write data to connection by "WriteTo()"
* @Parameter: b []byte, the data to be sent
* @Return: (n int, err error), the length of the data write and error
**********************************************************************/
func (this *UDPDistribute) Write(b []byte) (n int, err error) {
if !this.Established {
return 0, errors.New("udp distrubute has closed")
}
return this.Conn.WriteTo(b, this.RAddr)
}
/**********************************************************************
* @Function: (this *UDPDistribute) RemoteAddr() (net.Addr)
* @Description: get remote address
* @Parameter: nil
* @Return: net.Addr, the remote address
**********************************************************************/
func (this *UDPDistribute) RemoteAddr() net.Addr {
return this.RAddr
}
/**********************************************************************
* @Function: ListenUDP(address string, clientc chan Conn, quit chan bool)
* @Description: listen local udp service, and accept client connection,
* initialize connection and return by channel.
* since udp is running as a service, it only obtains remote data through
* the Read* function cluster (different from tcp), so we need a temporary
* table to record, so that we can use the temporary table to determine
* whether to forward or create a new link
* @Parameter: address string, the local listen address
* @Parameter: clientc chan Conn, new client connection channel
* @Parameter: quit chan bool, the quit signal channel
* @Return: nil
**********************************************************************/
func ListenUDP(address string, clientc chan Conn, quit chan bool) {
addr, err := net.ResolveUDPAddr("udp", address)
if err != nil {
LogError("udp listen error, %s", err)
clientc <- nil
return
}
serv, err := net.ListenUDP("udp", addr)
if err != nil {
LogError("udp listen error, %s", err)
clientc <- nil
return
}
defer serv.Close()
// the udp distrubute table
table := make(map[string]*UDPDistribute)
// NOTICE:
// in the process of running, the table will generate invalid historical
// data, we have not cleaned it up(These invalid historical data will not
// affect our normal operation).
//
// if you want to deal with invalid historical data, the best way is to
// launch a new coroutine to handle net.Read, and another coroutine to
// handle the connection status, but additional logic to exit the
// coroutine is needed. Currently we think it is unnecessary.
//
// under the current code logic: if a udp connection exits, it will timeout
// to trigger "Close()", and finally set "Established" to false, but there
// is no logic to check this state to clean up, so invalid historical data
// is generated (of course, we can traverse to clean up? every packet? no)
//
// PS: you can see that we called "delete()" in the following logic, but
// this is only for processing. when a new udp client hits invalid
// historical data, we need to return a new connection so that PortForward
// can create a communication link.
for {
// check quit
select {
case <-quit:
return
default:
}
// set timeout, for check "quit" signal
serv.SetDeadline(time.Now().Add(16 * time.Second))
// just new 32*1024, in the outer layer we used "io.Copy()", which
// can only handle the size of 32*1024
buf := make([]byte, 32*1024)
n, addr, err := serv.ReadFrom(buf)
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
continue
}
LogError("udp listen error, %s", err)
clientc <- nil
return
}
buf = buf[:n]
// if the address in table, we distrubute message
if d, ok := table[addr.String()]; ok {
if d.Established {
// it is established, distrubute message
d.Cache <- buf
continue
} else {
// we remove it when the connnection has expired
delete(table, addr.String())
}
}
// if the address not in table, we create new connection object
conn := NewUDPDistribute(serv, addr)
table[addr.String()] = conn
conn.Cache <- buf
clientc <- conn
} // end for
}
/**********************************************************************
* @Function: ConnUDP(address string) (Conn, error)
* @Description: dial to remote server, and return udp connection
* @Parameter: address string, the remote server address that needs to be dialed
* @Return: (Conn, error), the udp connection and error
**********************************************************************/
func ConnUDP(address string) (Conn, error) {
conn, err := net.DialTimeout("udp", address, 10*time.Second)
if err != nil {
return nil, err
}
// send one byte(knock) to server, get "established" udp connection
_, err = conn.Write([]byte("\x00"))
if err != nil {
return nil, err
}
// due to the characteristics of udp, when the udp server exits, we will
// not receive any signal, it will be blocked at conn.Read();
// here we set a timeout for udp
conn.SetDeadline(time.Now().Add(60 * time.Second))
return conn, nil
}