import random import threading from queue import Queue from paho.mqtt import client as mqtt_client import numpy as np import time import logging import os import datetime from KF_V2 import * from utils import * from config import * import argparse import json import yaml # 首先加载yaml配置 def load_mqtt_config(): config_path = os.getenv('CONFIG_PATH', 'config.yaml') with open(config_path, 'r') as f: config = yaml.safe_load(f) return config['mqtt'], config['topics'] # 获取MQTT和topics配置 mqtt_config, topics_config = load_mqtt_config() ## ======================= # MQTT 代理地址 # broker = '192.168.36.234' # port = 37826 # username = "cmlc" # password = "odD8#Ve7.B" client_id = f'python-mqtt-{random.randint(0, 100)}' # 创建 ArgumentParser 对象 parser = argparse.ArgumentParser(description='处理命令行参数') # 添加参数 task_id,简称 t,类型为 int,默认值为 1 parser.add_argument('-t', '--task_id', type=str, default="+", help='任务ID') # 添加参数 gate,简称 g,类型为 str,默认值为 "default_gate" parser.add_argument('-g', '--gate', type=int, default=30, help='门限值') # 添加参数 interval,简称 i,类型为 float,默认值为 1.0 parser.add_argument('-i', '--interval', type=float, default=1.0, help='时间间隔') # 解析命令行参数 args = parser.parse_args() # 实例化 DataFusion 类 fusion_instance = DataFusion( gate=args.gate, interval=args.interval, ) global task_id task_id = "10087" # 从yaml的mqtt_topic中提取基础路径 base_path = topics_config['mqtt_topic'].split('/')[0] # 获取"bridge" # 更新数据上报的主题格式 providerCode = "DP74b4ef9fb4aaf269" fusionCode = "DPZYLY" deviceType = "5ga" fusionType = "fusion" deviceId = "10580005" fusionId = "554343465692430336" sensor_id_list = ["80103"] # 使用base_path构建topic topic = f"{base_path}/{providerCode}/device_data/{deviceType}/{deviceId}" # 从yaml的sensor_topic中提取基础路径 base_topic = topics_config['sensor_topic'].split('FU_PAM')[0] # 得到 "fromcheck/DP74b4ef9fb4aaf269/device_data/" # 订阅主题 - 基于yaml格式构建 subscribe_topic = f"{base_topic}5ga/10000000000000" # 将FU_PAM替换为5ga,将+替换为具体ID # 发布融合结果的主题 # fusionId的来源是下发任务时的ID publish_topic = f"fromcheck/{fusionCode}/device_data/{fusionType}/{task_id}" # 更新运行参数的主题 fusion_parameters_topic = topics_config['sensor_topic'] # 生成唯一的 client_id # 数据池 data_pool = Queue() run_parameter = None interval = args.interval # 定义参考点 PO(纬度, 经度) global reference_point reference_point = (104.08, 30.51) # 参考点的经纬度 # 数据池 data_pool = Queue() run_parameter = None # 初始化数据处理类 pipe = Pipeline(fusion_parameters_topic=topics_config['sensor_topic'], reference_point=reference_point) fusion_code = "FU_PAM/"+args.task_id # 设置日志记录 def setup_logging(): # 创建logs目录(如果不存在) if not os.path.exists('logs'): os.makedirs('logs') # 设置日志文件名(包含日期) current_time = datetime.datetime.now() error_log_filename = f'logs/mqtt_connection_{current_time.strftime("%Y%m%d")}_error.log' # 配置总的日志记录器 logging.basicConfig( level=logging.INFO, # 记录所有信息 format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler() # 同时输出到控制台 ] ) # 配置错误日志记录器 error_logger = logging.getLogger('error_logger') error_logger.setLevel(logging.ERROR) # 创建文件处理器 error_handler = logging.FileHandler(error_log_filename) error_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')) # 添加处理器到错误日志记录器 error_logger.addHandler(error_handler) def connect_mqtt() -> mqtt_client: def on_connect(client, userdata, flags, rc): if rc == 0: logging.info("Successfully connected to MQTT Broker") logging.info(f"Client ID: {client_id}") logging.info(f"Broker: {mqtt_config['broker']}:{mqtt_config['port']}") # 重新订阅主题 client.subscribe(fusion_parameters_topic) logging.info(f"Subscribed to fusion parameters topic: {fusion_parameters_topic}") if hasattr(pipe, 'topics'): for topic in pipe.topics: client.subscribe(topic) logging.info(f"Subscribed to topic: {topic}") else: logging.error(f"Failed to connect, return code: {rc} ({DISCONNECT_REASONS.get(rc, '未知错误')})") def on_disconnect(client, userdata, rc): current_time = datetime.datetime.now() reason = DISCONNECT_REASONS.get(rc, "未知错误") logging.warning(f"Disconnected from MQTT Broker at {current_time.strftime('%Y-%m-%d %H:%M:%S')}") logging.warning(f"Disconnect reason code: {rc} - {reason}") if rc != 0: logging.error("Unexpected disconnection. Attempting to reconnect...") try: client.reconnect() logging.info("Reconnection successful") except Exception as e: current_time = datetime.datetime.now() logging.error(f"Reconnection failed at {current_time.strftime('%Y-%m-%d %H:%M:%S')}: {str(e)}") logging.error(f"Exception type: {type(e).__name__}") logging.error(f"Stack trace:", exc_info=True) client = mqtt_client.Client(client_id, clean_session=True) client.username_pw_set(mqtt_config['username'], mqtt_config['password']) # 设置保活时间和重试间隔 client.keepalive = 60 # 60秒的保活时间 client.socket_timeout = 30 # 30秒的socket超时 client.reconnect_delay_set(min_delay=1, max_delay=60) # 重连延迟在1-60秒之间 # 设置遗嘱消息(last will message) will_topic = f"fromcheck/{fusionCode}/status/{task_id}" will_payload = "offline" client.will_set(will_topic, will_payload, qos=1, retain=True) # 设置回调函数 client.on_connect = on_connect client.on_disconnect = on_disconnect try: client.connect(mqtt_config['broker'], mqtt_config['port']) except Exception as e: logging.error(f"Initial connection failed: {str(e)}") logging.error(f"Exception type: {type(e).__name__}") logging.error(f"Stack trace:", exc_info=True) time.sleep(5) return connect_mqtt() # 发送上线状态 client.publish(will_topic, "online", qos=1, retain=True) return client def subscribe(client: mqtt_client): def on_message(client, userdata, msg): try: global run_parameter global task_id logging.info(f"Received message on topic: {msg.topic}") logging.info(f"Message payload: {msg.payload.decode()}") if "FU_PAM" in msg.topic: if args.task_id == '+' or fusion_code in msg.topic: new_run_parameter = msg.payload.decode() if run_parameter != new_run_parameter: logging.info(f"Run parameter updated from {run_parameter} to {new_run_parameter}") run_parameter = new_run_parameter new_topics = pipe.extract_parms(run_parameter) logging.info(f"Extracted topics: {new_topics}") client.subscribe(new_topics) # 重新更新订阅的数据 logging.info(f"Subscribed to new topics: {new_topics}") logging.info('===========new run_parameter!===============') current_time = datetime.datetime.now() task_id = pipe.task_id else: data_pool.put((msg.topic, msg.payload)) except Exception as e: logging.error(f"Error processing message: {str(e)}") logging.error(f"Exception type: {type(e).__name__}") logging.error(f"Stack trace:", exc_info=True) subscribe_topics = [(subscribe_topic, 0), (fusion_parameters_topic, 0)] # 默认QoS为0 client.subscribe(subscribe_topics) client.on_message = on_message def publish(client, message): global task_id global fusionCode max_retries = 3 retry_delay = 1 # 初始重试延迟(秒) def do_publish(): publish_topic = f"bridge/{fusionCode}/device_data/fusion/{task_id}" try: result = client.publish(publish_topic, message) status = result.rc if status == 0: current_time = datetime.datetime.now() formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S') with open('log.txt', 'a') as log_file: log_file.write('=====================\n') log_file.write(f"Send message to topic {publish_topic}\n") log_file.write(f"time: {formatted_time}\n") log_file.write(f"{message}\n") return True else: logging.error(f"Failed to send message to topic {publish_topic}, status: {status}") return False except Exception as e: logging.error(f"Error publishing message: {str(e)}") logging.error(f"Exception type: {type(e).__name__}") logging.error(f"Stack trace:", exc_info=True) return False # 实现重试逻辑 for attempt in range(max_retries): if do_publish(): return if attempt < max_retries - 1: # 如果不是最后一次尝试 retry_delay *= 2 # 指数退避 logging.warning(f"Retrying publish in {retry_delay} seconds...") time.sleep(retry_delay) logging.error(f"Failed to publish message after {max_retries} attempts") def data_fusion(fusion_container): global data_pool data_list = [] # 从数据池中提取所有的数据 while not data_pool.empty(): data_now = data_pool.get() processed_data = pipe.process_json_data(data_now[1]) # 筛选有意义的数据 if processed_data and processed_data.get("objects"): # 只记录有 objects 的数据 data_list.append(processed_data) if data_list: # 只有当有数据时才写日志 current_time = datetime.datetime.now() formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S') with open('Data_log.txt', 'a') as log_file: # 以追加模式打开日志文件 log_file.write('=====================\n') # 写入分隔符 log_file.write(f"Get message \n") log_file.write(f"time: {formatted_time}\n") # 写入分隔符 log_file.write(f"{data_list}\n") # 写入消息内容 sensor_data = pipe.data_encoder(data_list) logging.info(sensor_data) filtered_results = fusion_container.run(sensor_data) processed_data = pipe.data_decoder(filtered_results) processed_data = json.dumps(processed_data, indent=4) return processed_data # 返回处理后的 JSON 字符串 def fusion_runner(client): global run_parameter pre_run_parameter = run_parameter last_run_time = time.time() last_health_check = time.time() health_check_interval = 30 # 每30秒进行一次健康检查 fusion_container = DataFusion(args.gate, args.interval) def check_connection(): if not client.is_connected(): logging.warning("MQTT client disconnected during fusion_runner") try: client.reconnect() logging.info("Successfully reconnected in fusion_runner") return True except Exception as e: logging.error(f"Reconnection failed in fusion_runner: {str(e)}") logging.error(f"Exception type: {type(e).__name__}") logging.error(f"Stack trace:", exc_info=True) return False return True while True: try: current_time = time.time() # 定期健康检查 if current_time - last_health_check >= health_check_interval: if not check_connection(): time.sleep(5) # 如果连接失败,等待5秒后继续 continue last_health_check = current_time # 数据处理和发送 if current_time - last_run_time >= interval: if not check_connection(): continue last_run_time = current_time if run_parameter != pre_run_parameter: fusion_parms = pipe.extract_fusion_parms(run_parameter) fusion_container.set_parameter(fusion_parms) pre_run_parameter= run_parameter processed_data = data_fusion(fusion_container) if processed_data: publish(client, processed_data) except Exception as e: logging.error(f"Error in fusion_runner: {str(e)}") logging.error(f"Exception type: {type(e).__name__}") logging.error(f"Stack trace:", exc_info=True) time.sleep(1) def run(): # 初始化日志系统 setup_logging() logging.error("Starting MQTT client application") while True: # 添加外层循环来处理完全断开的情况 try: client = connect_mqtt() subscribe(client) logging.info("Starting fusion_runner thread") fusion_runner_thread = threading.Thread(target=fusion_runner, args=(client,), daemon=True) fusion_runner_thread.start() logging.info("Starting MQTT loop") client.loop_forever() except Exception as e: logging.critical(f"Critical error in main loop: {str(e)}") logging.critical(f"Exception type: {type(e).__name__}") logging.critical(f"Stack trace:", exc_info=True) logging.info("Restarting in 5 seconds...") time.sleep(5) if __name__ == '__main__': run()