[agent]-[executor] optimize execution code
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
#!/bin/bash
|
||||
#!/bin/zsh
|
||||
|
||||
RED="31m" ## 姨妈红
|
||||
GREEN="32m" ## 水鸭青
|
||||
@@ -348,26 +348,29 @@ GenerateSystemInfo() {
|
||||
machineNumber=99
|
||||
fi
|
||||
|
||||
export serverName="${city}-${hostArch}-${machineNumber}"
|
||||
export serverIpPbV4="$public_ipv4"
|
||||
export serverIpInV4=""
|
||||
export serverIpPbV6=""
|
||||
export serverIpInV6=""
|
||||
export location="$city $region $country"
|
||||
export provider="$org"
|
||||
export managePort="$(netstat -ntulp | grep sshd | grep -w tcp | awk '{print$4}' | cut -d":" -f2)"
|
||||
export cpuCore="$cores @ $freq MHz"
|
||||
export cpuBrand="$cpuName"
|
||||
export memoryTotal="$tram"
|
||||
export diskTotal="$disk_total_size"
|
||||
export diskUsage="$disk_used_size"
|
||||
export archInfo="$arch ($lbit Bit)"
|
||||
export osInfo="$opsy"
|
||||
export osKernelInfo="$kern"
|
||||
export tcpControl="$tcpctrl"
|
||||
export virtualization="$virt"
|
||||
export ioSpeed="$ioavg MB/s"
|
||||
export machineId="$(cat /host/etc/machine-id)"
|
||||
|
||||
cat >current-env.txt <<EOF
|
||||
serverName="${city}-${hostArch}-${machineNumber}"
|
||||
serverIpPbV4="$public_ipv4"
|
||||
serverIpInV4=""
|
||||
serverIpPbV6=""
|
||||
serverIpInV6=""
|
||||
location="$city $region $country"
|
||||
provider="$org"
|
||||
managePort="$(netstat -ntulp | grep sshd | grep -w tcp | awk '{print$4}' | cut -d":" -f2)"
|
||||
cpuCore="$cores @ $freq MHz"
|
||||
cpuBrand="$cpuName"
|
||||
memoryTotal="$tram"
|
||||
diskTotal="$disk_total_size"
|
||||
diskUsage="$disk_used_size"
|
||||
archInfo="$arch ($lbit Bit)"
|
||||
osInfo="$opsy"
|
||||
osKernelInfo="$kern"
|
||||
tcpControl="$tcpctrl"
|
||||
virtualization="$virt"
|
||||
ioSpeed="$ioavg MB/s"
|
||||
machineId="$(cat /host/etc/machine-id)"
|
||||
EOF
|
||||
|
||||
FunctionEnd
|
||||
}
|
||||
|
||||
20
agent/current-env.txt
Normal file
20
agent/current-env.txt
Normal file
@@ -0,0 +1,20 @@
|
||||
serverName="Chengdu-amd64-98"
|
||||
serverIpPbV4="183.220.149.17"
|
||||
serverIpInV4=""
|
||||
serverIpPbV6=""
|
||||
serverIpInV6=""
|
||||
location="Chengdu Sichuan CN"
|
||||
provider="AS139080 The Internet Data Center of Sichuan Mobile Communication Company Limited"
|
||||
managePort="22"
|
||||
cpuCore="12 @ 4299.998 MHz"
|
||||
cpuBrand="Intel(R) Core(TM) i7-8700 CPU @ 3.20GHz"
|
||||
memoryTotal="7.6 GB"
|
||||
diskTotal="914.9 GB"
|
||||
diskUsage="12.3 GB"
|
||||
archInfo="x86_64 (64 Bit)"
|
||||
osInfo="Ubuntu 20.04.5 LTS"
|
||||
osKernelInfo="5.4.0-135-generic"
|
||||
tcpControl="cubic"
|
||||
virtualization="Dedicated"
|
||||
ioSpeed=" MB/s"
|
||||
machineId=""
|
||||
@@ -1,5 +1,7 @@
|
||||
package io.wdd.agent.config.message.handler;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.wdd.agent.executor.CommandExecutor;
|
||||
import io.wdd.agent.executor.FunctionExecutor;
|
||||
import io.wdd.common.beans.executor.ExecutionMessage;
|
||||
@@ -9,6 +11,8 @@ import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static io.wdd.agent.executor.function.CollectAllExecutorFunction.ALL_FUNCTION_MAP;
|
||||
|
||||
@Component
|
||||
@@ -20,6 +24,10 @@ public class OMHandlerExecutor extends AbstractOctopusMessageHandler {
|
||||
@Resource
|
||||
FunctionExecutor functionExecutor;
|
||||
|
||||
@Resource
|
||||
ObjectMapper objectMapper;
|
||||
|
||||
|
||||
@Override
|
||||
public boolean handle(OctopusMessage octopusMessage) {
|
||||
|
||||
@@ -27,18 +35,29 @@ public class OMHandlerExecutor extends AbstractOctopusMessageHandler {
|
||||
return next.handle(octopusMessage);
|
||||
}
|
||||
|
||||
ExecutionMessage executionMessage = (ExecutionMessage) octopusMessage.getContent();
|
||||
String executionType = executionMessage.getType();
|
||||
try {
|
||||
|
||||
if (ALL_FUNCTION_MAP.containsKey(executionType)) {
|
||||
// execute the exist function
|
||||
functionExecutor.execute(executionMessage);
|
||||
ExecutionMessage executionMessage = objectMapper.readValue((String) octopusMessage.getContent(), new TypeReference<ExecutionMessage>() {
|
||||
});
|
||||
|
||||
} else {
|
||||
// handle command
|
||||
commandExecutor.execute(executionMessage);
|
||||
System.out.println("executionMessage = " + executionMessage);
|
||||
|
||||
String executionType = executionMessage.getType();
|
||||
|
||||
if (ALL_FUNCTION_MAP.containsKey(executionType)) {
|
||||
// execute the exist function
|
||||
functionExecutor.execute(executionMessage);
|
||||
|
||||
} else {
|
||||
// handle command
|
||||
commandExecutor.execute(executionMessage);
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,10 +44,10 @@ public class OMHandlerInit extends AbstractOctopusMessageHandler {
|
||||
|
||||
|
||||
// 2. send PassThroughTopicName successful info to the server
|
||||
String success = String.format("Octopus Agent [ %s ] has successfully PassThroughTopicName with server [ %s ] !", agentServerInfo, octopusMessage);
|
||||
String success = String.format("[Octopus Agent] - [ %s ] has successfully PassThroughTopicName with server [ %s ] !", agentServerInfo.getServerName(), octopusMessage.getUuid());
|
||||
|
||||
octopusMessage.setResult(success);
|
||||
log.info(success);
|
||||
// log.info(success);
|
||||
|
||||
toServerMessage.send(octopusMessage);
|
||||
|
||||
|
||||
@@ -71,6 +71,7 @@ public class CommandExecutor {
|
||||
// cache log lines
|
||||
logToArrayListCache.cacheLog(streamKey, process.getInputStream());
|
||||
|
||||
// start to send the result log
|
||||
streamSender.startToWaitLog(streamKey);
|
||||
|
||||
// log.warn("start---------------------------------------------");
|
||||
@@ -80,6 +81,8 @@ public class CommandExecutor {
|
||||
|
||||
// a command shell don't understand how long it actually take
|
||||
processResult = process.waitFor();
|
||||
|
||||
// end send logs
|
||||
streamSender.endWaitLog(streamKey);
|
||||
|
||||
log.info("current shell command {} result is {}", processBuilder.command(), processResult);
|
||||
@@ -119,7 +122,7 @@ public class CommandExecutor {
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
|
||||
// clear the log Cache Thread scope
|
||||
logToArrayListCache.getCommandCachedLog(streamKey).clear();
|
||||
logToArrayListCache.getExecutionCmdCachedLogArrayList(streamKey).clear();
|
||||
|
||||
// clear the stream sender
|
||||
streamSender.clearLocalCache(streamKey);
|
||||
|
||||
@@ -33,11 +33,15 @@ import java.util.concurrent.TimeUnit;
|
||||
public class StreamSender {
|
||||
|
||||
public static String TEST_STREAM_JAVA = "test-stream-java";
|
||||
|
||||
@Resource
|
||||
RedisTemplate redisTemplate;
|
||||
|
||||
@Resource
|
||||
LogToArrayListCache logToArrayListCache;
|
||||
|
||||
private final HashMap<String, StreamSenderEntity> AllNeededStreamSender = new HashMap<>(16);
|
||||
|
||||
private final ArrayList<String> cacheLogList = new ArrayList<>(256);
|
||||
|
||||
private static ByteBuffer currentTimeByteBuffer() {
|
||||
@@ -66,7 +70,13 @@ public class StreamSender {
|
||||
|
||||
if (!AllNeededStreamSender.containsKey(streamKey)) {
|
||||
|
||||
StreamSenderEntity streamSender = StreamSenderEntity.builder().cachedCommandLog(logToArrayListCache.getCommandCachedLog(streamKey)).waitToSendLog(true).startIndex(0).streamKey(streamKey).build();
|
||||
StreamSenderEntity streamSender = StreamSenderEntity
|
||||
.builder()
|
||||
.cachedCommandLog(logToArrayListCache.getExecutionCmdCachedLogArrayList(streamKey))
|
||||
.waitToSendLog(true)
|
||||
.startIndex(0)
|
||||
.streamKey(streamKey)
|
||||
.build();
|
||||
|
||||
AllNeededStreamSender.put(streamKey, streamSender);
|
||||
|
||||
@@ -74,7 +84,7 @@ public class StreamSender {
|
||||
|
||||
TimeUnit.SECONDS.sleep(2);
|
||||
if (AllNeededStreamSender.get(streamKey).isWaitToSendLog()) {
|
||||
log.info("stream sender wait 1 s to send message");
|
||||
log.info("stream sender wait 2 s to send message");
|
||||
AllNeededStreamSender.get(streamKey).setWaitToSendLog(false);
|
||||
batchSendLog(streamKey);
|
||||
}
|
||||
@@ -85,14 +95,14 @@ public class StreamSender {
|
||||
StreamSenderEntity streamSenderEntity = AllNeededStreamSender.get(streamKey);
|
||||
streamSenderEntity.setWaitToSendLog(false);
|
||||
|
||||
batchSendLog(streamKey);
|
||||
|
||||
batchSendLog(streamKey);
|
||||
}
|
||||
|
||||
public void batchSendLog(String streamKey) {
|
||||
StreamSenderEntity streamSenderEntity = AllNeededStreamSender.get(streamKey);
|
||||
|
||||
log.info("batch send log == {}", streamSenderEntity);
|
||||
//log.info("batch send log == {}", streamSenderEntity);
|
||||
|
||||
ArrayList<String> cachedCommandLog = streamSenderEntity.getCachedCommandLog();
|
||||
|
||||
@@ -103,11 +113,13 @@ public class StreamSender {
|
||||
|
||||
List<String> content = cachedCommandLog.subList(startIndex, endIndex);
|
||||
|
||||
// System.out.println("content = " + content);
|
||||
// only send when cached log is not empty
|
||||
if (content.size() > 0) {
|
||||
this.send(streamKey, content);
|
||||
}
|
||||
|
||||
this.send(streamKey, content);
|
||||
// for next time
|
||||
startIndex = endIndex;
|
||||
streamSenderEntity.setStartIndex(endIndex);
|
||||
}
|
||||
|
||||
public boolean send(String streamKey, String content) {
|
||||
@@ -154,7 +166,6 @@ public class StreamSender {
|
||||
|
||||
MapRecord mapRecord = StreamRecords.mapBacked(fakeData).withStreamKey(TEST_STREAM_JAVA);
|
||||
|
||||
|
||||
redisTemplate.opsForStream().add(mapRecord);
|
||||
|
||||
TimeUnit.MILLISECONDS.sleep(200);
|
||||
|
||||
@@ -11,6 +11,9 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
|
||||
/**
|
||||
* utils to cache store the command execution logs
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class LogToArrayListCache {
|
||||
@@ -26,10 +29,11 @@ public class LogToArrayListCache {
|
||||
|
||||
public void cacheLog(String streamKey, InputStream commandLogStream) {
|
||||
|
||||
ArrayList<String> commandCachedLog = this.getCommandCachedLog(streamKey);
|
||||
ArrayList<String> commandCachedLog = this.getExecutionCmdCachedLogArrayList(streamKey);
|
||||
|
||||
// log.info(String.valueOf(commandCachedLog));
|
||||
|
||||
// read from input stream and store to the cacheArrayList
|
||||
new BufferedReader(new InputStreamReader(commandLogStream))
|
||||
.lines()
|
||||
.forEach(
|
||||
@@ -39,14 +43,14 @@ public class LogToArrayListCache {
|
||||
log.info("current streamKey is {} and CacheLog is {}", streamKey, commandCachedLog);
|
||||
}
|
||||
|
||||
public ArrayList<String> getCommandCachedLog(String streamKey) {
|
||||
public ArrayList<String> getExecutionCmdCachedLogArrayList(String streamKey) {
|
||||
|
||||
int keyToIndex = this.hashStreamKeyToIndex(streamKey);
|
||||
int keyToIndex = this.hashStreamKeyToCachedArrayListIndex(streamKey);
|
||||
|
||||
return CachedCommandLog.get(keyToIndex);
|
||||
}
|
||||
|
||||
private int hashStreamKeyToIndex(String streamKey) {
|
||||
private int hashStreamKeyToCachedArrayListIndex(String streamKey) {
|
||||
|
||||
int size = CachedCommandLog.size();
|
||||
|
||||
|
||||
@@ -88,7 +88,7 @@ public class CollectSystemInfo implements ApplicationContextAware {
|
||||
@PostConstruct
|
||||
private void getInjectServerInfo(){
|
||||
|
||||
log.info("Starting getInjectServerInfo");
|
||||
log.info("Octopus Agent -- Starting getInjectServerInfo");
|
||||
|
||||
agentServerInfo = (AgentServerInfo) context.getBean("agentServerInfo");
|
||||
|
||||
@@ -96,11 +96,12 @@ public class CollectSystemInfo implements ApplicationContextAware {
|
||||
throw new MyRuntimeException(" Collect server info error !");
|
||||
}
|
||||
|
||||
log.info("host server info has been collected == {}", agentServerInfo);
|
||||
//log.info("host server info has been collected == {}", agentServerInfo);
|
||||
|
||||
// start to send message to Octopus Server
|
||||
octopusAgentInitService.SendInfoToServer(agentServerInfo);
|
||||
log.info("PassThroughTopicName server info has been send to octopus server !");
|
||||
|
||||
//log.info("PassThroughTopicName server info has been send to octopus server !");
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -51,9 +51,10 @@ public class OctopusAgentInitService {
|
||||
* listen to the PassThroughTopicName queue from octopus server
|
||||
*
|
||||
* @param message 该方法不需要手动调用,Spring会自动运行这个监听方法
|
||||
* <p>
|
||||
*
|
||||
* 注意:如果该监听方法正常结束,那么Spring会自动确认消息
|
||||
* 如果出现异常,则Spring不会确认消息,该消息一直存在于消息队列中
|
||||
*
|
||||
* @RabbitListener : 用于标记当前方法是一个RabbitMQ的消息监听方法,可以持续性的自动接收消息
|
||||
*/
|
||||
@SneakyThrows
|
||||
@@ -86,6 +87,7 @@ public class OctopusAgentInitService {
|
||||
throw new MyRuntimeException(" Handle Octopus Message Error !");
|
||||
}
|
||||
|
||||
|
||||
} catch (Exception e) {
|
||||
|
||||
// reject the message
|
||||
@@ -93,12 +95,15 @@ public class OctopusAgentInitService {
|
||||
// long deliveryTag, boolean requeue
|
||||
// channel.basicReject(deliveryTag,true);
|
||||
|
||||
log.error("Octopus Agent Initialization Error, please check !");
|
||||
log.info("waiting for 5 seconds");
|
||||
|
||||
// 这里只是便于出现死循环时查看
|
||||
TimeUnit.SECONDS.sleep(5);
|
||||
|
||||
throw new MyRuntimeException("Octopus Agent Initialization Error, please check !");
|
||||
}
|
||||
|
||||
// handle init message ok
|
||||
// ack the info
|
||||
channel.basicAck(deliveryTag, false);
|
||||
}
|
||||
|
||||
@@ -46,7 +46,7 @@ public class GenOctopusRabbitMQConnection {
|
||||
// generate the ne topic queue for unique agent
|
||||
String agentTopicName = octopusMessage.getUuid();
|
||||
|
||||
// reboot judgyment of existing exchange
|
||||
// reboot judgement of existing exchange
|
||||
QueueInformation queueInfo = rabbitAdmin.getQueueInfo(agentTopicName);
|
||||
|
||||
if (ObjectUtils.isNotEmpty(queueInfo) && queueInfo.getConsumerCount() > 0 ) {
|
||||
@@ -89,7 +89,6 @@ public class GenOctopusRabbitMQConnection {
|
||||
|
||||
OctopusMessage octopusMessage;
|
||||
|
||||
|
||||
try {
|
||||
octopusMessage = objectMapper.readValue(message.getBody(), OctopusMessage.class);
|
||||
|
||||
|
||||
@@ -66,11 +66,13 @@ public class ToServerMessage {
|
||||
// set PassThroughTopicName agent register ttl
|
||||
InitMessagePostProcessor initMessagePostProcessor = new InitMessagePostProcessor(defaultInitRegisterTimeOut);
|
||||
|
||||
log.info("send INIT AgentServerInfo to Server = {}", agentServerInfo);
|
||||
log.info("[Octopus Agent] - send INIT AgentServerInfo to Server = {}", agentServerInfo);
|
||||
|
||||
// send the register server info to EXCHANGE:INIT_EXCHANGE QUEUE: init_to_server
|
||||
try {
|
||||
|
||||
rabbitTemplate.convertAndSend(initRabbitMqConnector.INIT_EXCHANGE, initRabbitMqConnector.INIT_TO_SERVER_KEY, objectMapper.writeValueAsBytes(agentServerInfo), initMessagePostProcessor);
|
||||
|
||||
} catch (JsonProcessingException e) {
|
||||
log.error("Failed to send INIT message to Server ! = {}", agentServerInfo);
|
||||
throw new RuntimeException(e);
|
||||
|
||||
@@ -2,11 +2,11 @@ spring:
|
||||
application:
|
||||
name: octopus-agent
|
||||
profiles:
|
||||
active: k3s
|
||||
active: local
|
||||
cloud:
|
||||
nacos:
|
||||
config:
|
||||
group: k3s
|
||||
group: local
|
||||
config-retry-time: 3000
|
||||
file-extension: yaml
|
||||
max-retry: 3
|
||||
@@ -16,7 +16,7 @@ spring:
|
||||
timeout: 5000
|
||||
config-long-poll-timeout: 5000
|
||||
extension-configs:
|
||||
- group: k3s
|
||||
data-id: common-k3s.yaml
|
||||
- group: local
|
||||
data-id: common-local.yaml
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user