diff --git a/cmii_operator/CmiiAppDeployTemplate.go b/cmii_operator/CmiiAppDeployTemplate.go deleted file mode 100644 index 976f8da..0000000 --- a/cmii_operator/CmiiAppDeployTemplate.go +++ /dev/null @@ -1,120 +0,0 @@ -package cmii_operator - -const cmiiBackendDeploymentTemplate = ` -apiVersion: apps/v1 -kind: Deployment -metadata: - name: {{ .AppName }} - namespace: {{ .Namespace }} - labels: - cmii.type: backend - cmii.app: {{ .AppName }} - octopus/control: backend-app-1.0.0 - app.kubernetes.io/managed-by: octopus/control - app.kubernetes.io/app-version: {{ .TagVersion }} -spec: - replicas: {{ .Replicas }} - strategy: - rollingUpdate: - maxUnavailable: 1 - selector: - matchLabels: - cmii.type: backend - cmii.app: {{ .AppName }} - template: - metadata: - labels: - cmii.type: backend - cmii.app: {{ .AppName }} - spec: - affinity: - nodeAffinity: - requiredDuringSchedulingIgnoredDuringExecution: - nodeSelectorTerms: - - matchExpressions: - - key: uavcloud.env - operator: In - values: - - demo - imagePullSecrets: - - name: harborsecret - containers: - - name: {{ .AppName }} - image: "harbor.cdcyy.com.cn/cmii/{{ .AppName }}:{{ .ImageTag }}" - imagePullPolicy: Always - env: - - name: K8S_NAMESPACE - value: "{{ .Namespace }}" - - name: APPLICATION_NAME - value: "{{ .AppName }}" - - name: CUST_JAVA_OPTS - value: "-Xms500m -Xmx1500m -Dlog4j2.formatMsgNoLookups=true" - - name: NACOS_REGISTRY - value: "helm-nacos:8848" - - name: NACOS_DISCOVERY_IP - valueFrom: - fieldRef: - fieldPath: status.podIP - - name: NACOS_DISCOVERY_PORT - value: "8080" - - name: BIZ_CONFIG_GROUP - value: {{ .TagVersion }} - - name: SYS_CONFIG_GROUP - value: {{ .TagVersion }} - - name: IMAGE_VERSION - value: {{ .TagVersion }} - - name: NACOS_USERNAME - value: "developer" - - name: NACOS_PASSWORD - value: "Deve@9128201" - ports: - - name: pod-port - containerPort: 8080 - protocol: TCP - resources: - limits: - memory: 2Gi - cpu: 2 - requests: - memory: 1Gi - cpu: 200m - livenessProbe: - httpGet: - path: /cmii/ping - port: pod-port - scheme: HTTP - initialDelaySeconds: 60 - timeoutSeconds: 5 - periodSeconds: 20 - successThreshold: 1 - failureThreshold: 3 - readinessProbe: - httpGet: - path: /cmii/ping - port: pod-port - scheme: HTTP - initialDelaySeconds: 60 - timeoutSeconds: 5 - periodSeconds: 20 - successThreshold: 1 - failureThreshold: 3 - startupProbe: - httpGet: - path: /cmii/ping - port: pod-port - scheme: HTTP - initialDelaySeconds: 60 - timeoutSeconds: 3 - periodSeconds: 20 - successThreshold: 1 - failureThreshold: 5 - volumeMounts: - - name: glusterfs-backend-log-volume - mountPath: /cmii/logs - readOnly: false - subPath: {{ .Namespace }}/{{ .AppName }} - volumes: - - name: glusterfs-backend-log-volume - persistentVolumeClaim: - claimName: glusterfs-backend-log-pvc - ` diff --git a/cmii_operator/CmiiAppDeploy_test.go b/cmii_operator/CmiiAppDeploy_test.go deleted file mode 100644 index 1b0ad0f..0000000 --- a/cmii_operator/CmiiAppDeploy_test.go +++ /dev/null @@ -1,17 +0,0 @@ -package cmii_operator - -import "testing" - -func TestCmiiBackendDeploymentConfig_ParseToApplyConf(t *testing.T) { - - deploymentConfig := CmiiBackendDeploymentConfig{ - Namespace: "uavcloud-dev", - AppName: "cmii-uav-gateway", - ImageTag: "5.2.0-123", - TagVersion: "5.2.0", - Replicas: "2", - } - - deploymentConfig.ParseToApplyConf() - -} diff --git a/cmii_operator/K8sOperator.go b/cmii_operator/K8sOperator.go index df02e01..e82796c 100644 --- a/cmii_operator/K8sOperator.go +++ b/cmii_operator/K8sOperator.go @@ -161,8 +161,7 @@ func (op *CmiiK8sOperator) DeploymentAll(cmiiEnv string) []v1.Deployment { length := len(deployments) log.InfoF("[DeploymentAll] - deployment in [%s] count is => %d", op.CurrentNamespace, length) results := make([]v1.Deployment, length) - //var results []v1.Deployment - //ccc := make(chan v1.Deployment, length) + ccc := make(chan v1.Deployment, length) var wg sync.WaitGroup //var mutex sync.Mutex @@ -193,32 +192,34 @@ func (op *CmiiK8sOperator) DeploymentAll(cmiiEnv string) []v1.Deployment { go func(deploymentList []v1.Deployment, start int, results *[]v1.Deployment) { - for index, deployment := range deploymentList { + for _, deployment := range deploymentList { objectMeta := deployment.ObjectMeta objectMeta.SetAnnotations(nil) objectMeta.SetManagedFields(nil) deployment.ObjectMeta = objectMeta - //ccc <- deployment - i := *results - i[index+start] = deployment + ccc <- deployment + //i := *results + //i[index+start] = deployment } wg.Done() }(deployments[start:end], start, &results) } - //go func() { - // wg.Wait() - // close(ccc) - //}() + go func() { + wg.Wait() + close(ccc) + }() wg.Wait() - //for deployment := range ccc { - // results = append(results, deployment) - //} + index := 0 + for deployment := range ccc { + results[index] = deployment + index++ + } - return results + return results[:index] } func (op *CmiiK8sOperator) DeploymentAllInterface(cmiiEnv string) []CmiiDeploymentInterface { diff --git a/cmii_operator/actual_project/cqga/operator.go b/cmii_operator/actual_project/cqga/operator.go index 944c2c0..d5950f1 100644 --- a/cmii_operator/actual_project/cqga/operator.go +++ b/cmii_operator/actual_project/cqga/operator.go @@ -38,5 +38,7 @@ func main() { } // restart all backend + cmii_operator.RestartCmiiBackendDeployment(realNamespace) + cmii_operator.RestartCmiiFrontendDeployment(realNamespace) } diff --git a/cmii_operator/CmiiAppDeploy.go b/cmii_operator/deploy/CmiiAppDeploy.go similarity index 50% rename from cmii_operator/CmiiAppDeploy.go rename to cmii_operator/deploy/CmiiAppDeploy.go index 49ea431..6c8d147 100644 --- a/cmii_operator/CmiiAppDeploy.go +++ b/cmii_operator/deploy/CmiiAppDeploy.go @@ -1,25 +1,47 @@ -package cmii_operator +package deploy import ( "bytes" + "fmt" v1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" appsv1 "k8s.io/client-go/applyconfigurations/apps/v1" "sigs.k8s.io/yaml" "text/template" "wdd.io/agent-go/utils" ) +type CommonEnvironmentConfig struct { + WebIP string + WebPort string + HarborIP string + HarborPort string +} + type CmiiBackendDeploymentConfig struct { + Namespace string + AppName string + ImageTag string + TagVersion string + Replicas string + NodePort string + NeedPvcCache bool + CustomJvmOpt string +} + +type CmiiFrontendDeploymentConfig struct { Namespace string AppName string ImageTag string TagVersion string Replicas string + ShortName string } func (backend CmiiBackendDeploymentConfig) ParseToApplyConf() *appsv1.DeploymentApplyConfiguration { // 解析模板 + tmpl, err := template.New("cmiiBackendDeploymentTemplate").Parse(cmiiBackendDeploymentTemplate) if err != nil { panic(err) @@ -41,5 +63,28 @@ func (backend CmiiBackendDeploymentConfig) ParseToApplyConf() *appsv1.Deployment utils.BeautifulPrint(&deployment) + // service + parse, err := template.New("cmiiBackendServiceTemplate").Parse(cmiiBackendServiceTemplate) + if err != nil { + panic(err) + } + // 应用数据并打印结果 + var resulta bytes.Buffer + err = parse.Execute(&resulta, backend) + if err != nil { + panic(err) + } + + fmt.Println(resulta.String()) + + // 创建Deployment对象 + service := corev1.Service{} + err = yaml.Unmarshal(resulta.Bytes(), &service) + if err != nil { + panic(err) + } + + utils.BeautifulPrint(&service) + return nil } diff --git a/cmii_operator/deploy/CmiiAppDeployTemplate.go b/cmii_operator/deploy/CmiiAppDeployTemplate.go new file mode 100644 index 0000000..57f1ff1 --- /dev/null +++ b/cmii_operator/deploy/CmiiAppDeployTemplate.go @@ -0,0 +1,263 @@ +package deploy + +const cmiiBackendDeploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ .AppName }} + namespace: {{ .Namespace }} + labels: + cmii.type: backend + cmii.app: {{ .AppName }} + octopus/control: backend-app-1.0.0 + app.kubernetes.io/managed-by: octopus/control + app.kubernetes.io/app-version: {{ .TagVersion }} +spec: + replicas: {{ .Replicas }} + strategy: + rollingUpdate: + maxUnavailable: 1 + selector: + matchLabels: + cmii.type: backend + cmii.app: {{ .AppName }} + template: + metadata: + labels: + cmii.type: backend + cmii.app: {{ .AppName }} + spec: + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: uavcloud.env + operator: In + values: + - demo + imagePullSecrets: + - name: harborsecret + containers: + - name: {{ .AppName }} + image: "harbor.cdcyy.com.cn/cmii/{{ .AppName }}:{{ .ImageTag }}" + imagePullPolicy: Always + env: + - name: K8S_NAMESPACE + value: {{ .Namespace }} + - name: APPLICATION_NAME + value: {{ .AppName }} + - name: CUST_JAVA_OPTS + value: "-Xms500m -Xmx1500m -Dlog4j2.formatMsgNoLookups=true" + - name: NACOS_REGISTRY + value: "helm-nacos:8848" + - name: NACOS_DISCOVERY_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: NACOS_DISCOVERY_PORT + value: "8080" + - name: BIZ_CONFIG_GROUP + value: {{ .TagVersion }} + - name: SYS_CONFIG_GROUP + value: {{ .TagVersion }} + - name: IMAGE_VERSION + value: {{ .TagVersion }} + - name: NACOS_USERNAME + value: "developer" + - name: NACOS_PASSWORD + value: "Deve@9128201" + ports: + - name: pod-port + containerPort: 8080 + protocol: TCP + resources: + limits: + memory: 2Gi + cpu: 2 + requests: + memory: 1Gi + cpu: 200m + livenessProbe: + httpGet: + path: /cmii/ping + port: pod-port + scheme: HTTP + initialDelaySeconds: 60 + timeoutSeconds: 5 + periodSeconds: 20 + successThreshold: 1 + failureThreshold: 3 + readinessProbe: + httpGet: + path: /cmii/ping + port: pod-port + scheme: HTTP + initialDelaySeconds: 60 + timeoutSeconds: 5 + periodSeconds: 20 + successThreshold: 1 + failureThreshold: 3 + startupProbe: + httpGet: + path: /cmii/ping + port: pod-port + scheme: HTTP + initialDelaySeconds: 60 + timeoutSeconds: 3 + periodSeconds: 20 + successThreshold: 1 + failureThreshold: 5 + volumeMounts: + - name: glusterfs-backend-log-volume + mountPath: /cmii/logs + readOnly: false + subPath: {{ .Namespace }}/{{ .AppName }} + {{- if .NeedPvcCache }} + - name: data-cache-volume + mountPath: /cmii/cache + readOnly: false + subPath: {{ .Namespace }}/{{ .AppName }} + {{- end }} + volumes: + - name: glusterfs-backend-log-volume + persistentVolumeClaim: + claimName: glusterfs-backend-log-pvc + {{- if .NeedPvcCache }} + - name: data-cache-volume + persistentVolumeClaim: + claimName: {{ .AppName }}-cache + {{- end }} + ` + +const cmiiBackendServiceTemplate = ` +apiVersion: v1 +kind: Service +metadata: + name: {{ .AppName }} + namespace: {{ .Namespace }} + labels: + cmii.type: backend + cmii.app: {{ .AppName }} + octopus/control: backend-app-1.0.0 + app.kubernetes.io/managed-by: octopus/control + app.kubernetes.io/app-version: {{ .TagVersion }} +spec: + {{- if .NodePort }} + type: NodePort + {{- else }} + type: ClusterIP + {{- end }} + selector: + cmii.type: backend + cmii.app: {{ .AppName }} + ports: + - name: backend-tcp + port: 8080 + protocol: TCP + targetPort: 8080 + {{- if .NodePort }} + nodePort: {{ .NodePort }} + {{- end }} + ` +const cmiiBackendPVCTemplate = ` +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: {{ .AppName }}-cache + namespace: {{ .Namespace }} + labels: + cmii.type: backend + cmii.app: {{ .AppName }} + octopus/control: backend-app-1.0.0 + app.kubernetes.io/managed-by: octopus/control + app.kubernetes.io/app-version: {{ .TagVersion }} +spec: + storageClassName: nfs-prod-distribute + accessModes: + - ReadWriteMany + volumeMode: Filesystem + resources: + requests: + storage: 15Gi + ` + +const cmiiFrontendDeploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ .AppName }} + namespace: {{ .Namespace }} + labels: + cmii.type: frontend + cmii.app: {{ .AppName }} + octopus/control: frontend-app-1.0.0 + app.kubernetes.io/managed-by: octopus/control + app.kubernetes.io/app-version: {{ .TagVersion }} +spec: + replicas: {{ .Replicas }} + strategy: + rollingUpdate: + maxUnavailable: 1 + selector: + matchLabels: + cmii.type: frontend + cmii.app: {{ .AppName }} + template: + metadata: + labels: + cmii.type: frontend + cmii.app: {{ .AppName }} + spec: + imagePullSecrets: + - name: harborsecret + containers: + - name: {{ .AppName }} + image: "harbor.cdcyy.com.cn/cmii/{{ .AppName }}:{{ .ImageTag }}" + imagePullPolicy: Always + env: + - name: K8S_NAMESPACE + value: {{ .Namespace }} + - name: APPLICATION_NAME + value: {{ .AppName }} + ports: + - name: platform-9528 + containerPort: 9528 + protocol: TCP + resources: + limits: + cpu: "1" + memory: 1Gi + requests: + cpu: 500m + memory: 500Mi + volumeMounts: + - name: nginx-conf + mountPath: /usr/local/nginx/conf/nginx.conf + subPath: nginx.conf + - name: default-nginx-conf + mountPath: /etc/nginx/conf.d/default.conf + subPath: default.conf + - name: tenant-prefix + subPath: ingress-config.js + mountPath: /home/cmii-platform/dist/ingress-config.js + volumes: + - name: nginx-conf + configMap: + name: nginx-cm + items: + - key: nginx.conf + path: nginx.conf + - name: default-nginx-conf + configMap: + name: default-nginx-cm + items: + - key: default.conf + path: default.conf + - name: tenant-prefix + configMap: + name: tenant-prefix-{{ .ShortName }} + items: + - key: ingress-config.js + path: ingress-config.js +` diff --git a/cmii_operator/deploy/CmiiAppDeploy_test.go b/cmii_operator/deploy/CmiiAppDeploy_test.go new file mode 100644 index 0000000..4c45381 --- /dev/null +++ b/cmii_operator/deploy/CmiiAppDeploy_test.go @@ -0,0 +1,19 @@ +package deploy + +import "testing" + +func TestCmiiBackendDeploymentConfig_ParseToApplyConf(t *testing.T) { + + deploymentConfig := CmiiBackendDeploymentConfig{ + Namespace: "uavcloud-dev", + AppName: "cmii-uav-gateway", + ImageTag: "5.2.0-123", + TagVersion: "5.2.0", + Replicas: "2", + NodePort: "31213", + NeedPvcCache: true, + } + + deploymentConfig.ParseToApplyConf() + +} diff --git a/port_forwarding/CHANGELOG b/port_forwarding/CHANGELOG new file mode 100644 index 0000000..4353695 --- /dev/null +++ b/port_forwarding/CHANGELOG @@ -0,0 +1,44 @@ +# Changelog +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [Released] + +## [0.5.0] - 2020-10-22 +### Added +- Refactoring source code strucure for StarLink + +## [Unreleased] + +## [0.5.1] - 2021-04-23 +### Fixed +- In listen-listen mode, actively close the previous socket when there are + multiple connections to the same end + +## [0.4.1] - 2020-10-16 +### Added +- Add multi-channel support of conn-conn mode +### Fixed +- Modify timeout of unsuccessfully established channel in listen-listen + work mode from 16s => 120s + +## [0.4] - 2020-09-25 +### Added +- Add multi-channel support +### Changed +- Optimize code structure and implementation + +## [0.3] - 2020-09-20 +### Added +- Refactoring the project framework into a three-layer structure: + work_mode / protocol / socket + +## [0.2] - 2020-09-10 +### Added +- Refactoring project framework + +## [0.1] - 2020-08-19 +### Added +- Initialize project diff --git a/port_forwarding/Images/mutil_forward.png b/port_forwarding/Images/mutil_forward.png new file mode 100644 index 0000000..7c7d497 Binary files /dev/null and b/port_forwarding/Images/mutil_forward.png differ diff --git a/port_forwarding/Images/portforward_framework.png b/port_forwarding/Images/portforward_framework.png new file mode 100644 index 0000000..c86356b Binary files /dev/null and b/port_forwarding/Images/portforward_framework.png differ diff --git a/port_forwarding/Images/restricted_forward.png b/port_forwarding/Images/restricted_forward.png new file mode 100644 index 0000000..a28a463 Binary files /dev/null and b/port_forwarding/Images/restricted_forward.png differ diff --git a/port_forwarding/Images/simple_forward.png b/port_forwarding/Images/simple_forward.png new file mode 100644 index 0000000..eb8ef20 Binary files /dev/null and b/port_forwarding/Images/simple_forward.png differ diff --git a/port_forwarding/LICENSE b/port_forwarding/LICENSE new file mode 100644 index 0000000..cf2185a --- /dev/null +++ b/port_forwarding/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 Knownsec, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/port_forwarding/README.md b/port_forwarding/README.md new file mode 100644 index 0000000..35a974f --- /dev/null +++ b/port_forwarding/README.md @@ -0,0 +1,190 @@ +# PortForward + +## 0x00 前言 + +`PortForward` 是使用 Golang 进行开发的端口转发工具,解决在某些场景下 `内外网无法互通`的问题。 + +`PortForward` 的功能如下: + + 1. 支持 tcp/udp 协议层的端口转发 + 2. 支持级联 + 3. 支持正向/反向连接模式 + 4. 支持多通道 + 5. 支持 ipv6 + +本文对 `PortForward` 进行了详细的介绍。 + +目录: + +1. 使用说明 +2. 工作场景 +3. 源码结构 +4. 逻辑结构 +5. 端口转发的实现 +6. udp的knock报文 +7. udp的超时设置 +8. listen-listen的超时设置 +9. 多通路的实现 +10. issue +11. contributions + +## 0x01 使用说明 + +**1.使用** + + Usage: + ./portforward [proto] [sock1] [sock2] + Option: + proto the port forward with protocol(tcp/udp) + sock format: [method:address:port] + method the sock mode(listen/conn) + Example: + tcp conn:192.168.1.1:3389 conn:192.168.1.10:23333 + udp listen:192.168.1.3:5353 conn:8.8.8.8:53 + tcp listen:[fe80::1%lo0]:8888 conn:[fe80::1%lo0]:7777 + + version: 0.5.0(build-20201022) + +**2.编译** + + Golang 1.12及以上 + GO111MODULE=on + + git clone https://github.com/knownsec/Portforward.git + ./build.sh + +## 0x02 工作场景 + +这里列举了一些 `PortForward` 的工作场景,如下: + +**2.1 简单模式** +
+ +
[图1.简单转发模式] +
+ +**2.2 受限主机转发** +
+ +
[图2.受限主机转发模式图] +
+ +**2.3 级联端口转发** +
+ +
[图3.级联端口转发] +
+ +## 0x03 源码结构 + + . + ├── CHANGELOG + ├── Images // images resource + ├── README.md + ├── build.sh // compile script + ├── forward.go // portforward main logic + ├── go.mod + ├── log.go // log module + ├── main.go // main, parse arguments + ├── tcp.go // tcp layer + └── udp.go // udp layer + +## 0x04 逻辑结构 + +`PortForward` 支持 `TCP` , `UDP` 协议层的端口转发,代码抽象后逻辑结构框架如下: +
+ +
[图4.整体框架] +
+ +## 0x05 端口转发的实现 + +端口转发程序作为网络传输的中间人,无非就是将两端的 socket 对象进行联通,数据就可以通过这条「链路」进行传输了。 + +按照这样的思路,我们从需求开始分析和抽象,可以这么认为:无论是作为 `tcp` 还是 `udp` 运行,无论是作为 `connect` 还是 `listen` +运行,最终都将获得两个 socket,其中一个连向原服务,另一个与客户端连接;最终将这两端的 socket 联通即可实现端口转发。 + +在 Golang 中我们采用了 `io.Copy()` 来联通两个 socket,但是 `io.Copy` 必须要求对象实现了 `io.Reader/io.Writer` 接口,`tcp` +的 socket 可以直接支持,而 `udp` 的 socket 需要我们进行封装。 + +## 0x06 udp的knock报文 + +在 `udp` 的 `connect` 模式下,我们在连接服务器成功后,立即发送了一个 `knock` 报文,如下: + + conn, err := net.DialTimeout("udp", ... + _, err = conn.Write([]byte("\x00")) + +其作用是通知远程 `udp` 服务器我们已经连上了(`udp` +创建连接后,仅在本地操作系统层进行了注册,只有当发送一个报文到对端后,远程服务器才能感知到新连接),当我们在 `udp` +的 `conn-conn` 模式下运行时,这个报文是必须的。 + +## 0x07 udp的超时设置 + +在 `udp` 的实现中,我们为所有的 `udp` 连接 socket 对象都设置了超时时间(`tcp` 中不需要),这是因为在 `udp` 中,socket +对象无法感知对端退出,如果不设置超时时间,将会一直在 `conn.Read()` 阻塞下去。 + +我们设置了 `udp` 超时时间为 60 秒,当 60 秒无数据传输,本次建立的虚拟通信链路将销毁,端口转发程序将重新创建新的通信链路。 + +## 0x08 listen-listen的超时设置 + +对于 `listen-listen` 模式,需要等待两端的客户端都连上端口转发程序后,才能将两个 socket 进行联通。 + +为此我们在此处设置了 120 秒的超时时间,也就是说当其中一端有客户端连接后,另一端在 120 秒内没有连接,我们就销毁这个未成功建立的通信链路;用户重新连接即可。 + +> 如果没有这个超时,可能某些场景遗留了某个连接,将造成后续的通信链路错位。 + +## 0x09 多通路的实现 + +多通路可以支持同时发起多个连接,这里我们以 `tcp` 作为例子来说明。为了处理这种情况,我们的处理方式是: + +1. `listen-conn`: 每当 listen 服务器接收到新连接后,与远端创建新的连接,并将两个 socket 进行联通。 +2. `listen-listen`: (好像没有实际场景)两端的 listen 服务器接收到新连接后,将两个 socket 进行联通。 +3. `conn-conn`: 创建 sock1 的连接,当 sock1 端收到数据,创建与 sock2 的连接,将两个 socket 进行联通;随后继续创建 sock1 + 连接(预留)。 + +> 我们在 `udp` 中也加入了多通路的支持,和 `tcp` 基本类似,但由于 `udp` 是无连接的,我们不能像 `tcp` 直接联通两个 socket +> 对象。我们在 `udp listen` 服务器中维护了一个临时表,使用 `ip:port` 作为标志,以描述各个通信链路的联通情况,依据此进行流量的分发。 + +## 0x0A issue + +**1.udp的映射表未清空** +udp多通路中的映射表没有对无效数据进行清理,长时间运行可能造成内存占用 + +**2.http服务的 host 字段影响** +当转发 http 服务时,由于常见的客户端在访问时自动将 `host` 设置为访问目标(端口转发程序的地址) +,我们直接对流量进行转发,那么 `host` 字段一定是错误的,某些 http 服务器对该字段进行了校验,所以无法正常转发。 + +> 比如端口转发 `www.baidu.com` + +**3.不支持多端口通信** +这里当然无法处理多端口通信的服务,我们仅做了一个端口的转发,多个端口的情况无法直接处理,比如 `FTP` 服务,其分别使用 20 端口( +数据传输)和 21 端口(命令传输)。 + +但还是要单独说一句,对于大多数 `udp` 服务的实现,是 `udp` 服务器收到了一个新报文,对此进行处理后,然后使用新的 socket +将响应数据发给对端,比如 `TFTP` 服务: + + 1.client 发起请求 [get aaa] + src: 55123 => dst: 69 + 2.tftp 收到报文,进行处理,返回响应: + src: 61234 => dst: 55123 + +这种多端口通信我们也是无法处理的,因为在目前多通路的实现下,上述流程中第 2 步过后,对于端口转发程序来说,后续的报文无法确定转发给 +69 端口还是 61234 端口。 + +## 0x0B contributions + +[r0oike@knownsec 404](https://github.com/r0oike) +[fenix@knownsec 404](https://github.com/13ph03nix) +[0x7F@knownsec 404](https://github.com/0x7Fancy) + +
+ +------------------------------ +References: +内网渗透之端口转发、映射、代理: +内网渗透之端口转发与代理工具总结: +sensepost/reGeorg: +idlefire/ew: + +knownsec404 +2020.10.22 \ No newline at end of file diff --git a/port_forwarding/build.sh b/port_forwarding/build.sh new file mode 100644 index 0000000..79d027d --- /dev/null +++ b/port_forwarding/build.sh @@ -0,0 +1 @@ +go build -o portforward . diff --git a/port_forwarding/forward.go b/port_forwarding/forward.go new file mode 100644 index 0000000..8b32fa7 --- /dev/null +++ b/port_forwarding/forward.go @@ -0,0 +1,356 @@ +/** +* Filename: forward.go +* Description: the PortForward main logic implement, Three working modes are +* provided: +* 1.Conn<=>Conn: dial A remote server, dial B remote server, connected +* 2.Listen<=>Conn: listen local server, dial remote server, connected +* 3.Listen<=>Listen: listen A local server, listen B local server, connected +* Author: knownsec404 +* Time: 2020.09.23 + */ + +package port_forwarding + +import ( + "io" + "net" + "time" +) + +const PORTFORWARD_PROTO_NIL uint8 = 0x00 +const PORTFORWARD_PROTO_TCP uint8 = 0x10 +const PORTFORWARD_PROTO_UDP uint8 = 0x20 + +const PORTFORWARD_SOCK_NIL uint8 = 0x00 +const PORTFORWARD_SOCK_LISTEN uint8 = 0x01 +const PORTFORWARD_SOCK_CONN uint8 = 0x02 + +// the PortForward network interface +type Conn interface { + // Read reads data from the connection. + Read(b []byte) (n int, err error) + // Write writes data to the connection. + Write(b []byte) (n int, err error) + // Close closes the connection. + Close() error + // RemoteAddr returns the remote network address. + RemoteAddr() net.Addr +} + +// the PortForward launch arguemnt +type Args struct { + Protocol uint8 + // sock1 + Method1 uint8 + Addr1 string + // sock2 + Method2 uint8 + Addr2 string +} + +var stop chan bool = nil + +/********************************************************************** +* @Function: Launch(args Args) +* @Description: launch PortForward working mode by arguments +* @Parameter: args Args, the launch arguments +* @Return: nil +**********************************************************************/ +func Launch(args Args) { + // initialize stop channel, the maximum number of coroutines that + // need to be managed is 2 (listen-listen) + stop = make(chan bool, 3) + + // + if args.Method1 == PORTFORWARD_SOCK_CONN && + args.Method2 == PORTFORWARD_SOCK_CONN { + // sock1 conn, sock2 conn + ConnConn(args.Protocol, args.Addr1, args.Addr2) + } else if args.Method1 == PORTFORWARD_SOCK_CONN && + args.Method2 == PORTFORWARD_SOCK_LISTEN { + // sock1 conn, sock2 listen + ListenConn(args.Protocol, args.Addr2, args.Addr1) + } else if args.Method1 == PORTFORWARD_SOCK_LISTEN && + args.Method2 == PORTFORWARD_SOCK_CONN { + // sock1 listen, sock2 conn + ListenConn(args.Protocol, args.Addr1, args.Addr2) + } else if args.Method1 == PORTFORWARD_SOCK_LISTEN && + args.Method2 == PORTFORWARD_SOCK_LISTEN { + // sock1 listen , sock2 listen + ListenListen(args.Protocol, args.Addr1, args.Addr2) + } else { + LogError("unknown forward method") + return + } +} + +/********************************************************************** +* @Function: Shutdown() +* @Description: the shutdown PortForward +* @Parameter: nil +* @Return: nil +**********************************************************************/ +func Shutdown() { + stop <- true +} + +/********************************************************************** +* @Function: ListenConn(proto uint8, addr1 string, addr2 string) +* @Description: "Listen<=>Conn" working mode +* @Parameter: proto uint8, the tcp or udp protocol setting +* @Parameter: addr1 string, the address1 "ip:port" string +* @Parameter: addr2 string, the address2 "ip:port" string +* @Return: nil +**********************************************************************/ +func ListenConn(proto uint8, addr1 string, addr2 string) { + // get sock launch function by protocol + sockfoo1 := ListenTCP + if proto == PORTFORWARD_PROTO_UDP { + sockfoo1 = ListenUDP + } + sockfoo2 := ConnTCP + if proto == PORTFORWARD_PROTO_UDP { + sockfoo2 = ConnUDP + } + + // launch socket1 listen + clientc := make(chan Conn) + quit := make(chan bool, 1) + LogInfo("listen A point with sock1 [%s]", addr1) + go sockfoo1(addr1, clientc, quit) + + var count int = 1 + for { + // socket1 listen & quit signal + var sock1 Conn = nil + select { + case <-stop: + quit <- true + return + case sock1 = <-clientc: + if sock1 == nil { + // set stop flag when error happend + stop <- true + continue + } + } + LogInfo("A point(link%d) [%s] is ready", count, sock1.RemoteAddr()) + + // socket2 dial + LogInfo("dial B point with sock2 [%s]", addr2) + sock2, err := sockfoo2(addr2) + if err != nil { + sock1.Close() + LogError("%s", err) + continue + } + LogInfo("B point(sock2) is ready") + + // connect with sockets + go ConnectSock(count, sock1, sock2) + count += 1 + } // end for +} + +/********************************************************************** +* @Function: ListenListen(proto uint8, addr1 string, addr2 string) +* @Description: the "Listen<=>Listen" working mode +* @Parameter: proto uint8, the tcp or udp protocol setting +* @Parameter: addr1 string, the address1 "ip:port" string +* @Parameter: addr2 string, the address2 "ip:port" string +* @Return: nil +**********************************************************************/ +func ListenListen(proto uint8, addr1 string, addr2 string) { + release := func(s1 Conn, s2 Conn) { + if s1 != nil { + s1.Close() + } + if s2 != nil { + s2.Close() + } + } + + // get sock launch function by protocol + sockfoo := ListenTCP + if proto == PORTFORWARD_PROTO_UDP { + sockfoo = ListenUDP + } + + // launch socket1 listen + clientc1 := make(chan Conn) + quit1 := make(chan bool, 1) + LogInfo("listen A point with sock1 [%s]", addr1) + go sockfoo(addr1, clientc1, quit1) + // launch socket2 listen + clientc2 := make(chan Conn) + quit2 := make(chan bool, 1) + LogInfo("listen B point with sock2 [%s]", addr2) + go sockfoo(addr2, clientc2, quit2) + + var sock1 Conn = nil + var sock2 Conn = nil + var count int = 1 + for { + select { + case <-stop: + quit1 <- true + quit2 <- true + release(sock1, sock2) + return + case c1 := <-clientc1: + if c1 == nil { + // set stop flag when error happend + stop <- true + continue + } + // close the last pending sock1 + if sock1 != nil { + sock1.Close() + } + sock1 = c1 + LogInfo("A point(link%d) [%s] is ready", count, sock1.RemoteAddr()) + case c2 := <-clientc2: + if c2 == nil { + // set stop flag when error happend + stop <- true + continue + } + // close the last pending sock2 + if sock2 != nil { + sock2.Close() + } + sock2 = c2 + LogInfo("B point(link%d) [%s] is ready", count, sock2.RemoteAddr()) + case <-time.After(120 * time.Second): + if sock1 != nil { + LogWarn("A point(%s) socket wait timeout, reset", sock1.RemoteAddr()) + } + if sock2 != nil { + LogWarn("B point(%s) socket wait timeout, reset", sock2.RemoteAddr()) + } + release(sock1, sock2) + continue + } + + // wait another socket ready + if sock1 == nil || sock2 == nil { + continue + } + + // the two socket is ready, connect with sockets + go ConnectSock(count, sock1, sock2) + count += 1 + // reset sock1 & sock2 + sock1 = nil + sock2 = nil + } // end for +} + +/********************************************************************** +* @Function: ConnConn(proto uint8, addr1 string, addr2 string) +* @Description: the "Conn<=>Conn" working mode +* @Parameter: proto uint8, the tcp or udp protocol setting +* @Parameter: addr1 string, the address1 "ip:port" string +* @Parameter: addr2 string, the address2 "ip:port" string +* @Return: nil +**********************************************************************/ +func ConnConn(proto uint8, addr1 string, addr2 string) { + // get sock launch function by protocol + sockfoo := ConnTCP + if proto == PORTFORWARD_PROTO_UDP { + sockfoo = ConnUDP + } + + var count int = 1 + for { + select { + case <-stop: + return + default: + } + + // socket1 dial + LogInfo("dial A point with sock1 [%s]", addr1) + sock1, err := sockfoo(addr1) + if err != nil { + LogError("%s", err) + time.Sleep(16 * time.Second) + continue + } + LogInfo("A point(sock1) is ready") + + // waiting for the first message sent by the A point(sock1) + buf := make([]byte, 32*1024) + n, err := sock1.Read(buf) + if err != nil { + LogError("A point: %s", err) + time.Sleep(16 * time.Second) + continue + } + buf = buf[:n] + + // socket2 dial + LogInfo("dial B point with sock2 [%s]", addr2) + sock2, err := sockfoo(addr2) + if err != nil { + sock1.Close() + LogError("%s", err) + time.Sleep(16 * time.Second) + continue + } + LogInfo("B point(sock2) is ready") + + // first pass in the first message above + _, err = sock2.Write(buf) + if err != nil { + LogError("B point: %s", err) + time.Sleep(16 * time.Second) + continue + } + + // connect with sockets + go ConnectSock(count, sock1, sock2) + count += 1 + } // end for +} + +/********************************************************************** +* @Function: ConnectSock(id int, sock1 Conn, sock2 Conn) +* @Description: connect two sockets, if an error occurs, the socket will +* be closed so that the coroutine can exit normally +* @Parameter: id int, the communication link id +* @Parameter: sock1 Conn, the first socket object +* @Parameter: sock2 Conn, the second socket object +* @Return: nil +**********************************************************************/ +func ConnectSock(id int, sock1 Conn, sock2 Conn) { + exit := make(chan bool, 1) + + // + go func() { + _, err := io.Copy(sock1, sock2) + if err != nil { + LogError("ConnectSock%d(A=>B): %s", id, err) + } else { + LogInfo("ConnectSock%d(A=>B) exited", id) + } + exit <- true + }() + + // + go func() { + _, err := io.Copy(sock2, sock1) + if err != nil { + LogError("ConnectSock%d(B=>A): %s", id, err) + } else { + LogInfo("ConnectSock%d(B=>A) exited", id) + } + exit <- true + }() + + // exit when close either end + <-exit + // close all socket, so that "io.Copy" can exit + sock1.Close() + sock2.Close() +} diff --git a/port_forwarding/go.mod b/port_forwarding/go.mod new file mode 100644 index 0000000..f29e611 --- /dev/null +++ b/port_forwarding/go.mod @@ -0,0 +1,3 @@ +module port_forwarding + +go 1.20 diff --git a/port_forwarding/log.go b/port_forwarding/log.go new file mode 100644 index 0000000..511d75e --- /dev/null +++ b/port_forwarding/log.go @@ -0,0 +1,114 @@ +/** +* Filename: log.go +* Description: the log information format +* Author: knownsec404 +* Time: 2020.08.17 + */ + +package port_forwarding + +import ( + "fmt" + "time" +) + +const ( + LOG_LEVEL_NONE uint32 = 0 + LOG_LEVEL_FATAL uint32 = 1 + LOG_LEVEL_ERROR uint32 = 2 + LOG_LEVEL_WARN uint32 = 3 + LOG_LEVEL_INFO uint32 = 4 + LOG_LEVEL_DEBUG uint32 = 5 +) + +var LOG_LEVEL uint32 = LOG_LEVEL_DEBUG + +/********************************************************************** +* @Function: LogFatal(format string, a ...interface{}) +* @Description: log infomations with fatal level +* @Parameter: format string, the format string template +* @Parameter: a ...interface{}, the value +* @Return: nil +**********************************************************************/ +func LogFatal(format string, a ...interface{}) { + if LOG_LEVEL < LOG_LEVEL_FATAL { + return + } + msg := fmt.Sprintf(format, a...) + msg = fmt.Sprintf("[%s] [FATAL] %s", getCurrentTime(), msg) + fmt.Println(msg) +} + +/********************************************************************** +* @Function: LogError(format string, a ...interface{}) +* @Description: log infomations with error level +* @Parameter: format string, the format string template +* @Parameter: a ...interface{}, the value +* @Return: nil +**********************************************************************/ +func LogError(format string, a ...interface{}) { + if LOG_LEVEL < LOG_LEVEL_WARN { + return + } + msg := fmt.Sprintf(format, a...) + msg = fmt.Sprintf("[%s] [ERROR] %s", getCurrentTime(), msg) + fmt.Println(msg) +} + +/********************************************************************** +* @Function: LogWarn(format string, a ...interface{}) +* @Description: log infomations with warn level +* @Parameter: format string, the format string template +* @Parameter: a ...interface{}, the value +* @Return: nil +**********************************************************************/ +func LogWarn(format string, a ...interface{}) { + if LOG_LEVEL < LOG_LEVEL_WARN { + return + } + msg := fmt.Sprintf(format, a...) + msg = fmt.Sprintf("[%s] [WARN] %s", getCurrentTime(), msg) + fmt.Println(msg) +} + +/********************************************************************** +* @Function: LogInfo(format string, a ...interface{}) +* @Description: log infomations with info level +* @Parameter: format string, the format string template +* @Parameter: a ...interface{}, the value +* @Return: nil +**********************************************************************/ +func LogInfo(format string, a ...interface{}) { + if LOG_LEVEL < LOG_LEVEL_INFO { + return + } + msg := fmt.Sprintf(format, a...) + msg = fmt.Sprintf("[%s] [INFO] %s", getCurrentTime(), msg) + fmt.Println(msg) +} + +/********************************************************************** +* @Function: LogDebug(format string, a ...interface{}) +* @Description: log infomations with debug level +* @Parameter: format string, the format string template +* @Parameter: a ...interface{}, the value +* @Return: nil +**********************************************************************/ +func LogDebug(format string, a ...interface{}) { + if LOG_LEVEL < LOG_LEVEL_DEBUG { + return + } + msg := fmt.Sprintf(format, a...) + msg = fmt.Sprintf("[%s] [DEBUG] %s", getCurrentTime(), msg) + fmt.Println(msg) +} + +/********************************************************************** +* @Function: getCurrentTime() (string) +* @Description: get current time as log format string +* @Parameter: nil +* @Return: string, the current time string +**********************************************************************/ +func getCurrentTime() string { + return time.Now().Format("01-02|15:04:05") +} diff --git a/port_forwarding/main.go b/port_forwarding/main.go new file mode 100644 index 0000000..ff8eb4e --- /dev/null +++ b/port_forwarding/main.go @@ -0,0 +1,115 @@ +/** +* Filename: main.go +* Description: the PortForward main entry point +* It supports tcp/udp protocol layer traffic forwarding, forward/reverse +* creation of forwarding links, and multi-level cascading use. +* Author: knownsec404 +* Time: 2020.09.02 + */ + +package port_forwarding + +import ( + "errors" + "fmt" + "os" + "strings" +) + +const VERSION string = "version: 0.5.0(build-20201022)" + +/********************************************************************** +* @Function: main() +* @Description: the PortForward entry point, parse command-line argument +* @Parameter: nil +* @Return: nil +**********************************************************************/ +func main() { + if len(os.Args) != 4 { + usage() + return + } + proto := os.Args[1] + sock1 := os.Args[2] + sock2 := os.Args[3] + + // parse and check argument + protocol := PORTFORWARD_PROTO_TCP + if strings.ToUpper(proto) == "TCP" { + protocol = PORTFORWARD_PROTO_TCP + } else if strings.ToUpper(proto) == "UDP" { + protocol = PORTFORWARD_PROTO_UDP + } else { + fmt.Printf("unknown protocol [%s]\n", proto) + return + } + + m1, a1, err := parseSock(sock1) + if err != nil { + fmt.Println(err) + return + } + m2, a2, err := parseSock(sock2) + if err != nil { + fmt.Println(err) + return + } + + // launch + args := Args{ + Protocol: protocol, + Method1: m1, + Addr1: a1, + Method2: m2, + Addr2: a2, + } + Launch(args) +} + +/********************************************************************** +* @Function: parseSock(sock string) (uint8, string, error) +* @Description: parse and check sock string +* @Parameter: sock string, the sock string from command-line +* @Return: (uint8, string, error), the method, address and error +**********************************************************************/ +func parseSock(sock string) (uint8, string, error) { + // split "method" and "address" + items := strings.SplitN(sock, ":", 2) + if len(items) != 2 { + return PORTFORWARD_SOCK_NIL, "", + errors.New("host format must [method:address:port]") + } + + method := items[0] + address := items[1] + // check the method field + if strings.ToUpper(method) == "LISTEN" { + return PORTFORWARD_SOCK_LISTEN, address, nil + } else if strings.ToUpper(method) == "CONN" { + return PORTFORWARD_SOCK_CONN, address, nil + } else { + errmsg := fmt.Sprintf("unknown method [%s]", method) + return PORTFORWARD_SOCK_NIL, "", errors.New(errmsg) + } +} + +/********************************************************************** +* @Function: usage() +* @Description: the PortForward usage +* @Parameter: nil +* @Return: nil +**********************************************************************/ +func usage() { + fmt.Println("Usage:") + fmt.Println(" ./portforward [proto] [sock1] [sock2]") + fmt.Println("Option:") + fmt.Println(" proto the port forward with protocol(tcp/udp)") + fmt.Println(" sock format: [method:address:port]") + fmt.Println(" method the sock mode(listen/conn)") + fmt.Println("Example:") + fmt.Println(" tcp conn:192.168.1.1:3389 conn:192.168.1.10:23333") + fmt.Println(" udp listen:192.168.1.3:5353 conn:8.8.8.8:53") + fmt.Println(" tcp listen:[fe80::1%lo0]:8888 conn:[fe80::1%lo0]:7777") + fmt.Println() + fmt.Println(VERSION) +} diff --git a/port_forwarding/tcp.go b/port_forwarding/tcp.go new file mode 100644 index 0000000..e2ca01c --- /dev/null +++ b/port_forwarding/tcp.go @@ -0,0 +1,79 @@ +/** +* Filename: tcp.go +* Description: the PortForward tcp layer implement +* Author: knownsec404 +* Time: 2020.09.23 + */ + +package port_forwarding + +import ( + "net" + "time" +) + +/********************************************************************** +* @Function: ListenTCP(address string, clientc chan Conn, quit chan bool) +* @Description: listen local tcp service, and accept client connection, +* initialize connection and return by channel. +* @Parameter: address string, the local listen address +* @Parameter: clientc chan Conn, new client connection channel +* @Parameter: quit chan bool, the quit signal channel +* @Return: nil +**********************************************************************/ +func ListenTCP(address string, clientc chan Conn, quit chan bool) { + addr, err := net.ResolveTCPAddr("tcp", address) + if err != nil { + LogError("tcp listen error, %s", err) + clientc <- nil + return + } + serv, err := net.ListenTCP("tcp", addr) + if err != nil { + LogError("tcp listen error, %s", err) + clientc <- nil + return + } + // the "conn" has been ready, close "serv" + defer serv.Close() + + for { + // check quit + select { + case <-quit: + return + default: + } + + // set "Accept" timeout, for check "quit" signal + serv.SetDeadline(time.Now().Add(16 * time.Second)) + conn, err := serv.Accept() + if err != nil { + if err, ok := err.(net.Error); ok && err.Timeout() { + continue + } + // others error + LogError("tcp listen error, %s", err) + clientc <- nil + break + } + + // new client is connected + clientc <- conn + } // end for +} + +/********************************************************************** +* @Function: ConnTCP(address string) (Conn, error) +* @Description: dial to remote server, and return tcp connection +* @Parameter: address string, the remote server address that needs to be dialed +* @Return: (Conn, error), the tcp connection and error +**********************************************************************/ +func ConnTCP(address string) (Conn, error) { + conn, err := net.DialTimeout("tcp", address, 10*time.Second) + if err != nil { + return nil, err + } + + return conn, nil +} diff --git a/port_forwarding/tcp_learn/socket_learn.go b/port_forwarding/tcp_learn/socket_learn.go new file mode 100644 index 0000000..af0edd6 --- /dev/null +++ b/port_forwarding/tcp_learn/socket_learn.go @@ -0,0 +1,79 @@ +package tcp_learn + +import ( + "fmt" + "log" + "net" + "time" +) + +func handleConnection(conn net.Conn) { + defer conn.Close() + + buf := make([]byte, 1024) + for { + n, err := conn.Read(buf) + if err != nil { + fmt.Println("Error reading:", err) + break + } + if n > 0 { + fmt.Printf("Received data from %s: %s\n", conn.RemoteAddr(), string(buf[:n])) + } + } +} + +func TcpServer() { + addr, err := net.ResolveTCPAddr("tcp", "localhost:8080") + if err != nil { + + log.Printf("Error accepting connection: %v", err) + } + listener, err := net.ListenTCP("tcp", addr) + if err != nil { + log.Fatalf("Error creating listener: %v", err) + } + defer listener.Close() + listener.SetDeadline(time.Now().Add(2 * time.Minute)) + + for { + conn, err := listener.Accept() + if err != nil { + log.Printf("Error accepting connection: %v", err) + return + } + go handleConnection(conn) + } + +} + +func TcpClient() chan bool { + + exit := make(chan bool, 1) + tick := time.Tick(time.Second) + timeOut := time.After(time.Minute) + + conn, err := net.Dial("tcp", "localhost:8080") + + if err != nil { + log.Fatalf("Error creating listener: %v", err) + return nil + } + + for { + select { + case <-timeOut: + fmt.Println("timeout exit!") + exit <- true + return exit + case <-tick: + + _, err := conn.Write([]byte("Hello from tcp client !")) + if err != nil { + log.Fatalf("Error write tcp : %v", err) + } + } + } + + return exit +} diff --git a/port_forwarding/tcp_learn/socket_learn_test.go b/port_forwarding/tcp_learn/socket_learn_test.go new file mode 100644 index 0000000..3509d40 --- /dev/null +++ b/port_forwarding/tcp_learn/socket_learn_test.go @@ -0,0 +1,23 @@ +package tcp_learn + +import ( + "testing" + "time" +) + +func TestTcpServer(t *testing.T) { + + var exit chan bool + + go func() { + TcpServer() + }() + + go func() { + time.Sleep(time.Second) + + exit = TcpClient() + }() + + <-exit +} diff --git a/port_forwarding/udp.go b/port_forwarding/udp.go new file mode 100644 index 0000000..ab72ee4 --- /dev/null +++ b/port_forwarding/udp.go @@ -0,0 +1,215 @@ +/** +* Filename: udp.go +* Description: the PortForward udp layer implement. +* Author: knownsec404 +* Time: 2020.09.23 + */ + +package port_forwarding + +import ( + "errors" + "net" + "time" +) + +// as UDP client Conn +type UDPDistribute struct { + Established bool + Conn *(net.UDPConn) + RAddr net.Addr + Cache chan []byte +} + +/********************************************************************** +* @Function: NewUDPDistribute(conn *(net.UDPConn), addr net.Addr) (*UDPDistribute) +* @Description: initialize UDPDistribute structure (as UDP client Conn) +* @Parameter: conn *(net.UDPConn), the udp connection object +* @Parameter: addr net.Addr, the udp client remote adddress +* @Return: *UDPDistribute, the new UDPDistribute structure pointer +**********************************************************************/ +func NewUDPDistribute(conn *(net.UDPConn), addr net.Addr) *UDPDistribute { + return &UDPDistribute{ + Established: true, + Conn: conn, + RAddr: addr, + Cache: make(chan []byte, 16), + } +} + +/********************************************************************** +* @Function: (this *UDPDistribute) Close() (error) +* @Description: set "Established" flag is false, the UDP service will +* cleaned up according to certain conditions. +* @Parameter: nil +* @Return: error, the error +**********************************************************************/ +func (this *UDPDistribute) Close() error { + this.Established = false + return nil +} + +/********************************************************************** +* @Function: (this *UDPDistribute) Read(b []byte) (n int, err error) +* @Description: read data from connection, due to the udp implementation of +* PortForward, read here will only produce a timeout error and closed error +* (compared to the normal net.Conn object) +* @Parameter: b []byte, the buffer for receive data +* @Return: (n int, err error), the length of the data read and error +**********************************************************************/ +func (this *UDPDistribute) Read(b []byte) (n int, err error) { + if !this.Established { + return 0, errors.New("udp distrubute has closed") + } + + select { + case <-time.After(16 * time.Second): + return 0, errors.New("udp distrubute read timeout") + case data := <-this.Cache: + n := len(data) + copy(b, data) + return n, nil + } +} + +/********************************************************************** +* @Function: (this *UDPDistribute) Write(b []byte) (n int, err error) +* @Description: write data to connection by "WriteTo()" +* @Parameter: b []byte, the data to be sent +* @Return: (n int, err error), the length of the data write and error +**********************************************************************/ +func (this *UDPDistribute) Write(b []byte) (n int, err error) { + if !this.Established { + return 0, errors.New("udp distrubute has closed") + } + return this.Conn.WriteTo(b, this.RAddr) +} + +/********************************************************************** +* @Function: (this *UDPDistribute) RemoteAddr() (net.Addr) +* @Description: get remote address +* @Parameter: nil +* @Return: net.Addr, the remote address +**********************************************************************/ +func (this *UDPDistribute) RemoteAddr() net.Addr { + return this.RAddr +} + +/********************************************************************** +* @Function: ListenUDP(address string, clientc chan Conn, quit chan bool) +* @Description: listen local udp service, and accept client connection, +* initialize connection and return by channel. +* since udp is running as a service, it only obtains remote data through +* the Read* function cluster (different from tcp), so we need a temporary +* table to record, so that we can use the temporary table to determine +* whether to forward or create a new link +* @Parameter: address string, the local listen address +* @Parameter: clientc chan Conn, new client connection channel +* @Parameter: quit chan bool, the quit signal channel +* @Return: nil +**********************************************************************/ +func ListenUDP(address string, clientc chan Conn, quit chan bool) { + addr, err := net.ResolveUDPAddr("udp", address) + if err != nil { + LogError("udp listen error, %s", err) + clientc <- nil + return + } + serv, err := net.ListenUDP("udp", addr) + if err != nil { + LogError("udp listen error, %s", err) + clientc <- nil + return + } + defer serv.Close() + + // the udp distrubute table + table := make(map[string]*UDPDistribute) + // NOTICE: + // in the process of running, the table will generate invalid historical + // data, we have not cleaned it up(These invalid historical data will not + // affect our normal operation). + // + // if you want to deal with invalid historical data, the best way is to + // launch a new coroutine to handle net.Read, and another coroutine to + // handle the connection status, but additional logic to exit the + // coroutine is needed. Currently we think it is unnecessary. + // + // under the current code logic: if a udp connection exits, it will timeout + // to trigger "Close()", and finally set "Established" to false, but there + // is no logic to check this state to clean up, so invalid historical data + // is generated (of course, we can traverse to clean up? every packet? no) + // + // PS: you can see that we called "delete()" in the following logic, but + // this is only for processing. when a new udp client hits invalid + // historical data, we need to return a new connection so that PortForward + // can create a communication link. + + for { + // check quit + select { + case <-quit: + return + default: + } + + // set timeout, for check "quit" signal + serv.SetDeadline(time.Now().Add(16 * time.Second)) + + // just new 32*1024, in the outer layer we used "io.Copy()", which + // can only handle the size of 32*1024 + buf := make([]byte, 32*1024) + n, addr, err := serv.ReadFrom(buf) + if err != nil { + if err, ok := err.(net.Error); ok && err.Timeout() { + continue + } + LogError("udp listen error, %s", err) + clientc <- nil + return + } + buf = buf[:n] + + // if the address in table, we distrubute message + if d, ok := table[addr.String()]; ok { + if d.Established { + // it is established, distrubute message + d.Cache <- buf + continue + } else { + // we remove it when the connnection has expired + delete(table, addr.String()) + } + } + // if the address not in table, we create new connection object + conn := NewUDPDistribute(serv, addr) + table[addr.String()] = conn + conn.Cache <- buf + clientc <- conn + } // end for +} + +/********************************************************************** +* @Function: ConnUDP(address string) (Conn, error) +* @Description: dial to remote server, and return udp connection +* @Parameter: address string, the remote server address that needs to be dialed +* @Return: (Conn, error), the udp connection and error +**********************************************************************/ +func ConnUDP(address string) (Conn, error) { + conn, err := net.DialTimeout("udp", address, 10*time.Second) + if err != nil { + return nil, err + } + + // send one byte(knock) to server, get "established" udp connection + _, err = conn.Write([]byte("\x00")) + if err != nil { + return nil, err + } + + // due to the characteristics of udp, when the udp server exits, we will + // not receive any signal, it will be blocked at conn.Read(); + // here we set a timeout for udp + conn.SetDeadline(time.Now().Add(60 * time.Second)) + return conn, nil +} diff --git a/server/src/main/java/io/wdd/func/auto/service/BaseFuncScheduler.java b/server/src/main/java/io/wdd/func/auto/service/BaseFuncScheduler.java index c245352..3400886 100644 --- a/server/src/main/java/io/wdd/func/auto/service/BaseFuncScheduler.java +++ b/server/src/main/java/io/wdd/func/auto/service/BaseFuncScheduler.java @@ -2,6 +2,7 @@ package io.wdd.func.auto.service; import io.wdd.func.auto.beans.BaseFunctionEnum; import io.wdd.func.auto.beans.ProjectDeployContext; +import io.wdd.server.beans.po.ServerInfoPO; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; @@ -25,10 +26,9 @@ public class BaseFuncScheduler { beforeRunProcedure(projectDeployContext); - // during run + // during running doRunProcedure(projectDeployContext); - // after run afterRunProcedure(); @@ -63,15 +63,12 @@ public class BaseFuncScheduler { .getMasterNode() .getTopicName(); - if (agentNodeRunProcedure( + return serverDoRunProcedure( true, masterTopicName, projectDeployContext - )) { + ); - return true; - } - return false; } @@ -108,41 +105,40 @@ public class BaseFuncScheduler { List agentTopicNameList = projectDeployContext .getAgentNodeList() .stream() - .map(agentNode -> agentNode.getTopicName()) + .map(ServerInfoPO::getTopicName) .collect(Collectors.toList()); - agentTopicNameList.forEach( - agentTopicName -> { - if (!agentNodeRunProcedure( - false, - agentTopicName, - projectDeployContext - )) { - log.error("{} agent run base func failed !", agentTopicName); - } - } - ); + for (String agentTopicName : agentTopicNameList) { + if (!serverDoRunProcedure( + false, + agentTopicName, + projectDeployContext + )) { + log.error("{} agent run base func failed !", agentTopicName); + return false; + } + } return true; } - private boolean agentNodeRunProcedure(boolean masterNode, String agentTopicName, ProjectDeployContext projectDeployContext) { + private boolean serverDoRunProcedure(boolean masterNode, String agentTopicName, ProjectDeployContext projectDeployContext) { - List agentNodeProcedureList; + List serverRunProcedureList; if (masterNode) { - agentNodeProcedureList = projectDeployContext.getMasterNodeBaseProcedure(); + serverRunProcedureList = projectDeployContext.getMasterNodeBaseProcedure(); } else { - agentNodeProcedureList = projectDeployContext.getAgentNodeBaseProcedure(); + serverRunProcedureList = projectDeployContext.getAgentNodeBaseProcedure(); } - if (CollectionUtils.isEmpty(agentNodeProcedureList)) { + if (CollectionUtils.isEmpty(serverRunProcedureList)) { return true; } ArrayList baseFuncArgList = projectDeployContext.getBaseFunctionArgs(); - for (BaseFunctionEnum durationBaseFunc : agentNodeProcedureList) { + for (BaseFunctionEnum durationBaseFunc : serverRunProcedureList) { // add op name baseFuncArgList.add( @@ -157,7 +153,8 @@ public class BaseFuncScheduler { true )) { log.error( - "Agent Base Func Failed ! => {}", + "Agent {} Base Func Failed ! => {}", + agentTopicName, durationBaseFunc ); return false; @@ -171,5 +168,4 @@ public class BaseFuncScheduler { } - } diff --git a/server/src/test/java/io/wdd/server/func/TestBaseFuncScheduler.java b/server/src/test/java/io/wdd/server/func/TestBaseFuncScheduler.java index cdaff0a..bac81aa 100644 --- a/server/src/test/java/io/wdd/server/func/TestBaseFuncScheduler.java +++ b/server/src/test/java/io/wdd/server/func/TestBaseFuncScheduler.java @@ -17,9 +17,9 @@ import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; -import java.util.ArrayList; import java.util.List; -import java.util.Optional; +import java.util.Map; +import java.util.stream.Collectors; @SpringBootTest public class TestBaseFuncScheduler { @@ -51,47 +51,24 @@ public class TestBaseFuncScheduler { projectDeployContext.setProjectId(projectServerId); -// ServerQueryEntity serverQueryEntity = new ServerQueryEntity(); -// // exsi server -//// serverQueryEntity.setServerName("Chengdu-amd64-99"); -// // lappro -//// serverQueryEntity.setServerName("Chengdu-amd64-65"); -// -// // cqga -// serverQueryEntity.setServerName(); -// ServerInfoPO serverInfoPO = serverService -// .serverGetByPage(serverQueryEntity) -// .getRecords() -// .get(0); - String masterNodeServerName = "Chongqing-amd64-01"; // cgga // String masterNodeServerName = "Chengdu-amd64-99"; // lap pro ProjectServerVO projectServerVO = coreProjectServerService.projectServerOne(projectServerId); + Map> collect = projectServerVO.getBindingServerList().stream().collect( + Collectors.groupingBy( + serverInfoPO -> StringUtils.contains(serverInfoPO.getServerName(), masterNodeServerName) + ) + ); - Optional serverInfoPOOptional = projectServerVO.getBindingServerList().stream().filter( - serverInfoPO -> StringUtils.contains(serverInfoPO.getServerName(), masterNodeServerName) - ).findFirst(); - if (serverInfoPOOptional.isEmpty()) { - System.out.printf("project of %s server of %s is empty", projectServerVO, masterNodeServerName); + if (collect.get(Boolean.TRUE) == null) { + System.out.printf("project of %s master server of %s is empty", projectServerVO, masterNodeServerName); return; } + projectDeployContext.setMasterNode(collect.get(Boolean.TRUE).get(0)); - ServerInfoPO serverInfoPO = serverInfoPOOptional.get(); - System.out.println("serverInfoPO = " + serverInfoPO); - - projectDeployContext.setMasterNode(serverInfoPO); - ArrayList agentNodeList = new ArrayList<>(); - - projectServerVO.getBindingServerList().forEach( - po -> { - if (!StringUtils.contains(po.getServerName(), masterNodeServerName)) { - agentNodeList.add(po); - } - } - ); - projectDeployContext.setAgentNodeList(agentNodeList); + projectDeployContext.setAgentNodeList(collect.get(Boolean.FALSE)); List masterNodeProcedure = List.of(