[ server ] [ scheduler ]- script scheduler fix bugs - 1
This commit is contained in:
@@ -21,12 +21,14 @@ import java.util.List;
|
|||||||
public class LogToArrayListCache {
|
public class LogToArrayListCache {
|
||||||
|
|
||||||
// concurrent command execute logs
|
// concurrent command execute logs
|
||||||
public static List<ArrayList<String>> CachedCommandLog = List.of(
|
public static List<ArrayList<String>> CachedCommandLog = new ArrayList<>(
|
||||||
new ArrayList<>(256),
|
List.of(
|
||||||
new ArrayList<>(256),
|
new ArrayList<>(256),
|
||||||
new ArrayList<>(256),
|
new ArrayList<>(256),
|
||||||
new ArrayList<>(256),
|
new ArrayList<>(256),
|
||||||
new ArrayList<>(256)
|
new ArrayList<>(256),
|
||||||
|
new ArrayList<>(256)
|
||||||
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
public void cacheLog(String streamKey, Process process) {
|
public void cacheLog(String streamKey, Process process) {
|
||||||
@@ -69,7 +71,14 @@ public class LogToArrayListCache {
|
|||||||
return CachedCommandLog.get(keyToIndex);
|
return CachedCommandLog.get(keyToIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private int hashStreamKeyToCachedArrayListIndex(String streamKey) {
|
private int hashStreamKeyToCachedArrayListIndex(String streamKey) {
|
||||||
|
int size = CachedCommandLog.size();
|
||||||
|
return Math.abs(streamKey.hashCode() % size);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private int hashStreamKeyToCachedArrayListIndexWithProblem(String streamKey) {
|
||||||
|
|
||||||
int size = CachedCommandLog.size();
|
int size = CachedCommandLog.size();
|
||||||
int result = Math.abs(streamKey.hashCode() % size);
|
int result = Math.abs(streamKey.hashCode() % size);
|
||||||
|
|||||||
@@ -14,6 +14,8 @@ import org.springframework.web.bind.annotation.*;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static io.wdd.rpc.scheduler.service.status.MonitorAllAgentStatus.ALL_AGENT_TOPIC_NAME_SET;
|
||||||
|
|
||||||
@RestController
|
@RestController
|
||||||
@Api(value = "定时任务控制中心的Controller")
|
@Api(value = "定时任务控制中心的Controller")
|
||||||
@RequestMapping(value = "/octopus/server/scheduler")
|
@RequestMapping(value = "/octopus/server/scheduler")
|
||||||
@@ -35,6 +37,8 @@ public class SchedulerController {
|
|||||||
@ApiParam(name = "scheduleScript") @RequestBody() ScriptSchedulerVO scriptSchedulerVO
|
@ApiParam(name = "scheduleScript") @RequestBody() ScriptSchedulerVO scriptSchedulerVO
|
||||||
) {
|
) {
|
||||||
|
|
||||||
|
ALL_AGENT_TOPIC_NAME_SET.add("Chengdu-amd64-98-98066f");
|
||||||
|
|
||||||
List<String> resultList = octopusQuartzService.createScriptScheduledMission(scriptSchedulerVO);
|
List<String> resultList = octopusQuartzService.createScriptScheduledMission(scriptSchedulerVO);
|
||||||
|
|
||||||
return R.ok(resultList);
|
return R.ok(resultList);
|
||||||
@@ -50,6 +54,7 @@ public class SchedulerController {
|
|||||||
@GetMapping(value = "/queryAllMission")
|
@GetMapping(value = "/queryAllMission")
|
||||||
public R<List<Map<String, Object>>> queryAllQuartzMission() {
|
public R<List<Map<String, Object>>> queryAllQuartzMission() {
|
||||||
|
|
||||||
|
|
||||||
return R.ok(octopusQuartzService.queryAllMission());
|
return R.ok(octopusQuartzService.queryAllMission());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ import java.util.List;
|
|||||||
@SuperBuilder(toBuilder = true)
|
@SuperBuilder(toBuilder = true)
|
||||||
public class ScriptSchedulerDTO extends ScriptSchedulerVO{
|
public class ScriptSchedulerDTO extends ScriptSchedulerVO{
|
||||||
|
|
||||||
List<List<String>> commandList;
|
List<List<String>> completeCommandList;
|
||||||
|
|
||||||
List<String> targetMachineList;
|
List<String> targetMachineList;
|
||||||
|
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ import javax.annotation.PostConstruct;
|
|||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static io.wdd.rpc.scheduler.beans.ScriptSchedulerVO.SCHEDULE_MISSION_GROUP_NAME;
|
import static io.wdd.rpc.scheduler.beans.ScriptSchedulerVO.SCHEDULE_MISSION_GROUP_NAME;
|
||||||
@@ -70,13 +71,18 @@ public class QuartzSchedulerServiceImpl implements QuartzSchedulerService {
|
|||||||
);
|
);
|
||||||
this.addMission(
|
this.addMission(
|
||||||
AgentScriptSchedulerJob.class,
|
AgentScriptSchedulerJob.class,
|
||||||
scriptSchedulerVO.getSchedulerUuid(),
|
scriptSchedulerDTO.getSchedulerUuid(),
|
||||||
SCHEDULE_MISSION_GROUP_NAME,
|
SCHEDULE_MISSION_GROUP_NAME,
|
||||||
1,
|
1, // 立即开始本次任务
|
||||||
scriptSchedulerVO.getScriptContent(),
|
scriptSchedulerDTO.getCronExpress(),
|
||||||
dataMap
|
dataMap
|
||||||
);
|
);
|
||||||
|
|
||||||
|
try {
|
||||||
|
TimeUnit.SECONDS.sleep(2);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
// persistent the script scheduled mission
|
// persistent the script scheduled mission
|
||||||
// dto should store more info
|
// dto should store more info
|
||||||
ScriptSchedulerPO scriptSchedulerPO = convertToScriptSchedulerPO(scriptSchedulerDTO);
|
ScriptSchedulerPO scriptSchedulerPO = convertToScriptSchedulerPO(scriptSchedulerDTO);
|
||||||
@@ -99,7 +105,7 @@ public class QuartzSchedulerServiceImpl implements QuartzSchedulerService {
|
|||||||
.builder()
|
.builder()
|
||||||
.cronExpress(dto.getCronExpress())
|
.cronExpress(dto.getCronExpress())
|
||||||
.schedulerUuid(dto.getSchedulerUuid())
|
.schedulerUuid(dto.getSchedulerUuid())
|
||||||
.scriptContent(String.valueOf(dto.getCommandList()))
|
.scriptContent(String.valueOf(dto.getCompleteCommandList()))
|
||||||
.targetMachine(String.valueOf(dto.getTargetMachineList()))
|
.targetMachine(String.valueOf(dto.getTargetMachineList()))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
@@ -141,10 +147,10 @@ public class QuartzSchedulerServiceImpl implements QuartzSchedulerService {
|
|||||||
throw new MyRuntimeException("cron express wrong !");
|
throw new MyRuntimeException("cron express wrong !");
|
||||||
}
|
}
|
||||||
|
|
||||||
// 归一化 commandList
|
// 归一化 completeCommandList
|
||||||
List<List<String>> commandList = FunctionReader.ReadStringToCommandList(scriptSchedulerVO.getScriptContent());
|
List<List<String>> completeCommandList = FunctionReader.ReadStringToCommandList(scriptSchedulerVO.getScriptContent());
|
||||||
if (commandList.size() == 0) {
|
if (completeCommandList.size() == 0) {
|
||||||
throw new MyRuntimeException("commandList parse wrong !");
|
throw new MyRuntimeException("completeCommandList parse wrong !");
|
||||||
}
|
}
|
||||||
|
|
||||||
// 执行机器目标归一化
|
// 执行机器目标归一化
|
||||||
@@ -155,11 +161,9 @@ public class QuartzSchedulerServiceImpl implements QuartzSchedulerService {
|
|||||||
throw new MyRuntimeException("target machine wrong !");
|
throw new MyRuntimeException("target machine wrong !");
|
||||||
}
|
}
|
||||||
ArrayList<String> targetMachineList = new ArrayList<>();
|
ArrayList<String> targetMachineList = new ArrayList<>();
|
||||||
targetMachineList
|
for (String s : targetMachineSplit) {
|
||||||
.stream()
|
targetMachineList.add(s);
|
||||||
.forEach(
|
}
|
||||||
targetMachineList::add
|
|
||||||
);
|
|
||||||
|
|
||||||
// 生成DTO对象
|
// 生成DTO对象
|
||||||
ScriptSchedulerDTO dto = new ScriptSchedulerDTO();
|
ScriptSchedulerDTO dto = new ScriptSchedulerDTO();
|
||||||
@@ -167,7 +171,7 @@ public class QuartzSchedulerServiceImpl implements QuartzSchedulerService {
|
|||||||
scriptSchedulerVO);
|
scriptSchedulerVO);
|
||||||
|
|
||||||
// 设置属性值
|
// 设置属性值
|
||||||
dto.setCommandList(null);
|
dto.setCompleteCommandList(completeCommandList);
|
||||||
dto.setTargetMachineList(targetMachineList);
|
dto.setTargetMachineList(targetMachineList);
|
||||||
|
|
||||||
// 生成 scheduler uuid
|
// 生成 scheduler uuid
|
||||||
@@ -292,7 +296,7 @@ public class QuartzSchedulerServiceImpl implements QuartzSchedulerService {
|
|||||||
// 触发器key
|
// 触发器key
|
||||||
|
|
||||||
// uniform the start time
|
// uniform the start time
|
||||||
if (ObjectUtils.isEmpty(startTime) || startTime == 0) {
|
if (ObjectUtils.isEmpty(startTime)) {
|
||||||
startTime = 1;
|
startTime = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -3,7 +3,6 @@ package io.wdd.rpc.scheduler.service.script;
|
|||||||
|
|
||||||
import io.wdd.rpc.execute.service.CoreExecutionService;
|
import io.wdd.rpc.execute.service.CoreExecutionService;
|
||||||
import io.wdd.rpc.scheduler.beans.ScriptSchedulerDTO;
|
import io.wdd.rpc.scheduler.beans.ScriptSchedulerDTO;
|
||||||
import lombok.extern.log4j.Log4j;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
@@ -22,7 +21,7 @@ public class AgentApplyScheduledScript {
|
|||||||
|
|
||||||
public void apply(ScriptSchedulerDTO scriptSchedulerDTO) {
|
public void apply(ScriptSchedulerDTO scriptSchedulerDTO) {
|
||||||
|
|
||||||
List<List<String>> completeCommandList = scriptSchedulerDTO.getCommandList();
|
List<List<String>> completeCommandList = scriptSchedulerDTO.getCompleteCommandList();
|
||||||
List<String> targetMachineList = scriptSchedulerDTO.getTargetMachineList();
|
List<String> targetMachineList = scriptSchedulerDTO.getTargetMachineList();
|
||||||
|
|
||||||
List<String> resultKeyList = coreExecutionService.SendCommandToAgentComplete(
|
List<String> resultKeyList = coreExecutionService.SendCommandToAgentComplete(
|
||||||
|
|||||||
Reference in New Issue
Block a user