package k8s_exec import ( "agent-go/g" "agent-go/logger" "agent-go/utils" "context" "k8s.io/api/apps/v1" autoscalingv1 "k8s.io/api/autoscaling/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "strings" "sync" ) var log = logger.Log var pool = g.G.P type CmiiK8sOperator struct { DevClient *kubernetes.Clientset CoreClient *kubernetes.Clientset CurrentNamespace string CurrentClient *kubernetes.Clientset } const ( dev = "uavcloud-dev" devFlight = "uavcloud-devflight" devOperation = "uavcloud-devoperation" validation = "uavcloud-feature" integration = "uavcloud-test" demo = "uavcloud-demo" workerThread = 4 ) func (op *CmiiK8sOperator) checkAndBuildCmiiK8sOperator() { if op.DevClient == nil { log.InfoF("[client] - build devFlight k8s operator client !") devConfig, err := clientcmd.RESTConfigFromKubeConfig([]byte(CmiiDevK8sConfig)) if err != nil { msg := "[client] - build devFlight k8s operator error !" log.Error(msg) panic(msg) } op.DevClient, err = kubernetes.NewForConfig(devConfig) if err != nil { panic(err.Error()) } } if op.CoreClient == nil { log.InfoF("[client] - build core k8s operator client !") coreConfig, err := clientcmd.RESTConfigFromKubeConfig([]byte(CmiiCoreK8sConfig)) if err != nil { msg := "[client] - build devFlight k8s operator error !" log.Error(msg) panic(msg) } op.CoreClient, err = kubernetes.NewForConfig(coreConfig) if err != nil { panic(err.Error()) } } } func (op *CmiiK8sOperator) changeOperatorEnv(cmiiEnv string) { // ok op.checkAndBuildCmiiK8sOperator() if strings.Contains(cmiiEnv, "dev") { op.CurrentClient = op.DevClient } else { op.CurrentClient = op.CoreClient } if strings.Contains(cmiiEnv, "dev") { if strings.Contains(cmiiEnv, "f") { op.CurrentNamespace = devFlight } else if strings.Contains(cmiiEnv, "o") { op.CurrentNamespace = devOperation } else { op.CurrentNamespace = dev } } if strings.Contains(cmiiEnv, "int") || strings.Contains(cmiiEnv, "test") { op.CurrentNamespace = integration } if strings.Contains(cmiiEnv, "fe") || strings.Contains(cmiiEnv, "val") { op.CurrentNamespace = validation } if strings.Contains(cmiiEnv, "demo") { op.CurrentNamespace = demo } if op.CurrentNamespace == "" { op.CurrentNamespace = dev } log.InfoF("[k8s env] - current env is => %s", op.CurrentNamespace) } func (op *CmiiK8sOperator) DeploymentAll(cmiiEnv string) []v1.Deployment { op.changeOperatorEnv(cmiiEnv) client := op.CurrentClient deploymentList, err := client.AppsV1().Deployments(op.CurrentNamespace).List(context.TODO(), metav1.ListOptions{}) if err != nil { log.ErrorF("[DeploymentAll] - list deployment in [%s] [%s] error => %s", cmiiEnv, op.CurrentNamespace, err.Error()) } deployments := deploymentList.Items length := len(deployments) log.InfoF("[DeploymentAll] - deployment in [%s] count is => %d", op.CurrentNamespace, length) results := make([]v1.Deployment, length) //var results []v1.Deployment //ccc := make(chan v1.Deployment, length) var wg sync.WaitGroup //var mutex sync.Mutex worker := workerThread if length <= worker { for i, deployment := range deployments { objectMeta := deployment.ObjectMeta objectMeta.SetAnnotations(nil) objectMeta.SetManagedFields(nil) deployment.ObjectMeta = objectMeta results[i] = deployment } return results } pinch := (length + worker - 1) / worker wg.Add(worker) for splice := 0; splice < worker; splice++ { // 计算每个goroutine处理的切片段 start := splice * pinch end := start + pinch if end > length { end = length } //log.DebugF("[DeploymentAll] - deployment pinch from [%d - %d]", start, end) go func(deploymentList []v1.Deployment, start int, results *[]v1.Deployment) { for index, deployment := range deploymentList { objectMeta := deployment.ObjectMeta objectMeta.SetAnnotations(nil) objectMeta.SetManagedFields(nil) deployment.ObjectMeta = objectMeta //ccc <- deployment i := *results i[index+start] = deployment } wg.Done() }(deployments[start:end], start, &results) } //go func() { // wg.Wait() // close(ccc) //}() wg.Wait() //for deployment := range ccc { // results = append(results, deployment) //} return results } func (op *CmiiK8sOperator) DeploymentAllInterface(cmiiEnv string) []CmiiDeploymentInterface { op.changeOperatorEnv(cmiiEnv) client := op.CurrentClient deploymentList, err := client.AppsV1().Deployments(op.CurrentNamespace).List(context.TODO(), metav1.ListOptions{}) if err != nil { log.ErrorF("[DeploymentAllInterface] - list deployment in [%s] [%s] error => %s", cmiiEnv, op.CurrentNamespace, err.Error()) } deployments := deploymentList.Items length := len(deployments) results := make([]CmiiDeploymentInterface, length) ccc := make(chan CmiiDeploymentInterface, length) var wg sync.WaitGroup worker := workerThread if length <= worker { for i, deployment := range deployments { objectMeta := deployment.ObjectMeta objectMeta.SetAnnotations(nil) objectMeta.SetManagedFields(nil) deployment.ObjectMeta = objectMeta results[i] = CmiiDeploymentInterface{}.Convert(deployment) } return results } pinch := length / worker wg.Add(worker) for splice := 0; splice < worker; splice++ { go func(deploymentList []v1.Deployment) { defer wg.Done() for _, deployment := range deploymentList { objectMeta := deployment.ObjectMeta objectMeta.SetAnnotations(nil) objectMeta.SetManagedFields(nil) deployment.ObjectMeta = objectMeta ccc <- CmiiDeploymentInterface{}.Convert(deployment) } }(deployments[splice*pinch : utils.MinInt((splice+1)*pinch, length)]) } go func() { wg.Wait() close(ccc) }() index := 0 for deployment := range ccc { results[index] = deployment index++ } return results } func (op *CmiiK8sOperator) DeploymentFizz(cmiiEnv, appFizz string) (fizzDeployment []v1.Deployment) { deploymentAll := op.DeploymentAll(cmiiEnv) if deploymentAll == nil { log.ErrorF("[DeploymentFizz] - namespace [%s] can not get deployment [%s]", cmiiEnv, appFizz) return nil } for _, deployment := range deploymentAll { if strings.Contains(deployment.Name, appFizz) { fizzDeployment = append(fizzDeployment, deployment) } } return fizzDeployment } func (op *CmiiK8sOperator) DeploymentExist(cmiiEnv, appName string) (exists *v1.Deployment) { client := op.CurrentClient deployment, err := client.AppsV1().Deployments(op.CurrentNamespace).Get(context.TODO(), appName, metav1.GetOptions{}) if err != nil { log.ErrorF("[DeploymentExist] - deployments [%s] [%s] not exists ! %s", cmiiEnv, appName, err.Error()) return nil } return deployment } func (op *CmiiK8sOperator) DeploymentScale(cmiiEnv, appFizz string, scaleCount int32) bool { deploymentFizz := op.DeploymentFizz(cmiiEnv, appFizz) client := op.CurrentClient for _, deployment := range deploymentFizz { log.DebugF("[DeploymentScale] - start to scale [%s] [%s] to %d", deployment.Namespace, deployment.Name, scaleCount) scale := &autoscalingv1.Scale{ ObjectMeta: metav1.ObjectMeta{ Name: deployment.Name, Namespace: deployment.Namespace, }, Spec: autoscalingv1.ScaleSpec{ Replicas: int32(scaleCount), }, } updateScale, err := client.AppsV1().Deployments(deployment.Namespace).UpdateScale( context.TODO(), deployment.Name, scale, metav1.UpdateOptions{}, ) if err != nil { log.ErrorF("[DeploymentScale] - scale error %s", err.Error()) return false } log.InfoF("[DeploymentScale] - scale of [%s] [%s] to %d success !", updateScale.Namespace, updateScale.Name, scaleCount) } return true } func (op *CmiiK8sOperator) ReplicaSet(cmiiEnv, replicaSetName string) *v1.ReplicaSet { op.changeOperatorEnv(cmiiEnv) client := op.CurrentClient replicaSet, err := client.AppsV1().ReplicaSets(op.CurrentNamespace).Get(context.TODO(), replicaSetName, metav1.GetOptions{}) if err != nil { return nil } return replicaSet } func (op *CmiiK8sOperator) PodAll(cmiiEnv string) []corev1.Pod { op.changeOperatorEnv(cmiiEnv) client := op.CurrentClient podList, err := client.CoreV1().Pods(op.CurrentNamespace).List(context.TODO(), metav1.ListOptions{}) if err != nil { log.ErrorF("[PodAll] - list c in [%s] [%s] error => %s", cmiiEnv, op.CurrentNamespace, err.Error()) return nil } pods := podList.Items length := len(pods) results := make([]corev1.Pod, length) ccc := make(chan corev1.Pod, length) var wg sync.WaitGroup worker := workerThread if length <= worker { for i, pod := range pods { objectMeta := pod.ObjectMeta objectMeta.SetAnnotations(nil) objectMeta.SetManagedFields(nil) pod.ObjectMeta = objectMeta results[i] = pod } return results } pinch := length / worker wg.Add(worker) for splice := 0; splice < worker; splice++ { go func(podList []corev1.Pod) { defer wg.Done() for _, pod := range podList { objectMeta := pod.ObjectMeta objectMeta.SetAnnotations(nil) objectMeta.SetManagedFields(nil) pod.ObjectMeta = objectMeta ccc <- pod } }(pods[splice*pinch : utils.MinInt((splice+1)*pinch, length)]) } go func() { wg.Wait() close(ccc) }() index := 0 for c := range ccc { results[index] = c index++ } return results } func (op *CmiiK8sOperator) PodFizz(cmiiEnv, appFizz string) (fizzPod []corev1.Pod) { podAll := op.PodAll(cmiiEnv) if podAll == nil { log.ErrorF("[DeploymentFizz] - no app find in [%s] !", cmiiEnv) return nil } for _, pod := range podAll { if strings.Contains(pod.Name, appFizz) { fizzPod = append(fizzPod, pod) } } return fizzPod }