[ server] [ agent ] - 完成核心信息,关机的功能
This commit is contained in:
@@ -21,6 +21,13 @@ public class AgentOperationInfoService {
|
|||||||
|
|
||||||
public void AgentOpInfo(OctopusMessage octopusMessage){
|
public void AgentOpInfo(OctopusMessage octopusMessage){
|
||||||
|
|
||||||
|
// 构造结果OM
|
||||||
|
octopusMessage.setAc_time(TimeUtils.currentTime());
|
||||||
|
octopusMessage.setResult(agentServerInfo);
|
||||||
|
|
||||||
|
// 发送相应的OM至 OctopusToServer 中
|
||||||
|
oMessageToServerSender.send(octopusMessage);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void AgentOpVersion(OctopusMessage octopusMessage){
|
public void AgentOpVersion(OctopusMessage octopusMessage){
|
||||||
|
|||||||
@@ -1,12 +1,26 @@
|
|||||||
package io.wdd.agent.agent;
|
package io.wdd.agent.agent;
|
||||||
|
|
||||||
|
|
||||||
|
import io.wdd.agent.config.utils.AgentCommonThreadPool;
|
||||||
|
import io.wdd.agent.executor.CommandExecutor;
|
||||||
import io.wdd.agent.executor.FunctionExecutor;
|
import io.wdd.agent.executor.FunctionExecutor;
|
||||||
|
import io.wdd.agent.message.OMessageToServerSender;
|
||||||
import io.wdd.common.beans.agent.AgentOperationMessage;
|
import io.wdd.common.beans.agent.AgentOperationMessage;
|
||||||
|
import io.wdd.common.beans.rabbitmq.OctopusMessage;
|
||||||
|
import io.wdd.common.utils.TimeUtils;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static io.wdd.common.beans.status.OctopusStatusMessage.ALL_AGENT_STATUS_REDIS_KEY;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@@ -15,6 +29,15 @@ public class AgentRebootUpdateService {
|
|||||||
@Resource
|
@Resource
|
||||||
FunctionExecutor functionExecutor;
|
FunctionExecutor functionExecutor;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
CommandExecutor commandExecutor;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
OMessageToServerSender toServerSender;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
RedisTemplate redisTemplate;
|
||||||
|
|
||||||
public void exAgentReboot(AgentOperationMessage operationMessage) {
|
public void exAgentReboot(AgentOperationMessage operationMessage) {
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -22,4 +45,91 @@ public class AgentRebootUpdateService {
|
|||||||
public void exAgentUpdate(AgentOperationMessage operationMessage) {
|
public void exAgentUpdate(AgentOperationMessage operationMessage) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void exAgentShutdown(OctopusMessage octopusMessage, AgentOperationMessage operationMessage) {
|
||||||
|
|
||||||
|
LocalDateTime now = TimeUtils.currentTime();
|
||||||
|
LocalDateTime operationTime = operationMessage.getOperationTime();
|
||||||
|
Duration duration = Duration.between(
|
||||||
|
now,
|
||||||
|
operationTime
|
||||||
|
);
|
||||||
|
|
||||||
|
long seconds = duration.toSeconds();
|
||||||
|
|
||||||
|
// 发送消息至RabbitMQ中
|
||||||
|
octopusMessage.setAc_time(now);
|
||||||
|
octopusMessage.setResult(
|
||||||
|
String.format(
|
||||||
|
"OctopusAgent [ %s ] 将会在 [ %s ] 秒之后关机",
|
||||||
|
octopusMessage.getUuid(),
|
||||||
|
seconds
|
||||||
|
)
|
||||||
|
);
|
||||||
|
toServerSender.send(
|
||||||
|
octopusMessage
|
||||||
|
);
|
||||||
|
|
||||||
|
ExecutorService pool = AgentCommonThreadPool.pool;
|
||||||
|
|
||||||
|
// 休眠,阻塞当前线程
|
||||||
|
if (seconds > 0) {
|
||||||
|
|
||||||
|
pool.submit(
|
||||||
|
() -> {
|
||||||
|
|
||||||
|
// keep this to local
|
||||||
|
OctopusMessage message = octopusMessage;
|
||||||
|
|
||||||
|
try {
|
||||||
|
TimeUnit.SECONDS.sleep(seconds);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 执行实际的关机操作
|
||||||
|
doShutdownAgent(message);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// 不需要等待,直接关机
|
||||||
|
doShutdownAgent(octopusMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doShutdownAgent(OctopusMessage octopusMessage) {
|
||||||
|
|
||||||
|
ArrayList<String> shutdownCommand = new ArrayList<>(
|
||||||
|
List.of(
|
||||||
|
"systemctl",
|
||||||
|
"poweroff"
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
//发送消息至Redis中, 修改Agent的状态
|
||||||
|
redisTemplate
|
||||||
|
.opsForHash()
|
||||||
|
.put(
|
||||||
|
ALL_AGENT_STATUS_REDIS_KEY,
|
||||||
|
octopusMessage.getUuid(),
|
||||||
|
"0"
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
// 执行关机操作
|
||||||
|
log.error(
|
||||||
|
"开始关闭OctopusAgent! 时间为 [ {} ]",
|
||||||
|
TimeUtils.currentTimeString()
|
||||||
|
);
|
||||||
|
String streamKey = octopusMessage.getUuid() + "-Status";
|
||||||
|
|
||||||
|
// 最终执行关机操作
|
||||||
|
commandExecutor.execute(
|
||||||
|
streamKey,
|
||||||
|
shutdownCommand
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -66,6 +66,10 @@ public class OMHandlerAgent extends AbstractOctopusMessageHandler {
|
|||||||
// 收集Agent的核心Info信息
|
// 收集Agent的核心Info信息
|
||||||
agentOperationInfoService.AgentOpInfo(octopusMessage);
|
agentOperationInfoService.AgentOpInfo(octopusMessage);
|
||||||
|
|
||||||
|
} else if (opType.equals(AgentOperationType.SHUTDOWN)) {
|
||||||
|
|
||||||
|
// 关闭Agent的接口
|
||||||
|
agentRebootUpdateService.exAgentShutdown(octopusMessage, operationMessage);
|
||||||
} else {
|
} else {
|
||||||
// operation unknown
|
// operation unknown
|
||||||
log.error("Command Agent Operation Unknown! ");
|
log.error("Command Agent Operation Unknown! ");
|
||||||
|
|||||||
@@ -29,9 +29,6 @@ public class OMHandlerStatus extends AbstractOctopusMessageHandler {
|
|||||||
@Resource
|
@Resource
|
||||||
AgentStatusCollector agentStatusCollector;
|
AgentStatusCollector agentStatusCollector;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean handle(OctopusMessage octopusMessage) {
|
public boolean handle(OctopusMessage octopusMessage) {
|
||||||
|
|
||||||
|
|||||||
@@ -75,7 +75,7 @@ public class CommandExecutor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public int processExecute(String streamKey, ProcessBuilder processBuilder) {
|
private int processExecute(String streamKey, ProcessBuilder processBuilder) {
|
||||||
|
|
||||||
processBuilder.redirectErrorStream(true);
|
processBuilder.redirectErrorStream(true);
|
||||||
// processBuilder.inheritIO();
|
// processBuilder.inheritIO();
|
||||||
|
|||||||
@@ -23,7 +23,6 @@ public class AgentOperationMessage {
|
|||||||
* 需要执行的目标时间,
|
* 需要执行的目标时间,
|
||||||
*/
|
*/
|
||||||
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
|
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
|
||||||
@Deprecated
|
|
||||||
private LocalDateTime operationTime;
|
private LocalDateTime operationTime;
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package io.wdd.rpc.agent;
|
package io.wdd.rpc.agent;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import io.wdd.common.beans.agent.AgentOperationMessage;
|
import io.wdd.common.beans.agent.AgentOperationMessage;
|
||||||
import io.wdd.common.beans.agent.AgentOperationType;
|
import io.wdd.common.beans.agent.AgentOperationType;
|
||||||
@@ -25,6 +26,7 @@ import java.util.concurrent.ExecutorService;
|
|||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_AGENT_TOPIC_NAME_SET;
|
||||||
import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST;
|
import static io.wdd.rpc.init.ServerBootUpEnvironment.ALL_HEALTHY_AGENT_TOPIC_NAME_LIST;
|
||||||
import static io.wdd.rpc.message.handler.OctopusMessageHandler.AGENT_LATEST_VERSION;
|
import static io.wdd.rpc.message.handler.OctopusMessageHandler.AGENT_LATEST_VERSION;
|
||||||
import static io.wdd.rpc.message.handler.OctopusMessageHandler.OCTOPUS_MESSAGE_FROM_AGENT;
|
import static io.wdd.rpc.message.handler.OctopusMessageHandler.OCTOPUS_MESSAGE_FROM_AGENT;
|
||||||
@@ -100,12 +102,125 @@ public class OctopusAgentServiceImpl implements OctopusAgentService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, ServerInfoVO> getAllAgentCoreInfo() {
|
public Map<String, ServerInfoVO> getAllAgentCoreInfo() {
|
||||||
return null;
|
|
||||||
|
HashMap<String, ServerInfoVO> result = new HashMap<>();
|
||||||
|
|
||||||
|
// 获取Agent的版本信息 -- 超时时间
|
||||||
|
// 发送OctopusMessage-Agent
|
||||||
|
// 从OctopusToServer中收集到所有Agent的版本信息
|
||||||
|
// 组装信息至集合中
|
||||||
|
LocalDateTime currentTime = TimeUtils.currentFormatTime();
|
||||||
|
|
||||||
|
buildAndSendToAllAgent(
|
||||||
|
AgentOperationType.INFO,
|
||||||
|
currentTime
|
||||||
|
);
|
||||||
|
|
||||||
|
CountDownLatch countDownLatch = new CountDownLatch(ALL_HEALTHY_AGENT_TOPIC_NAME_LIST.size());
|
||||||
|
|
||||||
|
CompletableFuture<Void> getAllAgentCoreInfoFuture = waitCollectAllAgentCoreInfo(
|
||||||
|
result,
|
||||||
|
currentTime,
|
||||||
|
countDownLatch
|
||||||
|
);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 超时等待5秒钟, 或者所有的Agent均已经完成上报
|
||||||
|
countDownLatch.await(
|
||||||
|
5,
|
||||||
|
TimeUnit.SECONDS
|
||||||
|
);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
log.warn("存在部分Agent没有上报 核心信息!");
|
||||||
|
} finally {
|
||||||
|
// 必须关闭释放线程
|
||||||
|
getAllAgentCoreInfoFuture.cancel(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private CompletableFuture<Void> waitCollectAllAgentCoreInfo(HashMap<String, ServerInfoVO> result, LocalDateTime startTime, CountDownLatch countDownLatch) {
|
||||||
|
|
||||||
|
ExecutorService pool = ServerCommonPool.pool;
|
||||||
|
|
||||||
|
// 从OCTOPUS_MESSAGE_FROM_AGENT中获取符合条件的信息
|
||||||
|
return CompletableFuture.runAsync(
|
||||||
|
() -> {
|
||||||
|
while (true) {
|
||||||
|
if (OCTOPUS_MESSAGE_FROM_AGENT.isEmpty()) {
|
||||||
|
// 开始收集等待 200ms
|
||||||
|
try {
|
||||||
|
TimeUnit.MILLISECONDS.sleep(50);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
// 返回,继续死循环
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
OctopusMessage message = OCTOPUS_MESSAGE_FROM_AGENT.poll();
|
||||||
|
|
||||||
|
// 获取到OM
|
||||||
|
// 判断信息是否是需要的类型
|
||||||
|
// 根据 init_time + OctopusMessageType + AgentOperationMessage
|
||||||
|
if (!judgyIsCurrentServerReplyMessage(
|
||||||
|
message,
|
||||||
|
startTime
|
||||||
|
)) {
|
||||||
|
|
||||||
|
// 不是当前应用需要的的OM,将信息放置与Cache队列的末尾
|
||||||
|
try {
|
||||||
|
OCTOPUS_MESSAGE_FROM_AGENT.put(message);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
// 返回,继续死循环
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 是当前需要的消息信息
|
||||||
|
try {
|
||||||
|
ServerInfoVO serverInfoVO = objectMapper.readValue(
|
||||||
|
(String) message.getResult(),
|
||||||
|
new TypeReference<ServerInfoVO>() {
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
result.put(
|
||||||
|
message.getUuid(),
|
||||||
|
serverInfoVO
|
||||||
|
);
|
||||||
|
|
||||||
|
} catch (JsonProcessingException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
countDownLatch.countDown();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
pool
|
||||||
|
);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String shutdownAgentDanger(String agentTopicName) {
|
public String shutdownAgentDanger(String agentTopicName) {
|
||||||
|
|
||||||
|
if (!ALL_AGENT_TOPIC_NAME_SET.contains(agentTopicName)) {
|
||||||
|
log.error("agentTopicName Error !");
|
||||||
|
return "agentTopicName Error!";
|
||||||
|
}
|
||||||
|
|
||||||
|
LocalDateTime formatTime = TimeUtils.currentFormatTime();
|
||||||
|
buildAndSendToAllAgent(
|
||||||
|
AgentOperationType.SHUTDOWN,
|
||||||
|
formatTime
|
||||||
|
);
|
||||||
|
|
||||||
|
// 是否需要检查相应的状态
|
||||||
|
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import io.swagger.annotations.Api;
|
|||||||
import io.swagger.annotations.ApiOperation;
|
import io.swagger.annotations.ApiOperation;
|
||||||
import io.wdd.common.beans.response.R;
|
import io.wdd.common.beans.response.R;
|
||||||
import io.wdd.rpc.agent.OctopusAgentService;
|
import io.wdd.rpc.agent.OctopusAgentService;
|
||||||
|
import io.wdd.server.beans.vo.ServerInfoVO;
|
||||||
import org.springframework.web.bind.annotation.GetMapping;
|
import org.springframework.web.bind.annotation.GetMapping;
|
||||||
import org.springframework.web.bind.annotation.RequestMapping;
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
import org.springframework.web.bind.annotation.RestController;
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
@@ -26,4 +27,12 @@ public class AgentController {
|
|||||||
return R.ok(octopusAgentService.getAllAgentVersion());
|
return R.ok(octopusAgentService.getAllAgentVersion());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@GetMapping("/coreInfo")
|
||||||
|
@ApiOperation("获取所有OctopusAgent的核心信息")
|
||||||
|
public R<Map<String, ServerInfoVO>> getAllAgentCoreInfo(){
|
||||||
|
|
||||||
|
return R.ok(octopusAgentService.getAllAgentCoreInfo());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user