From 2e97f9326db810865774c805e0b2e2fa0b38de0b Mon Sep 17 00:00:00 2001 From: IceDerce Date: Wed, 14 Dec 2022 13:54:48 +0800 Subject: [PATCH] [agent]-[executor] optimize the command log collect algrothnm --- agent/agent-application-run-on-linux.sh | 394 ++++++++++++++++++ .../config/beans/executor/CommandLog.java | 2 +- .../beans/executor/StreamSenderEntity.java | 25 ++ .../wdd/agent/executor/CommandExecutor.java | 40 +- .../wdd/agent/executor/FunctionExecutor.java | 4 +- .../agent/executor/redis/StreamSender.java | 113 ++++- .../executor/thread/DaemonLogThread.java | 52 +-- .../executor/thread/LogToArrayListCache.java | 56 +++ .../java/io/wdd/source/shell/install-nginx.sh | 18 +- .../io/wdd/source/shell/lib/wdd-lib-env.sh | 4 + .../io/wdd/source/shell/lib/wdd-lib-log.sh | 4 +- 11 files changed, 645 insertions(+), 67 deletions(-) create mode 100755 agent/agent-application-run-on-linux.sh create mode 100644 agent/src/main/java/io/wdd/agent/config/beans/executor/StreamSenderEntity.java create mode 100644 agent/src/main/java/io/wdd/agent/executor/thread/LogToArrayListCache.java create mode 100644 source/src/main/java/io/wdd/source/shell/lib/wdd-lib-env.sh diff --git a/agent/agent-application-run-on-linux.sh b/agent/agent-application-run-on-linux.sh new file mode 100755 index 0000000..0267992 --- /dev/null +++ b/agent/agent-application-run-on-linux.sh @@ -0,0 +1,394 @@ +#!/bin/bash + +RED="31m" ## 姨妈红 +GREEN="32m" ## 水鸭青 +YELLOW="33m" ## 鸭屎黄 +PURPLE="35m" ## 基佬紫 +BLUE="36m" ## 天依蓝 +BlinkGreen="32;5m" ##闪烁的绿色 +BlinkRed="31;5m" ##闪烁的红色 +BackRed="41m" ## 背景红色 +SplitLine="----------------------" #会被sys函数中的方法重写 + +hostArchVersion="" +hostArch="" +#### CollectSystemInfo #### +serverName="" +serverIpPbV4="" +serverIpInV4="" +serverIpPbV6="" +serverIpInV6="" +location="" +provider="" +managePort="" +cpuBrand="" +cpuCore="" +memoryTotal="" +diskTotal="" +diskUsage="" +osInfo="" +osKernelInfo="" +tcpControl="" +virtualization="" +ioSpeed="" +machineId="" + +### tmp usage +ioavg="" +public_ipv4="" +country="" +region="" +city="" +org="" +#### CollectSystemInfo #### + +######## 颜色函数方法很精妙 ############ +colorEcho() { + # shellcheck disable=SC2145 + echo -e "\033[${1}${@:2}\033[0m" 1>&2 +} +# 判断命令是否存在 +command_exists() { + local cmd="$1" + if eval type type >/dev/null 2>&1; then + eval type "$cmd" >/dev/null 2>&1 + elif command >/dev/null 2>&1; then + command -v "$cmd" >/dev/null 2>&1 + else + which "$cmd" >/dev/null 2>&1 + fi + local rt=$? + return ${rt} +} + +FunctionStart() { + colorEcho ${PURPLE} ${SplitLine} + colorEcho ${PURPLE} ${SplitLine} + echo "" +} + +FunctionSuccess() { + colorEcho ${GREEN} ${SplitLine} + echo "" +} + +FunctionEnd() { + echo "" + colorEcho ${BlinkGreen} ${SplitLine} + echo "" + echo "" +} +####### 获取系统版本及64位或32位信息 +check_sys() { + # 获取当前终端的宽度,动态调整分割线的长度 + shellwidth=$(stty size | awk '{print $2}') + if [[ $shellwidth -gt 1 ]]; then + SplitLine=$(yes "-" | sed ${shellwidth}'q' | tr -d '\n') + fi + + sys_bit=$(uname -m) + case $sys_bit in + i[36]86) + os_bit="32" + hostArch="386" + ;; + x86_64) + os_bit="64" + hostArch="amd64" + ;; + *armv6*) + os_bit="arm" + hostArch="arm6" + ;; + *armv7*) + os_bit="arm" + hostArch="arm7" + ;; + *aarch64* | *armv8*) + os_bit="arm64" + hostArch="arm64" + ;; + *) + colorEcho ${RED} " + 哈哈……这个 辣鸡脚本 不支持你的系统。 (-_-) \n + 备注: 仅支持 Ubuntu 16+ / Debian 8+ / CentOS 7+ 系统 + " && exit 1 + ;; + esac + +} + +GoIOTest() { + (LANG=C dd if=/dev/zero of=benchtest_$$ bs=512k count=$1 conv=fdatasync && rm -f benchtest_$$) 2>&1 | awk -F, '{io=$NF} END { print io}' | sed 's/^[ \t]*//;s/[ \t]*$//' +} + +calc_size() { + local raw=$1 + local total_size=0 + local num=1 + local unit="KB" + if ! [[ ${raw} =~ ^[0-9]+$ ]]; then + echo "" + return + fi + if [ "${raw}" -ge 1073741824 ]; then + num=1073741824 + unit="TB" + elif [ "${raw}" -ge 1048576 ]; then + num=1048576 + unit="GB" + elif [ "${raw}" -ge 1024 ]; then + num=1024 + unit="MB" + elif [ "${raw}" -eq 0 ]; then + echo "${total_size}" + return + fi + total_size=$(awk 'BEGIN{printf "%.1f", '$raw' / '$num'}') + echo "${total_size} ${unit}" +} + +GethostArchINfo() { + [ -f /etc/redhat-release ] && awk '{print $0}' /etc/redhat-release && return + [ -f /etc/os-release ] && awk -F'[= "]' '/PRETTY_NAME/{print $3,$4,$5}' /etc/os-release && return + [ -f /etc/lsb-release ] && awk -F'[="]+' '/DESCRIPTION/{print $2}' /etc/lsb-release && return +} + +StartIOTest() { + + FunctionStart + colorEcho ${BLUE} "start IO speed test !" + + freespace=$(df -m . | awk 'NR==2 {print $4}') + if [ -z "${freespace}" ]; then + freespace=$(df -m . | awk 'NR==3 {print $3}') + fi + if [ ${freespace} -gt 1024 ]; then + writemb=2048 + io1=$(GoIOTest ${writemb}) + colorEcho $YELLOW "I/O Speed(1st run) : $io1)" + io2=$(GoIOTest ${writemb}) + colorEcho $YELLOW "I/O Speed(2st run) : $io2)" + io3=$(GoIOTest ${writemb}) + colorEcho $YELLOW "I/O Speed(3st run) : $io3)" + ioraw1=$(echo $io1 | awk 'NR==1 {print $1}') + [ "$(echo $io1 | awk 'NR==1 {print $2}')" == "GB/s" ] && ioraw1=$(awk 'BEGIN{print '$ioraw1' * 1024}') + ioraw2=$(echo $io2 | awk 'NR==1 {print $1}') + [ "$(echo $io2 | awk 'NR==1 {print $2}')" == "GB/s" ] && ioraw2=$(awk 'BEGIN{print '$ioraw2' * 1024}') + ioraw3=$(echo $io3 | awk 'NR==1 {print $1}') + [ "$(echo $io3 | awk 'NR==1 {print $2}')" == "GB/s" ] && ioraw3=$(awk 'BEGIN{print '$ioraw3' * 1024}') + ioall=$(awk 'BEGIN{print '$ioraw1' + '$ioraw2' + '$ioraw3'}') + ioavg=$(awk 'BEGIN{printf "%.1f", '$ioall' / 3}') + colorEcho $YELLOW "I/O Speed(average) : $ioavg MB/s)" + else + echo " $(_red "Not enough space for I/O Speed test!")" + fi + + FunctionSuccess + FunctionEnd +} + +Check_Virtualization() { + + FunctionStart + colorEcho ${BLUE} "start to check host virtualization !" + + command_exists "dmesg" && virtualx="$(dmesg 2>/dev/null)" + if command_exists "dmidecode"; then + sys_manu="$(dmidecode -s system-manufacturer 2>/dev/null)" + sys_product="$(dmidecode -s system-product-name 2>/dev/null)" + sys_ver="$(dmidecode -s system-version 2>/dev/null)" + else + sys_manu="" + sys_product="" + sys_ver="" + fi + if grep -qa docker /proc/1/cgroup; then + virt="Docker" + elif grep -qa lxc /proc/1/cgroup; then + virt="LXC" + elif grep -qa container=lxc /proc/1/environ; then + virt="LXC" + elif [[ -f /proc/user_beancounters ]]; then + virt="OpenVZ" + elif [[ "${virtualx}" == *kvm-clock* ]]; then + virt="KVM" + elif [[ "${sys_product}" == *KVM* ]]; then + virt="KVM" + elif [[ "${cpuName}" == *KVM* ]]; then + virt="KVM" + elif [[ "${cpuName}" == *QEMU* ]]; then + virt="KVM" + elif [[ "${virtualx}" == *"VMware Virtual Platform"* ]]; then + virt="VMware" + elif [[ "${sys_product}" == *"VMware Virtual Platform"* ]]; then + virt="VMware" + elif [[ "${virtualx}" == *"Parallels Software International"* ]]; then + virt="Parallels" + elif [[ "${virtualx}" == *VirtualBox* ]]; then + virt="VirtualBox" + elif [[ -e /proc/xen ]]; then + if grep -q "control_d" "/proc/xen/capabilities" 2>/dev/null; then + virt="Xen-Dom0" + else + virt="Xen-DomU" + fi + elif [ -f "/sys/hypervisor/type" ] && grep -q "xen" "/sys/hypervisor/type"; then + virt="Xen" + elif [[ "${sys_manu}" == *"Microsoft Corporation"* ]]; then + if [[ "${sys_product}" == *"Virtual Machine"* ]]; then + if [[ "${sys_ver}" == *"7.0"* || "${sys_ver}" == *"Hyper-V" ]]; then + virt="Hyper-V" + else + virt="Microsoft Virtual Machine" + fi + fi + else + virt="Dedicated" + fi + + FunctionSuccess + FunctionEnd +} + +GetIpv4Info() { + + FunctionStart + colorEcho ${BLUE} "start to get system public ip info !" + + org="$(wget -q -T10 -O- ipinfo.io/org)" + city="$(wget -q -T10 -O- ipinfo.io/city)" + country="$(wget -q -T10 -O- ipinfo.io/country)" + region="$(wget -q -T10 -O- ipinfo.io/region)" + public_ipv4="$(wget -q -T10 -O- ipinfo.io/ip)" + + FunctionSuccess + FunctionEnd + +} + +GenerateSystemInfo() { + FunctionStart + colorEcho $BLUE "start to collect system info !" + + check_sys + + cpuName=$(awk -F: '/model name/ {name=$2} END {print name}' /proc/cpuinfo | sed 's/^[ \t]*//;s/[ \t]*$//') + cores=$(awk -F: '/processor/ {core++} END {print core}' /proc/cpuinfo) + freq=$(awk -F'[ :]' '/cpu MHz/ {print $4;exit}' /proc/cpuinfo) + ccache=$(awk -F: '/cache size/ {cache=$2} END {print cache}' /proc/cpuinfo | sed 's/^[ \t]*//;s/[ \t]*$//') + cpu_aes=$(grep -i 'aes' /proc/cpuinfo) + cpu_virt=$(grep -Ei 'vmx|svm' /proc/cpuinfo) + tram=$( + LANG=C + free | awk '/Mem/ {print $2}' + ) + tram=$(calc_size $tram) + uram=$( + LANG=C + free | awk '/Mem/ {print $3}' + ) + uram=$(calc_size $uram) + swap=$( + LANG=C + free | awk '/Swap/ {print $2}' + ) + swap=$(calc_size $swap) + uswap=$( + LANG=C + free | awk '/Swap/ {print $3}' + ) + uswap=$(calc_size $uswap) + up=$(awk '{a=$1/86400;b=($1%86400)/3600;c=($1%3600)/60} {printf("%d days, %d hour %d min\n",a,b,c)}' /proc/uptime) + if command_exists "w"; then + load=$( + LANG=C + w | head -1 | awk -F'load average:' '{print $2}' | sed 's/^[ \t]*//;s/[ \t]*$//' + ) + elif command_exists "uptime"; then + load=$( + LANG=C + uptime | head -1 | awk -F'load average:' '{print $2}' | sed 's/^[ \t]*//;s/[ \t]*$//' + ) + fi + opsy=$(GethostArchINfo) + arch=$(uname -m) + if command_exists "getconf"; then + lbit=$(getconf LONG_BIT) + else + echo ${arch} | grep -q "64" && lbit="64" || lbit="32" + fi + kern=$(uname -r) + disk_total_size=$( + LANG=C + df -t simfs -t ext2 -t ext3 -t ext4 -t btrfs -t xfs -t vfat -t ntfs -t swap --total 2>/dev/null | grep total | awk '{ print $2 }' + ) + disk_total_size=$(calc_size $disk_total_size) + disk_used_size=$( + LANG=C + df -t simfs -t ext2 -t ext3 -t ext4 -t btrfs -t xfs -t vfat -t ntfs -t swap --total 2>/dev/null | grep total | awk '{ print $3 }' + ) + disk_used_size=$(calc_size $disk_used_size) + tcpctrl=$(sysctl net.ipv4.tcp_congestion_control | awk -F ' ' '{print $3}') + + FunctionSuccess + + # todo +# StartIOTest + + GetIpv4Info + + Check_Virtualization + + local machineNumber="" + + if [[ $(cat /etc/hostname | cut -d"-" -f 3 | grep -c '^[0-9][0-9]') -gt 0 ]]; then + machineNumber=$(cat /etc/hostname | cut -d"-" -f 3) + else + machineNumber=99 + fi + + export serverName="${city}-${hostArch}-${machineNumber}" + export serverIpPbV4="$public_ipv4" + export serverIpInV4="" + export serverIpPbV6="" + export serverIpInV6="" + export location="$city $region $country" + export provider="$org" + export managePort="$(netstat -ntulp | grep sshd | grep -w tcp | awk '{print$4}' | cut -d":" -f2)" + export cpuCore="$cores @ $freq MHz" + export cpuBrand="$cpuName" + export memoryTotal="$tram" + export diskTotal="$disk_total_size" + export diskUsage="$disk_used_size" + export archInfo="$arch ($lbit Bit)" + export osInfo="$opsy" + export osKernelInfo="$kern" + export tcpControl="$tcpctrl" + export virtualization="$virt" + export ioSpeed="$ioavg MB/s" + export machineId="$(cat /host/etc/machine-id)" + + FunctionEnd +} + +PrintEnv(){ + + FunctionStart + + env + + FunctionEnd +} + +main() { + + GenerateSystemInfo + + PrintEnv + + FunctionEnd + +} + +main \ No newline at end of file diff --git a/agent/src/main/java/io/wdd/agent/config/beans/executor/CommandLog.java b/agent/src/main/java/io/wdd/agent/config/beans/executor/CommandLog.java index 48a61e2..f9a8722 100644 --- a/agent/src/main/java/io/wdd/agent/config/beans/executor/CommandLog.java +++ b/agent/src/main/java/io/wdd/agent/config/beans/executor/CommandLog.java @@ -18,6 +18,6 @@ public class CommandLog { // @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") private String lineTime; - private String lineContend; + private Object lineContend; } diff --git a/agent/src/main/java/io/wdd/agent/config/beans/executor/StreamSenderEntity.java b/agent/src/main/java/io/wdd/agent/config/beans/executor/StreamSenderEntity.java new file mode 100644 index 0000000..ff28870 --- /dev/null +++ b/agent/src/main/java/io/wdd/agent/config/beans/executor/StreamSenderEntity.java @@ -0,0 +1,25 @@ +package io.wdd.agent.config.beans.executor; + + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +import java.util.ArrayList; + +@Data +@AllArgsConstructor +@NoArgsConstructor +@SuperBuilder(toBuilder = true) +public class StreamSenderEntity { + + private String streamKey; + + private ArrayList cachedCommandLog; + + private boolean waitToSendLog; + + private int startIndex; + +} diff --git a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java index a2d4abf..09501c1 100644 --- a/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java +++ b/agent/src/main/java/io/wdd/agent/executor/CommandExecutor.java @@ -2,9 +2,9 @@ package io.wdd.agent.executor; import com.google.common.io.ByteStreams; import io.wdd.agent.executor.redis.StreamSender; -import io.wdd.agent.executor.thread.DaemonLogThread; -import io.wdd.agent.executor.thread.LogToStreamSender; +import io.wdd.agent.executor.thread.LogToArrayListCache; import io.wdd.common.beans.executor.ExecutionMessage; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Configuration; @@ -12,6 +12,7 @@ import javax.annotation.Resource; import java.io.*; import java.nio.ByteBuffer; import java.util.List; +import java.util.concurrent.TimeUnit; @Configuration @@ -21,6 +22,8 @@ public class CommandExecutor { @Resource StreamSender streamSender; + @Resource + LogToArrayListCache logToArrayListCache; /** * handle command from octopus server @@ -52,28 +55,26 @@ public class CommandExecutor { processBuilder.redirectErrorStream(true); // processBuilder.inheritIO(); -// processBuilder.directory(new File(System.getProperty("user.home"))); - + processBuilder.directory(new File(System.getProperty("user.home"))); int processResult = 233; try { Process process = processBuilder.start(); + // cache log lines + logToArrayListCache.cacheLog(streamKey, process.getInputStream()); + streamSender.startToWaitLog(streamKey); - LogToStreamSender toStreamSender = new LogToStreamSender(streamKey, process.getInputStream(), streamSender::send); - DaemonLogThread.start(toStreamSender); - - log.warn("---------------------------------------------"); - new BufferedReader(new InputStreamReader(process.getInputStream())).lines() - .map( - String::valueOf - ).forEach(System.out::println); - log.warn("---------------------------------------------"); +// log.warn("start---------------------------------------------"); +// new BufferedReader(new InputStreamReader(process.getInputStream())).lines() +// .forEach(System.out::println); +// log.warn("end ---------------------------------------------"); // a command shell don't understand how long it actually take processResult = process.waitFor(); + streamSender.endWaitLog(streamKey); log.info("current shell command {} result is {}", processBuilder.command(), processResult); @@ -105,4 +106,17 @@ public class CommandExecutor { } + @SneakyThrows + public void clearCommandCache(String streamKey) { + + // wait + TimeUnit.SECONDS.sleep(1); + + // clear the log Cache Thread scope + logToArrayListCache.getCommandCachedLog(streamKey).clear(); + + // clear the stream sender + streamSender.clearLocalCache(streamKey); + + } } diff --git a/agent/src/main/java/io/wdd/agent/executor/FunctionExecutor.java b/agent/src/main/java/io/wdd/agent/executor/FunctionExecutor.java index 4cdd432..eb4fefc 100644 --- a/agent/src/main/java/io/wdd/agent/executor/FunctionExecutor.java +++ b/agent/src/main/java/io/wdd/agent/executor/FunctionExecutor.java @@ -57,15 +57,15 @@ public class FunctionExecutor { Iterator> iterator = commandList.iterator(); while (iterator.hasNext()) { - int execute = commandExecutor.execute(streamKey, iterator.next()); if (execute != 0) { log.error("command list execute failed !"); break; } - } + + commandExecutor.clearCommandCache(streamKey); } diff --git a/agent/src/main/java/io/wdd/agent/executor/redis/StreamSender.java b/agent/src/main/java/io/wdd/agent/executor/redis/StreamSender.java index 0a487c2..8f02696 100644 --- a/agent/src/main/java/io/wdd/agent/executor/redis/StreamSender.java +++ b/agent/src/main/java/io/wdd/agent/executor/redis/StreamSender.java @@ -2,6 +2,8 @@ package io.wdd.agent.executor.redis; import io.wdd.agent.config.beans.executor.CommandLog; +import io.wdd.agent.config.beans.executor.StreamSenderEntity; +import io.wdd.agent.executor.thread.LogToArrayListCache; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.beanutils.BeanUtils; @@ -11,16 +13,17 @@ import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.connection.stream.StreamRecords; -import org.springframework.data.redis.connection.stream.StringRecord; import org.springframework.data.redis.core.RedisTemplate; import javax.annotation.Resource; -import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -31,6 +34,82 @@ public class StreamSender { @Resource RedisTemplate redisTemplate; + @Resource + LogToArrayListCache logToArrayListCache; + + + private HashMap AllNeededStreamSender = new HashMap<>(16); + + + private ArrayList cacheLogList = new ArrayList<>(256); + + public void startToWaitLog(String streamKey) throws InterruptedException { + + if (!AllNeededStreamSender.containsKey(streamKey)) { + + StreamSenderEntity streamSender = StreamSenderEntity.builder() + .cachedCommandLog(logToArrayListCache.getCommandCachedLog(streamKey)) + .waitToSendLog(true) + .startIndex(0) + .streamKey(streamKey).build(); + + AllNeededStreamSender.put(streamKey, streamSender); + + } + + TimeUnit.SECONDS.sleep(2); + if (AllNeededStreamSender.get(streamKey).isWaitToSendLog()) { + log.info("stream sender wait 1 s to send message"); + AllNeededStreamSender.get(streamKey).setWaitToSendLog(false); + batchSendLog(streamKey); + } + } + + public void endWaitLog(String streamKey){ + + StreamSenderEntity streamSenderEntity = AllNeededStreamSender.get(streamKey); + streamSenderEntity.setWaitToSendLog(false); + + batchSendLog(streamKey); + + } + + public void batchSendLog(String streamKey) { + StreamSenderEntity streamSenderEntity = AllNeededStreamSender.get(streamKey); + + log.info("batch send log == {}", streamSenderEntity); + + ArrayList cachedCommandLog = streamSenderEntity.getCachedCommandLog(); + +// System.out.println("cachedCommandLog = " + cachedCommandLog); + + int startIndex = streamSenderEntity.getStartIndex(); + int endIndex = cachedCommandLog.size(); + + List content = cachedCommandLog.subList(startIndex, endIndex); + +// System.out.println("content = " + content); + + this.send(streamKey, content); + // for next time + startIndex = endIndex; + } + + public boolean send(String streamKey, String content){ + + return this.send(streamKey, List.of(content)); + + } + + private boolean send(String streamKey, List content) { + + HashMap> map = new HashMap<>(16); + + map.put(currentTimeString(), content); + + return doSendLogToStream(streamKey, map); + } + private static ByteBuffer currentTimeByteBuffer(){ byte[] timeBytes = LocalDateTime.now(ZoneId.of("UTC+8")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).getBytes(StandardCharsets.UTF_8); @@ -43,33 +122,22 @@ public class StreamSender { return LocalDateTime.now(ZoneId.of("UTC+8")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); } + private boolean doSendLogToStream(String streamKey, HashMap map) { - public static String TEST_STREAM_JAVA = "test-stream-java"; + log.info("redis stream sender message is {}", map); + MapRecord> stringRecord = StreamRecords.mapBacked(map).withStreamKey(streamKey); - public boolean send(String streamKey, String content){ + RecordId recordId = redisTemplate.opsForStream().add(stringRecord); - CommandLog commandLog = new CommandLog(currentTimeString(), content); - Map map = null; - try { - map = BeanUtils.describe(commandLog); - } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { - throw new RuntimeException(e); - } +// log.info("redis send recordId is {}",recordId); - log.info("redis stream sender message is {}", map); - - StringRecord stringRecord = StreamRecords.string(map).withStreamKey(streamKey); - - RecordId recordId = redisTemplate.opsForStream().add(stringRecord); - - log.info("redis send recordId is {}",recordId); - - return ObjectUtils.isNotEmpty(recordId); + return ObjectUtils.isNotEmpty(recordId); } + public static String TEST_STREAM_JAVA = "test-stream-java"; @SneakyThrows public void test(){ @@ -93,8 +161,6 @@ public class StreamSender { } - - } @SneakyThrows @@ -107,4 +173,7 @@ public class StreamSender { return map; } + public void clearLocalCache(String streamKey) { + AllNeededStreamSender.remove(streamKey); + } } diff --git a/agent/src/main/java/io/wdd/agent/executor/thread/DaemonLogThread.java b/agent/src/main/java/io/wdd/agent/executor/thread/DaemonLogThread.java index 016e7dd..07fb10a 100644 --- a/agent/src/main/java/io/wdd/agent/executor/thread/DaemonLogThread.java +++ b/agent/src/main/java/io/wdd/agent/executor/thread/DaemonLogThread.java @@ -1,30 +1,32 @@ package io.wdd.agent.executor.thread; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.wdd.common.beans.response.R; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; +import java.util.concurrent.*; -public class DaemonLogThread { - - private static final ExecutorService executorService; - - static { - - ThreadFactory daemonLogThread = new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("DaemonLogThread") - .setPriority(1) - .build(); - - executorService = Executors.newSingleThreadExecutor(daemonLogThread); - - } - - public static Future start(Runnable logToSenderTask) { - - return executorService.submit(logToSenderTask); - } -} +//public class DaemonLogThread { +// +// private static final ExecutorService executorService; +// +// static { +// +// ThreadFactory daemonLogThread = new ThreadFactoryBuilder() +// .setDaemon(true) +// .setNameFormat("BackendToRedisThread") +// .setPriority(1) +// .build(); +// +// executorService = Executors.newSingleThreadExecutor(daemonLogThread); +// +// } +// +// public static Future start(Runnable backendToRedisStream) { +// +// return executorService.submit(backendToRedisStream); +// } +// +// public static void stop(Runnable backendToRedisStream) { +// executorService.shutdownNow(); +// } +//} diff --git a/agent/src/main/java/io/wdd/agent/executor/thread/LogToArrayListCache.java b/agent/src/main/java/io/wdd/agent/executor/thread/LogToArrayListCache.java new file mode 100644 index 0000000..f12a7dd --- /dev/null +++ b/agent/src/main/java/io/wdd/agent/executor/thread/LogToArrayListCache.java @@ -0,0 +1,56 @@ +package io.wdd.agent.executor.thread; + + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; + + +@Component +@Slf4j +public class LogToArrayListCache { + + // concurrent command execute logs + public static List> CachedCommandLog = List.of( + new ArrayList<>(256), + new ArrayList<>(256), + new ArrayList<>(256), + new ArrayList<>(256), + new ArrayList<>(256) + ); + + public void cacheLog(String streamKey, InputStream commandLogStream) { + + ArrayList commandCachedLog = this.getCommandCachedLog(streamKey); + +// log.info(String.valueOf(commandCachedLog)); + + new BufferedReader(new InputStreamReader(commandLogStream)) + .lines() + .forEach( + commandCachedLog::add + ); + + log.info("current streamKey is {} and CacheLog is {}", streamKey, commandCachedLog); + } + + public ArrayList getCommandCachedLog(String streamKey) { + + int keyToIndex = this.hashStreamKeyToIndex(streamKey); + + return CachedCommandLog.get(keyToIndex); + } + + private int hashStreamKeyToIndex(String streamKey) { + + int size = CachedCommandLog.size(); + + return Math.abs(streamKey.hashCode() % size); + } + +} diff --git a/source/src/main/java/io/wdd/source/shell/install-nginx.sh b/source/src/main/java/io/wdd/source/shell/install-nginx.sh index 624bb60..4d37b00 100644 --- a/source/src/main/java/io/wdd/source/shell/install-nginx.sh +++ b/source/src/main/java/io/wdd/source/shell/install-nginx.sh @@ -1,8 +1,22 @@ #!/bin/bash -. "C:\Users\wdd\IdeaProjects\ProjectOctopus\source\src\main\java\io\wdd\source\shell\lib\wdd-lib-log.sh" +. /root/IdeaProjects/ProjectOctopus/source/src/main/java/io/wdd/source/shell/lib/wdd-lib-log.sh + +. /root/IdeaProjects/ProjectOctopus/source/src/main/java/io/wdd/source/shell/lib/wdd-lib-env.sh -log "wdd is awesome !" \ No newline at end of file +log "wdd is awesome !" +error "error message" + +debug "debug message" +info "woshinibaba!" + + +debug "-------------------" + +log "env TEST_ENV is $(env | grep TEST_ENV)" + +TEST_ENV=cccc +log "env TEST_ENV is $(env | grep TEST_ENV)" \ No newline at end of file diff --git a/source/src/main/java/io/wdd/source/shell/lib/wdd-lib-env.sh b/source/src/main/java/io/wdd/source/shell/lib/wdd-lib-env.sh new file mode 100644 index 0000000..5d8c45e --- /dev/null +++ b/source/src/main/java/io/wdd/source/shell/lib/wdd-lib-env.sh @@ -0,0 +1,4 @@ +#!/bin/bash + + +export TEST_ENV="${TEST_ENV}_.txt" \ No newline at end of file diff --git a/source/src/main/java/io/wdd/source/shell/lib/wdd-lib-log.sh b/source/src/main/java/io/wdd/source/shell/lib/wdd-lib-log.sh index c7c0f6d..2decece 100644 --- a/source/src/main/java/io/wdd/source/shell/lib/wdd-lib-log.sh +++ b/source/src/main/java/io/wdd/source/shell/lib/wdd-lib-log.sh @@ -37,7 +37,7 @@ stderr_print() { # None ######################### log() { - stderr_print "${CYAN}${MODULE:-} ${MAGENTA}$(date "+%T.%2N ")${RESET}${*}" + stderr_print "${CYAN}${MODULE:-} ${MAGENTA}$(date "+%Y-%m-%d %H:%M:%S.%2N ")${RESET}${*}" } ######################## # Log an 'info' message @@ -80,7 +80,7 @@ error() { ######################### debug() { # 'is_boolean_yes' is defined in libvalidations.sh, but depends on this file so we cannot source it - local bool="${BITNAMI_DEBUG:-false}" + local bool="${BITNAMI_DEBUG:-true}" # comparison is performed without regard to the case of alphabetic characters shopt -s nocasematch if [[ "$bool" = 1 || "$bool" =~ ^(yes|true)$ ]]; then