Files
ProjectOctopus/agent-go/k8s_exec/K8sOperator.go

764 lines
20 KiB
Go

package k8s_exec
import (
"agent-go/g"
"agent-go/logger"
"agent-go/utils"
"context"
v1 "k8s.io/api/apps/v1"
autoscalingv1 "k8s.io/api/autoscaling/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"strings"
"sync"
"time"
)
var log = logger.Log
var pool = g.G.P
type CmiiK8sOperator struct {
DevClient *kubernetes.Clientset
CoreClient *kubernetes.Clientset
CurrentNamespace string
CurrentClient *kubernetes.Clientset
}
const (
dev = "uavcloud-dev"
devFlight = "uavcloud-devflight"
devOperation = "uavcloud-devoperation"
validation = "uavcloud-feature"
integration = "uavcloud-test"
uat = "uavcloud-uat"
demo = "uavcloud-demo"
uavms = "uavcloud-uavms"
workerThread = 4
)
func (op *CmiiK8sOperator) checkAndBuildCmiiK8sOperator() {
if op.DevClient == nil {
log.InfoF("[client] - build devFlight k8s operator client !")
devConfig, err := clientcmd.RESTConfigFromKubeConfig([]byte(CmiiDevK8sConfig))
if err != nil {
msg := "[client] - build devFlight k8s operator error !"
log.Error(msg)
panic(msg)
}
op.DevClient, err = kubernetes.NewForConfig(devConfig)
if err != nil {
panic(err.Error())
}
}
if op.CoreClient == nil {
log.InfoF("[client] - build core k8s operator client !")
coreConfig, err := clientcmd.RESTConfigFromKubeConfig([]byte(CmiiCoreK8sConfig))
if err != nil {
msg := "[client] - build devFlight k8s operator error !"
log.Error(msg)
panic(msg)
}
op.CoreClient, err = kubernetes.NewForConfig(coreConfig)
if err != nil {
panic(err.Error())
}
}
}
func (op *CmiiK8sOperator) changeOperatorEnv(cmiiEnv string) {
// ok
op.checkAndBuildCmiiK8sOperator()
if strings.Contains(cmiiEnv, "dev") {
op.CurrentClient = op.DevClient
} else {
op.CurrentClient = op.CoreClient
}
if strings.Contains(cmiiEnv, "dev") {
if strings.Contains(cmiiEnv, "devf") {
op.CurrentNamespace = devFlight
} else if strings.Contains(cmiiEnv, "devo") {
op.CurrentNamespace = devOperation
} else {
op.CurrentNamespace = dev
}
}
if strings.Contains(cmiiEnv, "int") || strings.Contains(cmiiEnv, "test") {
op.CurrentNamespace = integration
}
if strings.Contains(cmiiEnv, "fe") || strings.Contains(cmiiEnv, "val") {
op.CurrentNamespace = validation
}
if strings.Contains(cmiiEnv, "uat") {
op.CurrentNamespace = uat
}
if strings.Contains(cmiiEnv, "demo") {
op.CurrentNamespace = demo
}
if strings.Contains(cmiiEnv, "uavms") {
op.CurrentNamespace = uavms
}
if op.CurrentNamespace == "" {
op.CurrentNamespace = dev
}
log.InfoF("[k8s env] - current env is => %s", op.CurrentNamespace)
}
func (op *CmiiK8sOperator) DeploymentAll(cmiiEnv string) []v1.Deployment {
op.changeOperatorEnv(cmiiEnv)
client := op.CurrentClient
deploymentList, err := client.AppsV1().Deployments(op.CurrentNamespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
log.ErrorF("[DeploymentAll] - list deployment in [%s] [%s] error => %s", cmiiEnv, op.CurrentNamespace, err.Error())
}
deployments := deploymentList.Items
length := len(deployments)
log.InfoF("[DeploymentAll] - deployment in [%s] count is => %d", op.CurrentNamespace, length)
results := make([]v1.Deployment, length)
//var results []v1.Deployment
//ccc := make(chan v1.Deployment, length)
var wg sync.WaitGroup
//var mutex sync.Mutex
worker := workerThread
if length <= worker {
for i, deployment := range deployments {
objectMeta := deployment.ObjectMeta
objectMeta.SetAnnotations(nil)
objectMeta.SetManagedFields(nil)
deployment.ObjectMeta = objectMeta
results[i] = deployment
}
return results
}
pinch := (length + worker - 1) / worker
wg.Add(worker)
for splice := 0; splice < worker; splice++ {
// 计算每个goroutine处理的切片段
start := splice * pinch
end := start + pinch
if end > length {
end = length
}
//log.DebugF("[DeploymentAll] - deployment pinch from [%d - %d]", start, end)
go func(deploymentList []v1.Deployment, start int, results *[]v1.Deployment) {
for index, deployment := range deploymentList {
objectMeta := deployment.ObjectMeta
objectMeta.SetAnnotations(nil)
objectMeta.SetManagedFields(nil)
deployment.ObjectMeta = objectMeta
//ccc <- deployment
i := *results
i[index+start] = deployment
}
wg.Done()
}(deployments[start:end], start, &results)
}
//go func() {
// wg.Wait()
// close(ccc)
//}()
wg.Wait()
//for deployment := range ccc {
// results = append(results, deployment)
//}
return results
}
func (op *CmiiK8sOperator) DeploymentAllInterface(cmiiEnv string) []CmiiDeploymentInterface {
op.changeOperatorEnv(cmiiEnv)
client := op.CurrentClient
deploymentList, err := client.AppsV1().Deployments(op.CurrentNamespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
log.ErrorF("[DeploymentAllInterface] - list deployment in [%s] [%s] error => %s", cmiiEnv, op.CurrentNamespace, err.Error())
}
deployments := deploymentList.Items
length := len(deployments)
results := make([]CmiiDeploymentInterface, length)
ccc := make(chan CmiiDeploymentInterface, length)
var wg sync.WaitGroup
worker := workerThread
if length <= worker {
for i, deployment := range deployments {
objectMeta := deployment.ObjectMeta
objectMeta.SetAnnotations(nil)
objectMeta.SetManagedFields(nil)
deployment.ObjectMeta = objectMeta
results[i] = CmiiDeploymentInterface{}.Convert(deployment)
}
return results
}
pinch := length / worker
wg.Add(worker)
for splice := 0; splice < worker; splice++ {
go func(deploymentList []v1.Deployment) {
defer wg.Done()
for _, deployment := range deploymentList {
objectMeta := deployment.ObjectMeta
objectMeta.SetAnnotations(nil)
objectMeta.SetManagedFields(nil)
deployment.ObjectMeta = objectMeta
ccc <- CmiiDeploymentInterface{}.Convert(deployment)
}
}(deployments[splice*pinch : utils.MinInt((splice+1)*pinch, length)])
}
go func() {
wg.Wait()
close(ccc)
}()
index := 0
for deployment := range ccc {
results[index] = deployment
index++
}
return results[:index]
}
func (op *CmiiK8sOperator) DeploymentFizz(cmiiEnv, appFizz string) (fizzDeployment []v1.Deployment) {
deploymentAll := op.DeploymentAll(cmiiEnv)
if deploymentAll == nil {
log.ErrorF("[DeploymentFizz] - namespace [%s] can not get deployment [%s]", cmiiEnv, appFizz)
return nil
}
for _, deployment := range deploymentAll {
if strings.Contains(deployment.Name, appFizz) {
fizzDeployment = append(fizzDeployment, deployment)
}
}
return fizzDeployment
}
func (op *CmiiK8sOperator) DeploymentExist(cmiiEnv, appName string) (exists *v1.Deployment) {
op.changeOperatorEnv(cmiiEnv)
client := op.CurrentClient
deployment, err := client.AppsV1().Deployments(op.CurrentNamespace).Get(context.TODO(), appName, metav1.GetOptions{})
if err != nil {
log.ErrorF("[DeploymentExist] - deployments [%s] [%s] not exists ! %s", cmiiEnv, appName, err.Error())
return nil
}
return deployment
}
func (op *CmiiK8sOperator) DeploymentOneInterface(cmiiEnv, appName string) (deploy *CmiiDeploymentInterface) {
op.changeOperatorEnv(cmiiEnv)
client := op.CurrentClient
deploymentInterface := CmiiDeploymentInterface{}
deployment, err := client.AppsV1().Deployments(op.CurrentNamespace).Get(context.TODO(), appName, metav1.GetOptions{})
if err != nil {
log.ErrorF("[DeploymentExist] - deployments [%s] [%s] not exists ! %s", cmiiEnv, appName, err.Error())
return nil
}
convert := deploymentInterface.Convert(*deployment)
return &convert
}
func (op *CmiiK8sOperator) DeploymentScale(cmiiEnv, appName string, scaleCount int32) bool {
deployment := op.DeploymentOneInterface(cmiiEnv, appName)
client := op.CurrentClient
log.DebugF("[DeploymentScale] - start to scale [%s] [%s] to %d", deployment.Namespace, deployment.Name, scaleCount)
scale := &autoscalingv1.Scale{
ObjectMeta: metav1.ObjectMeta{
Name: deployment.Name,
Namespace: deployment.Namespace,
},
Spec: autoscalingv1.ScaleSpec{
Replicas: scaleCount,
},
}
updateScale, err := client.AppsV1().Deployments(deployment.Namespace).UpdateScale(
context.TODO(),
deployment.Name,
scale,
metav1.UpdateOptions{},
)
if err != nil {
log.ErrorF("[DeploymentScale] - scale error %s", err.Error())
return false
}
log.InfoF("[DeploymentScale] - scale of [%s] [%s] to %d success !", updateScale.Namespace, updateScale.Name, scaleCount)
return true
}
func (op *CmiiK8sOperator) DeploymentUpdateTag(cmiiEnv, appName, newTag string) bool {
if newTag == "" {
log.WarnF("[DeploymentUpdateTag] - can not update image tag to null!")
return false
}
deployment := op.DeploymentExist(cmiiEnv, appName)
if deployment == nil {
return false
}
containers := deployment.Spec.Template.Spec.Containers
if len(containers) == 1 {
// only update this kind
container := containers[0]
split := strings.Split(container.Image, ":")
container.Image = split[0] + ":" + newTag
log.InfoF("[DeploymentUpdateTag] - update [%s] [%s] from [%s] to [%s]", op.CurrentNamespace, appName, split[1], container.Image)
// re assign
deployment.Spec.Template.Spec.Containers[0] = container
// update
_, err := op.CurrentClient.AppsV1().Deployments(deployment.Namespace).Update(context.TODO(), deployment, metav1.UpdateOptions{})
if err != nil {
log.ErrorF("[DeploymentUpdateTag] - update [%s] [%s] from [%s] to [%s] error ! %s", op.CurrentNamespace, appName, split[1], container.Image, err.Error())
return false
}
}
return true
}
func (op *CmiiK8sOperator) DeploymentRestart(cmiiEnv, appName string) bool {
op.changeOperatorEnv(cmiiEnv)
result := true
deployment := op.DeploymentOneInterface(cmiiEnv, appName)
if deployment == nil {
log.ErrorF("[DeploymentRestart] - [%s] [%s] not exists !", cmiiEnv, appName)
return false
}
result = op.DeploymentScale(deployment.Namespace, deployment.Name, 0)
if !result {
return result
}
time.Sleep(time.Millisecond * 200)
result = op.DeploymentScale(deployment.Namespace, deployment.Name, deployment.Replicas)
if !result {
return result
}
return result
}
func (op *CmiiK8sOperator) DeploymentRestartByKill(cmiiEnv, appName string) bool {
deployment := op.DeploymentOneInterface(cmiiEnv, appName)
if deployment == nil {
log.ErrorF("[DeploymentRestart] - [%s] [%s] not exists !", cmiiEnv, appName)
return false
}
podList := op.PodByAppName(deployment.Namespace, deployment.Name)
if podList == nil {
log.ErrorF("[DeploymentRestart] - [%s] [%s] no pod success !", deployment.Namespace, deployment.Name)
return true
}
for _, podInterface := range podList {
if !op.PodDelete(cmiiEnv, podInterface.Name) {
log.ErrorF("[DeploymentRestart] - [%s] [%s] delete pod failed !", podInterface.Namespace, podInterface.Name)
} else {
log.DebugF("[DeploymentRestart] - [%s] [%s] delete pod success !", podInterface.Namespace, podInterface.Name)
}
}
return true
}
func (op *CmiiK8sOperator) DeploymentNew(cmiiEnv, appName string, waitTimeOut int) bool {
op.changeOperatorEnv(cmiiEnv)
//op.CurrentClient.AppsV1().Deployments(op.CurrentNamespace).Apply()
return true
}
func (op *CmiiK8sOperator) DeploymentStatusCheck(cmiiEnv, appName string, waitTimeOut int) bool {
op.changeOperatorEnv(cmiiEnv)
// 设置超时时间和时间间隔
timeout := time.After(time.Duration(waitTimeOut) * time.Second)
tick := time.Tick(time.Second)
// 监控Pod状态
for {
select {
case <-timeout:
log.ErrorF("[DeploymentStatusCheck] - [%s] [%s] 状态: 失败!", cmiiEnv, appName)
return false
case <-tick:
// check deployment exists
deployment := op.DeploymentOneInterface(cmiiEnv, appName)
if deployment == nil {
log.ErrorF("[DeploymentStatusCheck] - [%s] [%s] not exists !", cmiiEnv, appName)
return false
}
if deployment.AvailableReplicas == deployment.Replicas {
log.InfoF("[DeploymentStatusCheck] - [%s] [%s] Available: %d, Total: %d success !", deployment.Namespace, deployment.Name, deployment.AvailableReplicas, deployment.Replicas)
return true
}
log.DebugF("[DeploymentStatusCheck] - [%s] [%s] Available: %d, Total: %d waiting !", deployment.Namespace, deployment.Name, deployment.AvailableReplicas, deployment.Replicas)
}
}
}
func (op *CmiiK8sOperator) ReplicaSetExists(cmiiEnv, replicaSetName string) *v1.ReplicaSet {
op.changeOperatorEnv(cmiiEnv)
client := op.CurrentClient
replicaSet, err := client.AppsV1().ReplicaSets(op.CurrentNamespace).Get(context.TODO(), replicaSetName, metav1.GetOptions{})
if err != nil {
log.ErrorF("[ReplicaSetExists] - [%s] [%s] not exists !", cmiiEnv, replicaSetName)
return nil
}
return replicaSet
}
func (op *CmiiK8sOperator) ReplicaSetByAppName(cmiiEnv, appName string) (replicaList []v1.ReplicaSet) {
deploy := op.DeploymentExist(cmiiEnv, appName)
if deploy == nil {
log.ErrorF("[ReplicaSetByAppName] - [%s] [%s] app not exists !", cmiiEnv, appName)
return nil
}
labelSelector := metav1.FormatLabelSelector(deploy.Spec.Selector)
//Get the replica sets that belong to the deployment.
replicaSets, err := op.CurrentClient.AppsV1().ReplicaSets(op.CurrentNamespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
log.ErrorF("[ReplicaSetByAppName] - [%s] [%s] list replicaset error %s", cmiiEnv, appName, err.Error())
return nil
}
for _, replicaSet := range replicaSets.Items {
replicaSet.SetManagedFields(nil)
replicaList = append(replicaList, replicaSet)
}
return replicaList
}
func (op *CmiiK8sOperator) PodAll(cmiiEnv string) []corev1.Pod {
op.changeOperatorEnv(cmiiEnv)
client := op.CurrentClient
podList, err := client.CoreV1().Pods(op.CurrentNamespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
log.ErrorF("[PodAll] - list c in [%s] [%s] error => %s", cmiiEnv, op.CurrentNamespace, err.Error())
return nil
}
pods := podList.Items
length := len(pods)
results := make([]corev1.Pod, length)
ccc := make(chan corev1.Pod, length)
var wg sync.WaitGroup
worker := workerThread
if length <= worker {
for i, pod := range pods {
objectMeta := pod.ObjectMeta
objectMeta.SetAnnotations(nil)
objectMeta.SetManagedFields(nil)
pod.ObjectMeta = objectMeta
results[i] = pod
}
return results
}
pinch := length / worker
wg.Add(worker)
for splice := 0; splice < worker; splice++ {
go func(podList []corev1.Pod) {
defer wg.Done()
for _, pod := range podList {
objectMeta := pod.ObjectMeta
objectMeta.SetAnnotations(nil)
objectMeta.SetManagedFields(nil)
pod.ObjectMeta = objectMeta
ccc <- pod
}
}(pods[splice*pinch : utils.MinInt((splice+1)*pinch, length)])
}
go func() {
wg.Wait()
close(ccc)
}()
index := 0
for c := range ccc {
results[index] = c
index++
}
return results[:index]
}
func (op *CmiiK8sOperator) PodAllInterface(cmiiEnv string) []CmiiPodInterface {
op.changeOperatorEnv(cmiiEnv)
client := op.CurrentClient
podList, err := client.CoreV1().Pods(op.CurrentNamespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
log.ErrorF("[PodAll] - list c in [%s] [%s] error => %s", cmiiEnv, op.CurrentNamespace, err.Error())
return nil
}
pods := podList.Items
length := len(pods)
results := make([]CmiiPodInterface, length)
ccc := make(chan CmiiPodInterface, length)
var wg sync.WaitGroup
podInterface := CmiiPodInterface{}
worker := workerThread
if length <= worker {
for i, pod := range pods {
objectMeta := pod.ObjectMeta
objectMeta.SetAnnotations(nil)
objectMeta.SetManagedFields(nil)
pod.ObjectMeta = objectMeta
results[i] = podInterface.Convert(pod)
}
return results
}
pinch := length / worker
wg.Add(worker)
for splice := 0; splice < worker; splice++ {
go func(podList []corev1.Pod) {
defer wg.Done()
for _, pod := range podList {
objectMeta := pod.ObjectMeta
objectMeta.SetAnnotations(nil)
objectMeta.SetManagedFields(nil)
pod.ObjectMeta = objectMeta
ccc <- podInterface.Convert(pod)
}
}(pods[splice*pinch : utils.MinInt((splice+1)*pinch, length)])
}
go func() {
wg.Wait()
close(ccc)
}()
index := 0
for c := range ccc {
results[index] = c
index++
}
return results[:index]
}
func (op *CmiiK8sOperator) PodByAppName(cmiiEnv, appName string) (podList []CmiiPodInterface) {
deploy := op.DeploymentExist(cmiiEnv, appName)
if deploy == nil {
log.ErrorF("[PodByAppName] - [%s] [%s] app not exists !", cmiiEnv, appName)
return nil
}
labelSelector := metav1.FormatLabelSelector(deploy.Spec.Selector)
//Get the replica sets that belong to the deployment.
pods, err := op.CurrentClient.CoreV1().Pods(op.CurrentNamespace).List(context.TODO(), metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
log.ErrorF("[PodByAppName] - [%s] [%s] list pods error %s", cmiiEnv, appName, err.Error())
return nil
}
cmiiPodInterface := CmiiPodInterface{}
for _, pod := range pods.Items {
pod.SetManagedFields(nil)
podInterface := cmiiPodInterface.Convert(pod)
podList = append(podList, podInterface)
}
return podList
}
func (op *CmiiK8sOperator) PodByNodeName(cmiiEnv, nodeName string) (podList []CmiiPodInterface) {
node := op.NodeExists(cmiiEnv, nodeName)
if node == nil {
return nil
}
list, err := op.CurrentClient.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{
FieldSelector: "spec.nodeName=" + nodeName,
})
if err != nil {
log.ErrorF("[PodByNodeName] - [%s] [%s] list pod error %s !", cmiiEnv, nodeName, err.Error())
return nil
}
podInterface := CmiiPodInterface{}
for _, pod := range list.Items {
cmiiPodInterface := podInterface.Convert(pod)
podList = append(podList, cmiiPodInterface)
}
return podList
}
func (op *CmiiK8sOperator) PodFizz(cmiiEnv, appFizz string) (fizzPod []corev1.Pod) {
podAll := op.PodAll(cmiiEnv)
if podAll == nil {
log.ErrorF("[DeploymentFizz] - no app find in [%s] !", cmiiEnv)
return nil
}
for _, pod := range podAll {
if strings.Contains(pod.Name, appFizz) {
fizzPod = append(fizzPod, pod)
}
}
return fizzPod
}
func (op *CmiiK8sOperator) PodDelete(cmiiEnv, podName string) bool {
op.changeOperatorEnv(cmiiEnv)
client := op.CurrentClient
pod, err := client.CoreV1().Pods(op.CurrentNamespace).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
log.ErrorF("[PodDelete] - [%s] [%s] not exists", cmiiEnv, podName)
return false
}
// pod exists
err = client.CoreV1().Pods(op.CurrentNamespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
if err != nil {
log.ErrorF("[PodDelete] - [%s] [%s] delete error ! %s", cmiiEnv, podName, err.Error())
return false
}
return true
}
func (op *CmiiK8sOperator) NodeAll(cmiiEnv string) (nodeListR []corev1.Node) {
op.changeOperatorEnv(cmiiEnv)
client := op.CurrentClient
nodeList, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
log.ErrorF("[NodeAll] - [%s] list all node failed %s", cmiiEnv, err.Error())
return nil
}
for _, node := range nodeList.Items {
node.SetManagedFields(nil)
nodeListR = append(nodeListR, node)
}
return nodeListR
}
func (op *CmiiK8sOperator) NodeAllInterface(cmiiEnv string) (nodeList []CmiiNodeInterface) {
nodeListR := op.NodeAll(cmiiEnv)
nodeInterface := CmiiNodeInterface{}
for _, node := range nodeListR {
nodeList = append(nodeList, nodeInterface.Convert(node))
}
return nodeList
}
func (op *CmiiK8sOperator) NodeExists(cmiiEnv, nodeName string) (node *CmiiNodeInterface) {
op.changeOperatorEnv(cmiiEnv)
client := op.CurrentClient
nodeInterface := CmiiNodeInterface{}
nodeList, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
if err != nil {
log.ErrorF("[NodeExists] - [%s] [%s] not exists !", cmiiEnv, nodeName)
return nil
}
convert := nodeInterface.Convert(*nodeList)
return &convert
}