[ Cmii ] [ Octopus ] - add tcp portforwarding part

This commit is contained in:
zeaslity
2024-02-28 14:05:16 +08:00
parent 9447801212
commit 9d5ca4b50f
25 changed files with 1619 additions and 213 deletions

View File

@@ -1,120 +0,0 @@
package cmii_operator
const cmiiBackendDeploymentTemplate = `
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .AppName }}
namespace: {{ .Namespace }}
labels:
cmii.type: backend
cmii.app: {{ .AppName }}
octopus/control: backend-app-1.0.0
app.kubernetes.io/managed-by: octopus/control
app.kubernetes.io/app-version: {{ .TagVersion }}
spec:
replicas: {{ .Replicas }}
strategy:
rollingUpdate:
maxUnavailable: 1
selector:
matchLabels:
cmii.type: backend
cmii.app: {{ .AppName }}
template:
metadata:
labels:
cmii.type: backend
cmii.app: {{ .AppName }}
spec:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: uavcloud.env
operator: In
values:
- demo
imagePullSecrets:
- name: harborsecret
containers:
- name: {{ .AppName }}
image: "harbor.cdcyy.com.cn/cmii/{{ .AppName }}:{{ .ImageTag }}"
imagePullPolicy: Always
env:
- name: K8S_NAMESPACE
value: "{{ .Namespace }}"
- name: APPLICATION_NAME
value: "{{ .AppName }}"
- name: CUST_JAVA_OPTS
value: "-Xms500m -Xmx1500m -Dlog4j2.formatMsgNoLookups=true"
- name: NACOS_REGISTRY
value: "helm-nacos:8848"
- name: NACOS_DISCOVERY_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: NACOS_DISCOVERY_PORT
value: "8080"
- name: BIZ_CONFIG_GROUP
value: {{ .TagVersion }}
- name: SYS_CONFIG_GROUP
value: {{ .TagVersion }}
- name: IMAGE_VERSION
value: {{ .TagVersion }}
- name: NACOS_USERNAME
value: "developer"
- name: NACOS_PASSWORD
value: "Deve@9128201"
ports:
- name: pod-port
containerPort: 8080
protocol: TCP
resources:
limits:
memory: 2Gi
cpu: 2
requests:
memory: 1Gi
cpu: 200m
livenessProbe:
httpGet:
path: /cmii/ping
port: pod-port
scheme: HTTP
initialDelaySeconds: 60
timeoutSeconds: 5
periodSeconds: 20
successThreshold: 1
failureThreshold: 3
readinessProbe:
httpGet:
path: /cmii/ping
port: pod-port
scheme: HTTP
initialDelaySeconds: 60
timeoutSeconds: 5
periodSeconds: 20
successThreshold: 1
failureThreshold: 3
startupProbe:
httpGet:
path: /cmii/ping
port: pod-port
scheme: HTTP
initialDelaySeconds: 60
timeoutSeconds: 3
periodSeconds: 20
successThreshold: 1
failureThreshold: 5
volumeMounts:
- name: glusterfs-backend-log-volume
mountPath: /cmii/logs
readOnly: false
subPath: {{ .Namespace }}/{{ .AppName }}
volumes:
- name: glusterfs-backend-log-volume
persistentVolumeClaim:
claimName: glusterfs-backend-log-pvc
`

View File

@@ -1,17 +0,0 @@
package cmii_operator
import "testing"
func TestCmiiBackendDeploymentConfig_ParseToApplyConf(t *testing.T) {
deploymentConfig := CmiiBackendDeploymentConfig{
Namespace: "uavcloud-dev",
AppName: "cmii-uav-gateway",
ImageTag: "5.2.0-123",
TagVersion: "5.2.0",
Replicas: "2",
}
deploymentConfig.ParseToApplyConf()
}

View File

@@ -161,8 +161,7 @@ func (op *CmiiK8sOperator) DeploymentAll(cmiiEnv string) []v1.Deployment {
length := len(deployments) length := len(deployments)
log.InfoF("[DeploymentAll] - deployment in [%s] count is => %d", op.CurrentNamespace, length) log.InfoF("[DeploymentAll] - deployment in [%s] count is => %d", op.CurrentNamespace, length)
results := make([]v1.Deployment, length) results := make([]v1.Deployment, length)
//var results []v1.Deployment ccc := make(chan v1.Deployment, length)
//ccc := make(chan v1.Deployment, length)
var wg sync.WaitGroup var wg sync.WaitGroup
//var mutex sync.Mutex //var mutex sync.Mutex
@@ -193,32 +192,34 @@ func (op *CmiiK8sOperator) DeploymentAll(cmiiEnv string) []v1.Deployment {
go func(deploymentList []v1.Deployment, start int, results *[]v1.Deployment) { go func(deploymentList []v1.Deployment, start int, results *[]v1.Deployment) {
for index, deployment := range deploymentList { for _, deployment := range deploymentList {
objectMeta := deployment.ObjectMeta objectMeta := deployment.ObjectMeta
objectMeta.SetAnnotations(nil) objectMeta.SetAnnotations(nil)
objectMeta.SetManagedFields(nil) objectMeta.SetManagedFields(nil)
deployment.ObjectMeta = objectMeta deployment.ObjectMeta = objectMeta
//ccc <- deployment ccc <- deployment
i := *results //i := *results
i[index+start] = deployment //i[index+start] = deployment
} }
wg.Done() wg.Done()
}(deployments[start:end], start, &results) }(deployments[start:end], start, &results)
} }
//go func() { go func() {
// wg.Wait() wg.Wait()
// close(ccc) close(ccc)
//}() }()
wg.Wait() wg.Wait()
//for deployment := range ccc { index := 0
// results = append(results, deployment) for deployment := range ccc {
//} results[index] = deployment
index++
}
return results return results[:index]
} }
func (op *CmiiK8sOperator) DeploymentAllInterface(cmiiEnv string) []CmiiDeploymentInterface { func (op *CmiiK8sOperator) DeploymentAllInterface(cmiiEnv string) []CmiiDeploymentInterface {

View File

@@ -38,5 +38,7 @@ func main() {
} }
// restart all backend // restart all backend
cmii_operator.RestartCmiiBackendDeployment(realNamespace)
cmii_operator.RestartCmiiFrontendDeployment(realNamespace)
} }

View File

@@ -1,25 +1,47 @@
package cmii_operator package deploy
import ( import (
"bytes" "bytes"
"fmt"
v1 "k8s.io/api/apps/v1" v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
appsv1 "k8s.io/client-go/applyconfigurations/apps/v1" appsv1 "k8s.io/client-go/applyconfigurations/apps/v1"
"sigs.k8s.io/yaml" "sigs.k8s.io/yaml"
"text/template" "text/template"
"wdd.io/agent-go/utils" "wdd.io/agent-go/utils"
) )
type CommonEnvironmentConfig struct {
WebIP string
WebPort string
HarborIP string
HarborPort string
}
type CmiiBackendDeploymentConfig struct { type CmiiBackendDeploymentConfig struct {
Namespace string
AppName string
ImageTag string
TagVersion string
Replicas string
NodePort string
NeedPvcCache bool
CustomJvmOpt string
}
type CmiiFrontendDeploymentConfig struct {
Namespace string Namespace string
AppName string AppName string
ImageTag string ImageTag string
TagVersion string TagVersion string
Replicas string Replicas string
ShortName string
} }
func (backend CmiiBackendDeploymentConfig) ParseToApplyConf() *appsv1.DeploymentApplyConfiguration { func (backend CmiiBackendDeploymentConfig) ParseToApplyConf() *appsv1.DeploymentApplyConfiguration {
// 解析模板 // 解析模板
tmpl, err := template.New("cmiiBackendDeploymentTemplate").Parse(cmiiBackendDeploymentTemplate) tmpl, err := template.New("cmiiBackendDeploymentTemplate").Parse(cmiiBackendDeploymentTemplate)
if err != nil { if err != nil {
panic(err) panic(err)
@@ -41,5 +63,28 @@ func (backend CmiiBackendDeploymentConfig) ParseToApplyConf() *appsv1.Deployment
utils.BeautifulPrint(&deployment) utils.BeautifulPrint(&deployment)
// service
parse, err := template.New("cmiiBackendServiceTemplate").Parse(cmiiBackendServiceTemplate)
if err != nil {
panic(err)
}
// 应用数据并打印结果
var resulta bytes.Buffer
err = parse.Execute(&resulta, backend)
if err != nil {
panic(err)
}
fmt.Println(resulta.String())
// 创建Deployment对象
service := corev1.Service{}
err = yaml.Unmarshal(resulta.Bytes(), &service)
if err != nil {
panic(err)
}
utils.BeautifulPrint(&service)
return nil return nil
} }

View File

@@ -0,0 +1,263 @@
package deploy
const cmiiBackendDeploymentTemplate = `
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .AppName }}
namespace: {{ .Namespace }}
labels:
cmii.type: backend
cmii.app: {{ .AppName }}
octopus/control: backend-app-1.0.0
app.kubernetes.io/managed-by: octopus/control
app.kubernetes.io/app-version: {{ .TagVersion }}
spec:
replicas: {{ .Replicas }}
strategy:
rollingUpdate:
maxUnavailable: 1
selector:
matchLabels:
cmii.type: backend
cmii.app: {{ .AppName }}
template:
metadata:
labels:
cmii.type: backend
cmii.app: {{ .AppName }}
spec:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: uavcloud.env
operator: In
values:
- demo
imagePullSecrets:
- name: harborsecret
containers:
- name: {{ .AppName }}
image: "harbor.cdcyy.com.cn/cmii/{{ .AppName }}:{{ .ImageTag }}"
imagePullPolicy: Always
env:
- name: K8S_NAMESPACE
value: {{ .Namespace }}
- name: APPLICATION_NAME
value: {{ .AppName }}
- name: CUST_JAVA_OPTS
value: "-Xms500m -Xmx1500m -Dlog4j2.formatMsgNoLookups=true"
- name: NACOS_REGISTRY
value: "helm-nacos:8848"
- name: NACOS_DISCOVERY_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: NACOS_DISCOVERY_PORT
value: "8080"
- name: BIZ_CONFIG_GROUP
value: {{ .TagVersion }}
- name: SYS_CONFIG_GROUP
value: {{ .TagVersion }}
- name: IMAGE_VERSION
value: {{ .TagVersion }}
- name: NACOS_USERNAME
value: "developer"
- name: NACOS_PASSWORD
value: "Deve@9128201"
ports:
- name: pod-port
containerPort: 8080
protocol: TCP
resources:
limits:
memory: 2Gi
cpu: 2
requests:
memory: 1Gi
cpu: 200m
livenessProbe:
httpGet:
path: /cmii/ping
port: pod-port
scheme: HTTP
initialDelaySeconds: 60
timeoutSeconds: 5
periodSeconds: 20
successThreshold: 1
failureThreshold: 3
readinessProbe:
httpGet:
path: /cmii/ping
port: pod-port
scheme: HTTP
initialDelaySeconds: 60
timeoutSeconds: 5
periodSeconds: 20
successThreshold: 1
failureThreshold: 3
startupProbe:
httpGet:
path: /cmii/ping
port: pod-port
scheme: HTTP
initialDelaySeconds: 60
timeoutSeconds: 3
periodSeconds: 20
successThreshold: 1
failureThreshold: 5
volumeMounts:
- name: glusterfs-backend-log-volume
mountPath: /cmii/logs
readOnly: false
subPath: {{ .Namespace }}/{{ .AppName }}
{{- if .NeedPvcCache }}
- name: data-cache-volume
mountPath: /cmii/cache
readOnly: false
subPath: {{ .Namespace }}/{{ .AppName }}
{{- end }}
volumes:
- name: glusterfs-backend-log-volume
persistentVolumeClaim:
claimName: glusterfs-backend-log-pvc
{{- if .NeedPvcCache }}
- name: data-cache-volume
persistentVolumeClaim:
claimName: {{ .AppName }}-cache
{{- end }}
`
const cmiiBackendServiceTemplate = `
apiVersion: v1
kind: Service
metadata:
name: {{ .AppName }}
namespace: {{ .Namespace }}
labels:
cmii.type: backend
cmii.app: {{ .AppName }}
octopus/control: backend-app-1.0.0
app.kubernetes.io/managed-by: octopus/control
app.kubernetes.io/app-version: {{ .TagVersion }}
spec:
{{- if .NodePort }}
type: NodePort
{{- else }}
type: ClusterIP
{{- end }}
selector:
cmii.type: backend
cmii.app: {{ .AppName }}
ports:
- name: backend-tcp
port: 8080
protocol: TCP
targetPort: 8080
{{- if .NodePort }}
nodePort: {{ .NodePort }}
{{- end }}
`
const cmiiBackendPVCTemplate = `
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: {{ .AppName }}-cache
namespace: {{ .Namespace }}
labels:
cmii.type: backend
cmii.app: {{ .AppName }}
octopus/control: backend-app-1.0.0
app.kubernetes.io/managed-by: octopus/control
app.kubernetes.io/app-version: {{ .TagVersion }}
spec:
storageClassName: nfs-prod-distribute
accessModes:
- ReadWriteMany
volumeMode: Filesystem
resources:
requests:
storage: 15Gi
`
const cmiiFrontendDeploymentTemplate = `
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .AppName }}
namespace: {{ .Namespace }}
labels:
cmii.type: frontend
cmii.app: {{ .AppName }}
octopus/control: frontend-app-1.0.0
app.kubernetes.io/managed-by: octopus/control
app.kubernetes.io/app-version: {{ .TagVersion }}
spec:
replicas: {{ .Replicas }}
strategy:
rollingUpdate:
maxUnavailable: 1
selector:
matchLabels:
cmii.type: frontend
cmii.app: {{ .AppName }}
template:
metadata:
labels:
cmii.type: frontend
cmii.app: {{ .AppName }}
spec:
imagePullSecrets:
- name: harborsecret
containers:
- name: {{ .AppName }}
image: "harbor.cdcyy.com.cn/cmii/{{ .AppName }}:{{ .ImageTag }}"
imagePullPolicy: Always
env:
- name: K8S_NAMESPACE
value: {{ .Namespace }}
- name: APPLICATION_NAME
value: {{ .AppName }}
ports:
- name: platform-9528
containerPort: 9528
protocol: TCP
resources:
limits:
cpu: "1"
memory: 1Gi
requests:
cpu: 500m
memory: 500Mi
volumeMounts:
- name: nginx-conf
mountPath: /usr/local/nginx/conf/nginx.conf
subPath: nginx.conf
- name: default-nginx-conf
mountPath: /etc/nginx/conf.d/default.conf
subPath: default.conf
- name: tenant-prefix
subPath: ingress-config.js
mountPath: /home/cmii-platform/dist/ingress-config.js
volumes:
- name: nginx-conf
configMap:
name: nginx-cm
items:
- key: nginx.conf
path: nginx.conf
- name: default-nginx-conf
configMap:
name: default-nginx-cm
items:
- key: default.conf
path: default.conf
- name: tenant-prefix
configMap:
name: tenant-prefix-{{ .ShortName }}
items:
- key: ingress-config.js
path: ingress-config.js
`

View File

@@ -0,0 +1,19 @@
package deploy
import "testing"
func TestCmiiBackendDeploymentConfig_ParseToApplyConf(t *testing.T) {
deploymentConfig := CmiiBackendDeploymentConfig{
Namespace: "uavcloud-dev",
AppName: "cmii-uav-gateway",
ImageTag: "5.2.0-123",
TagVersion: "5.2.0",
Replicas: "2",
NodePort: "31213",
NeedPvcCache: true,
}
deploymentConfig.ParseToApplyConf()
}

44
port_forwarding/CHANGELOG Normal file
View File

@@ -0,0 +1,44 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Released]
## [0.5.0] - 2020-10-22
### Added
- Refactoring source code strucure for StarLink
## [Unreleased]
## [0.5.1] - 2021-04-23
### Fixed
- In listen-listen mode, actively close the previous socket when there are
multiple connections to the same end
## [0.4.1] - 2020-10-16
### Added
- Add multi-channel support of conn-conn mode
### Fixed
- Modify timeout of unsuccessfully established channel in listen-listen
work mode from 16s => 120s
## [0.4] - 2020-09-25
### Added
- Add multi-channel support
### Changed
- Optimize code structure and implementation
## [0.3] - 2020-09-20
### Added
- Refactoring the project framework into a three-layer structure:
work_mode / protocol / socket
## [0.2] - 2020-09-10
### Added
- Refactoring project framework
## [0.1] - 2020-08-19
### Added
- Initialize project

Binary file not shown.

After

Width:  |  Height:  |  Size: 206 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 75 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 214 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 158 KiB

21
port_forwarding/LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2021 Knownsec, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

190
port_forwarding/README.md Normal file
View File

@@ -0,0 +1,190 @@
# PortForward
## 0x00 前言
`PortForward` 是使用 Golang 进行开发的端口转发工具,解决在某些场景下 `内外网无法互通`的问题。
`PortForward` 的功能如下:
1. 支持 tcp/udp 协议层的端口转发
2. 支持级联
3. 支持正向/反向连接模式
4. 支持多通道
5. 支持 ipv6
本文对 `PortForward` 进行了详细的介绍。
目录:
1. 使用说明
2. 工作场景
3. 源码结构
4. 逻辑结构
5. 端口转发的实现
6. udp的knock报文
7. udp的超时设置
8. listen-listen的超时设置
9. 多通路的实现
10. issue
11. contributions
## 0x01 使用说明
**1.使用**
Usage:
./portforward [proto] [sock1] [sock2]
Option:
proto the port forward with protocol(tcp/udp)
sock format: [method:address:port]
method the sock mode(listen/conn)
Example:
tcp conn:192.168.1.1:3389 conn:192.168.1.10:23333
udp listen:192.168.1.3:5353 conn:8.8.8.8:53
tcp listen:[fe80::1%lo0]:8888 conn:[fe80::1%lo0]:7777
version: 0.5.0(build-20201022)
**2.编译**
Golang 1.12及以上
GO111MODULE=on
git clone https://github.com/knownsec/Portforward.git
./build.sh
## 0x02 工作场景
这里列举了一些 `PortForward` 的工作场景,如下:
**2.1 简单模式**
<div align="center">
<img src="./Images/simple_forward.png" width="500">
</br>[图1.简单转发模式]
</div>
**2.2 受限主机转发**
<div align="center">
<img src="./Images/restricted_forward.png" width="500">
</br>[图2.受限主机转发模式图]
</div>
**2.3 级联端口转发**
<div align="center">
<img src="./Images/mutil_forward.png" width="500">
</br>[图3.级联端口转发]
</div>
## 0x03 源码结构
.
├── CHANGELOG
├── Images // images resource
├── README.md
├── build.sh // compile script
├── forward.go // portforward main logic
├── go.mod
├── log.go // log module
├── main.go // main, parse arguments
├── tcp.go // tcp layer
└── udp.go // udp layer
## 0x04 逻辑结构
`PortForward` 支持 `TCP` , `UDP` 协议层的端口转发,代码抽象后逻辑结构框架如下:
<div align="center">
<img src="./Images/portforward_framework.png" width="500">
</br>[图4.整体框架]
</div>
## 0x05 端口转发的实现
端口转发程序作为网络传输的中间人,无非就是将两端的 socket 对象进行联通,数据就可以通过这条「链路」进行传输了。
按照这样的思路,我们从需求开始分析和抽象,可以这么认为:无论是作为 `tcp` 还是 `udp` 运行,无论是作为 `connect` 还是 `listen`
运行,最终都将获得两个 socket其中一个连向原服务另一个与客户端连接最终将这两端的 socket 联通即可实现端口转发。
在 Golang 中我们采用了 `io.Copy()` 来联通两个 socket但是 `io.Copy` 必须要求对象实现了 `io.Reader/io.Writer` 接口,`tcp`
的 socket 可以直接支持,而 `udp` 的 socket 需要我们进行封装。
## 0x06 udp的knock报文
`udp``connect` 模式下,我们在连接服务器成功后,立即发送了一个 `knock` 报文,如下:
conn, err := net.DialTimeout("udp", ...
_, err = conn.Write([]byte("\x00"))
其作用是通知远程 `udp` 服务器我们已经连上了(`udp`
创建连接后,仅在本地操作系统层进行了注册,只有当发送一个报文到对端后,远程服务器才能感知到新连接),当我们在 `udp`
`conn-conn` 模式下运行时,这个报文是必须的。
## 0x07 udp的超时设置
`udp` 的实现中,我们为所有的 `udp` 连接 socket 对象都设置了超时时间(`tcp` 中不需要),这是因为在 `udp`socket
对象无法感知对端退出,如果不设置超时时间,将会一直在 `conn.Read()` 阻塞下去。
我们设置了 `udp` 超时时间为 60 秒,当 60 秒无数据传输,本次建立的虚拟通信链路将销毁,端口转发程序将重新创建新的通信链路。
## 0x08 listen-listen的超时设置
对于 `listen-listen` 模式,需要等待两端的客户端都连上端口转发程序后,才能将两个 socket 进行联通。
为此我们在此处设置了 120 秒的超时时间,也就是说当其中一端有客户端连接后,另一端在 120 秒内没有连接,我们就销毁这个未成功建立的通信链路;用户重新连接即可。
> 如果没有这个超时,可能某些场景遗留了某个连接,将造成后续的通信链路错位。
## 0x09 多通路的实现
多通路可以支持同时发起多个连接,这里我们以 `tcp` 作为例子来说明。为了处理这种情况,我们的处理方式是:
1. `listen-conn`: 每当 listen 服务器接收到新连接后,与远端创建新的连接,并将两个 socket 进行联通。
2. `listen-listen`: (好像没有实际场景)两端的 listen 服务器接收到新连接后,将两个 socket 进行联通。
3. `conn-conn`: 创建 sock1 的连接,当 sock1 端收到数据,创建与 sock2 的连接,将两个 socket 进行联通;随后继续创建 sock1
连接(预留)。
> 我们在 `udp` 中也加入了多通路的支持,和 `tcp` 基本类似,但由于 `udp` 是无连接的,我们不能像 `tcp` 直接联通两个 socket
> 对象。我们在 `udp listen` 服务器中维护了一个临时表,使用 `ip:port` 作为标志,以描述各个通信链路的联通情况,依据此进行流量的分发。
## 0x0A issue
**1.udp的映射表未清空**
udp多通路中的映射表没有对无效数据进行清理长时间运行可能造成内存占用
**2.http服务的 host 字段影响**
当转发 http 服务时,由于常见的客户端在访问时自动将 `host` 设置为访问目标(端口转发程序的地址)
,我们直接对流量进行转发,那么 `host` 字段一定是错误的,某些 http 服务器对该字段进行了校验,所以无法正常转发。
> 比如端口转发 `www.baidu.com`
**3.不支持多端口通信**
这里当然无法处理多端口通信的服务,我们仅做了一个端口的转发,多个端口的情况无法直接处理,比如 `FTP` 服务,其分别使用 20 端口(
数据传输)和 21 端口(命令传输)。
但还是要单独说一句,对于大多数 `udp` 服务的实现,是 `udp` 服务器收到了一个新报文,对此进行处理后,然后使用新的 socket
将响应数据发给对端,比如 `TFTP` 服务:
1.client 发起请求 [get aaa]
src: 55123 => dst: 69
2.tftp 收到报文,进行处理,返回响应:
src: 61234 => dst: 55123
这种多端口通信我们也是无法处理的,因为在目前多通路的实现下,上述流程中第 2 步过后,对于端口转发程序来说,后续的报文无法确定转发给
69 端口还是 61234 端口。
## 0x0B contributions
[r0oike@knownsec 404](https://github.com/r0oike)
[fenix@knownsec 404](https://github.com/13ph03nix)
[0x7F@knownsec 404](https://github.com/0x7Fancy)
</br>
------------------------------
References:
内网渗透之端口转发、映射、代理: <https://xz.aliyun.com/t/6349>
内网渗透之端口转发与代理工具总结: <https://www.freebuf.com/articles/web/170970.html>
sensepost/reGeorg: <https://github.com/sensepost/reGeorg>
idlefire/ew: <https://github.com/idlefire/ew>
knownsec404
2020.10.22

1
port_forwarding/build.sh Normal file
View File

@@ -0,0 +1 @@
go build -o portforward .

356
port_forwarding/forward.go Normal file
View File

@@ -0,0 +1,356 @@
/**
* Filename: forward.go
* Description: the PortForward main logic implement, Three working modes are
* provided:
* 1.Conn<=>Conn: dial A remote server, dial B remote server, connected
* 2.Listen<=>Conn: listen local server, dial remote server, connected
* 3.Listen<=>Listen: listen A local server, listen B local server, connected
* Author: knownsec404
* Time: 2020.09.23
*/
package port_forwarding
import (
"io"
"net"
"time"
)
const PORTFORWARD_PROTO_NIL uint8 = 0x00
const PORTFORWARD_PROTO_TCP uint8 = 0x10
const PORTFORWARD_PROTO_UDP uint8 = 0x20
const PORTFORWARD_SOCK_NIL uint8 = 0x00
const PORTFORWARD_SOCK_LISTEN uint8 = 0x01
const PORTFORWARD_SOCK_CONN uint8 = 0x02
// the PortForward network interface
type Conn interface {
// Read reads data from the connection.
Read(b []byte) (n int, err error)
// Write writes data to the connection.
Write(b []byte) (n int, err error)
// Close closes the connection.
Close() error
// RemoteAddr returns the remote network address.
RemoteAddr() net.Addr
}
// the PortForward launch arguemnt
type Args struct {
Protocol uint8
// sock1
Method1 uint8
Addr1 string
// sock2
Method2 uint8
Addr2 string
}
var stop chan bool = nil
/**********************************************************************
* @Function: Launch(args Args)
* @Description: launch PortForward working mode by arguments
* @Parameter: args Args, the launch arguments
* @Return: nil
**********************************************************************/
func Launch(args Args) {
// initialize stop channel, the maximum number of coroutines that
// need to be managed is 2 (listen-listen)
stop = make(chan bool, 3)
//
if args.Method1 == PORTFORWARD_SOCK_CONN &&
args.Method2 == PORTFORWARD_SOCK_CONN {
// sock1 conn, sock2 conn
ConnConn(args.Protocol, args.Addr1, args.Addr2)
} else if args.Method1 == PORTFORWARD_SOCK_CONN &&
args.Method2 == PORTFORWARD_SOCK_LISTEN {
// sock1 conn, sock2 listen
ListenConn(args.Protocol, args.Addr2, args.Addr1)
} else if args.Method1 == PORTFORWARD_SOCK_LISTEN &&
args.Method2 == PORTFORWARD_SOCK_CONN {
// sock1 listen, sock2 conn
ListenConn(args.Protocol, args.Addr1, args.Addr2)
} else if args.Method1 == PORTFORWARD_SOCK_LISTEN &&
args.Method2 == PORTFORWARD_SOCK_LISTEN {
// sock1 listen , sock2 listen
ListenListen(args.Protocol, args.Addr1, args.Addr2)
} else {
LogError("unknown forward method")
return
}
}
/**********************************************************************
* @Function: Shutdown()
* @Description: the shutdown PortForward
* @Parameter: nil
* @Return: nil
**********************************************************************/
func Shutdown() {
stop <- true
}
/**********************************************************************
* @Function: ListenConn(proto uint8, addr1 string, addr2 string)
* @Description: "Listen<=>Conn" working mode
* @Parameter: proto uint8, the tcp or udp protocol setting
* @Parameter: addr1 string, the address1 "ip:port" string
* @Parameter: addr2 string, the address2 "ip:port" string
* @Return: nil
**********************************************************************/
func ListenConn(proto uint8, addr1 string, addr2 string) {
// get sock launch function by protocol
sockfoo1 := ListenTCP
if proto == PORTFORWARD_PROTO_UDP {
sockfoo1 = ListenUDP
}
sockfoo2 := ConnTCP
if proto == PORTFORWARD_PROTO_UDP {
sockfoo2 = ConnUDP
}
// launch socket1 listen
clientc := make(chan Conn)
quit := make(chan bool, 1)
LogInfo("listen A point with sock1 [%s]", addr1)
go sockfoo1(addr1, clientc, quit)
var count int = 1
for {
// socket1 listen & quit signal
var sock1 Conn = nil
select {
case <-stop:
quit <- true
return
case sock1 = <-clientc:
if sock1 == nil {
// set stop flag when error happend
stop <- true
continue
}
}
LogInfo("A point(link%d) [%s] is ready", count, sock1.RemoteAddr())
// socket2 dial
LogInfo("dial B point with sock2 [%s]", addr2)
sock2, err := sockfoo2(addr2)
if err != nil {
sock1.Close()
LogError("%s", err)
continue
}
LogInfo("B point(sock2) is ready")
// connect with sockets
go ConnectSock(count, sock1, sock2)
count += 1
} // end for
}
/**********************************************************************
* @Function: ListenListen(proto uint8, addr1 string, addr2 string)
* @Description: the "Listen<=>Listen" working mode
* @Parameter: proto uint8, the tcp or udp protocol setting
* @Parameter: addr1 string, the address1 "ip:port" string
* @Parameter: addr2 string, the address2 "ip:port" string
* @Return: nil
**********************************************************************/
func ListenListen(proto uint8, addr1 string, addr2 string) {
release := func(s1 Conn, s2 Conn) {
if s1 != nil {
s1.Close()
}
if s2 != nil {
s2.Close()
}
}
// get sock launch function by protocol
sockfoo := ListenTCP
if proto == PORTFORWARD_PROTO_UDP {
sockfoo = ListenUDP
}
// launch socket1 listen
clientc1 := make(chan Conn)
quit1 := make(chan bool, 1)
LogInfo("listen A point with sock1 [%s]", addr1)
go sockfoo(addr1, clientc1, quit1)
// launch socket2 listen
clientc2 := make(chan Conn)
quit2 := make(chan bool, 1)
LogInfo("listen B point with sock2 [%s]", addr2)
go sockfoo(addr2, clientc2, quit2)
var sock1 Conn = nil
var sock2 Conn = nil
var count int = 1
for {
select {
case <-stop:
quit1 <- true
quit2 <- true
release(sock1, sock2)
return
case c1 := <-clientc1:
if c1 == nil {
// set stop flag when error happend
stop <- true
continue
}
// close the last pending sock1
if sock1 != nil {
sock1.Close()
}
sock1 = c1
LogInfo("A point(link%d) [%s] is ready", count, sock1.RemoteAddr())
case c2 := <-clientc2:
if c2 == nil {
// set stop flag when error happend
stop <- true
continue
}
// close the last pending sock2
if sock2 != nil {
sock2.Close()
}
sock2 = c2
LogInfo("B point(link%d) [%s] is ready", count, sock2.RemoteAddr())
case <-time.After(120 * time.Second):
if sock1 != nil {
LogWarn("A point(%s) socket wait timeout, reset", sock1.RemoteAddr())
}
if sock2 != nil {
LogWarn("B point(%s) socket wait timeout, reset", sock2.RemoteAddr())
}
release(sock1, sock2)
continue
}
// wait another socket ready
if sock1 == nil || sock2 == nil {
continue
}
// the two socket is ready, connect with sockets
go ConnectSock(count, sock1, sock2)
count += 1
// reset sock1 & sock2
sock1 = nil
sock2 = nil
} // end for
}
/**********************************************************************
* @Function: ConnConn(proto uint8, addr1 string, addr2 string)
* @Description: the "Conn<=>Conn" working mode
* @Parameter: proto uint8, the tcp or udp protocol setting
* @Parameter: addr1 string, the address1 "ip:port" string
* @Parameter: addr2 string, the address2 "ip:port" string
* @Return: nil
**********************************************************************/
func ConnConn(proto uint8, addr1 string, addr2 string) {
// get sock launch function by protocol
sockfoo := ConnTCP
if proto == PORTFORWARD_PROTO_UDP {
sockfoo = ConnUDP
}
var count int = 1
for {
select {
case <-stop:
return
default:
}
// socket1 dial
LogInfo("dial A point with sock1 [%s]", addr1)
sock1, err := sockfoo(addr1)
if err != nil {
LogError("%s", err)
time.Sleep(16 * time.Second)
continue
}
LogInfo("A point(sock1) is ready")
// waiting for the first message sent by the A point(sock1)
buf := make([]byte, 32*1024)
n, err := sock1.Read(buf)
if err != nil {
LogError("A point: %s", err)
time.Sleep(16 * time.Second)
continue
}
buf = buf[:n]
// socket2 dial
LogInfo("dial B point with sock2 [%s]", addr2)
sock2, err := sockfoo(addr2)
if err != nil {
sock1.Close()
LogError("%s", err)
time.Sleep(16 * time.Second)
continue
}
LogInfo("B point(sock2) is ready")
// first pass in the first message above
_, err = sock2.Write(buf)
if err != nil {
LogError("B point: %s", err)
time.Sleep(16 * time.Second)
continue
}
// connect with sockets
go ConnectSock(count, sock1, sock2)
count += 1
} // end for
}
/**********************************************************************
* @Function: ConnectSock(id int, sock1 Conn, sock2 Conn)
* @Description: connect two sockets, if an error occurs, the socket will
* be closed so that the coroutine can exit normally
* @Parameter: id int, the communication link id
* @Parameter: sock1 Conn, the first socket object
* @Parameter: sock2 Conn, the second socket object
* @Return: nil
**********************************************************************/
func ConnectSock(id int, sock1 Conn, sock2 Conn) {
exit := make(chan bool, 1)
//
go func() {
_, err := io.Copy(sock1, sock2)
if err != nil {
LogError("ConnectSock%d(A=>B): %s", id, err)
} else {
LogInfo("ConnectSock%d(A=>B) exited", id)
}
exit <- true
}()
//
go func() {
_, err := io.Copy(sock2, sock1)
if err != nil {
LogError("ConnectSock%d(B=>A): %s", id, err)
} else {
LogInfo("ConnectSock%d(B=>A) exited", id)
}
exit <- true
}()
// exit when close either end
<-exit
// close all socket, so that "io.Copy" can exit
sock1.Close()
sock2.Close()
}

3
port_forwarding/go.mod Normal file
View File

@@ -0,0 +1,3 @@
module port_forwarding
go 1.20

114
port_forwarding/log.go Normal file
View File

@@ -0,0 +1,114 @@
/**
* Filename: log.go
* Description: the log information format
* Author: knownsec404
* Time: 2020.08.17
*/
package port_forwarding
import (
"fmt"
"time"
)
const (
LOG_LEVEL_NONE uint32 = 0
LOG_LEVEL_FATAL uint32 = 1
LOG_LEVEL_ERROR uint32 = 2
LOG_LEVEL_WARN uint32 = 3
LOG_LEVEL_INFO uint32 = 4
LOG_LEVEL_DEBUG uint32 = 5
)
var LOG_LEVEL uint32 = LOG_LEVEL_DEBUG
/**********************************************************************
* @Function: LogFatal(format string, a ...interface{})
* @Description: log infomations with fatal level
* @Parameter: format string, the format string template
* @Parameter: a ...interface{}, the value
* @Return: nil
**********************************************************************/
func LogFatal(format string, a ...interface{}) {
if LOG_LEVEL < LOG_LEVEL_FATAL {
return
}
msg := fmt.Sprintf(format, a...)
msg = fmt.Sprintf("[%s] [FATAL] %s", getCurrentTime(), msg)
fmt.Println(msg)
}
/**********************************************************************
* @Function: LogError(format string, a ...interface{})
* @Description: log infomations with error level
* @Parameter: format string, the format string template
* @Parameter: a ...interface{}, the value
* @Return: nil
**********************************************************************/
func LogError(format string, a ...interface{}) {
if LOG_LEVEL < LOG_LEVEL_WARN {
return
}
msg := fmt.Sprintf(format, a...)
msg = fmt.Sprintf("[%s] [ERROR] %s", getCurrentTime(), msg)
fmt.Println(msg)
}
/**********************************************************************
* @Function: LogWarn(format string, a ...interface{})
* @Description: log infomations with warn level
* @Parameter: format string, the format string template
* @Parameter: a ...interface{}, the value
* @Return: nil
**********************************************************************/
func LogWarn(format string, a ...interface{}) {
if LOG_LEVEL < LOG_LEVEL_WARN {
return
}
msg := fmt.Sprintf(format, a...)
msg = fmt.Sprintf("[%s] [WARN] %s", getCurrentTime(), msg)
fmt.Println(msg)
}
/**********************************************************************
* @Function: LogInfo(format string, a ...interface{})
* @Description: log infomations with info level
* @Parameter: format string, the format string template
* @Parameter: a ...interface{}, the value
* @Return: nil
**********************************************************************/
func LogInfo(format string, a ...interface{}) {
if LOG_LEVEL < LOG_LEVEL_INFO {
return
}
msg := fmt.Sprintf(format, a...)
msg = fmt.Sprintf("[%s] [INFO] %s", getCurrentTime(), msg)
fmt.Println(msg)
}
/**********************************************************************
* @Function: LogDebug(format string, a ...interface{})
* @Description: log infomations with debug level
* @Parameter: format string, the format string template
* @Parameter: a ...interface{}, the value
* @Return: nil
**********************************************************************/
func LogDebug(format string, a ...interface{}) {
if LOG_LEVEL < LOG_LEVEL_DEBUG {
return
}
msg := fmt.Sprintf(format, a...)
msg = fmt.Sprintf("[%s] [DEBUG] %s", getCurrentTime(), msg)
fmt.Println(msg)
}
/**********************************************************************
* @Function: getCurrentTime() (string)
* @Description: get current time as log format string
* @Parameter: nil
* @Return: string, the current time string
**********************************************************************/
func getCurrentTime() string {
return time.Now().Format("01-02|15:04:05")
}

115
port_forwarding/main.go Normal file
View File

@@ -0,0 +1,115 @@
/**
* Filename: main.go
* Description: the PortForward main entry point
* It supports tcp/udp protocol layer traffic forwarding, forward/reverse
* creation of forwarding links, and multi-level cascading use.
* Author: knownsec404
* Time: 2020.09.02
*/
package port_forwarding
import (
"errors"
"fmt"
"os"
"strings"
)
const VERSION string = "version: 0.5.0(build-20201022)"
/**********************************************************************
* @Function: main()
* @Description: the PortForward entry point, parse command-line argument
* @Parameter: nil
* @Return: nil
**********************************************************************/
func main() {
if len(os.Args) != 4 {
usage()
return
}
proto := os.Args[1]
sock1 := os.Args[2]
sock2 := os.Args[3]
// parse and check argument
protocol := PORTFORWARD_PROTO_TCP
if strings.ToUpper(proto) == "TCP" {
protocol = PORTFORWARD_PROTO_TCP
} else if strings.ToUpper(proto) == "UDP" {
protocol = PORTFORWARD_PROTO_UDP
} else {
fmt.Printf("unknown protocol [%s]\n", proto)
return
}
m1, a1, err := parseSock(sock1)
if err != nil {
fmt.Println(err)
return
}
m2, a2, err := parseSock(sock2)
if err != nil {
fmt.Println(err)
return
}
// launch
args := Args{
Protocol: protocol,
Method1: m1,
Addr1: a1,
Method2: m2,
Addr2: a2,
}
Launch(args)
}
/**********************************************************************
* @Function: parseSock(sock string) (uint8, string, error)
* @Description: parse and check sock string
* @Parameter: sock string, the sock string from command-line
* @Return: (uint8, string, error), the method, address and error
**********************************************************************/
func parseSock(sock string) (uint8, string, error) {
// split "method" and "address"
items := strings.SplitN(sock, ":", 2)
if len(items) != 2 {
return PORTFORWARD_SOCK_NIL, "",
errors.New("host format must [method:address:port]")
}
method := items[0]
address := items[1]
// check the method field
if strings.ToUpper(method) == "LISTEN" {
return PORTFORWARD_SOCK_LISTEN, address, nil
} else if strings.ToUpper(method) == "CONN" {
return PORTFORWARD_SOCK_CONN, address, nil
} else {
errmsg := fmt.Sprintf("unknown method [%s]", method)
return PORTFORWARD_SOCK_NIL, "", errors.New(errmsg)
}
}
/**********************************************************************
* @Function: usage()
* @Description: the PortForward usage
* @Parameter: nil
* @Return: nil
**********************************************************************/
func usage() {
fmt.Println("Usage:")
fmt.Println(" ./portforward [proto] [sock1] [sock2]")
fmt.Println("Option:")
fmt.Println(" proto the port forward with protocol(tcp/udp)")
fmt.Println(" sock format: [method:address:port]")
fmt.Println(" method the sock mode(listen/conn)")
fmt.Println("Example:")
fmt.Println(" tcp conn:192.168.1.1:3389 conn:192.168.1.10:23333")
fmt.Println(" udp listen:192.168.1.3:5353 conn:8.8.8.8:53")
fmt.Println(" tcp listen:[fe80::1%lo0]:8888 conn:[fe80::1%lo0]:7777")
fmt.Println()
fmt.Println(VERSION)
}

79
port_forwarding/tcp.go Normal file
View File

@@ -0,0 +1,79 @@
/**
* Filename: tcp.go
* Description: the PortForward tcp layer implement
* Author: knownsec404
* Time: 2020.09.23
*/
package port_forwarding
import (
"net"
"time"
)
/**********************************************************************
* @Function: ListenTCP(address string, clientc chan Conn, quit chan bool)
* @Description: listen local tcp service, and accept client connection,
* initialize connection and return by channel.
* @Parameter: address string, the local listen address
* @Parameter: clientc chan Conn, new client connection channel
* @Parameter: quit chan bool, the quit signal channel
* @Return: nil
**********************************************************************/
func ListenTCP(address string, clientc chan Conn, quit chan bool) {
addr, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
LogError("tcp listen error, %s", err)
clientc <- nil
return
}
serv, err := net.ListenTCP("tcp", addr)
if err != nil {
LogError("tcp listen error, %s", err)
clientc <- nil
return
}
// the "conn" has been ready, close "serv"
defer serv.Close()
for {
// check quit
select {
case <-quit:
return
default:
}
// set "Accept" timeout, for check "quit" signal
serv.SetDeadline(time.Now().Add(16 * time.Second))
conn, err := serv.Accept()
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
continue
}
// others error
LogError("tcp listen error, %s", err)
clientc <- nil
break
}
// new client is connected
clientc <- conn
} // end for
}
/**********************************************************************
* @Function: ConnTCP(address string) (Conn, error)
* @Description: dial to remote server, and return tcp connection
* @Parameter: address string, the remote server address that needs to be dialed
* @Return: (Conn, error), the tcp connection and error
**********************************************************************/
func ConnTCP(address string) (Conn, error) {
conn, err := net.DialTimeout("tcp", address, 10*time.Second)
if err != nil {
return nil, err
}
return conn, nil
}

View File

@@ -0,0 +1,79 @@
package tcp_learn
import (
"fmt"
"log"
"net"
"time"
)
func handleConnection(conn net.Conn) {
defer conn.Close()
buf := make([]byte, 1024)
for {
n, err := conn.Read(buf)
if err != nil {
fmt.Println("Error reading:", err)
break
}
if n > 0 {
fmt.Printf("Received data from %s: %s\n", conn.RemoteAddr(), string(buf[:n]))
}
}
}
func TcpServer() {
addr, err := net.ResolveTCPAddr("tcp", "localhost:8080")
if err != nil {
log.Printf("Error accepting connection: %v", err)
}
listener, err := net.ListenTCP("tcp", addr)
if err != nil {
log.Fatalf("Error creating listener: %v", err)
}
defer listener.Close()
listener.SetDeadline(time.Now().Add(2 * time.Minute))
for {
conn, err := listener.Accept()
if err != nil {
log.Printf("Error accepting connection: %v", err)
return
}
go handleConnection(conn)
}
}
func TcpClient() chan bool {
exit := make(chan bool, 1)
tick := time.Tick(time.Second)
timeOut := time.After(time.Minute)
conn, err := net.Dial("tcp", "localhost:8080")
if err != nil {
log.Fatalf("Error creating listener: %v", err)
return nil
}
for {
select {
case <-timeOut:
fmt.Println("timeout exit!")
exit <- true
return exit
case <-tick:
_, err := conn.Write([]byte("Hello from tcp client !"))
if err != nil {
log.Fatalf("Error write tcp : %v", err)
}
}
}
return exit
}

View File

@@ -0,0 +1,23 @@
package tcp_learn
import (
"testing"
"time"
)
func TestTcpServer(t *testing.T) {
var exit chan bool
go func() {
TcpServer()
}()
go func() {
time.Sleep(time.Second)
exit = TcpClient()
}()
<-exit
}

215
port_forwarding/udp.go Normal file
View File

@@ -0,0 +1,215 @@
/**
* Filename: udp.go
* Description: the PortForward udp layer implement.
* Author: knownsec404
* Time: 2020.09.23
*/
package port_forwarding
import (
"errors"
"net"
"time"
)
// as UDP client Conn
type UDPDistribute struct {
Established bool
Conn *(net.UDPConn)
RAddr net.Addr
Cache chan []byte
}
/**********************************************************************
* @Function: NewUDPDistribute(conn *(net.UDPConn), addr net.Addr) (*UDPDistribute)
* @Description: initialize UDPDistribute structure (as UDP client Conn)
* @Parameter: conn *(net.UDPConn), the udp connection object
* @Parameter: addr net.Addr, the udp client remote adddress
* @Return: *UDPDistribute, the new UDPDistribute structure pointer
**********************************************************************/
func NewUDPDistribute(conn *(net.UDPConn), addr net.Addr) *UDPDistribute {
return &UDPDistribute{
Established: true,
Conn: conn,
RAddr: addr,
Cache: make(chan []byte, 16),
}
}
/**********************************************************************
* @Function: (this *UDPDistribute) Close() (error)
* @Description: set "Established" flag is false, the UDP service will
* cleaned up according to certain conditions.
* @Parameter: nil
* @Return: error, the error
**********************************************************************/
func (this *UDPDistribute) Close() error {
this.Established = false
return nil
}
/**********************************************************************
* @Function: (this *UDPDistribute) Read(b []byte) (n int, err error)
* @Description: read data from connection, due to the udp implementation of
* PortForward, read here will only produce a timeout error and closed error
* (compared to the normal net.Conn object)
* @Parameter: b []byte, the buffer for receive data
* @Return: (n int, err error), the length of the data read and error
**********************************************************************/
func (this *UDPDistribute) Read(b []byte) (n int, err error) {
if !this.Established {
return 0, errors.New("udp distrubute has closed")
}
select {
case <-time.After(16 * time.Second):
return 0, errors.New("udp distrubute read timeout")
case data := <-this.Cache:
n := len(data)
copy(b, data)
return n, nil
}
}
/**********************************************************************
* @Function: (this *UDPDistribute) Write(b []byte) (n int, err error)
* @Description: write data to connection by "WriteTo()"
* @Parameter: b []byte, the data to be sent
* @Return: (n int, err error), the length of the data write and error
**********************************************************************/
func (this *UDPDistribute) Write(b []byte) (n int, err error) {
if !this.Established {
return 0, errors.New("udp distrubute has closed")
}
return this.Conn.WriteTo(b, this.RAddr)
}
/**********************************************************************
* @Function: (this *UDPDistribute) RemoteAddr() (net.Addr)
* @Description: get remote address
* @Parameter: nil
* @Return: net.Addr, the remote address
**********************************************************************/
func (this *UDPDistribute) RemoteAddr() net.Addr {
return this.RAddr
}
/**********************************************************************
* @Function: ListenUDP(address string, clientc chan Conn, quit chan bool)
* @Description: listen local udp service, and accept client connection,
* initialize connection and return by channel.
* since udp is running as a service, it only obtains remote data through
* the Read* function cluster (different from tcp), so we need a temporary
* table to record, so that we can use the temporary table to determine
* whether to forward or create a new link
* @Parameter: address string, the local listen address
* @Parameter: clientc chan Conn, new client connection channel
* @Parameter: quit chan bool, the quit signal channel
* @Return: nil
**********************************************************************/
func ListenUDP(address string, clientc chan Conn, quit chan bool) {
addr, err := net.ResolveUDPAddr("udp", address)
if err != nil {
LogError("udp listen error, %s", err)
clientc <- nil
return
}
serv, err := net.ListenUDP("udp", addr)
if err != nil {
LogError("udp listen error, %s", err)
clientc <- nil
return
}
defer serv.Close()
// the udp distrubute table
table := make(map[string]*UDPDistribute)
// NOTICE:
// in the process of running, the table will generate invalid historical
// data, we have not cleaned it up(These invalid historical data will not
// affect our normal operation).
//
// if you want to deal with invalid historical data, the best way is to
// launch a new coroutine to handle net.Read, and another coroutine to
// handle the connection status, but additional logic to exit the
// coroutine is needed. Currently we think it is unnecessary.
//
// under the current code logic: if a udp connection exits, it will timeout
// to trigger "Close()", and finally set "Established" to false, but there
// is no logic to check this state to clean up, so invalid historical data
// is generated (of course, we can traverse to clean up? every packet? no)
//
// PS: you can see that we called "delete()" in the following logic, but
// this is only for processing. when a new udp client hits invalid
// historical data, we need to return a new connection so that PortForward
// can create a communication link.
for {
// check quit
select {
case <-quit:
return
default:
}
// set timeout, for check "quit" signal
serv.SetDeadline(time.Now().Add(16 * time.Second))
// just new 32*1024, in the outer layer we used "io.Copy()", which
// can only handle the size of 32*1024
buf := make([]byte, 32*1024)
n, addr, err := serv.ReadFrom(buf)
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
continue
}
LogError("udp listen error, %s", err)
clientc <- nil
return
}
buf = buf[:n]
// if the address in table, we distrubute message
if d, ok := table[addr.String()]; ok {
if d.Established {
// it is established, distrubute message
d.Cache <- buf
continue
} else {
// we remove it when the connnection has expired
delete(table, addr.String())
}
}
// if the address not in table, we create new connection object
conn := NewUDPDistribute(serv, addr)
table[addr.String()] = conn
conn.Cache <- buf
clientc <- conn
} // end for
}
/**********************************************************************
* @Function: ConnUDP(address string) (Conn, error)
* @Description: dial to remote server, and return udp connection
* @Parameter: address string, the remote server address that needs to be dialed
* @Return: (Conn, error), the udp connection and error
**********************************************************************/
func ConnUDP(address string) (Conn, error) {
conn, err := net.DialTimeout("udp", address, 10*time.Second)
if err != nil {
return nil, err
}
// send one byte(knock) to server, get "established" udp connection
_, err = conn.Write([]byte("\x00"))
if err != nil {
return nil, err
}
// due to the characteristics of udp, when the udp server exits, we will
// not receive any signal, it will be blocked at conn.Read();
// here we set a timeout for udp
conn.SetDeadline(time.Now().Add(60 * time.Second))
return conn, nil
}

View File

@@ -2,6 +2,7 @@ package io.wdd.func.auto.service;
import io.wdd.func.auto.beans.BaseFunctionEnum; import io.wdd.func.auto.beans.BaseFunctionEnum;
import io.wdd.func.auto.beans.ProjectDeployContext; import io.wdd.func.auto.beans.ProjectDeployContext;
import io.wdd.server.beans.po.ServerInfoPO;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
@@ -25,10 +26,9 @@ public class BaseFuncScheduler {
beforeRunProcedure(projectDeployContext); beforeRunProcedure(projectDeployContext);
// during run // during running
doRunProcedure(projectDeployContext); doRunProcedure(projectDeployContext);
// after run // after run
afterRunProcedure(); afterRunProcedure();
@@ -63,15 +63,12 @@ public class BaseFuncScheduler {
.getMasterNode() .getMasterNode()
.getTopicName(); .getTopicName();
if (agentNodeRunProcedure( return serverDoRunProcedure(
true, true,
masterTopicName, masterTopicName,
projectDeployContext projectDeployContext
)) { );
return true;
}
return false;
} }
@@ -108,41 +105,40 @@ public class BaseFuncScheduler {
List<String> agentTopicNameList = projectDeployContext List<String> agentTopicNameList = projectDeployContext
.getAgentNodeList() .getAgentNodeList()
.stream() .stream()
.map(agentNode -> agentNode.getTopicName()) .map(ServerInfoPO::getTopicName)
.collect(Collectors.toList()); .collect(Collectors.toList());
agentTopicNameList.forEach( for (String agentTopicName : agentTopicNameList) {
agentTopicName -> { if (!serverDoRunProcedure(
if (!agentNodeRunProcedure( false,
false, agentTopicName,
agentTopicName, projectDeployContext
projectDeployContext )) {
)) { log.error("{} agent run base func failed !", agentTopicName);
log.error("{} agent run base func failed !", agentTopicName); return false;
} }
} }
);
return true; return true;
} }
private boolean agentNodeRunProcedure(boolean masterNode, String agentTopicName, ProjectDeployContext projectDeployContext) { private boolean serverDoRunProcedure(boolean masterNode, String agentTopicName, ProjectDeployContext projectDeployContext) {
List<BaseFunctionEnum> agentNodeProcedureList; List<BaseFunctionEnum> serverRunProcedureList;
if (masterNode) { if (masterNode) {
agentNodeProcedureList = projectDeployContext.getMasterNodeBaseProcedure(); serverRunProcedureList = projectDeployContext.getMasterNodeBaseProcedure();
} else { } else {
agentNodeProcedureList = projectDeployContext.getAgentNodeBaseProcedure(); serverRunProcedureList = projectDeployContext.getAgentNodeBaseProcedure();
} }
if (CollectionUtils.isEmpty(agentNodeProcedureList)) { if (CollectionUtils.isEmpty(serverRunProcedureList)) {
return true; return true;
} }
ArrayList<String> baseFuncArgList = projectDeployContext.getBaseFunctionArgs(); ArrayList<String> baseFuncArgList = projectDeployContext.getBaseFunctionArgs();
for (BaseFunctionEnum durationBaseFunc : agentNodeProcedureList) { for (BaseFunctionEnum durationBaseFunc : serverRunProcedureList) {
// add op name // add op name
baseFuncArgList.add( baseFuncArgList.add(
@@ -157,7 +153,8 @@ public class BaseFuncScheduler {
true true
)) { )) {
log.error( log.error(
"Agent Base Func Failed ! => {}", "Agent {} Base Func Failed ! => {}",
agentTopicName,
durationBaseFunc durationBaseFunc
); );
return false; return false;
@@ -171,5 +168,4 @@ public class BaseFuncScheduler {
} }
} }

View File

@@ -17,9 +17,9 @@ import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Map;
import java.util.stream.Collectors;
@SpringBootTest @SpringBootTest
public class TestBaseFuncScheduler { public class TestBaseFuncScheduler {
@@ -51,47 +51,24 @@ public class TestBaseFuncScheduler {
projectDeployContext.setProjectId(projectServerId); projectDeployContext.setProjectId(projectServerId);
// ServerQueryEntity serverQueryEntity = new ServerQueryEntity();
// // exsi server
//// serverQueryEntity.setServerName("Chengdu-amd64-99");
// // lappro
//// serverQueryEntity.setServerName("Chengdu-amd64-65");
//
// // cqga
// serverQueryEntity.setServerName();
// ServerInfoPO serverInfoPO = serverService
// .serverGetByPage(serverQueryEntity)
// .getRecords()
// .get(0);
String masterNodeServerName = "Chongqing-amd64-01"; // cgga String masterNodeServerName = "Chongqing-amd64-01"; // cgga
// String masterNodeServerName = "Chengdu-amd64-99"; // lap pro // String masterNodeServerName = "Chengdu-amd64-99"; // lap pro
ProjectServerVO projectServerVO = coreProjectServerService.projectServerOne(projectServerId); ProjectServerVO projectServerVO = coreProjectServerService.projectServerOne(projectServerId);
Map<Boolean, List<ServerInfoPO>> collect = projectServerVO.getBindingServerList().stream().collect(
Collectors.groupingBy(
serverInfoPO -> StringUtils.contains(serverInfoPO.getServerName(), masterNodeServerName)
)
);
Optional<ServerInfoPO> serverInfoPOOptional = projectServerVO.getBindingServerList().stream().filter(
serverInfoPO -> StringUtils.contains(serverInfoPO.getServerName(), masterNodeServerName)
).findFirst();
if (serverInfoPOOptional.isEmpty()) { if (collect.get(Boolean.TRUE) == null) {
System.out.printf("project of %s server of %s is empty", projectServerVO, masterNodeServerName); System.out.printf("project of %s master server of %s is empty", projectServerVO, masterNodeServerName);
return; return;
} }
projectDeployContext.setMasterNode(collect.get(Boolean.TRUE).get(0));
ServerInfoPO serverInfoPO = serverInfoPOOptional.get(); projectDeployContext.setAgentNodeList(collect.get(Boolean.FALSE));
System.out.println("serverInfoPO = " + serverInfoPO);
projectDeployContext.setMasterNode(serverInfoPO);
ArrayList<ServerInfoPO> agentNodeList = new ArrayList<>();
projectServerVO.getBindingServerList().forEach(
po -> {
if (!StringUtils.contains(po.getServerName(), masterNodeServerName)) {
agentNodeList.add(po);
}
}
);
projectDeployContext.setAgentNodeList(agentNodeList);
List<BaseFunctionEnum> masterNodeProcedure = List.of( List<BaseFunctionEnum> masterNodeProcedure = List.of(