Files
WddSuperAgent/agent-operator/CmiiK8sOperator.go

1142 lines
31 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"bytes"
"context"
"fmt"
v1 "k8s.io/api/apps/v1"
autoscalingv1 "k8s.io/api/autoscaling/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/api/policy/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/remotecommand"
"os"
"os/exec"
"runtime"
"strings"
"sync"
"time"
image2 "wdd.io/agent-common/image"
"wdd.io/agent-common/utils"
"wdd.io/agent-operator/config"
)
// CmiiK8sOperator 操作CMII的k8s集群的实体类
type CmiiK8sOperator struct {
DevClient *kubernetes.Clientset
CoreClient *kubernetes.Clientset
CurrentNamespace string
CurrentClient *kubernetes.Clientset
DevConfig *restclient.Config
CoreConfig *restclient.Config
CurrentConfig *restclient.Config
}
var CmiiDevClusterK8sConfig string // used for kubectl
var CmiiCoreClusterK8sConfig string // used for kubectl
const (
workerThread = 4
)
var podDeleteGratefulTime = int64(120)
func init() {
switch runtime.GOOS {
case "windows":
CmiiDevClusterK8sConfig = "C:\\Users\\wddsh\\Documents\\IdeaProjects\\ProjectOctopus\\agent-operator\\config\\cmii-Dev-cluster.yaml"
CmiiCoreClusterK8sConfig = "C:\\Users\\wddsh\\Documents\\IdeaProjects\\ProjectOctopus\\agent-operator\\config\\cmii-core-cluster.yaml"
case "linux":
CmiiDevClusterK8sConfig = "/home/wdd/IdeaProjects/ProjectOctopus/agent-operator/config/cmii-Dev-cluster.yaml"
CmiiCoreClusterK8sConfig = "/home/wdd/IdeaProjects/ProjectOctopus/agent-operator/config/cmii-core-cluster.yaml"
}
}
func GetK8sConfigByEnv(cmiiEnv string) string {
switch cmiiEnv {
case config.Dev:
return CmiiDevClusterK8sConfig
case config.DevFlight:
return CmiiDevClusterK8sConfig
case config.DevOperation:
return CmiiDevClusterK8sConfig
case config.Integration:
return CmiiCoreClusterK8sConfig
case config.Uat:
return CmiiCoreClusterK8sConfig
case config.Validation:
return CmiiCoreClusterK8sConfig
case config.Uavms:
return CmiiCoreClusterK8sConfig
case config.Demo:
return CmiiCoreClusterK8sConfig
default:
return CmiiDevClusterK8sConfig
}
}
func DeleteByKubectl(applyFilePath string, cmiiEnv string) bool {
kubeconfig := GetK8sConfigByEnv(cmiiEnv)
cmd := exec.Command("kubectl", "delete", "-f", applyFilePath)
cmd.Env = append(os.Environ(), fmt.Sprintf("KUBECONFIG=%s", kubeconfig))
output, err := cmd.CombinedOutput()
if err != nil {
log.ErrorF("failed to delete resources in file %s: %v\n%s", applyFilePath, err, output)
return false
}
log.InfoF("successfully deleted resources in file %s", applyFilePath)
fmt.Println(string(output))
return true
}
func ApplyByKubectl(applyFilePath string, cmiiEnv string) bool {
kubeconfig := GetK8sConfigByEnv(cmiiEnv)
cmd := exec.Command("kubectl", "apply", "-f", applyFilePath)
cmd.Env = append(os.Environ(), fmt.Sprintf("KUBECONFIG=%s", kubeconfig))
output, err := cmd.CombinedOutput()
if err != nil {
fmt.Println("failed apply file => " + applyFilePath)
fmt.Println(string(output))
fmt.Println()
return false
}
fmt.Println("success apply file => " + applyFilePath)
fmt.Println(string(output))
fmt.Println()
return true
}
func GetNodeWideByKubectl(cmiiEnv string) {
kubeconfig := GetK8sConfigByEnv(cmiiEnv)
cmd := exec.Command("kubectl", "get", "nodes", "-o", "wide")
cmd.Env = append(os.Environ(), fmt.Sprintf("KUBECONFIG=%s", kubeconfig))
output, err := cmd.CombinedOutput()
if err != nil {
log.ErrorF("failed to get nodes: %v\n%s", err, output)
return
}
fmt.Println(string(output))
}
func (op *CmiiK8sOperator) checkAndBuildCmiiK8sOperator() {
if op.DevClient == nil {
log.InfoF("[client] - build DevFlight k8s operator client !")
devConfig, err := clientcmd.RESTConfigFromKubeConfig([]byte(config.CmiiDevK8sConfig))
if err != nil {
msg := "[client] - build DevFlight k8s operator error !"
log.Error(msg)
panic(msg)
}
op.DevConfig = devConfig
op.DevClient, err = kubernetes.NewForConfig(devConfig)
if err != nil {
panic(err.Error())
}
}
if op.CoreClient == nil {
log.InfoF("[client] - build core k8s operator client !")
coreConfig, err := clientcmd.RESTConfigFromKubeConfig([]byte(config.CmiiCoreK8sConfig))
if err != nil {
msg := "[client] - build DevFlight k8s operator error !"
log.Error(msg)
panic(msg)
}
op.CoreConfig = coreConfig
op.CoreClient, err = kubernetes.NewForConfig(coreConfig)
if err != nil {
panic(err.Error())
}
}
}
func (op *CmiiK8sOperator) changeOperatorEnv(cmiiEnv string) {
// ok
op.checkAndBuildCmiiK8sOperator()
// speed up
if op.CurrentNamespace == cmiiEnv {
return
}
// first key
op.CurrentNamespace = config.DevOperation
if strings.Contains(cmiiEnv, "dev") {
if strings.Contains(cmiiEnv, "devf") {
op.CurrentNamespace = config.DevFlight
} else if strings.Contains(cmiiEnv, "devo") {
op.CurrentNamespace = config.DevOperation
} else {
op.CurrentNamespace = config.Dev
}
}
if strings.Contains(cmiiEnv, "int") || strings.Contains(cmiiEnv, "test") {
op.CurrentNamespace = config.Integration
}
if strings.Contains(cmiiEnv, "fe") || strings.Contains(cmiiEnv, "val") {
op.CurrentNamespace = config.Validation
}
if strings.Contains(cmiiEnv, "uat") {
op.CurrentNamespace = config.Uat
}
if strings.Contains(cmiiEnv, "demo") {
op.CurrentNamespace = config.Demo
// 2025年4月7日 特殊情况
log.Info("2025年4月7日 特殊情况 demo现在在DEV环境")
op.CurrentClient = op.DevClient
return
}
if strings.Contains(cmiiEnv, "uavms") {
op.CurrentNamespace = config.Uavms
}
// key feature
if op.CurrentNamespace == "" {
op.CurrentNamespace = cmiiEnv
} else {
if strings.Contains(cmiiEnv, "dev") {
op.CurrentClient = op.DevClient
op.CurrentConfig = op.DevConfig
} else {
op.CurrentClient = op.CoreClient
op.CurrentConfig = op.CoreConfig
}
}
log.InfoF("[k8s env] - current env is => %s", op.CurrentNamespace)
}
func (op *CmiiK8sOperator) BuildCurrentClientFromConfig(realClientConfig []byte) {
log.InfoF("[BuildCurrentClientFromConfig] - build real k8s operator client !")
realEnvConfig, err := clientcmd.RESTConfigFromKubeConfig(realClientConfig)
if err != nil {
msg := "[BuildCurrentClientFromConfig] - build real k8s operator error !"
log.Error(msg)
panic(msg)
}
op.CurrentConfig = realEnvConfig
op.CurrentClient, err = kubernetes.NewForConfig(realEnvConfig)
if err != nil {
panic(err.Error())
}
}
func (op *CmiiK8sOperator) DeploymentAll(cmiiEnv string) []v1.Deployment {
op.changeOperatorEnv(cmiiEnv)
client := op.CurrentClient
deploymentList, err := client.AppsV1().Deployments(op.CurrentNamespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
log.ErrorF("[DeploymentAll] - list deployment in [%s] [%s] error => %s", cmiiEnv, op.CurrentNamespace, err.Error())
}
deployments := deploymentList.Items
length := len(deployments)
log.InfoF("[DeploymentAll] - deployment in [%s] count is => %d", op.CurrentNamespace, length)
results := make([]v1.Deployment, length)
ccc := make(chan v1.Deployment, length)
var wg sync.WaitGroup
//var mutex sync.Mutex
worker := workerThread
if length <= worker {
for i, deployment := range deployments {
objectMeta := deployment.ObjectMeta
objectMeta.SetAnnotations(nil)
objectMeta.SetManagedFields(nil)
deployment.ObjectMeta = objectMeta
results[i] = deployment
}
return results
}
pinch := (length + worker - 1) / worker
wg.Add(worker)
for splice := 0; splice < worker; splice++ {
// 计算每个goroutine处理的切片段
start := splice * pinch
end := start + pinch
if end > length {
end = length
}
//log.DebugF("[DeploymentAll] - deployment pinch from [%d - %d]", start, end)
go func(deploymentList []v1.Deployment, start int, results *[]v1.Deployment) {
for _, deployment := range deploymentList {
objectMeta := deployment.ObjectMeta
objectMeta.SetAnnotations(nil)
objectMeta.SetManagedFields(nil)
deployment.ObjectMeta = objectMeta
ccc <- deployment
//i := *results
//i[index+start] = deployment
}
wg.Done()
}(deployments[start:end], start, &results)
}
go func() {
wg.Wait()
close(ccc)
}()
wg.Wait()
index := 0
for deployment := range ccc {
results[index] = deployment
index++
}
return results[:index]
}
func (op *CmiiK8sOperator) DeploymentAllInterface(cmiiEnv string) []config.CmiiDeploymentInterface {
op.changeOperatorEnv(cmiiEnv)
client := op.CurrentClient
deploymentList, err := client.AppsV1().Deployments(op.CurrentNamespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
log.ErrorF("[DeploymentAllInterface] - list deployment in [%s] [%s] error => %s", cmiiEnv, op.CurrentNamespace, err.Error())
}
deployments := deploymentList.Items
length := len(deployments)
results := make([]config.CmiiDeploymentInterface, length)
ccc := make(chan config.CmiiDeploymentInterface, length)
var wg sync.WaitGroup
worker := workerThread
if length <= worker {
for i, deployment := range deployments {
objectMeta := deployment.ObjectMeta
objectMeta.SetAnnotations(nil)
objectMeta.SetManagedFields(nil)
deployment.ObjectMeta = objectMeta
results[i] = config.CmiiDeploymentInterface{}.Convert(deployment)
}
return results
}
pinch := length / worker
wg.Add(worker)
for splice := 0; splice < worker; splice++ {
go func(deploymentList []v1.Deployment) {
defer wg.Done()
for _, deployment := range deploymentList {
objectMeta := deployment.ObjectMeta
objectMeta.SetAnnotations(nil)
objectMeta.SetManagedFields(nil)
deployment.ObjectMeta = objectMeta
ccc <- config.CmiiDeploymentInterface{}.Convert(deployment)
}
}(deployments[splice*pinch : utils.MinInt((splice+1)*pinch, length)])
}
go func() {
wg.Wait()
close(ccc)
}()
index := 0
for deployment := range ccc {
results[index] = deployment
index++
}
return results[:index]
}
func (op *CmiiK8sOperator) DeploymentFizz(cmiiEnv, appFizz string) (fizzDeployment []v1.Deployment) {
deploymentAll := op.DeploymentAll(cmiiEnv)
if deploymentAll == nil {
log.ErrorF("[DeploymentFizz] - namespace [%s] can not get deployment [%s]", cmiiEnv, appFizz)
return nil
}
for _, deployment := range deploymentAll {
if strings.Contains(deployment.Name, appFizz) {
fizzDeployment = append(fizzDeployment, deployment)
}
}
return fizzDeployment
}
func (op *CmiiK8sOperator) DeploymentExist(cmiiEnv, appName string) (exists *v1.Deployment) {
op.changeOperatorEnv(cmiiEnv)
client := op.CurrentClient
deployment, err := client.AppsV1().Deployments(op.CurrentNamespace).Get(context.TODO(), appName, metav1.GetOptions{})
if err != nil {
log.ErrorF("[DeploymentExist] - deployments [%s] [%s] not exists ! %s", cmiiEnv, appName, err.Error())
return nil
}
return deployment
}
func (op *CmiiK8sOperator) DeploymentOneInterface(cmiiEnv, appName string) (deploy *config.CmiiDeploymentInterface) {
op.changeOperatorEnv(cmiiEnv)
client := op.CurrentClient
deploymentInterface := config.CmiiDeploymentInterface{}
deployment, err := client.AppsV1().Deployments(op.CurrentNamespace).Get(context.TODO(), appName, metav1.GetOptions{})
if err != nil {
log.ErrorF("[DeploymentExist] - deployments [%s] [%s] not exists ! %s", cmiiEnv, appName, err.Error())
return nil
}
convert := deploymentInterface.Convert(*deployment)
return &convert
}
func (op *CmiiK8sOperator) StatefulSetOneInterface(cmiiEnv, appName string) (deploy *config.CmiiDeploymentInterface) {
op.changeOperatorEnv(cmiiEnv)
client := op.CurrentClient
deploymentInterface := config.CmiiDeploymentInterface{}
statefulSet, err := client.AppsV1().StatefulSets(op.CurrentNamespace).Get(context.TODO(), appName, metav1.GetOptions{})
if err != nil {
log.ErrorF("[StatefulSetOneInterface] - stateful set [%s] [%s] not exists ! %s", cmiiEnv, appName, err.Error())
return nil
}
convert := deploymentInterface.ConvertFromStatefulSet(*statefulSet)
return &convert
}
func (op *CmiiK8sOperator) DeploymentScale(cmiiEnv, appName string, scaleCount int32) bool {
deployment := op.DeploymentOneInterface(cmiiEnv, appName)
client := op.CurrentClient
log.DebugF("[DeploymentScale] - start to scale [%s] [%s] to %d", deployment.Namespace, deployment.Name, scaleCount)
scale := &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: deployment.Name,
Namespace: deployment.Namespace,
},
Spec: autoscalingv1.ScaleSpec{
Replicas: scaleCount,
},
}
updateScale, err := client.AppsV1().Deployments(deployment.Namespace).UpdateScale(
context.TODO(),
deployment.Name,
scale,
metav1.UpdateOptions{},
)
if err != nil {
log.ErrorF("[DeploymentScale] - scale error %s", err.Error())
return false
}
log.InfoF("[DeploymentScale] - scale of [%s] [%s] to %d success !", updateScale.Namespace, updateScale.Name, scaleCount)
return true
}
func (op *CmiiK8sOperator) DeploymentUpdateTagByImageFullName(cmiiEnv, imageFullName string) bool {
// todo
split := strings.Split(imageFullName, ":")
// harbor
// 192.168.6.6:8033/rancher/k8s-dns-sidecar:v1.0.2
newTag := split[1]
appName := strings.Split(split[0], "/")[len(strings.Split(split[0], "/"))-1]
if strings.Contains(imageFullName, "8033") {
newTag = split[2]
appName = strings.Split(split[1], "/")[len(strings.Split(split[1], "/"))-1]
}
// 拿到AppName
log.DebugF("DeploymentUpdateTagByImageFullName - appName => %s, newTag => %s", appName, newTag)
return op.DeploymentUpdateTag(cmiiEnv, appName, newTag)
}
// DeploymentUpdateTag 更新一个Deployment的Tag返回true或者false。 同时更新IMAGE_VERSION BIZ_GROUP
func (op *CmiiK8sOperator) DeploymentUpdateTag(cmiiEnv, appName, newTag string) bool {
if newTag == "" {
log.WarnF("[DeploymentUpdateTag] - can not update image tag to null!")
return false
}
deployment := op.DeploymentExist(cmiiEnv, appName)
if deployment == nil {
return false
}
containers := deployment.Spec.Template.Spec.Containers
if len(containers) >= 2 {
log.ErrorF("[DeploymentUpdateTag] - cant update app with 2 containers !")
return false
}
// 只支持container的数量为1的形式
container := containers[0]
oldName := container.Image
split := strings.Split(container.Image, ":")
if strings.HasPrefix(container.Image, image2.CmiiHarborPrefix) {
// harbor
container.Image = split[0] + ":" + newTag
} else if strings.Contains(container.Image, "8033") {
// 192.168.6.6:8033/rancher/k8s-dns-sidecar:v1.0.2
// 重新拼接
container.Image = split[0] + ":" + split[1] + ":" + newTag
}
log.DebugF("[DeploymentUpdateTag] - update [%s] [%s] from [%s] to [%s]", op.CurrentNamespace, appName, oldName, container.Image)
// 更新Cmii BIZ_GROUP
tagVersion := newTag
if strings.Contains(newTag, "-") {
// 5.0.0-1243
// 5.0.0-1243-1234
tagVersion = strings.Split(newTag, "-")[0]
}
envList := container.Env
for index, envVar := range envList {
if envVar.Name == "IMAGE_VERSION" {
envList[index].Value = tagVersion
}
if envVar.Name == "BIZ_CONFIG_GROUP" {
envList[index].Value = tagVersion
}
if envVar.Name == "SYS_CONFIG_GROUP" {
envList[index].Value = tagVersion
}
}
log.DebugF("[DeploymentUpdateTag] - update env IMAGE_VERSION to [%s]", tagVersion)
// 赋值回去 很关键
deployment.Spec.Template.Spec.Containers[0] = container
// update
_, err := op.CurrentClient.AppsV1().Deployments(deployment.Namespace).Update(context.TODO(), deployment, metav1.UpdateOptions{})
if err != nil {
log.ErrorF("[DeploymentUpdateTag] - update [%s] [%s] from [%s] to [%s] error ! %s", op.CurrentNamespace, appName, split[1], container.Image, err.Error())
return false
}
return true
}
func (op *CmiiK8sOperator) DeploymentRestart(cmiiEnv, appName string) bool {
op.changeOperatorEnv(cmiiEnv)
result := true
deployment := op.DeploymentOneInterface(cmiiEnv, appName)
if deployment == nil {
log.ErrorF("[DeploymentRestart] - [%s] [%s] not exists !", cmiiEnv, appName)
return false
}
result = op.DeploymentScale(deployment.Namespace, deployment.Name, 0)
if !result {
return result
}
time.Sleep(time.Millisecond * 200)
result = op.DeploymentScale(deployment.Namespace, deployment.Name, deployment.Replicas)
if !result {
return result
}
return result
}
func (op *CmiiK8sOperator) DeploymentRestartByKill(cmiiEnv, appName string) bool {
deployment := op.DeploymentOneInterface(cmiiEnv, appName)
if deployment == nil {
log.ErrorF("[DeploymentRestart] - [%s] [%s] not exists !", cmiiEnv, appName)
return false
}
podList := op.PodByAppName(deployment.Namespace, deployment.Name)
if podList == nil {
log.ErrorF("[DeploymentRestart] - [%s] [%s] no pod success !", deployment.Namespace, deployment.Name)
return true
}
for _, podInterface := range podList {
if !op.PodDelete(cmiiEnv, podInterface.Name) {
log.ErrorF("[DeploymentRestart] - [%s] [%s] delete pod failed !", podInterface.Namespace, podInterface.Name)
} else {
log.DebugF("[DeploymentRestart] - [%s] [%s] delete pod success !", podInterface.Namespace, podInterface.Name)
}
// wait for deployment to be ready
check := op.DeploymentStatusCheck(cmiiEnv, appName, 300)
if !check {
log.ErrorF("[DeploymentRestart] - [%s] [%s] 重启pod启动失败!", podInterface.Namespace, podInterface.Name)
return false
}
}
return true
}
func (op *CmiiK8sOperator) DeploymentNew(cmiiEnv, appName string, waitTimeOut int) bool {
op.changeOperatorEnv(cmiiEnv)
//op.CurrentClient.AppsV1().Deployments(op.CurrentNamespace).Apply()
return true
}
func (op *CmiiK8sOperator) DeploymentStatusCheck(cmiiEnv, appName string, waitTimeOut int) bool {
op.changeOperatorEnv(cmiiEnv)
// 设置超时时间和时间间隔
timeout := time.After(time.Duration(waitTimeOut) * time.Second)
tick := time.Tick(4 * time.Second)
// 监控Pod状态
for {
select {
case <-timeout:
log.ErrorF("[DeploymentStatusCheck] - [%s] [%s] 状态: 失败!", cmiiEnv, appName)
return false
case <-tick:
// check deployment exists
deployment := op.DeploymentOneInterface(cmiiEnv, appName)
if deployment == nil {
log.ErrorF("[DeploymentStatusCheck] - [%s] [%s] not exists !", cmiiEnv, appName)
return false
}
if deployment.AvailableReplicas == deployment.Replicas {
log.InfoF("[DeploymentStatusCheck] - [%s] [%s] Available: %d, Total: %d success !", deployment.Namespace, deployment.Name, deployment.AvailableReplicas, deployment.Replicas)
return true
}
log.DebugF("[DeploymentStatusCheck] - [%s] [%s] Available: %d, Total: %d waiting !", deployment.Namespace, deployment.Name, deployment.AvailableReplicas, deployment.Replicas)
}
}
}
func (op *CmiiK8sOperator) ReplicaSetExists(cmiiEnv, replicaSetName string) *v1.ReplicaSet {
op.changeOperatorEnv(cmiiEnv)
client := op.CurrentClient
replicaSet, err := client.AppsV1().ReplicaSets(op.CurrentNamespace).Get(context.TODO(), replicaSetName, metav1.GetOptions{})
if err != nil {
log.ErrorF("[ReplicaSetExists] - [%s] [%s] not exists !", cmiiEnv, replicaSetName)
return nil
}
return replicaSet
}
func (op *CmiiK8sOperator) ReplicaSetByAppName(cmiiEnv, appName string) (replicaList []v1.ReplicaSet) {
deploy := op.DeploymentExist(cmiiEnv, appName)
if deploy == nil {
log.ErrorF("[ReplicaSetByAppName] - [%s] [%s] app not exists !", cmiiEnv, appName)
return nil
}
labelSelector := metav1.FormatLabelSelector(deploy.Spec.Selector)
//Get the replica sets that belong to the deployment.
replicaSets, err := op.CurrentClient.AppsV1().ReplicaSets(op.CurrentNamespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
log.ErrorF("[ReplicaSetByAppName] - [%s] [%s] list replicaset error %s", cmiiEnv, appName, err.Error())
return nil
}
for _, replicaSet := range replicaSets.Items {
replicaSet.SetManagedFields(nil)
replicaList = append(replicaList, replicaSet)
}
return replicaList
}
func (op *CmiiK8sOperator) PodAll(cmiiEnv string) []corev1.Pod {
op.changeOperatorEnv(cmiiEnv)
client := op.CurrentClient
podList, err := client.CoreV1().Pods(op.CurrentNamespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
log.ErrorF("[PodAll] - list c in [%s] [%s] error => %s", cmiiEnv, op.CurrentNamespace, err.Error())
return nil
}
pods := podList.Items
length := len(pods)
results := make([]corev1.Pod, length)
ccc := make(chan corev1.Pod, length)
var wg sync.WaitGroup
worker := workerThread
if length <= worker {
for i, pod := range pods {
objectMeta := pod.ObjectMeta
objectMeta.SetAnnotations(nil)
objectMeta.SetManagedFields(nil)
pod.ObjectMeta = objectMeta
results[i] = pod
}
return results
}
pinch := length / worker
wg.Add(worker)
for splice := 0; splice < worker; splice++ {
go func(podList []corev1.Pod) {
defer wg.Done()
for _, pod := range podList {
objectMeta := pod.ObjectMeta
objectMeta.SetAnnotations(nil)
objectMeta.SetManagedFields(nil)
pod.ObjectMeta = objectMeta
ccc <- pod
}
}(pods[splice*pinch : utils.MinInt((splice+1)*pinch, length)])
}
go func() {
wg.Wait()
close(ccc)
}()
index := 0
for c := range ccc {
results[index] = c
index++
}
return results[:index]
}
// PodAllInterface 获取全部的Pod转换为PodInterface
func (op *CmiiK8sOperator) PodAllInterface(cmiiEnv string) []config.CmiiPodInterface {
op.changeOperatorEnv(cmiiEnv)
client := op.CurrentClient
podList, err := client.CoreV1().Pods(op.CurrentNamespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
log.ErrorF("[PodAll] - list c in [%s] [%s] error => %s", cmiiEnv, op.CurrentNamespace, err.Error())
return nil
}
pods := podList.Items
length := len(pods)
results := make([]config.CmiiPodInterface, length)
ccc := make(chan config.CmiiPodInterface, length)
var wg sync.WaitGroup
podInterface := config.CmiiPodInterface{}
worker := workerThread
if length <= worker {
for i, pod := range pods {
objectMeta := pod.ObjectMeta
objectMeta.SetAnnotations(nil)
objectMeta.SetManagedFields(nil)
pod.ObjectMeta = objectMeta
results[i] = podInterface.Convert(pod)
}
return results
}
pinch := length / worker
wg.Add(worker)
for splice := 0; splice < worker; splice++ {
go func(podList []corev1.Pod) {
defer wg.Done()
for _, pod := range podList {
// 去除垃圾信息
objectMeta := pod.ObjectMeta
objectMeta.SetAnnotations(nil)
objectMeta.SetManagedFields(nil)
pod.ObjectMeta = objectMeta
// 转化Pod为PodInterface
ccc <- podInterface.Convert(pod)
}
}(pods[splice*pinch : utils.MinInt((splice+1)*pinch, length)])
}
go func() {
wg.Wait()
close(ccc)
}()
index := 0
for c := range ccc {
results[index] = c
index++
}
return results[:index]
}
func (op *CmiiK8sOperator) PodByAppName(cmiiEnv, appName string) (podList []config.CmiiPodInterface) {
deploy := op.DeploymentExist(cmiiEnv, appName)
if deploy == nil {
log.ErrorF("[PodByAppName] - [%s] [%s] app not exists !", cmiiEnv, appName)
return nil
}
labelSelector := metav1.FormatLabelSelector(deploy.Spec.Selector)
//Get the replica sets that belong to the deployment.
pods, err := op.CurrentClient.CoreV1().Pods(op.CurrentNamespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
log.ErrorF("[PodByAppName] - [%s] [%s] list pods error %s", cmiiEnv, appName, err.Error())
return nil
}
cmiiPodInterface := config.CmiiPodInterface{}
for _, pod := range pods.Items {
pod.SetManagedFields(nil)
podInterface := cmiiPodInterface.Convert(pod)
podList = append(podList, podInterface)
}
return podList
}
func (op *CmiiK8sOperator) PodByNodeName(cmiiEnv, nodeName string) (podList []config.CmiiPodInterface) {
node := op.NodeExists(cmiiEnv, nodeName)
if node == nil {
return nil
}
list, err := op.CurrentClient.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
FieldSelector: "spec.nodeName=" + nodeName,
})
if err != nil {
log.ErrorF("[PodByNodeName] - [%s] [%s] list pod error %s !", cmiiEnv, nodeName, err.Error())
return nil
}
podInterface := config.CmiiPodInterface{}
for _, pod := range list.Items {
cmiiPodInterface := podInterface.Convert(pod)
podList = append(podList, cmiiPodInterface)
}
return podList
}
func (op *CmiiK8sOperator) PodFizz(cmiiEnv, appFizz string) (fizzPod []corev1.Pod) {
podAll := op.PodAll(cmiiEnv)
if podAll == nil {
log.ErrorF("[DeploymentFizz] - no app find in [%s] !", cmiiEnv)
return nil
}
for _, pod := range podAll {
if strings.Contains(pod.Name, appFizz) {
fizzPod = append(fizzPod, pod)
}
}
return fizzPod
}
func (op *CmiiK8sOperator) PodDelete(cmiiEnv, podName string) bool {
op.changeOperatorEnv(cmiiEnv)
client := op.CurrentClient
pod, err := client.CoreV1().Pods(op.CurrentNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
log.ErrorF("[PodDelete] - [%s] [%s] not exists", cmiiEnv, podName)
return false
}
// pod exists
err = client.CoreV1().Pods(op.CurrentNamespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{
GracePeriodSeconds: &podDeleteGratefulTime,
})
if err != nil {
log.ErrorF("[PodDelete] - [%s] [%s] delete error ! %s", cmiiEnv, podName, err.Error())
return false
}
return true
}
func (op *CmiiK8sOperator) PodExec(cmiiEnv string, podInterface config.CmiiPodInterface, commandList []string) (stdout, stderr *bytes.Buffer) {
op.changeOperatorEnv(cmiiEnv)
client := op.CurrentClient
execRequest := client.CoreV1().RESTClient().
Post().
Resource("pods").
Name(podInterface.Name).
Namespace(op.CurrentNamespace).
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Stdin: false,
Stdout: true,
Stderr: true,
TTY: false,
Container: podInterface.ContainerName,
Command: commandList,
}, scheme.ParameterCodec)
stdout = &bytes.Buffer{}
stderr = &bytes.Buffer{}
//log.DebugF("PodExec] - [%s] [%s] exec %s, url %s", cmiiEnv, podInterface.Name, commandList, execRequest.URL())
exec, err := remotecommand.NewSPDYExecutor(op.CurrentConfig, "POST", execRequest.URL())
if err != nil {
log.ErrorF("[PodExec] - NewSPDYExecutor error => %s", err.Error())
return stdout, stderr
}
err = exec.Stream(remotecommand.StreamOptions{
Stdin: nil,
Stdout: stdout,
Stderr: stderr,
Tty: false,
TerminalSizeQueue: nil,
})
if err != nil {
log.ErrorF("[PodExec] - exec.Stream error => %s", err.Error())
}
return stdout, stderr
}
func (op *CmiiK8sOperator) PodStatusCheckTimeout(specificPod string, cmiiEnv string, waitTimeOut int) bool {
op.changeOperatorEnv(cmiiEnv)
client := op.CurrentClient
// 设置超时时间和时间间隔
timeout := time.After(time.Duration(waitTimeOut) * time.Second)
tick := time.Tick(5 * time.Second)
// 监控Pod状态
for {
select {
case <-timeout:
log.ErrorF("[PodStatusCheck] - 命名空间: [%s], Pod名称: [%s], 状态: 失败!", cmiiEnv, specificPod)
return false
case <-tick:
pod, err := client.CoreV1().Pods(cmiiEnv).Get(context.TODO(), specificPod, metav1.GetOptions{})
if err != nil {
log.ErrorF("[PodStatusCheck] - 命名空间: [%s], Pod名称: [%s], 获取Pod信息失败 ", cmiiEnv, err.Error())
} else {
log.DebugF("[PodStatusCheck] - 命名空间: [%s], Pod名称: [%s], 状态: [%s]", cmiiEnv, pod.Name, pod.Status.Phase)
if pod.Status.Phase == corev1.PodRunning || pod.Status.Phase == corev1.PodSucceeded {
return true
}
}
}
}
}
func (op *CmiiK8sOperator) NodeAll(cmiiEnv string) (nodeListR []corev1.Node) {
op.changeOperatorEnv(cmiiEnv)
client := op.CurrentClient
nodeList, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
log.ErrorF("[NodeAll] - [%s] list all node failed %s", cmiiEnv, err.Error())
return nil
}
for _, node := range nodeList.Items {
node.SetManagedFields(nil)
nodeListR = append(nodeListR, node)
}
return nodeListR
}
func (op *CmiiK8sOperator) NodeAllInterface(cmiiEnv string) (nodeList []config.CmiiNodeInterface) {
nodeListR := op.NodeAll(cmiiEnv)
nodeInterface := config.CmiiNodeInterface{}
for _, node := range nodeListR {
nodeList = append(nodeList, nodeInterface.Convert(node))
}
return nodeList
}
func (op *CmiiK8sOperator) NodeExists(cmiiEnv, nodeName string) (node *config.CmiiNodeInterface) {
op.changeOperatorEnv(cmiiEnv)
client := op.CurrentClient
nodeInterface := config.CmiiNodeInterface{}
nodeList, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
if err != nil {
log.ErrorF("[NodeExists] - [%s] [%s] not exists !", cmiiEnv, nodeName)
return nil
}
convert := nodeInterface.Convert(*nodeList)
return &convert
}
func (op *CmiiK8sOperator) NodeCordon(cmiiEnv, nodeName string) (ok bool) {
op.changeOperatorEnv(cmiiEnv)
client := op.CurrentClient
nodeInfo, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
if err != nil {
log.ErrorF("[NodeExists] - [%s] [%s] not exists !", cmiiEnv, nodeName)
return false
}
if nodeInfo.Spec.Unschedulable {
return true
}
// 设置 Unschedulable 字段为 true
nodeInfo.Spec.Unschedulable = true
// 更新节点对象
_, err = client.CoreV1().Nodes().Update(context.TODO(), nodeInfo, metav1.UpdateOptions{})
if err != nil {
log.ErrorF("NodeCordon] - [%s] [%s] cordon error ! %s", cmiiEnv, nodeName, err.Error())
return false
}
// 删除节点上的全部Pod
return true
}
func (op *CmiiK8sOperator) NodeUnCordon(cmiiEnv, nodeName string) (ok bool) {
op.changeOperatorEnv(cmiiEnv)
client := op.CurrentClient
nodeInfo, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
if err != nil {
log.ErrorF("[NodeExists] - [%s] [%s] not exists !", cmiiEnv, nodeName)
return false
}
if !nodeInfo.Spec.Unschedulable {
return true
}
// 设置 Unschedulable 字段为 true
nodeInfo.Spec.Unschedulable = false
// 更新节点对象
_, err = client.CoreV1().Nodes().Update(context.TODO(), nodeInfo, metav1.UpdateOptions{})
if err != nil {
log.ErrorF("NodeCordon] - [%s] [%s] cordon error ! %s", cmiiEnv, nodeName, err.Error())
return false
}
// 删除节点上的全部Pod
return true
}
func (op *CmiiK8sOperator) PodEvictFromNode(cmiiEnv, nodeName string) (podList []*config.CmiiPodInterface) {
op.changeOperatorEnv(cmiiEnv)
client := op.CurrentClient
// 获取节点上的所有 Pod
pods, err := client.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
FieldSelector: "spec.nodeName=" + nodeName,
})
if err != nil {
return nil
}
gracePeriodSeconds := int64(0)
// 驱逐每个 Pod
for _, pod := range pods.Items {
err = client.CoreV1().Pods(pod.Namespace).Evict(context.TODO(), &v1beta1.Eviction{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
},
DeleteOptions: &metav1.DeleteOptions{
GracePeriodSeconds: &gracePeriodSeconds,
},
})
if err != nil {
log.WarnF("PodEvictFromNode] - 驱逐 Pod [%s] 失败: %s", pod.Name, err.Error())
continue
}
podInterface := config.CmiiPodInterface{}
convert := podInterface.Convert(pod)
podList = append(podList, &convert)
}
return podList
}