474 lines
12 KiB
Go
474 lines
12 KiB
Go
package k8s_exec
|
|
|
|
import (
|
|
"agent-go/g"
|
|
"agent-go/logger"
|
|
"agent-go/utils"
|
|
"context"
|
|
"k8s.io/api/apps/v1"
|
|
autoscalingv1 "k8s.io/api/autoscaling/v1"
|
|
corev1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/tools/clientcmd"
|
|
"strings"
|
|
"sync"
|
|
)
|
|
|
|
var log = logger.Log
|
|
var pool = g.G.P
|
|
|
|
type CmiiK8sOperator struct {
|
|
DevClient *kubernetes.Clientset
|
|
CoreClient *kubernetes.Clientset
|
|
CurrentNamespace string
|
|
CurrentClient *kubernetes.Clientset
|
|
}
|
|
|
|
const (
|
|
dev = "uavcloud-dev"
|
|
devFlight = "uavcloud-devflight"
|
|
devOperation = "uavcloud-devoperation"
|
|
validation = "uavcloud-feature"
|
|
integration = "uavcloud-test"
|
|
uat = "uavcloud-uat"
|
|
demo = "uavcloud-demo"
|
|
workerThread = 4
|
|
)
|
|
|
|
func (op *CmiiK8sOperator) checkAndBuildCmiiK8sOperator() {
|
|
|
|
if op.DevClient == nil {
|
|
log.InfoF("[client] - build devFlight k8s operator client !")
|
|
devConfig, err := clientcmd.RESTConfigFromKubeConfig([]byte(CmiiDevK8sConfig))
|
|
if err != nil {
|
|
msg := "[client] - build devFlight k8s operator error !"
|
|
log.Error(msg)
|
|
panic(msg)
|
|
}
|
|
op.DevClient, err = kubernetes.NewForConfig(devConfig)
|
|
if err != nil {
|
|
panic(err.Error())
|
|
}
|
|
}
|
|
|
|
if op.CoreClient == nil {
|
|
log.InfoF("[client] - build core k8s operator client !")
|
|
coreConfig, err := clientcmd.RESTConfigFromKubeConfig([]byte(CmiiCoreK8sConfig))
|
|
if err != nil {
|
|
msg := "[client] - build devFlight k8s operator error !"
|
|
log.Error(msg)
|
|
panic(msg)
|
|
}
|
|
op.CoreClient, err = kubernetes.NewForConfig(coreConfig)
|
|
if err != nil {
|
|
panic(err.Error())
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
func (op *CmiiK8sOperator) changeOperatorEnv(cmiiEnv string) {
|
|
|
|
// ok
|
|
op.checkAndBuildCmiiK8sOperator()
|
|
|
|
if strings.Contains(cmiiEnv, "dev") {
|
|
op.CurrentClient = op.DevClient
|
|
} else {
|
|
op.CurrentClient = op.CoreClient
|
|
}
|
|
|
|
if strings.Contains(cmiiEnv, "dev") {
|
|
if strings.Contains(cmiiEnv, "f") {
|
|
op.CurrentNamespace = devFlight
|
|
} else if strings.Contains(cmiiEnv, "o") {
|
|
op.CurrentNamespace = devOperation
|
|
} else {
|
|
op.CurrentNamespace = dev
|
|
}
|
|
}
|
|
|
|
if strings.Contains(cmiiEnv, "int") || strings.Contains(cmiiEnv, "test") {
|
|
op.CurrentNamespace = integration
|
|
}
|
|
|
|
if strings.Contains(cmiiEnv, "fe") || strings.Contains(cmiiEnv, "val") {
|
|
op.CurrentNamespace = validation
|
|
}
|
|
|
|
if strings.Contains(cmiiEnv, "uat") {
|
|
op.CurrentNamespace = uat
|
|
}
|
|
|
|
if strings.Contains(cmiiEnv, "demo") {
|
|
op.CurrentNamespace = demo
|
|
}
|
|
|
|
if op.CurrentNamespace == "" {
|
|
op.CurrentNamespace = dev
|
|
}
|
|
|
|
log.InfoF("[k8s env] - current env is => %s", op.CurrentNamespace)
|
|
}
|
|
|
|
func (op *CmiiK8sOperator) DeploymentAll(cmiiEnv string) []v1.Deployment {
|
|
|
|
op.changeOperatorEnv(cmiiEnv)
|
|
|
|
client := op.CurrentClient
|
|
|
|
deploymentList, err := client.AppsV1().Deployments(op.CurrentNamespace).List(context.TODO(), metav1.ListOptions{})
|
|
if err != nil {
|
|
log.ErrorF("[DeploymentAll] - list deployment in [%s] [%s] error => %s", cmiiEnv, op.CurrentNamespace, err.Error())
|
|
}
|
|
deployments := deploymentList.Items
|
|
|
|
length := len(deployments)
|
|
log.InfoF("[DeploymentAll] - deployment in [%s] count is => %d", op.CurrentNamespace, length)
|
|
results := make([]v1.Deployment, length)
|
|
//var results []v1.Deployment
|
|
//ccc := make(chan v1.Deployment, length)
|
|
var wg sync.WaitGroup
|
|
//var mutex sync.Mutex
|
|
|
|
worker := workerThread
|
|
if length <= worker {
|
|
for i, deployment := range deployments {
|
|
objectMeta := deployment.ObjectMeta
|
|
objectMeta.SetAnnotations(nil)
|
|
objectMeta.SetManagedFields(nil)
|
|
deployment.ObjectMeta = objectMeta
|
|
|
|
results[i] = deployment
|
|
}
|
|
return results
|
|
}
|
|
|
|
pinch := (length + worker - 1) / worker
|
|
wg.Add(worker)
|
|
for splice := 0; splice < worker; splice++ {
|
|
|
|
// 计算每个goroutine处理的切片段
|
|
start := splice * pinch
|
|
end := start + pinch
|
|
if end > length {
|
|
end = length
|
|
}
|
|
//log.DebugF("[DeploymentAll] - deployment pinch from [%d - %d]", start, end)
|
|
|
|
go func(deploymentList []v1.Deployment, start int, results *[]v1.Deployment) {
|
|
|
|
for index, deployment := range deploymentList {
|
|
objectMeta := deployment.ObjectMeta
|
|
objectMeta.SetAnnotations(nil)
|
|
objectMeta.SetManagedFields(nil)
|
|
deployment.ObjectMeta = objectMeta
|
|
|
|
//ccc <- deployment
|
|
i := *results
|
|
i[index+start] = deployment
|
|
}
|
|
|
|
wg.Done()
|
|
}(deployments[start:end], start, &results)
|
|
}
|
|
|
|
//go func() {
|
|
// wg.Wait()
|
|
// close(ccc)
|
|
//}()
|
|
|
|
wg.Wait()
|
|
//for deployment := range ccc {
|
|
// results = append(results, deployment)
|
|
//}
|
|
|
|
return results
|
|
}
|
|
|
|
func (op *CmiiK8sOperator) DeploymentAllInterface(cmiiEnv string) []CmiiDeploymentInterface {
|
|
op.changeOperatorEnv(cmiiEnv)
|
|
|
|
client := op.CurrentClient
|
|
|
|
deploymentList, err := client.AppsV1().Deployments(op.CurrentNamespace).List(context.TODO(), metav1.ListOptions{})
|
|
if err != nil {
|
|
log.ErrorF("[DeploymentAllInterface] - list deployment in [%s] [%s] error => %s", cmiiEnv, op.CurrentNamespace, err.Error())
|
|
}
|
|
deployments := deploymentList.Items
|
|
|
|
length := len(deployments)
|
|
|
|
results := make([]CmiiDeploymentInterface, length)
|
|
ccc := make(chan CmiiDeploymentInterface, length)
|
|
var wg sync.WaitGroup
|
|
|
|
worker := workerThread
|
|
if length <= worker {
|
|
for i, deployment := range deployments {
|
|
objectMeta := deployment.ObjectMeta
|
|
objectMeta.SetAnnotations(nil)
|
|
objectMeta.SetManagedFields(nil)
|
|
deployment.ObjectMeta = objectMeta
|
|
|
|
results[i] = CmiiDeploymentInterface{}.Convert(deployment)
|
|
}
|
|
return results
|
|
}
|
|
|
|
pinch := length / worker
|
|
wg.Add(worker)
|
|
for splice := 0; splice < worker; splice++ {
|
|
go func(deploymentList []v1.Deployment) {
|
|
defer wg.Done()
|
|
for _, deployment := range deploymentList {
|
|
objectMeta := deployment.ObjectMeta
|
|
objectMeta.SetAnnotations(nil)
|
|
objectMeta.SetManagedFields(nil)
|
|
deployment.ObjectMeta = objectMeta
|
|
|
|
ccc <- CmiiDeploymentInterface{}.Convert(deployment)
|
|
}
|
|
|
|
}(deployments[splice*pinch : utils.MinInt((splice+1)*pinch, length)])
|
|
}
|
|
|
|
go func() {
|
|
wg.Wait()
|
|
close(ccc)
|
|
}()
|
|
|
|
index := 0
|
|
for deployment := range ccc {
|
|
// 过滤所有非CMII的内容
|
|
if strings.Contains(deployment.Name, "proxy") {
|
|
continue
|
|
}
|
|
if strings.Contains(deployment.Name, "minio") {
|
|
continue
|
|
}
|
|
if strings.HasPrefix(deployment.Name, "helm-live-rtsp") {
|
|
continue
|
|
}
|
|
if strings.Contains(deployment.Name, "nfs") {
|
|
continue
|
|
}
|
|
results[index] = deployment
|
|
index++
|
|
}
|
|
|
|
return results[:index]
|
|
}
|
|
|
|
func (op *CmiiK8sOperator) DeploymentFizz(cmiiEnv, appFizz string) (fizzDeployment []v1.Deployment) {
|
|
|
|
deploymentAll := op.DeploymentAll(cmiiEnv)
|
|
|
|
if deploymentAll == nil {
|
|
log.ErrorF("[DeploymentFizz] - namespace [%s] can not get deployment [%s]", cmiiEnv, appFizz)
|
|
return nil
|
|
}
|
|
|
|
for _, deployment := range deploymentAll {
|
|
if strings.Contains(deployment.Name, appFizz) {
|
|
fizzDeployment = append(fizzDeployment, deployment)
|
|
}
|
|
}
|
|
|
|
return fizzDeployment
|
|
}
|
|
|
|
func (op *CmiiK8sOperator) DeploymentExist(cmiiEnv, appName string) (exists *v1.Deployment) {
|
|
op.changeOperatorEnv(cmiiEnv)
|
|
client := op.CurrentClient
|
|
|
|
deployment, err := client.AppsV1().Deployments(op.CurrentNamespace).Get(context.TODO(), appName, metav1.GetOptions{})
|
|
if err != nil {
|
|
log.ErrorF("[DeploymentExist] - deployments [%s] [%s] not exists ! %s", cmiiEnv, appName, err.Error())
|
|
return nil
|
|
}
|
|
|
|
return deployment
|
|
}
|
|
|
|
func (op *CmiiK8sOperator) DeploymentOneInterface(cmiiEnv, appName string) (deploy *CmiiDeploymentInterface) {
|
|
op.changeOperatorEnv(cmiiEnv)
|
|
client := op.CurrentClient
|
|
deploymentInterface := CmiiDeploymentInterface{}
|
|
|
|
deployment, err := client.AppsV1().Deployments(op.CurrentNamespace).Get(context.TODO(), appName, metav1.GetOptions{})
|
|
if err != nil {
|
|
log.ErrorF("[DeploymentExist] - deployments [%s] [%s] not exists ! %s", cmiiEnv, appName, err.Error())
|
|
return nil
|
|
}
|
|
|
|
convert := deploymentInterface.Convert(*deployment)
|
|
|
|
return &convert
|
|
}
|
|
|
|
func (op *CmiiK8sOperator) DeploymentScale(cmiiEnv, appFizz string, scaleCount int32) bool {
|
|
|
|
deploymentFizz := op.DeploymentFizz(cmiiEnv, appFizz)
|
|
|
|
client := op.CurrentClient
|
|
|
|
for _, deployment := range deploymentFizz {
|
|
log.DebugF("[DeploymentScale] - start to scale [%s] [%s] to %d", deployment.Namespace, deployment.Name, scaleCount)
|
|
|
|
scale := &autoscalingv1.Scale{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: deployment.Name,
|
|
Namespace: deployment.Namespace,
|
|
},
|
|
Spec: autoscalingv1.ScaleSpec{
|
|
Replicas: int32(scaleCount),
|
|
},
|
|
}
|
|
|
|
updateScale, err := client.AppsV1().Deployments(deployment.Namespace).UpdateScale(
|
|
context.TODO(),
|
|
deployment.Name,
|
|
scale,
|
|
metav1.UpdateOptions{},
|
|
)
|
|
if err != nil {
|
|
log.ErrorF("[DeploymentScale] - scale error %s", err.Error())
|
|
return false
|
|
}
|
|
|
|
log.InfoF("[DeploymentScale] - scale of [%s] [%s] to %d success !", updateScale.Namespace, updateScale.Name, scaleCount)
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (op *CmiiK8sOperator) DeploymentUpdateTag(cmiiEnv, appName, newTag string) bool {
|
|
|
|
if newTag == "" {
|
|
log.WarnF("[DeploymentUpdateTag] - can not update image tag to null!")
|
|
return false
|
|
}
|
|
|
|
deployment := op.DeploymentExist(cmiiEnv, appName)
|
|
if deployment == nil {
|
|
return false
|
|
}
|
|
|
|
containers := deployment.Spec.Template.Spec.Containers
|
|
if len(containers) == 1 {
|
|
// only update this kind
|
|
container := containers[0]
|
|
split := strings.Split(container.Image, ":")
|
|
|
|
container.Image = split[0] + ":" + newTag
|
|
log.InfoF("[DeploymentUpdateTag] - update [%s] [%s] from [%s] to [%s]", op.CurrentNamespace, appName, split[1], container.Image)
|
|
|
|
// re assign
|
|
deployment.Spec.Template.Spec.Containers[0] = container
|
|
|
|
// update
|
|
_, err := op.CurrentClient.AppsV1().Deployments(deployment.Namespace).Update(context.TODO(), deployment, metav1.UpdateOptions{})
|
|
if err != nil {
|
|
log.ErrorF("[DeploymentUpdateTag] - update [%s] [%s] from [%s] to [%s] error ! %s", op.CurrentNamespace, appName, split[1], container.Image, err.Error())
|
|
return false
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (op *CmiiK8sOperator) ReplicaSet(cmiiEnv, replicaSetName string) *v1.ReplicaSet {
|
|
|
|
op.changeOperatorEnv(cmiiEnv)
|
|
|
|
client := op.CurrentClient
|
|
|
|
replicaSet, err := client.AppsV1().ReplicaSets(op.CurrentNamespace).Get(context.TODO(), replicaSetName, metav1.GetOptions{})
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
return replicaSet
|
|
}
|
|
|
|
func (op *CmiiK8sOperator) PodAll(cmiiEnv string) []corev1.Pod {
|
|
op.changeOperatorEnv(cmiiEnv)
|
|
|
|
client := op.CurrentClient
|
|
|
|
podList, err := client.CoreV1().Pods(op.CurrentNamespace).List(context.TODO(), metav1.ListOptions{})
|
|
if err != nil {
|
|
log.ErrorF("[PodAll] - list c in [%s] [%s] error => %s", cmiiEnv, op.CurrentNamespace, err.Error())
|
|
return nil
|
|
}
|
|
pods := podList.Items
|
|
|
|
length := len(pods)
|
|
results := make([]corev1.Pod, length)
|
|
ccc := make(chan corev1.Pod, length)
|
|
var wg sync.WaitGroup
|
|
|
|
worker := workerThread
|
|
if length <= worker {
|
|
for i, pod := range pods {
|
|
objectMeta := pod.ObjectMeta
|
|
objectMeta.SetAnnotations(nil)
|
|
objectMeta.SetManagedFields(nil)
|
|
pod.ObjectMeta = objectMeta
|
|
|
|
results[i] = pod
|
|
}
|
|
return results
|
|
}
|
|
|
|
pinch := length / worker
|
|
wg.Add(worker)
|
|
for splice := 0; splice < worker; splice++ {
|
|
go func(podList []corev1.Pod) {
|
|
defer wg.Done()
|
|
for _, pod := range podList {
|
|
objectMeta := pod.ObjectMeta
|
|
objectMeta.SetAnnotations(nil)
|
|
objectMeta.SetManagedFields(nil)
|
|
pod.ObjectMeta = objectMeta
|
|
|
|
ccc <- pod
|
|
}
|
|
|
|
}(pods[splice*pinch : utils.MinInt((splice+1)*pinch, length)])
|
|
}
|
|
|
|
go func() {
|
|
wg.Wait()
|
|
close(ccc)
|
|
}()
|
|
|
|
index := 0
|
|
for c := range ccc {
|
|
results[index] = c
|
|
index++
|
|
}
|
|
|
|
return results
|
|
}
|
|
|
|
func (op *CmiiK8sOperator) PodFizz(cmiiEnv, appFizz string) (fizzPod []corev1.Pod) {
|
|
|
|
podAll := op.PodAll(cmiiEnv)
|
|
|
|
if podAll == nil {
|
|
log.ErrorF("[DeploymentFizz] - no app find in [%s] !", cmiiEnv)
|
|
return nil
|
|
}
|
|
|
|
for _, pod := range podAll {
|
|
if strings.Contains(pod.Name, appFizz) {
|
|
fizzPod = append(fizzPod, pod)
|
|
}
|
|
}
|
|
|
|
return fizzPod
|
|
}
|