[agent]-[executor] optimize execution code -2
This commit is contained in:
@@ -40,7 +40,7 @@ public class OMHandlerExecutor extends AbstractOctopusMessageHandler {
|
|||||||
ExecutionMessage executionMessage = objectMapper.readValue((String) octopusMessage.getContent(), new TypeReference<ExecutionMessage>() {
|
ExecutionMessage executionMessage = objectMapper.readValue((String) octopusMessage.getContent(), new TypeReference<ExecutionMessage>() {
|
||||||
});
|
});
|
||||||
|
|
||||||
System.out.println("executionMessage = " + executionMessage);
|
// System.out.println("executionMessage = " + executionMessage);
|
||||||
|
|
||||||
String executionType = executionMessage.getType();
|
String executionType = executionMessage.getType();
|
||||||
|
|
||||||
|
|||||||
@@ -14,7 +14,8 @@ import java.io.IOException;
|
|||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.*;
|
||||||
|
|
||||||
|
|
||||||
@Configuration
|
@Configuration
|
||||||
@@ -27,6 +28,10 @@ public class CommandExecutor {
|
|||||||
@Resource
|
@Resource
|
||||||
LogToArrayListCache logToArrayListCache;
|
LogToArrayListCache logToArrayListCache;
|
||||||
|
|
||||||
|
int processMaxWaitSeconds = 60;
|
||||||
|
|
||||||
|
ExecutorService DaemonCommandProcess = Executors.newFixedThreadPool(1);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* handle command from octopus server
|
* handle command from octopus server
|
||||||
*
|
*
|
||||||
@@ -67,6 +72,13 @@ public class CommandExecutor {
|
|||||||
try {
|
try {
|
||||||
|
|
||||||
Process process = processBuilder.start();
|
Process process = processBuilder.start();
|
||||||
|
// start a backend thread to daemon the process
|
||||||
|
// wait for processMaxWaitSeconds and kill the process if it's still alived
|
||||||
|
DaemonCommandProcess.submit(
|
||||||
|
StopStuckCommandProcess(
|
||||||
|
process,
|
||||||
|
processMaxWaitSeconds
|
||||||
|
));
|
||||||
|
|
||||||
// cache log lines
|
// cache log lines
|
||||||
logToArrayListCache.cacheLog(streamKey, process.getInputStream());
|
logToArrayListCache.cacheLog(streamKey, process.getInputStream());
|
||||||
@@ -74,18 +86,13 @@ public class CommandExecutor {
|
|||||||
// start to send the result log
|
// start to send the result log
|
||||||
streamSender.startToWaitLog(streamKey);
|
streamSender.startToWaitLog(streamKey);
|
||||||
|
|
||||||
// log.warn("start---------------------------------------------");
|
// get the command result
|
||||||
// new BufferedReader(new InputStreamReader(process.getInputStream())).lines()
|
|
||||||
// .forEach(System.out::println);
|
|
||||||
// log.warn("end ---------------------------------------------");
|
|
||||||
|
|
||||||
// a command shell don't understand how long it actually take
|
|
||||||
processResult = process.waitFor();
|
processResult = process.waitFor();
|
||||||
|
|
||||||
// end send logs
|
// end send logs
|
||||||
streamSender.endWaitLog(streamKey);
|
streamSender.endWaitLog(streamKey);
|
||||||
|
|
||||||
log.info("current shell command {} result is {}", processBuilder.command(), processResult);
|
log.debug("current shell command {} result is {}", processBuilder.command(), processResult);
|
||||||
|
|
||||||
|
|
||||||
} catch (IOException | InterruptedException e) {
|
} catch (IOException | InterruptedException e) {
|
||||||
@@ -95,6 +102,29 @@ public class CommandExecutor {
|
|||||||
return processResult;
|
return processResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Runnable StopStuckCommandProcess(Process process, int processMaxWaitSeconds) {
|
||||||
|
return () -> {
|
||||||
|
try {
|
||||||
|
|
||||||
|
|
||||||
|
log.debug("daemon thread start to wait for {} s for the result", processMaxWaitSeconds);
|
||||||
|
|
||||||
|
TimeUnit.SECONDS.sleep(processMaxWaitSeconds);
|
||||||
|
|
||||||
|
if (process.isAlive()) {
|
||||||
|
|
||||||
|
log.warn("Command [ {} ] stuck for {} s, destroy the command process !", process.info().commandLine().get(), processMaxWaitSeconds);
|
||||||
|
|
||||||
|
// shutdown the process
|
||||||
|
process.destroyForcibly();
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
private ByteBuffer cvToByteBuffer(InputStream inputStream) throws IOException {
|
private ByteBuffer cvToByteBuffer(InputStream inputStream) throws IOException {
|
||||||
|
|
||||||
byte[] toByteArray = ByteStreams.toByteArray(inputStream);
|
byte[] toByteArray = ByteStreams.toByteArray(inputStream);
|
||||||
|
|||||||
@@ -52,7 +52,7 @@ public class FunctionExecutor {
|
|||||||
|
|
||||||
// List<List<String>> commandList = functionReader.ReadFileToCommandList(functionFileName);
|
// List<List<String>> commandList = functionReader.ReadFileToCommandList(functionFileName);
|
||||||
|
|
||||||
log.info("all commands are {}", commandList);
|
log.info("[ Function Executor ] all commands are ==> {}", commandList);
|
||||||
|
|
||||||
Iterator<List<String>> iterator = commandList.iterator();
|
Iterator<List<String>> iterator = commandList.iterator();
|
||||||
|
|
||||||
|
|||||||
@@ -84,7 +84,7 @@ public class StreamSender {
|
|||||||
|
|
||||||
TimeUnit.SECONDS.sleep(2);
|
TimeUnit.SECONDS.sleep(2);
|
||||||
if (AllNeededStreamSender.get(streamKey).isWaitToSendLog()) {
|
if (AllNeededStreamSender.get(streamKey).isWaitToSendLog()) {
|
||||||
log.info("stream sender wait 2 s to send message");
|
log.debug("stream sender wait 2 s to send message");
|
||||||
AllNeededStreamSender.get(streamKey).setWaitToSendLog(false);
|
AllNeededStreamSender.get(streamKey).setWaitToSendLog(false);
|
||||||
batchSendLog(streamKey);
|
batchSendLog(streamKey);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -40,7 +40,7 @@ public class LogToArrayListCache {
|
|||||||
commandCachedLog::add
|
commandCachedLog::add
|
||||||
);
|
);
|
||||||
|
|
||||||
log.info("current streamKey is {} and CacheLog is {}", streamKey, commandCachedLog);
|
log.debug("current streamKey is {} and CacheLog is {}", streamKey, commandCachedLog);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ArrayList<String> getExecutionCmdCachedLogArrayList(String streamKey) {
|
public ArrayList<String> getExecutionCmdCachedLogArrayList(String streamKey) {
|
||||||
|
|||||||
@@ -19,4 +19,8 @@ spring:
|
|||||||
- group: local
|
- group: local
|
||||||
data-id: common-local.yaml
|
data-id: common-local.yaml
|
||||||
|
|
||||||
|
debug: true
|
||||||
|
logging:
|
||||||
|
level:
|
||||||
|
io.wdd.agent:
|
||||||
|
debug
|
||||||
|
|||||||
7
index.html
Normal file
7
index.html
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
<html>
|
||||||
|
<head><title>Index of /octopus-agent/</title></head>
|
||||||
|
<body>
|
||||||
|
<h1>Index of /octopus-agent/</h1><hr><pre><a href="../">../</a>
|
||||||
|
<a href="octopus-agent-2022-12-26-16-00-00.jar">octopus-agent-2022-12-26-16-00-00.jar</a> 27-Dec-2022 07:01 46309416
|
||||||
|
</pre><hr></body>
|
||||||
|
</html>
|
||||||
Reference in New Issue
Block a user