[ Agent ] [ CMII ] - 新增大量功能 - 1

This commit is contained in:
zeaslity
2024-01-10 16:45:41 +08:00
parent 7ce838289b
commit bc231b0866
6 changed files with 251 additions and 345 deletions

View File

@@ -10,45 +10,46 @@ import (
var CmiiOperator = CmiiK8sOperator{}
var updateLogPath = "C:\\Users\\wddsh\\Documents\\IdeaProjects\\ProjectOctopus\\agent-go\\k8s_exec\\log\\cmii-update-log.txt"
// FindDeploymentRestartCountGreaterThanN 重启次数大于N的所有Deployment
func FindDeploymentRestartCountGreaterThanN(cmiiEnv string, restartCount int32) []CmiiDeploymentInterface {
// FindAppNotHealthyOrRestartCountGreaterThanN 重启次数大于N的所有Deployment
func FindAppNotHealthyOrRestartCountGreaterThanN(cmiiEnv string, restartCount int32) []CmiiDeploymentInterface {
//podInterface := CmiiPodInterface{}
// get all pods
podAll := CmiiOperator.PodAll(cmiiEnv)
podAll := CmiiOperator.PodAllInterface(cmiiEnv)
// restart map
restartMap := make(map[string]int32, len(podAll))
// restart count
for _, pod := range podAll {
for _, containerStatus := range pod.Status.ContainerStatuses {
if containerStatus.RestartCount > restartCount {
restart, exists := restartMap[containerStatus.Name]
if exists {
restartMap[containerStatus.Name] = utils.MaxInt32(containerStatus.RestartCount, restart)
} else {
restartMap[containerStatus.Name] = containerStatus.RestartCount
}
if pod.RestartCount > restartCount {
restart, exists := restartMap[pod.ContainerName]
if exists {
restartMap[pod.ContainerName] = utils.MaxInt32(pod.RestartCount, restart)
} else {
restartMap[pod.ContainerName] = pod.RestartCount
}
}
// unhealthy
if !pod.PodStatus {
restartMap[pod.ContainerName] = pod.RestartCount
}
}
result := make([]CmiiDeploymentInterface, len(restartMap))
cmiiDeploymentInterface := CmiiDeploymentInterface{}
index := 0
log.DebugF("[FindDeploymentRestartCountGreaterThanN] - restart map is => %v", restartMap)
log.DebugF("[FindAppNotHealthyOrRestartCountGreaterThanN] - restart map is => %v", restartMap)
// find deployment convert to interface
for key, value := range restartMap {
deployment := CmiiOperator.DeploymentExist(cmiiEnv, key)
// container Name must equals deployment name
deployment := CmiiOperator.DeploymentOneInterface(cmiiEnv, key)
if deployment != nil {
// deployment exists
log.DebugF("[FindDeploymentRestartCountGreaterThanN] - restart [%s] [%s] is [%d]", cmiiEnv, key, value)
log.DebugF("[FindAppNotHealthyOrRestartCountGreaterThanN] - restart [%s] [%s] is [%d]", cmiiEnv, key, value)
convert := cmiiDeploymentInterface.Convert(*deployment)
result[index] = convert
result[index] = *deployment
index++
}
}
@@ -60,6 +61,7 @@ func FindDeploymentReplicasSmallerThanN(cmiiEnv string, replicasMin int32) (depl
// get all deployments
cmiiDeploymentInterfaces := CmiiOperator.DeploymentAllInterface(cmiiEnv)
cmiiDeploymentInterfaces = FilterAllCmiiAppSoft(cmiiDeploymentInterfaces)
// filter
for _, deploymentInterface := range cmiiDeploymentInterfaces {
@@ -72,6 +74,25 @@ func FindDeploymentReplicasSmallerThanN(cmiiEnv string, replicasMin int32) (depl
return deploymentList
}
func FindDeploymentNotHealthy(cmiiEnv string) (deploymentList []CmiiDeploymentInterface) {
// all unhealthy pods
allInterface := CmiiOperator.PodAllInterface(cmiiEnv)
// find the deployments
for _, podInterface := range allInterface {
if !podInterface.PodStatus {
// unhealthy pod
deploymentInterface := CmiiOperator.DeploymentOneInterface(cmiiEnv, podInterface.ContainerName)
if deploymentInterface != nil {
deploymentList = append(deploymentList, *deploymentInterface)
}
}
}
return deploymentList
}
func RestartDeploymentFromList(deploymentList []CmiiDeploymentInterface) bool {
result := true
@@ -96,11 +117,35 @@ func RestartCmiiBackendDeployment(cmiiEnv string) {
cmiiDeploymentInterfaces := CmiiOperator.DeploymentAllInterface(cmiiEnv)
for _, deploymentInterface := range cmiiDeploymentInterfaces {
if strings.Contains(deploymentInterface.Name, "platform") {
_, ok := CmiiBackendAppMap[deploymentInterface.Name]
if ok {
if !CmiiOperator.DeploymentRestart(deploymentInterface.Namespace, deploymentInterface.Name) {
log.ErrorF("[RestartCmiiBackendDeployment] - restart of [%s] [%s] failed !", deploymentInterface.Namespace, deploymentInterface.Name)
} else {
log.DebugF("[RestartCmiiBackendDeployment] - restart of [%s] [%s] success !", deploymentInterface.Namespace, deploymentInterface.Name)
}
}
}
log.InfoF("[RestartCmiiBackendDeployment] - restart of all backend app in [%s] success !", CmiiOperator.CurrentNamespace)
}
func RestartCmiiFrontendDeployment(cmiiEnv string) {
cmiiDeploymentInterfaces := CmiiOperator.DeploymentAllInterface(cmiiEnv)
for _, deploymentInterface := range cmiiDeploymentInterfaces {
_, ok := CmiiFrontendAppMap[deploymentInterface.Name]
if ok {
if !CmiiOperator.DeploymentRestart(deploymentInterface.Namespace, deploymentInterface.Name) {
log.ErrorF("[RestartCmiiFrontendDeployment] - restart of [%s] [%s] failed !", deploymentInterface.Namespace, deploymentInterface.Name)
} else {
log.DebugF("[RestartCmiiFrontendDeployment] - restart of [%s] [%s] success !", deploymentInterface.Namespace, deploymentInterface.Name)
}
}
}
log.InfoF("[RestartCmiiFrontendDeployment] - restart of all backend app in [%s] success !", CmiiOperator.CurrentNamespace)
}
func UpdateCmiiDeploymentImageTag(cmiiEnv, appName, newTag string) bool {
@@ -233,6 +278,7 @@ func BackupAllDeploymentFromEnv(cmiiEnv string) bool {
func BackupAllCmiiDeploymentToMap(cmiiEnv string) (backendMap, frontendMap map[string]int32) {
allInterface := CmiiOperator.DeploymentAllInterface(cmiiEnv)
allInterface = FilterAllCmiiAppSoft(allInterface)
backendMap = make(map[string]int32, len(allInterface))
frontendMap = make(map[string]int32, len(allInterface))
@@ -247,3 +293,58 @@ func BackupAllCmiiDeploymentToMap(cmiiEnv string) (backendMap, frontendMap map[s
return backendMap, frontendMap
}
func FilterAllCmiiAppStrict(source []CmiiDeploymentInterface) (result []CmiiDeploymentInterface) {
for _, c := range source {
_, ok := CmiiBackendAppMap[c.ContainerName]
if !ok {
_, ok = CmiiFrontendAppMap[c.ContainerName]
if !ok {
log.WarnF("[FilterAllCmiiAppStrict] - [%s] not cmii pod ", c.ContainerName)
continue
}
}
result = append(result, c)
}
return result
}
func FilterAllCmiiAppSoft(source []CmiiDeploymentInterface) (result []CmiiDeploymentInterface) {
for _, c := range source {
if strings.Contains(c.ContainerName, "redis") {
continue
}
if strings.Contains(c.ContainerName, "emqxs") {
continue
}
if strings.Contains(c.ContainerName, "rabbitmq") {
continue
}
if strings.Contains(c.ContainerName, "nacos") {
continue
}
if strings.Contains(c.ContainerName, "oss") {
continue
}
if strings.Contains(c.ContainerName, "minio") {
continue
}
if strings.HasPrefix(c.ContainerName, "nfs") {
continue
}
if strings.HasPrefix(c.ContainerName, "operator") {
continue
}
if strings.HasPrefix(c.ContainerName, "proxy") {
continue
}
result = append(result, c)
}
return result
}

View File

@@ -6,9 +6,10 @@ import (
"testing"
)
func TestFindDeploymentRestartCountGreaterThanN(t *testing.T) {
func TestFindAppNotHealthyOrRestartCountGreaterThanN(t *testing.T) {
deploymentRestartCountGreaterThanN := FindDeploymentRestartCountGreaterThanN("devflight", 10)
deploymentRestartCountGreaterThanN := FindAppNotHealthyOrRestartCountGreaterThanN("devflight", 10)
deploymentRestartCountGreaterThanN = FilterAllCmiiAppSoft(deploymentRestartCountGreaterThanN)
for _, deploymentInterface := range deploymentRestartCountGreaterThanN {
println()
@@ -57,3 +58,17 @@ func TestRollBackCmiiDeploymentFromUpdateLog(t *testing.T) {
assert.Equal(t, updateLog, true, "roll back from update log failed !")
}
func TestRestartCmiiBackendDeployment(t *testing.T) {
RestartCmiiBackendDeployment("devflight")
}
func TestFindDeploymentNotHealthy(t *testing.T) {
notHealthy := FindDeploymentNotHealthy("devflight")
notHealthy = FilterAllCmiiAppSoft(notHealthy)
for _, deploymentInterface := range notHealthy {
utils.BeautifulPrint(deploymentInterface)
}
}

View File

@@ -1,6 +1,7 @@
package k8s_exec
import (
"agent-go/utils"
v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"strings"
@@ -18,11 +19,14 @@ type CmiiPodInterface struct {
HostIP string
NodeName string
RestartCountMap map[string]int32
RestartCount int32
ContainerName string
Image string
ImageTag string
GitBranch string
GitCommit string
PodStatus bool
PodPhase corev1.PodPhase
}
type CmiiDeploymentInterface struct {
@@ -104,6 +108,7 @@ func (pod CmiiPodInterface) Convert(podDetail corev1.Pod) CmiiPodInterface {
for _, containerStatus := range containerStatuses {
containerStatusMap[containerStatus.Name] = containerStatus.RestartCount
pod.RestartCount = utils.MaxInt32(pod.RestartCount, containerStatus.RestartCount)
}
pod.Name = podDetail.Name
@@ -113,9 +118,22 @@ func (pod CmiiPodInterface) Convert(podDetail corev1.Pod) CmiiPodInterface {
pod.PodIP = podDetail.Status.PodIP
pod.HostIP = podDetail.Status.HostIP
pod.NodeName = podDetail.Spec.NodeName
//if len(podDetail.ObjectMeta.OwnerReferences) > 0 {
// pod.ReplicaSetName = podDetail.ObjectMeta.OwnerReferences[0].Name
//}
pod.PodPhase = podDetail.Status.Phase
switch podDetail.Status.Phase {
case corev1.PodFailed:
pod.PodStatus = false
break
case corev1.PodPending:
pod.PodStatus = false
break
case corev1.PodReasonUnschedulable:
pod.PodStatus = false
break
default:
pod.PodStatus = true
break
}
return pod
}

View File

@@ -13,6 +13,7 @@ import (
"k8s.io/client-go/tools/clientcmd"
"strings"
"sync"
"time"
)
var log = logger.Log
@@ -241,19 +242,6 @@ func (op *CmiiK8sOperator) DeploymentAllInterface(cmiiEnv string) []CmiiDeployme
index := 0
for deployment := range ccc {
// 过滤所有非CMII的内容
if strings.Contains(deployment.Name, "proxy") {
continue
}
if strings.Contains(deployment.Name, "minio") {
continue
}
if strings.HasPrefix(deployment.Name, "helm-live-rtsp") {
continue
}
if strings.Contains(deployment.Name, "nfs") {
continue
}
results[index] = deployment
index++
}
@@ -323,7 +311,7 @@ func (op *CmiiK8sOperator) DeploymentScale(cmiiEnv, appFizz string, scaleCount i
Namespace: deployment.Namespace,
},
Spec: autoscalingv1.ScaleSpec{
Replicas: int32(scaleCount),
Replicas: scaleCount,
},
}
@@ -379,6 +367,30 @@ func (op *CmiiK8sOperator) DeploymentUpdateTag(cmiiEnv, appName, newTag string)
return true
}
func (op *CmiiK8sOperator) DeploymentRestart(cmiiEnv, appName string) bool {
op.changeOperatorEnv(cmiiEnv)
result := true
deployment := op.DeploymentOneInterface(cmiiEnv, appName)
if deployment == nil {
log.ErrorF("[DeploymentRestart] - [%s] [%s] not exists !", cmiiEnv, appName)
return false
}
result = op.DeploymentScale(deployment.Namespace, deployment.Name, 0)
if !result {
return result
}
time.Sleep(time.Millisecond * 200)
result = op.DeploymentScale(deployment.Namespace, deployment.Name, deployment.Replicas)
if !result {
return result
}
return result
}
func (op *CmiiK8sOperator) ReplicaSet(cmiiEnv, replicaSetName string) *v1.ReplicaSet {
op.changeOperatorEnv(cmiiEnv)
@@ -451,7 +463,69 @@ func (op *CmiiK8sOperator) PodAll(cmiiEnv string) []corev1.Pod {
index++
}
return results
return results[:index]
}
func (op *CmiiK8sOperator) PodAllInterface(cmiiEnv string) []CmiiPodInterface {
op.changeOperatorEnv(cmiiEnv)
client := op.CurrentClient
podList, err := client.CoreV1().Pods(op.CurrentNamespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
log.ErrorF("[PodAll] - list c in [%s] [%s] error => %s", cmiiEnv, op.CurrentNamespace, err.Error())
return nil
}
pods := podList.Items
length := len(pods)
results := make([]CmiiPodInterface, length)
ccc := make(chan CmiiPodInterface, length)
var wg sync.WaitGroup
podInterface := CmiiPodInterface{}
worker := workerThread
if length <= worker {
for i, pod := range pods {
objectMeta := pod.ObjectMeta
objectMeta.SetAnnotations(nil)
objectMeta.SetManagedFields(nil)
pod.ObjectMeta = objectMeta
results[i] = podInterface.Convert(pod)
}
return results
}
pinch := length / worker
wg.Add(worker)
for splice := 0; splice < worker; splice++ {
go func(podList []corev1.Pod) {
defer wg.Done()
for _, pod := range podList {
objectMeta := pod.ObjectMeta
objectMeta.SetAnnotations(nil)
objectMeta.SetManagedFields(nil)
pod.ObjectMeta = objectMeta
ccc <- podInterface.Convert(pod)
}
}(pods[splice*pinch : utils.MinInt((splice+1)*pinch, length)])
}
go func() {
wg.Wait()
close(ccc)
}()
index := 0
for c := range ccc {
results[index] = c
index++
}
return results[:index]
}
func (op *CmiiK8sOperator) PodFizz(cmiiEnv, appFizz string) (fizzPod []corev1.Pod) {

View File

@@ -52,7 +52,7 @@ func TestCmiiK8sOperator_DeploymentAllInterface(t *testing.T) {
func TestCmiiK8sOperator_DeploymentFizz(t *testing.T) {
start := time.Now()
deploymentFizz := CmiiOperator.DeploymentFizz("demo", "cmii-uav-platform")
deploymentFizz := CmiiOperator.DeploymentFizz("demo", "helm-proxysql")
elapsed := time.Since(start).Milliseconds()
fmt.Printf("执行耗时: %d ms\n", elapsed)
@@ -73,7 +73,7 @@ func TestCmiiK8sOperator_DeploymentFizz(t *testing.T) {
func TestCmiiK8sOperator_DeploymentScale(t *testing.T) {
start := time.Now()
CmiiOperator.DeploymentScale("devflight", "cmii-uav-gis-server", 1)
CmiiOperator.DeploymentScale("devflight", "cmii-uav-depotautoreturn", 0)
elapsed := time.Since(start).Milliseconds()
fmt.Printf("执行耗时: %d ms\n", elapsed)
@@ -112,7 +112,7 @@ func TestCmiiK8sOperator_PodAll(t *testing.T) {
func TestCmiiK8sOperator_PodFizz(t *testing.T) {
start := time.Now()
podList := CmiiOperator.PodFizz("devflight", "cmii-uav-depotautoreturn")
podList := CmiiOperator.PodFizz("devflight", "cmii-uav-data-post-process")
elapsed := time.Since(start).Milliseconds()
fmt.Printf("执行耗时: %d ms\n", elapsed)
t.Logf("pod list lenght is => %d", len(podList))