Files
2025-03-27 16:09:20 +08:00

374 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import subprocess
import paho.mqtt.client as mqtt
import json
import time
import threading
import logging
from config import *
import datetime
import schedule # 需要先安装: pip install schedule
import yaml
# 读取yaml配置
def load_mqtt_config():
config_path = os.getenv('CONFIG_PATH', 'config.yaml')
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
return config['mqtt'], config['topics']
# 获取MQTT和topics配置
mqtt_config, topics_config = load_mqtt_config()
# 设置日志配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('check.log'),
logging.StreamHandler()
]
)
# 存储运行中的任务及其配置
running_tasks = {}
task_configs = {}
# 启动 Dev_Fusion.py 的命令模板
fusion_command_template = f"nohup python Dev_Fusion.py -t {{task_id}} -g {DEV_FUSION_G} -i {DEV_FUSION_I} > /dev/null 2> error.log &"
# 日志文件夹路径
log_folder = "tasklog"
os.makedirs(log_folder, exist_ok=True)
# 创建全局锁
task_lock = threading.Lock()
def compare_configs(old_config, new_config):
"""
比较两个配置是否有实质性差异
返回 True 表示有差异,需要重启
返回 False 表示无差异,只需转发
"""
try:
# 1. 检查 devices 列表
old_devices = old_config.get('devices', [])
new_devices = new_config.get('devices', [])
if len(old_devices) != len(new_devices):
return True
# 为每个设备创建一个关键信息元组进行比较
def get_device_key(device):
return (
device.get('device_id'),
device.get('device_topic'),
device.get('device_type'),
device.get('reference_point')
)
old_device_keys = {get_device_key(d) for d in old_devices}
new_device_keys = {get_device_key(d) for d in new_devices}
# 如果设备的关键信息有变化,需要重启
if old_device_keys != new_device_keys:
return True
# 2. 检查参考点
old_ref = old_config.get('reference_point')
new_ref = new_config.get('reference_point')
if old_ref != new_ref:
return True
# 3. 其他参数(如 sampling_rate的变化不需要重启
logging.info("No critical configuration changes detected")
return False
except Exception as e:
logging.error(f"Error comparing configs: {str(e)}")
return True # 出错时视为有差异,安全起见重启实例
def stop_task(task_id):
"""停止指定的任务实例"""
try:
if task_id in running_tasks:
process = running_tasks[task_id]
# 使用 pkill 命令终止对应的 Python 进程
subprocess.run(f"pkill -f 'python.*Dev_Fusion.py.*-t {task_id}'", shell=True)
process.wait(timeout=5) # 等待进程结束
del running_tasks[task_id]
del task_configs[task_id]
logging.info(f"Task {task_id} stopped successfully")
except Exception as e:
logging.error(f"Error stopping task {task_id}: {str(e)}")
# 多线程处理函数
def handle_task(client, task_id, payload):
try:
with task_lock: # 使用锁保护共享资源
data = json.loads(payload)
sensor_topic = topics_config['sensor_topic'].replace("+", task_id)
# 记录配置更新
log_file = os.path.join(log_folder, f"received_tasklog_{task_id}.txt")
current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
def log_config_update(action):
with open(log_file, "a") as f:
f.write(f"\n=== Configuration Update at {current_time} ===\n")
f.write(f"Task ID: {task_id}\n")
f.write(f"MQTT_TOPIC: {topics_config['mqtt_topic']}\n")
f.write(f"Payload: {payload}\n")
f.write(f"Action: {action}\n")
f.write("=" * 50 + "\n")
# 检查任务是否已经在运行
if task_id in running_tasks:
# 检查是否有存储的配置
if task_id in task_configs:
# 比较新旧配置
if compare_configs(task_configs[task_id], data):
logging.info(f"Configuration changed for task {task_id}, restarting...")
stop_task(task_id)
log_config_update("Configuration changed, restarting instance")
start_new_instance(client, task_id, payload, data)
else:
# 配置无变化,只转发消息
logging.info(f"No configuration change for task {task_id}, forwarding message")
log_config_update("Message forwarded (no critical changes)")
client.publish(sensor_topic, payload)
else:
# 没有存储的配置,存储新配置并转发
logging.info(f"No stored config for task {task_id}, storing first config")
task_configs[task_id] = data
log_config_update("First config stored and forwarded")
client.publish(sensor_topic, payload)
else:
# 任务不存在,启动新实例
log_config_update("New instance started")
start_new_instance(client, task_id, payload, data)
except Exception as e:
logging.error(f"Error handling task {task_id}: {str(e)}")
def start_new_instance(client, task_id, payload, config):
"""启动新的 Dev_Fusion 实例"""
try:
# 启动 Dev_Fusion.py 实例
fusion_command = fusion_command_template.format(task_id=task_id)
process = subprocess.Popen(fusion_command, shell=True)
running_tasks[task_id] = process
task_configs[task_id] = config
logging.info(f"Dev_Fusion.py started successfully for Task ID {task_id}")
# 保存日志,使用追加模式
log_file = os.path.join(log_folder, f"received_tasklog_{task_id}.txt")
current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
with open(log_file, "a") as f: # 使用 "a" 模式追加内容
f.write(f"\n=== Configuration Update at {current_time} ===\n")
f.write(f"Task ID: {task_id}\n")
f.write(f"MQTT_TOPIC: {topics_config['mqtt_topic']}\n")
f.write(f"Payload: {payload}\n")
# 记录是否触发了重启
f.write("Action: New instance started\n")
f.write("=" * 50 + "\n")
# 等待实例启动
time.sleep(0.5)
# 发送配置
sensor_topic = topics_config['sensor_topic'].replace("+", task_id)
client.publish(sensor_topic, payload)
logging.info(f"Configuration sent to {sensor_topic}")
except Exception as e:
logging.error(f"Error starting new instance for task {task_id}: {str(e)}")
if task_id in running_tasks:
del running_tasks[task_id]
del task_configs[task_id]
# MQTT 回调函数
def on_connect(client, userdata, flags, rc):
if rc == 0:
logging.info("Connected to MQTT broker")
client.subscribe(topics_config['mqtt_topic']) # 使用yaml中的topic
else:
logging.error(f"Connection failed with code {rc}: {DISCONNECT_REASONS.get(rc, 'Unknown error')}")
def on_message(client, userdata, msg):
try:
payload = msg.payload.decode("utf-8")
logging.info(f"Received message on topic {msg.topic}")
data = json.loads(payload)
task_id = data.get("task_id")
if task_id:
thread = threading.Thread(target=handle_task, args=(client, task_id, payload))
thread.start()
else:
logging.warning("Received message without task_id")
except json.JSONDecodeError:
logging.error("Received message is not valid JSON")
except Exception as e:
logging.error(f"Error processing message: {str(e)}")
def check_running_instances():
"""检查系统中已经运行的 Dev_Fusion 实例"""
try:
# 使用 ps 命令查找运行中的 Dev_Fusion.py 实例
result = subprocess.run("ps aux | grep 'python.*Dev_Fusion.py' | grep -v grep",
shell=True, capture_output=True, text=True)
found_instances = []
for line in result.stdout.splitlines():
# 从命令行参数中提取 task_id
if '-t' in line:
parts = line.split()
for i, part in enumerate(parts):
if part == '-t' and i + 1 < len(parts):
task_id = parts[i + 1]
pid = parts[1] # 进程 ID 通常在第二列
found_instances.append((task_id, pid))
for task_id, pid in found_instances:
logging.info(f"Found running instance for task {task_id}, pid: {pid}")
# 读取该任务的最新配置
config = read_latest_config(task_id)
if config:
# 将已运行的实例添加到 running_tasks
running_tasks[task_id] = subprocess.Popen(['echo', ''], stdout=subprocess.PIPE)
running_tasks[task_id].pid = int(pid)
task_configs[task_id] = config
logging.info(
f"Successfully loaded config for task {task_id} from tasklog/received_tasklog_{task_id}.txt")
else:
logging.warning(f"No valid config found for task {task_id}, stopping instance...")
subprocess.run(f"pkill -f 'python.*Dev_Fusion.py.*-t {task_id}'", shell=True)
logging.info(f"Stopped instance {task_id} due to missing config")
logging.info(f"Finished checking instances. Loaded {len(running_tasks)} tasks with valid configs")
except Exception as e:
logging.error(f"Error checking running instances: {str(e)}")
def read_latest_config(task_id):
"""读取指定任务的最新配置"""
try:
log_file = os.path.join(log_folder, f"received_tasklog_{task_id}.txt")
if not os.path.exists(log_file):
logging.error(f"No log file found for task {task_id}")
return None
with open(log_file, 'r') as f:
content = f.read()
# 按配置更新块分割
updates = content.split('=== Configuration Update at')
if not updates:
return None
# 获取最后一个更新块
latest_update = updates[-1]
# 提取 Payload
payload_start = latest_update.find('Payload: ') + len('Payload: ')
payload_end = latest_update.find('\nAction:')
if payload_end == -1: # 如果没有 Action 行
payload_end = latest_update.find('\n===')
if payload_start > 0 and payload_end > payload_start:
payload = latest_update[payload_start:payload_end].strip()
return json.loads(payload)
return None
except Exception as e:
logging.error(f"Error reading latest config for task {task_id}: {str(e)}")
return None
def restart_all_instances():
"""重启所有运行中的实例"""
logging.info("Scheduled restart: Beginning restart of all instances")
# 复制当前运行的任务列表,因为我们会修改 running_tasks
tasks_to_restart = list(running_tasks.keys())
for task_id in tasks_to_restart:
try:
# 读取最新配置
config = read_latest_config(task_id)
if not config:
logging.error(f"Could not find latest config for task {task_id}, skipping restart")
continue
# 停止当前实例
logging.info(f"Stopping task {task_id} for scheduled restart")
stop_task(task_id)
# 将配置转换为 JSON 字符串
payload = json.dumps(config)
# 启动新实例
logging.info(f"Starting new instance for task {task_id} with latest config")
start_new_instance(mqtt_client, task_id, payload, config)
except Exception as e:
logging.error(f"Error restarting task {task_id}: {str(e)}")
def setup_scheduled_restart(restart_time="03:00"):
"""设置定时重启任务"""
schedule.every().day.at(restart_time).do(restart_all_instances)
def run_schedule():
while True:
schedule.run_pending()
time.sleep(30) # 每30秒检查一次
# 启动调度器线程
scheduler_thread = threading.Thread(target=run_schedule, daemon=True)
scheduler_thread.start()
def main():
global mqtt_client # 添加全局变量以在重启时使用
# 在启动时检查已运行的实例
check_running_instances()
# 创建 MQTT 客户端
mqtt_client = mqtt.Client()
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.username_pw_set(mqtt_config['username'], mqtt_config['password'])
# 设置定时重启默认凌晨3点
setup_scheduled_restart()
while True:
try:
mqtt_client.connect(mqtt_config['broker'], mqtt_config['port'], 60)
mqtt_client.loop_forever()
except Exception as e:
logging.error(f"MQTT connection error: {str(e)}")
time.sleep(5)
if __name__ == "__main__":
main()