[ server ] [ agent ]- 收集Agent的版本信息

This commit is contained in:
zeaslity
2023-02-07 16:26:22 +08:00
parent f4e636a368
commit 602e3c3034
20 changed files with 334 additions and 84 deletions

View File

@@ -0,0 +1,43 @@
package io.wdd.agent.agent;
import io.wdd.agent.config.beans.init.AgentServerInfo;
import io.wdd.agent.message.OMessageToServerSender;
import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.utils.TimeUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Service
@Slf4j
public class AgentOperationInfoService {
@Resource
AgentServerInfo agentServerInfo;
@Resource
OMessageToServerSender oMessageToServerSender;
public void AgentOpInfo(OctopusMessage order){
}
public void AgentOpVersion(OctopusMessage octopusMessage){
// 收集版本信息
String agentVersion = agentServerInfo.getAgentVersion();
// 构造结果OM
octopusMessage.setAc_time(TimeUtils.currentTime());
octopusMessage.setResult(agentVersion);
// 发送相应的OM至 OctopusToServer 中
oMessageToServerSender.send(octopusMessage);
}
}

View File

@@ -110,6 +110,9 @@ public class AgentServerInfo {
@Value("${machineId}")
private String machineId;
@Value("${agentVersion}")
private String agentVersion;
/*
* get from octopus server at the end of initialization
*

View File

@@ -1,7 +1,12 @@
package io.wdd.agent.config.message.handler;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.agent.agent.AgentOperationInfoService;
import io.wdd.agent.agent.AgentRebootUpdateService;
import io.wdd.common.beans.agent.AgentOperationMessage;
import io.wdd.common.beans.agent.AgentOperationType;
import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
import lombok.extern.slf4j.Slf4j;
@@ -17,28 +22,48 @@ public class OMHandlerAgent extends AbstractOctopusMessageHandler {
@Resource
AgentRebootUpdateService agentRebootUpdateService;
@Resource
AgentOperationInfoService agentOperationInfoService;
@Resource
ObjectMapper objectMapper;
@Override
public boolean handle(OctopusMessage octopusMessage) {
if (!octopusMessage.getType().equals(OctopusMessageType.AGENT)) {
if (!octopusMessage
.getType()
.equals(OctopusMessageType.AGENT)) {
return next.handle(octopusMessage);
}
AgentOperationMessage operationMessage = (AgentOperationMessage) octopusMessage.getContent();
String operationName = operationMessage.getOperationName();
if (operationName.startsWith("reb")) {
// reboot
agentRebootUpdateService.exAgentReboot(operationMessage);
} else if (operationName.startsWith("upd")) {
// update
agentRebootUpdateService.exAgentUpdate(operationMessage);
} else {
// operation unknown
log.error("Command Agent Operation Unknown! " );
AgentOperationMessage operationMessage;
try {
operationMessage = objectMapper.readValue(
(String) octopusMessage.getContent(),
new TypeReference<AgentOperationMessage>() {
}
);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
AgentOperationType opType = operationMessage.getOpType();
if (opType.equals(AgentOperationType.REBOOT)) {
// reboot
agentRebootUpdateService.exAgentReboot(operationMessage);
} else if (opType.equals(AgentOperationType.UPDATE)) {
// update
agentRebootUpdateService.exAgentUpdate(operationMessage);
} else if (opType.equals(AgentOperationType.VERSION)) {
agentOperationInfoService.AgentOpInfo(octopusMessage);
} else if (opType.equals(AgentOperationType.INFO)) {
agentOperationInfoService.AgentOpVersion(octopusMessage);
} else {
// operation unknown
log.error("Command Agent Operation Unknown! ");
}
return true;
}

View File

@@ -2,7 +2,7 @@ package io.wdd.agent.config.message.handler;
import io.wdd.agent.config.beans.init.AgentServerInfo;
import io.wdd.agent.initialization.message.GenOctopusRabbitMQConnection;
import io.wdd.agent.message.ToServerMessage;
import io.wdd.agent.message.OMessageToServerSender;
import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
import lombok.extern.slf4j.Slf4j;
@@ -26,7 +26,7 @@ public class OMHandlerInit extends AbstractOctopusMessageHandler {
GenOctopusRabbitMQConnection genOctopusRabbitMQConnection;
@Resource
ToServerMessage toServerMessage;
OMessageToServerSender oMessageToServerSender;
@Resource
AgentServerInfo agentServerInfo;
@@ -52,7 +52,7 @@ public class OMHandlerInit extends AbstractOctopusMessageHandler {
octopusMessage.setResult(success);
// log.info(success);
toServerMessage.send(octopusMessage);
oMessageToServerSender.send(octopusMessage);
return true;
}

View File

@@ -52,58 +52,6 @@ public class CommandExecutor {
}
/*public int executeScript(String streamKey, List<String> commandList) {
ProcessBuilder processBuilder = new ProcessBuilder(commandList);
processBuilder.redirectErrorStream(true);
// processBuilder.inheritIO();
processBuilder.directory(new File(System.getProperty("user.home")));
int processResult = 233;
try {
Process process = processBuilder.start();
// start a backend thread to daemon the process
// wait for processMaxWaitSeconds and kill the process if it's still alived
DaemonCommandProcess.submit(
StopStuckCommandProcess(
process,
processMaxWaitSeconds
));
// todo this will stuck the process and rabbitmq message will reentry the queue
// get the command result must also be a timeout smaller than the process
boolean waitFor = process.waitFor(
50,
TimeUnit.SECONDS
);
// get the process result
if (ObjectUtils.isNotEmpty(waitFor) && ObjectUtils.isNotEmpty(process)) {
processResult = process.exitValue();
}
log.debug(
"current shell command {} result is {}",
processBuilder.command(),
processResult
);
} catch (IOException | InterruptedException e) {
log.error(
"Shell command error ! {} + {}",
e.getCause(),
e.getMessage()
);
}
return processResult;
}*/
public int execute(String streamKey, List<String> command) {
ProcessBuilder processBuilder = new ProcessBuilder(command);

View File

@@ -3,7 +3,7 @@ package io.wdd.agent.initialization.bootup;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import io.wdd.agent.config.beans.init.AgentServerInfo;
import io.wdd.agent.message.ToServerMessage;
import io.wdd.agent.message.OMessageToServerSender;
import io.wdd.agent.message.handler.OctopusMessageHandler;
import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.handler.MyRuntimeException;
@@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit;
public class OctopusAgentInitService {
@Resource
ToServerMessage toServerMessage;
OMessageToServerSender oMessageToServerSender;
@Autowired
OctopusMessageHandler octopusMessageHandler;
@@ -43,7 +43,7 @@ public class OctopusAgentInitService {
public void SendInfoToServer(AgentServerInfo agentServerInfo) {
toServerMessage.sendInitInfo(agentServerInfo, defaultInitRegisterTimeOut);
oMessageToServerSender.sendInitInfo(agentServerInfo, defaultInitRegisterTimeOut);
}

View File

@@ -20,7 +20,7 @@ import java.time.LocalDateTime;
@Service
@Slf4j(topic = "To Octopus Server Message")
public class ToServerMessage {
public class OMessageToServerSender {
@Resource
InitRabbitMQConnector initRabbitMqConnector;

View File

@@ -14,11 +14,22 @@ import java.time.LocalDateTime;
@SuperBuilder(toBuilder = true)
public class AgentOperationMessage {
private String operationName;
/**
* 执行Agent Operation操作的类型
*/
private AgentOperationType opType;
/**
* 需要执行的目标时间,
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
@Deprecated
private LocalDateTime operationTime;
/**
* 期望升级到的目标版本
*/
private String updateVersion;
}

View File

@@ -0,0 +1,18 @@
package io.wdd.common.beans.agent;
public enum AgentOperationType {
// 上报版本信息
VERSION,
// 上报核心信息
INFO,
REBOOT,
UPDATE,
//关键操作,关闭Agent 只能通过此种方式完成
SHUTDOWN
}

View File

@@ -20,6 +20,9 @@ public class OctopusMessage {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
LocalDateTime init_time;
/**
* 执行操作的类型
*/
OctopusMessageType type;
// server send message content
@@ -28,6 +31,9 @@ public class OctopusMessage {
// agent reply message content
Object result;
/**
* Agent 完成操作的时间
*/
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
LocalDateTime ac_time;

View File

@@ -0,0 +1,16 @@
package io.wdd.rpc.agent;
import java.util.Map;
public interface OctopusAgentService {
/**
* 获取所有Agent的版本信息,附带最新的版本信息
* 超时时间为 5s
* @return key - AgentTopicName value - version Info
*/
Map<String, String> getAllAgentVersion();
}

View File

@@ -0,0 +1,102 @@
package io.wdd.rpc.agent;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.wdd.common.beans.agent.AgentOperationMessage;
import io.wdd.common.beans.agent.AgentOperationType;
import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.message.sender.OMessageToAgentSender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_AGENT_TOPIC_NAME_LIST;
@Service
@Slf4j
public class OctopusAgentServiceImpl implements OctopusAgentService{
@Resource
OMessageToAgentSender oMessageToAgentSender;
@Resource
ObjectMapper objectMapper;
@Override
public Map<String, String> getAllAgentVersion() {
HashMap<String, String> result = new HashMap<>();
// 查询获取到最新的Agent版本信息
result.put("LATEST_VERSION","2023-02-06-09-23-00");
// 获取Agent的版本信息 -- 超时时间
// 发送OctopusMessage-Agent
// 从OctopusToServer中收集到所有Agent的版本信息
// 组装信息至集合中
buildAndSendToAllAgent(AgentOperationType.VERSION);
waitCollectAllAgentVersionInfo(result);
return result;
}
private void waitCollectAllAgentVersionInfo(HashMap<String, String> result) {
}
private void buildAndSendToAllAgent(AgentOperationType operationType) {
List<OctopusMessage> octopusMessageList = ALL_AGENT_TOPIC_NAME_LIST
.stream()
.map(
agentTopicName ->
ConstructAgentOperationMessage(
agentTopicName,
operationType
)
)
.collect(Collectors.toList());
// 发送相应的消息
oMessageToAgentSender.send(octopusMessageList);
}
/**
* 专门构造 Agent 类型的 OctopusMessage
* 通常只能在此类中使用
*
* @param agentTopicName
* @param operationType
* @return
*/
private OctopusMessage ConstructAgentOperationMessage(String agentTopicName, AgentOperationType operationType) {
AgentOperationMessage operationMessage = AgentOperationMessage
.builder()
.opType(operationType)
.build();
String ops;
try {
ops = objectMapper.writeValueAsString(operationMessage);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return OctopusMessage
.builder()
.type(OctopusMessageType.AGENT)
.uuid(agentTopicName)
.init_time(TimeUtils.currentTime())
.content(ops)
.build();
}
}

View File

@@ -0,0 +1,29 @@
package io.wdd.rpc.controller;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.wdd.common.beans.response.R;
import io.wdd.rpc.agent.OctopusAgentService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Map;
@RestController
@RequestMapping("/octopus/server/agent")
@Api("处理Agent核心内容的Controller")
public class AgentController {
@Resource
OctopusAgentService octopusAgentService;
@GetMapping("/version")
@ApiOperation("获取所有OctopusAgent的版本")
public R<Map<String, String>> getAllAgentVersion(){
return R.ok(octopusAgentService.getAllAgentVersion());
}
}

View File

@@ -8,7 +8,7 @@ import io.wdd.common.beans.rabbitmq.OctopusMessageType;
import io.wdd.common.handler.MyRuntimeException;
import io.wdd.rpc.execute.config.ExecutionLog;
import io.wdd.rpc.execute.result.BuildStreamReader;
import io.wdd.rpc.message.sender.ToAgentMessageSender;
import io.wdd.rpc.message.sender.OMessageToAgentSender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
@@ -27,7 +27,7 @@ import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_AGENT_TOPIC_NAME_SET;
public class CoreExecutionServiceImpl implements CoreExecutionService {
@Resource
ToAgentMessageSender toAgentMessageSender;
OMessageToAgentSender oMessageToAgentSender;
@Resource
ObjectMapper objectMapper;
@@ -110,7 +110,7 @@ public class CoreExecutionServiceImpl implements CoreExecutionService {
);
// send the message
toAgentMessageSender.send(octopusMessage);
oMessageToAgentSender.send(octopusMessage);
// set up the stream read group
String group = redisTemplate

View File

@@ -7,7 +7,7 @@ import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
import io.wdd.common.beans.status.AgentStatus;
import io.wdd.common.handler.MyRuntimeException;
import io.wdd.rpc.message.sender.ToAgentMessageSender;
import io.wdd.rpc.message.sender.OMessageToAgentSender;
import io.wdd.server.beans.vo.ServerInfoVO;
import io.wdd.server.utils.DaemonDatabaseOperator;
import lombok.SneakyThrows;
@@ -27,7 +27,6 @@ import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* The type Accept boot up info message.
@@ -63,7 +62,7 @@ public class AcceptAgentInitInfo {
* The To agent order.
*/
@Resource
ToAgentMessageSender toAgentMessageSender;
OMessageToAgentSender oMessageToAgentSender;
/**
@@ -208,7 +207,7 @@ public class AcceptAgentInitInfo {
.uuid(serverInfoVO.getTopicName())
.build();
toAgentMessageSender.sendINIT(octopusMessage);
oMessageToAgentSender.sendINIT(octopusMessage);
return true;
}

View File

@@ -20,7 +20,7 @@ import java.util.List;
*/
@Component
@Slf4j(topic = "Send Message To Octopus Agent ")
public class ToAgentMessageSender {
public class OMessageToAgentSender {
@Resource
RabbitTemplate rabbitTemplate;

View File

@@ -6,7 +6,7 @@ import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessageType;
import io.wdd.common.beans.status.OctopusStatusMessage;
import io.wdd.common.utils.TimeUtils;
import io.wdd.rpc.message.sender.ToAgentMessageSender;
import io.wdd.rpc.message.sender.OMessageToAgentSender;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@@ -22,7 +22,7 @@ import java.util.stream.Collectors;
public class CollectAgentStatus {
@Resource
ToAgentMessageSender toAgentMessageSender;
OMessageToAgentSender oMessageToAgentSender;
@Resource
ObjectMapper objectMapper;
@@ -45,7 +45,7 @@ public class CollectAgentStatus {
).collect(Collectors.toList());
// batch send all messages to RabbitMQ
toAgentMessageSender.send(octopusMessageList);
oMessageToAgentSender.send(octopusMessageList);
// todo how to get result ?
}

View File

@@ -0,0 +1,36 @@
package io.wdd.server.config;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.*;
public class ServerCommonPool {
public static ExecutorService pool;
static {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setPriority(5)
.setNameFormat("server-pool-%d")
.setDaemon(true)
.build();
pool = new ThreadPoolExecutor(
10,
30,
500L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10,true),
threadFactory,
new ThreadPoolExecutor.AbortPolicy()
);
}
}

View File

@@ -0,0 +1,5 @@
#!/bin/bash
# 完整的更新Octopus Agent
# 删除旧的Agent

View File

@@ -29,6 +29,7 @@ virtualization=""
ioSpeed=""
machineId=""
archInfo=""
agentVersion=""
### tmp usage
ioavg=""
@@ -139,6 +140,13 @@ StartIOTest() {
}
GetAgentLatestVersion(){
ls /octopus-agent | grep "octopus-agent" | cut -d "-" -f3- | cut -d"." -f1
}
#######################################
# description
# Globals:
@@ -332,6 +340,7 @@ tcpControl=$tcpctrl
virtualization=$virt
ioSpeed="$ioavg MB/s"
machineId=$(cat /etc/machine-id)
agentVersion=$(ls /octopus-agent | grep "octopus-agent" | cut -d "-" -f3- | cut -d"." -f1)
EOF
log "env collect complete!"