374 lines
13 KiB
Python
374 lines
13 KiB
Python
import os
|
||
import subprocess
|
||
import paho.mqtt.client as mqtt
|
||
import json
|
||
import time
|
||
import threading
|
||
import logging
|
||
from config import *
|
||
import datetime
|
||
import schedule # 需要先安装: pip install schedule
|
||
import yaml
|
||
|
||
|
||
# 读取yaml配置
|
||
def load_mqtt_config():
|
||
config_path = os.getenv('CONFIG_PATH', 'config.yaml')
|
||
with open(config_path, 'r') as f:
|
||
config = yaml.safe_load(f)
|
||
return config['mqtt'], config['topics']
|
||
|
||
|
||
# 获取MQTT和topics配置
|
||
mqtt_config, topics_config = load_mqtt_config()
|
||
|
||
# 设置日志配置
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||
handlers=[
|
||
logging.FileHandler('check.log'),
|
||
logging.StreamHandler()
|
||
]
|
||
)
|
||
|
||
# 存储运行中的任务及其配置
|
||
running_tasks = {}
|
||
task_configs = {}
|
||
|
||
# 启动 Dev_Fusion.py 的命令模板
|
||
fusion_command_template = f"nohup python Dev_Fusion.py -t {{task_id}} -g {DEV_FUSION_G} -i {DEV_FUSION_I} > /dev/null 2> error.log &"
|
||
|
||
# 日志文件夹路径
|
||
log_folder = "tasklog"
|
||
os.makedirs(log_folder, exist_ok=True)
|
||
|
||
# 创建全局锁
|
||
task_lock = threading.Lock()
|
||
|
||
|
||
def compare_configs(old_config, new_config):
|
||
"""
|
||
比较两个配置是否有实质性差异
|
||
返回 True 表示有差异,需要重启
|
||
返回 False 表示无差异,只需转发
|
||
"""
|
||
try:
|
||
# 1. 检查 devices 列表
|
||
old_devices = old_config.get('devices', [])
|
||
new_devices = new_config.get('devices', [])
|
||
|
||
if len(old_devices) != len(new_devices):
|
||
return True
|
||
|
||
# 为每个设备创建一个关键信息元组进行比较
|
||
def get_device_key(device):
|
||
return (
|
||
device.get('device_id'),
|
||
device.get('device_topic'),
|
||
device.get('device_type'),
|
||
device.get('reference_point')
|
||
|
||
)
|
||
|
||
old_device_keys = {get_device_key(d) for d in old_devices}
|
||
new_device_keys = {get_device_key(d) for d in new_devices}
|
||
|
||
# 如果设备的关键信息有变化,需要重启
|
||
if old_device_keys != new_device_keys:
|
||
return True
|
||
|
||
# 2. 检查参考点
|
||
old_ref = old_config.get('reference_point')
|
||
new_ref = new_config.get('reference_point')
|
||
|
||
if old_ref != new_ref:
|
||
return True
|
||
|
||
# 3. 其他参数(如 sampling_rate)的变化不需要重启
|
||
logging.info("No critical configuration changes detected")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logging.error(f"Error comparing configs: {str(e)}")
|
||
return True # 出错时视为有差异,安全起见重启实例
|
||
|
||
|
||
def stop_task(task_id):
|
||
"""停止指定的任务实例"""
|
||
try:
|
||
if task_id in running_tasks:
|
||
process = running_tasks[task_id]
|
||
# 使用 pkill 命令终止对应的 Python 进程
|
||
subprocess.run(f"pkill -f 'python.*Dev_Fusion.py.*-t {task_id}'", shell=True)
|
||
process.wait(timeout=5) # 等待进程结束
|
||
del running_tasks[task_id]
|
||
del task_configs[task_id]
|
||
logging.info(f"Task {task_id} stopped successfully")
|
||
except Exception as e:
|
||
logging.error(f"Error stopping task {task_id}: {str(e)}")
|
||
|
||
|
||
# 多线程处理函数
|
||
def handle_task(client, task_id, payload):
|
||
try:
|
||
with task_lock: # 使用锁保护共享资源
|
||
data = json.loads(payload)
|
||
sensor_topic = topics_config['sensor_topic'].replace("+", task_id)
|
||
|
||
# 记录配置更新
|
||
log_file = os.path.join(log_folder, f"received_tasklog_{task_id}.txt")
|
||
current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
def log_config_update(action):
|
||
with open(log_file, "a") as f:
|
||
f.write(f"\n=== Configuration Update at {current_time} ===\n")
|
||
f.write(f"Task ID: {task_id}\n")
|
||
f.write(f"MQTT_TOPIC: {topics_config['mqtt_topic']}\n")
|
||
f.write(f"Payload: {payload}\n")
|
||
f.write(f"Action: {action}\n")
|
||
f.write("=" * 50 + "\n")
|
||
|
||
# 检查任务是否已经在运行
|
||
if task_id in running_tasks:
|
||
# 检查是否有存储的配置
|
||
if task_id in task_configs:
|
||
# 比较新旧配置
|
||
if compare_configs(task_configs[task_id], data):
|
||
logging.info(f"Configuration changed for task {task_id}, restarting...")
|
||
stop_task(task_id)
|
||
log_config_update("Configuration changed, restarting instance")
|
||
start_new_instance(client, task_id, payload, data)
|
||
else:
|
||
# 配置无变化,只转发消息
|
||
logging.info(f"No configuration change for task {task_id}, forwarding message")
|
||
log_config_update("Message forwarded (no critical changes)")
|
||
client.publish(sensor_topic, payload)
|
||
else:
|
||
# 没有存储的配置,存储新配置并转发
|
||
logging.info(f"No stored config for task {task_id}, storing first config")
|
||
task_configs[task_id] = data
|
||
log_config_update("First config stored and forwarded")
|
||
client.publish(sensor_topic, payload)
|
||
else:
|
||
# 任务不存在,启动新实例
|
||
log_config_update("New instance started")
|
||
start_new_instance(client, task_id, payload, data)
|
||
|
||
except Exception as e:
|
||
logging.error(f"Error handling task {task_id}: {str(e)}")
|
||
|
||
|
||
def start_new_instance(client, task_id, payload, config):
|
||
"""启动新的 Dev_Fusion 实例"""
|
||
try:
|
||
# 启动 Dev_Fusion.py 实例
|
||
fusion_command = fusion_command_template.format(task_id=task_id)
|
||
process = subprocess.Popen(fusion_command, shell=True)
|
||
running_tasks[task_id] = process
|
||
task_configs[task_id] = config
|
||
|
||
logging.info(f"Dev_Fusion.py started successfully for Task ID {task_id}")
|
||
|
||
# 保存日志,使用追加模式
|
||
log_file = os.path.join(log_folder, f"received_tasklog_{task_id}.txt")
|
||
current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
with open(log_file, "a") as f: # 使用 "a" 模式追加内容
|
||
f.write(f"\n=== Configuration Update at {current_time} ===\n")
|
||
f.write(f"Task ID: {task_id}\n")
|
||
f.write(f"MQTT_TOPIC: {topics_config['mqtt_topic']}\n")
|
||
f.write(f"Payload: {payload}\n")
|
||
# 记录是否触发了重启
|
||
f.write("Action: New instance started\n")
|
||
f.write("=" * 50 + "\n")
|
||
|
||
# 等待实例启动
|
||
time.sleep(0.5)
|
||
|
||
# 发送配置
|
||
sensor_topic = topics_config['sensor_topic'].replace("+", task_id)
|
||
client.publish(sensor_topic, payload)
|
||
logging.info(f"Configuration sent to {sensor_topic}")
|
||
|
||
except Exception as e:
|
||
logging.error(f"Error starting new instance for task {task_id}: {str(e)}")
|
||
if task_id in running_tasks:
|
||
del running_tasks[task_id]
|
||
del task_configs[task_id]
|
||
|
||
|
||
# MQTT 回调函数
|
||
def on_connect(client, userdata, flags, rc):
|
||
if rc == 0:
|
||
logging.info("Connected to MQTT broker")
|
||
client.subscribe(topics_config['mqtt_topic']) # 使用yaml中的topic
|
||
else:
|
||
logging.error(f"Connection failed with code {rc}: {DISCONNECT_REASONS.get(rc, 'Unknown error')}")
|
||
|
||
|
||
def on_message(client, userdata, msg):
|
||
try:
|
||
payload = msg.payload.decode("utf-8")
|
||
logging.info(f"Received message on topic {msg.topic}")
|
||
|
||
data = json.loads(payload)
|
||
task_id = data.get("task_id")
|
||
|
||
if task_id:
|
||
thread = threading.Thread(target=handle_task, args=(client, task_id, payload))
|
||
thread.start()
|
||
else:
|
||
logging.warning("Received message without task_id")
|
||
|
||
except json.JSONDecodeError:
|
||
logging.error("Received message is not valid JSON")
|
||
except Exception as e:
|
||
logging.error(f"Error processing message: {str(e)}")
|
||
|
||
|
||
def check_running_instances():
|
||
"""检查系统中已经运行的 Dev_Fusion 实例"""
|
||
try:
|
||
# 使用 ps 命令查找运行中的 Dev_Fusion.py 实例
|
||
result = subprocess.run("ps aux | grep 'python.*Dev_Fusion.py' | grep -v grep",
|
||
shell=True, capture_output=True, text=True)
|
||
|
||
found_instances = []
|
||
for line in result.stdout.splitlines():
|
||
# 从命令行参数中提取 task_id
|
||
if '-t' in line:
|
||
parts = line.split()
|
||
for i, part in enumerate(parts):
|
||
if part == '-t' and i + 1 < len(parts):
|
||
task_id = parts[i + 1]
|
||
pid = parts[1] # 进程 ID 通常在第二列
|
||
found_instances.append((task_id, pid))
|
||
|
||
for task_id, pid in found_instances:
|
||
logging.info(f"Found running instance for task {task_id}, pid: {pid}")
|
||
|
||
# 读取该任务的最新配置
|
||
config = read_latest_config(task_id)
|
||
if config:
|
||
# 将已运行的实例添加到 running_tasks
|
||
running_tasks[task_id] = subprocess.Popen(['echo', ''], stdout=subprocess.PIPE)
|
||
running_tasks[task_id].pid = int(pid)
|
||
task_configs[task_id] = config
|
||
logging.info(
|
||
f"Successfully loaded config for task {task_id} from tasklog/received_tasklog_{task_id}.txt")
|
||
else:
|
||
logging.warning(f"No valid config found for task {task_id}, stopping instance...")
|
||
subprocess.run(f"pkill -f 'python.*Dev_Fusion.py.*-t {task_id}'", shell=True)
|
||
logging.info(f"Stopped instance {task_id} due to missing config")
|
||
|
||
logging.info(f"Finished checking instances. Loaded {len(running_tasks)} tasks with valid configs")
|
||
|
||
except Exception as e:
|
||
logging.error(f"Error checking running instances: {str(e)}")
|
||
|
||
|
||
def read_latest_config(task_id):
|
||
"""读取指定任务的最新配置"""
|
||
try:
|
||
log_file = os.path.join(log_folder, f"received_tasklog_{task_id}.txt")
|
||
if not os.path.exists(log_file):
|
||
logging.error(f"No log file found for task {task_id}")
|
||
return None
|
||
|
||
with open(log_file, 'r') as f:
|
||
content = f.read()
|
||
|
||
# 按配置更新块分割
|
||
updates = content.split('=== Configuration Update at')
|
||
if not updates:
|
||
return None
|
||
|
||
# 获取最后一个更新块
|
||
latest_update = updates[-1]
|
||
|
||
# 提取 Payload
|
||
payload_start = latest_update.find('Payload: ') + len('Payload: ')
|
||
payload_end = latest_update.find('\nAction:')
|
||
if payload_end == -1: # 如果没有 Action 行
|
||
payload_end = latest_update.find('\n===')
|
||
|
||
if payload_start > 0 and payload_end > payload_start:
|
||
payload = latest_update[payload_start:payload_end].strip()
|
||
return json.loads(payload)
|
||
|
||
return None
|
||
except Exception as e:
|
||
logging.error(f"Error reading latest config for task {task_id}: {str(e)}")
|
||
return None
|
||
|
||
|
||
def restart_all_instances():
|
||
"""重启所有运行中的实例"""
|
||
logging.info("Scheduled restart: Beginning restart of all instances")
|
||
|
||
# 复制当前运行的任务列表,因为我们会修改 running_tasks
|
||
tasks_to_restart = list(running_tasks.keys())
|
||
|
||
for task_id in tasks_to_restart:
|
||
try:
|
||
# 读取最新配置
|
||
config = read_latest_config(task_id)
|
||
if not config:
|
||
logging.error(f"Could not find latest config for task {task_id}, skipping restart")
|
||
continue
|
||
|
||
# 停止当前实例
|
||
logging.info(f"Stopping task {task_id} for scheduled restart")
|
||
stop_task(task_id)
|
||
|
||
# 将配置转换为 JSON 字符串
|
||
payload = json.dumps(config)
|
||
|
||
# 启动新实例
|
||
logging.info(f"Starting new instance for task {task_id} with latest config")
|
||
start_new_instance(mqtt_client, task_id, payload, config)
|
||
|
||
except Exception as e:
|
||
logging.error(f"Error restarting task {task_id}: {str(e)}")
|
||
|
||
|
||
def setup_scheduled_restart(restart_time="03:00"):
|
||
"""设置定时重启任务"""
|
||
schedule.every().day.at(restart_time).do(restart_all_instances)
|
||
|
||
def run_schedule():
|
||
while True:
|
||
schedule.run_pending()
|
||
time.sleep(30) # 每30秒检查一次
|
||
|
||
# 启动调度器线程
|
||
scheduler_thread = threading.Thread(target=run_schedule, daemon=True)
|
||
scheduler_thread.start()
|
||
|
||
|
||
def main():
|
||
global mqtt_client # 添加全局变量以在重启时使用
|
||
|
||
# 在启动时检查已运行的实例
|
||
check_running_instances()
|
||
|
||
# 创建 MQTT 客户端
|
||
mqtt_client = mqtt.Client()
|
||
mqtt_client.on_connect = on_connect
|
||
mqtt_client.on_message = on_message
|
||
mqtt_client.username_pw_set(mqtt_config['username'], mqtt_config['password'])
|
||
|
||
# 设置定时重启(默认凌晨3点)
|
||
setup_scheduled_restart()
|
||
|
||
while True:
|
||
try:
|
||
mqtt_client.connect(mqtt_config['broker'], mqtt_config['port'], 60)
|
||
mqtt_client.loop_forever()
|
||
except Exception as e:
|
||
logging.error(f"MQTT connection error: {str(e)}")
|
||
time.sleep(5)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main() |