Files
ProjectOctopus/agent-go/executor/K8sFunction.go
2023-11-22 11:31:56 +08:00

328 lines
9.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package executor
import (
"fmt"
"k8s.io/apimachinery/pkg/api/errors"
"os"
"strings"
"sync"
"time"
"context"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
var k8sConfigFilePath = "/root/wdd/kube_config_cluster.yml"
var k8sClient = newK8sClientInstance()
func newK8sClientInstance() *kubernetes.Clientset {
once := sync.Once{}
if !BasicFileExistAndNotNull(k8sConfigFilePath) {
log.WarnF("[newK8sClientInstance] - k8s config %s does not exist ! ", k8sConfigFilePath)
return nil
}
var client *kubernetes.Clientset
once.Do(func() {
// 使用kubeconfig文件初始化客户端
config, err := clientcmd.BuildConfigFromFlags("", k8sConfigFilePath)
if err != nil {
log.ErrorF("[newK8sClientInstance] - load from %s error !", k8sConfigFilePath)
}
client, err = kubernetes.NewForConfig(config)
if err != nil {
log.Error("[newK8sClientInstance] - create k8s client error !")
}
})
return client
}
func K8sCheckPodStatusTimeout(specificPod string, supreme string, waitTimeOut int) bool {
// 设置超时时间和时间间隔
timeout := time.After(time.Duration(waitTimeOut) * time.Second)
tick := time.Tick(5 * time.Second)
// 监控Pod状态
for {
select {
case <-timeout:
log.ErrorF("[K8sCheckPodStatusTimeout] - 命名空间: [%s], Pod名称: [%s], 状态: 失败!", supreme, specificPod)
return false
case <-tick:
pod, err := k8sClient.CoreV1().Pods(supreme).Get(context.TODO(), specificPod, metav1.GetOptions{})
if err != nil {
log.ErrorF("[K8sCheckPodStatusTimeout] - 命名空间: [%s], Pod名称: [%s], 获取Pod信息失败 ", supreme, err.Error())
} else {
log.DebugF("[K8sCheckPodStatusTimeout] - 命名空间: [%s], Pod名称: [%s], 状态: [%s]", supreme, pod.Name, pod.Status.Phase)
if pod.Status.Phase == corev1.PodRunning || pod.Status.Phase == corev1.PodSucceeded {
return true
}
}
}
}
}
func K8sCheckDeploymentStatusTimeout(specificDeployment string, supreme string, waitTimeOut int) bool {
// 设置超时时间和时间间隔
timeout := time.After(time.Duration(waitTimeOut) * time.Second)
tick := time.Tick(5 * time.Second)
// 监控Pod状态
for {
select {
case <-timeout:
log.ErrorF("[K8sCheckDeploymentStatusTimeout] - 命名空间: %s, Deployment名称: %s, 状态: 失败!\n", supreme, specificDeployment)
return false
case <-tick:
deployment, err := k8sClient.AppsV1().Deployments(supreme).Get(context.TODO(), specificDeployment, metav1.GetOptions{})
if err != nil {
log.ErrorF("[K8sCheckDeploymentStatusTimeout] - 命名空间: [%s], Deployment 名称: [%s], 获取Deployment信息失败 ", supreme, err.Error())
} else {
log.DebugF("[K8sCheckDeploymentStatusTimeout] - 命名空间: [ %s ], Deployment: [ %s ] 还有Pods未处于Running状态 (Ready: %d, Total: %d)\n", supreme, deployment.Name, deployment.Status.ReadyReplicas, deployment.Status.Replicas)
if deployment.Status.ReadyReplicas == deployment.Status.Replicas {
return true
}
}
}
}
}
func K8sListPVCInNamespace(supreme string) (bool, []string) {
if k8sClient == nil {
log.ErrorF("k8s client is nil, run k8s function error !")
return false, nil
}
pvcs, err := k8sClient.CoreV1().PersistentVolumeClaims(supreme).List(context.TODO(), metav1.ListOptions{})
if err != nil {
log.ErrorF("[K8sListPVCInNamespace] - error list pvc list in namespace %s", supreme)
return false, nil
}
var pvcList []string
for _, pvc := range pvcs.Items {
pvcList = append(pvcList, pvc.Name)
}
log.DebugF("K8sListPVCInNamespace - all pvc in namespace of [ %s ] are %v", supreme, pvcList)
return true, pvcList
}
func K8sCheckPVCStatusTimeOut(specificPvcName string, supreme string, waitTimeOut int) bool {
if k8sClient == nil {
log.ErrorF("k8s client is nil, run k8s function error !")
return false
}
// 设置超时时间和时间间隔
timeout := time.After(time.Duration(waitTimeOut) * time.Second)
tick := time.Tick(5 * time.Second)
// 监控Pod状态
for {
select {
case <-timeout:
log.ErrorF("[K8sCheckPVCStatusTimeOut] - 命名空间: %s, PVC 名称: %s, 状态: 失败! ", supreme, specificPvcName)
return false
case <-tick:
pvc, err := k8sClient.CoreV1().PersistentVolumeClaims(supreme).Get(context.TODO(), specificPvcName, metav1.GetOptions{})
if err != nil {
log.ErrorF("[K8sCheckPVCStatusTimeOut] - 命名空间: [ %s ], 获取 PVC [%s] 信息失败: %s ", supreme, specificPvcName, err.Error())
}
if pvc.Status.Phase == corev1.ClaimBound {
log.DebugF("[K8sCheckPVCStatusTimeOut] - PVC %s in namespace %s is running", specificPvcName, supreme)
return true
} else {
log.WarnF("[K8sCheckPVCStatusTimeOut] - PVC %s in namespace %s run failed !", specificPvcName, supreme)
return false
}
}
}
}
func KubectlCheckPodStatus(specific string, supreme string) bool {
if !BasicCommandExists("kubectl") {
log.Error("kubectl命令不存在无法查看Pod状态请查看")
}
ok, resultLog := AllCommandExecutor([]string{
"kubectl", "-n", supreme, "get", "pod", specific, "-o", "jsonpath='{.status.phase}'",
})
if !ok {
return false
}
for _, resultLine := range resultLog {
if strings.HasPrefix(resultLine, "Running") {
return true
}
}
return false
}
func KubectlApplyExec(resourcesYamlFile string) (bool, []string) {
// check kubectl
if !BasicCommandExistByPath("kubectl") {
return false, []string{
"[KubectlApplyExec] - kubectl command not exist !",
}
}
// check resourcesYamlFile
if !BasicFileExistAndNotNull(resourcesYamlFile) {
return false, []string{
fmt.Sprintf("[KubectlApplyExec] - wrong resourcesYamlFile %s not exist !", resourcesYamlFile),
}
}
// apply -f
ok, resultLog := AllCommandExecutor([]string{
"/usr/local/bin/kubectl",
"apply",
"-f",
resourcesYamlFile,
})
if !ok {
return false, resultLog
}
return true, append(resultLog,
fmt.Sprintf("[KubectlApplyExec] - %s apply success!", resourcesYamlFile))
}
func KubectlDeleteExec(resourcesYamlFile string) (bool, []string) {
// check kubectl
if !BasicCommandExistByPath("kubectl") {
return false, []string{
"[KubectlDeleteExec] - kubectl command not exist !",
}
}
// check resourcesYamlFile
if !BasicFileExistAndNotNull(resourcesYamlFile) {
return false, []string{
fmt.Sprintf("[KubectlDeleteExec] - wrong resourcesYamlFile %s not exist !", resourcesYamlFile),
}
}
// apply -f
ok, resultLog := AllCommandExecutor([]string{
"/usr/local/bin/kubectl",
"delete",
"-f",
resourcesYamlFile,
})
if !ok {
return false, resultLog
}
return true, append(resultLog,
fmt.Sprintf("[KubectlDeleteExec] - %s delete success!", resourcesYamlFile))
}
func K8sCreateNamespace(namespaceName string) bool {
if k8sClient == nil {
log.ErrorF("k8s client is nil, run k8s function error !")
return false
}
namespace, err := k8sClient.CoreV1().Namespaces().Get(context.TODO(), namespaceName, metav1.GetOptions{})
if err != nil {
log.ErrorF("Error getting namespace: %s ", err.Error())
return false
}
if namespace != nil {
log.InfoF("[K8sCreateNamespace] - namespace of [%s] already exists!", namespaceName)
return true
}
// create namespace
// 创建命名空间对象
namespace = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: namespaceName,
},
}
// 使用客户端创建命名空间
namespace, err = k8sClient.CoreV1().Namespaces().Create(context.TODO(), namespace, metav1.CreateOptions{})
if err != nil {
log.ErrorF("Error getting namespace: %s ", err.Error())
return false
}
// check namespace exists
// 尝试获取名为 "xxg" 的命名空间
namespace, err = k8sClient.CoreV1().Namespaces().Get(context.TODO(), namespaceName, metav1.GetOptions{})
// 如果返回错误,需要判断是因为命名空间不存在还是其他错误
if err != nil {
if errors.IsNotFound(err) {
log.ErrorF("Namespace %s cant be got !", namespaceName)
return false
} else {
log.ErrorF("Error retrieving namespace: %s\n", err.Error())
}
return false
}
log.DebugF("Namespace %s create successful !", namespaceName)
return true
}
func K8sGetDashBoardAuthKey() {
// 获取 kube-system 命名空间的 secrets 列表
secrets, err := k8sClient.CoreV1().Secrets("kube-system").List(context.TODO(), metav1.ListOptions{})
if err != nil {
fmt.Printf("Error retrieving secrets from kube-system namespace: %s\n", err.Error())
os.Exit(1)
}
// 过滤出名为 admin-user 的 secret
var adminUserSecretName string
for _, secret := range secrets.Items {
if strings.Contains(secret.Name, "admin-user") {
adminUserSecretName = secret.Name
break
}
}
if adminUserSecretName == "" {
fmt.Println("No admin-user secret found")
os.Exit(1)
}
// 获取并打印特定的 secret 描述信息
secret, err := k8sClient.CoreV1().Secrets("kube-system").Get(context.TODO(), adminUserSecretName, metav1.GetOptions{})
if err != nil {
fmt.Printf("Error retrieving secret %s: %s\n", adminUserSecretName, err.Error())
os.Exit(1)
}
// 打印 secret 的详细信息,根据需要格式化输出
fmt.Printf("Name: %s\nNamespace: %s\nData:\n", secret.Name, secret.Namespace)
for key, value := range secret.Data {
fmt.Printf("%s: %s\n", key, value)
}
}