[agent][executor]- bug - 9

This commit is contained in:
zeaslity
2023-02-26 11:53:25 +08:00
parent c7b0b3cb67
commit b37a4e1d9c
2 changed files with 185 additions and 145 deletions

View File

@@ -8,8 +8,6 @@ import io.wdd.agent.executor.thread.CommandExecLogCache;
import io.wdd.agent.message.OMessageToServerSender;
import io.wdd.common.beans.executor.ExecutionMessage;
import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.handler.MyRuntimeException;
import io.wdd.common.utils.TimeUtils;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
@@ -19,12 +17,9 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@Configuration
@@ -135,6 +130,7 @@ public class CommandExecutor {
));
// 缓存 命令处理日志
// 如果是特别简单的命令,必须要放在此处才可以,否则会导致 无法收集
commandExecLogCache.cacheLog(
streamKey,
process
@@ -193,12 +189,11 @@ public class CommandExecutor {
System.out.println("process isAlive = " + process.isAlive());
// 任务提前执行结束,或者超过了最长等待时间
// 判断命令是否正确处理完成
if (!commandExecComplete) {
log.warn(
"Command [ {} ] stuck for {} s, destroy the command process!",
"任务 [ {} ]执行超过了最长等待时间 {} , destroy the command process!",
process
.info()
.commandLine()
@@ -217,65 +212,38 @@ public class CommandExecutor {
.commandLine()
.get()
);
//process.destroy();
try {
/*byte[] bytes = process
.getInputStream()
.readAllBytes();
String s = new String(
bytes,
StandardCharsets.UTF_8
);
log.debug(
"从process中获取到的 所有字符内容为 {}",
s
);*/
// 关闭这个命令执行的inputStream
log.debug(
"关闭Process [ {} ]命令执行的inputStream",
process.info()
);
process
.getInputStream()
.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
try {
Process exitProcess = process
.onExit()
.get(
commandExecWaitTimeout,
TimeUnit.SECONDS
);
commandExecLogCache.debugProcessStreams(exitProcess);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new MyRuntimeException(e);
}
}
// 关停任务执行的缓存日志收集 BufferedReader 否则无法终止
commandExecLogCache.StopExecLogBufferedReader(streamKey);
}
// 日志操作,如果需要显示回传,需要将日志发送回相应的
/*collectCommandLogAndRepeat(
streamKey,
needResultReplay,
octopusMessage
);*/
// 异步执行日志发送工作
//commandExecLogCache.CollectAndSendExecLog(streamKey, needResultReplay, octopusMessage);
// 执行到这里,说明整个任务流程结束(超时结束)
log.debug(
"命令 [ {} ] 执行全流程结束!",
process
"命令 [ {} ] 执行全流程结束! 开始释放所有资源",
process.info()
);
// 释放所有的资源
try {
process
.getInputStream()
.close();
process
.getOutputStream()
.close();
process
.getErrorStream()
.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
if (process.isAlive()) {
// shutdown the process
process.destroyForcibly();
@@ -284,45 +252,7 @@ public class CommandExecutor {
}
/**
* 日志回传操作
* 默认发送值Redis中
* 如果需要持久化则需要发送至RabbitMQ中
*
* @param streamKey
* @param needResultReplay
* @param octopusMessage
*/
private void collectCommandLogAndRepeat(String streamKey, boolean needResultReplay, OctopusMessage octopusMessage) {
// 获取到命令执行的结果, 此操作会清除掉缓存
ArrayList<String> commandExecLogCachedLog = commandExecLogCache.getCacheLog(streamKey);
log.debug(
"从缓存中获取到的命令执行日志为: {}",
commandExecLogCachedLog
);
// 简单的发送到StreamSender
simpleStreamSender.sendLog(
streamKey,
commandExecLogCachedLog
);
// 需要恢复相应的消息
if (needResultReplay) {
log.debug("需要准确回复执行命令结果");
// 构造相应的数据
octopusMessage.setAc_time(TimeUtils.currentFormatTime());
octopusMessage.setResult(commandExecLogCachedLog);
// 发送返回执行完成的 OM结果
toServerSender.send(octopusMessage);
}
}
@Deprecated
private ByteBuffer cvToByteBuffer(InputStream inputStream) throws IOException {

View File

@@ -1,13 +1,19 @@
package io.wdd.agent.executor.thread;
import io.wdd.agent.config.utils.AgentCommonThreadPool;
import io.wdd.agent.executor.reply.SimpleStreamSender;
import io.wdd.agent.message.OMessageToServerSender;
import io.wdd.common.beans.rabbitmq.OctopusMessage;
import io.wdd.common.handler.MyRuntimeException;
import io.wdd.common.utils.TimeUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
@@ -26,7 +32,17 @@ public class CommandExecLogCache {
*/
private static final ArrayList<String> EmptyCommandLog = new ArrayList<>();
// 存储命令执行缓存日志
public static HashMap<String, ArrayList<String>> CachedCommandLogMap = new HashMap<>();
private static HashMap<String, ArrayList<String>> CachedCommandLogMap = new HashMap<>();
/**
* 存储每一个任务对应的 日志读取BufferedReader
*/
private static HashMap<String, BufferedReader> CommandLogReaderMap = new HashMap<>();
@Resource
SimpleStreamSender simpleStreamSender;
@Resource
OMessageToServerSender toServerSender;
/**
* 缓存命令执行日志
@@ -57,13 +73,152 @@ public class CommandExecLogCache {
}
/**
* 实际执行命令缓存操作
*
* @param streamKey
*/
private void doCacheLog(String streamKey, Process process) {
ArrayList<String> commandCachedLog = new ArrayList<>(128);
String execCommandString = String.format(
"execution command are => [ %s ]",
process
.info()
.commandLine()
.get()
);
String execTimeString = String.format(
"execution time is => [ %s ]",
TimeUtils.currentTimeString()
);
String execResultString = String.format(
"execution result is => [ %s ]",
process.exitValue()
);
// add the command
commandCachedLog.add(execCommandString);
commandCachedLog.add(execTimeString);
commandCachedLog.add(execResultString);
commandCachedLog.add("--------------- command result are as below --------------------");
commandCachedLog.add("");
log.debug("doCacheLog 开始从process的结果中获取日志缓存");
// read from input stream and store to the cacheArrayList
BufferedReader bufferedReader = new BufferedReader(
new InputStreamReader(
process.getInputStream()
)
);
// 缓存这个 日志读取器
CommandLogReaderMap.put(
streamKey,
bufferedReader
);
// !! 此处会阻塞
// 阻塞读取命令执行日志的输出流
bufferedReader
.lines()
.forEach(
commandCachedLog::add
);
log.debug(
"命令代码 [ {} ] 的执行日志内容为 {} ",
streamKey,
commandCachedLog
);
}
/**
* 对于一些没有中止的任务,必须要手动将读取的 InputStream流关闭
* 否则部分任务的日志无法收集
*/
public void StopExecLogBufferedReader(String streamKey) {
BufferedReader bufferedReader = CommandLogReaderMap.get(streamKey);
if (ObjectUtils.isNotEmpty(bufferedReader)) {
try {
log.debug("开始关停任务 [ {} ]对应的日志读取后端",
streamKey);
bufferedReader.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
/**
* 日志回传操作
* 默认发送值Redis中
* 如果需要持久化则需要发送至RabbitMQ中
*
* @param streamKey
* @param needResultReplay
* @param octopusMessage
*/
public void CollectAndSendExecLog(String streamKey, boolean needResultReplay, OctopusMessage octopusMessage) {
// 日志操作如果需要显示回传需要将日志发送回相应的MQ中
// 使用异步的方式
AgentCommonThreadPool.pool.submit(
() ->
collectCommandLogAndRepeat(
streamKey,
needResultReplay,
octopusMessage
)
);
}
private void collectCommandLogAndRepeat(String streamKey, boolean needResultReplay, OctopusMessage octopusMessage) {
// 获取到命令执行的结果, 此操作会清除掉缓存
ArrayList<String> commandExecLogCachedLog = GetAndCleanExecCacheLog(streamKey);
log.debug(
"从缓存中获取到的命令执行日志为: {}",
commandExecLogCachedLog
);
// 简单的发送到StreamSender
simpleStreamSender.sendLog(
streamKey,
commandExecLogCachedLog
);
// 需要 回复 相应的消息
if (needResultReplay) {
log.debug("需要准确回复执行命令结果");
// 构造相应的数据
octopusMessage.setAc_time(TimeUtils.currentFormatTime());
octopusMessage.setResult(commandExecLogCachedLog);
// 发送返回执行完成的 OM结果
toServerSender.send(octopusMessage);
}
}
/**
* 获取缓存的 命令执行缓存日志
*
* @param streamKey
* @return
*/
public ArrayList<String> getCacheLog(String streamKey) {
private ArrayList<String> GetAndCleanExecCacheLog(String streamKey) {
// 获取
ArrayList<String> execLogCacheArrayList = CachedCommandLogMap.getOrDefault(
@@ -72,7 +227,7 @@ public class CommandExecLogCache {
);
// 清除Key
//CachedCommandLogMap.remove(streamKey);
CachedCommandLogMap.remove(streamKey);
return execLogCacheArrayList;
}
@@ -113,51 +268,6 @@ public class CommandExecLogCache {
}
/**
* 实际执行命令缓存操作
*
* @param streamKey
*/
private void doCacheLog(String streamKey, Process process) {
ArrayList<String> commandCachedLog = new ArrayList<>(128);
String execCommandString = String.format(
"execution command are => [ %s ]",
process
.info()
.commandLine()
.get()
);
String execTimeString = String.format(
"execution time is => [ %s ]",
TimeUtils.currentTimeString()
);
// add the command
commandCachedLog.add("");
commandCachedLog.add(execCommandString);
commandCachedLog.add(execTimeString);
commandCachedLog.add("--------------- command result are as below --------------------");
commandCachedLog.add("");
log.debug("doCacheLog 开始从process的结果中获取日志缓存");
// read from input stream and store to the cacheArrayList
new BufferedReader(new InputStreamReader(process.getInputStream()))
.lines()
.forEach(
commandCachedLog::add
);
log.debug(
"current streamKey is [ {} ] and CacheLog is [ {} ]",
streamKey,
commandCachedLog
);
}
/**
* 获取到命令执行日志 缓存存储实际的ArrayList
*