Files
WddSuperAgent/agent-common/SplitProject/ranjing-python-devfusion/Dev_Fusion.py
2025-03-27 16:09:20 +08:00

388 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import random
import threading
from queue import Queue
from paho.mqtt import client as mqtt_client
import numpy as np
import time
import logging
import os
import datetime
from KF_V2 import *
from utils import *
from config import *
import argparse
import json
import yaml
# 首先加载yaml配置
def load_mqtt_config():
config_path = os.getenv('CONFIG_PATH', 'config.yaml')
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
return config['mqtt'], config['topics']
# 获取MQTT和topics配置
mqtt_config, topics_config = load_mqtt_config()
## =======================
# MQTT 代理地址
# broker = '192.168.36.234'
# port = 37826
# username = "cmlc"
# password = "odD8#Ve7.B"
client_id = f'python-mqtt-{random.randint(0, 100)}'
# 创建 ArgumentParser 对象
parser = argparse.ArgumentParser(description='处理命令行参数')
# 添加参数 task_id简称 t类型为 int默认值为 1
parser.add_argument('-t', '--task_id', type=str, default="+", help='任务ID')
# 添加参数 gate简称 g类型为 str默认值为 "default_gate"
parser.add_argument('-g', '--gate', type=int, default=30, help='门限值')
# 添加参数 interval简称 i类型为 float默认值为 1.0
parser.add_argument('-i', '--interval', type=float, default=1.0, help='时间间隔')
# 解析命令行参数
args = parser.parse_args()
# 实例化 DataFusion 类
fusion_instance = DataFusion(
gate=args.gate,
interval=args.interval,
)
global task_id
task_id = "10087"
# 从yaml的mqtt_topic中提取基础路径
base_path = topics_config['mqtt_topic'].split('/')[0] # 获取"bridge"
# 更新数据上报的主题格式
providerCode = "DP74b4ef9fb4aaf269"
fusionCode = "DPZYLY"
deviceType = "5ga"
fusionType = "fusion"
deviceId = "10580005"
fusionId = "554343465692430336"
sensor_id_list = ["80103"]
# 使用base_path构建topic
topic = f"{base_path}/{providerCode}/device_data/{deviceType}/{deviceId}"
# 从yaml的sensor_topic中提取基础路径
base_topic = topics_config['sensor_topic'].split('FU_PAM')[0] # 得到 "fromcheck/DP74b4ef9fb4aaf269/device_data/"
# 订阅主题 - 基于yaml格式构建
subscribe_topic = f"{base_topic}5ga/10000000000000" # 将FU_PAM替换为5ga,将+替换为具体ID
# 发布融合结果的主题
# fusionId的来源是下发任务时的ID
publish_topic = f"fromcheck/{fusionCode}/device_data/{fusionType}/{task_id}"
# 更新运行参数的主题
fusion_parameters_topic = topics_config['sensor_topic']
# 生成唯一的 client_id
# 数据池
data_pool = Queue()
run_parameter = None
interval = args.interval
# 定义参考点 PO纬度, 经度)
global reference_point
reference_point = (104.08, 30.51) # 参考点的经纬度
# 数据池
data_pool = Queue()
run_parameter = None
# 初始化数据处理类
pipe = Pipeline(fusion_parameters_topic=topics_config['sensor_topic'], reference_point=reference_point)
fusion_code = "FU_PAM/"+args.task_id
# 设置日志记录
def setup_logging():
# 创建logs目录如果不存在
if not os.path.exists('logs'):
os.makedirs('logs')
# 设置日志文件名(包含日期)
current_time = datetime.datetime.now()
error_log_filename = f'logs/mqtt_connection_{current_time.strftime("%Y%m%d")}_error.log'
# 配置总的日志记录器
logging.basicConfig(
level=logging.INFO, # 记录所有信息
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler() # 同时输出到控制台
]
)
# 配置错误日志记录器
error_logger = logging.getLogger('error_logger')
error_logger.setLevel(logging.ERROR)
# 创建文件处理器
error_handler = logging.FileHandler(error_log_filename)
error_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
# 添加处理器到错误日志记录器
error_logger.addHandler(error_handler)
def connect_mqtt() -> mqtt_client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
logging.info("Successfully connected to MQTT Broker")
logging.info(f"Client ID: {client_id}")
logging.info(f"Broker: {mqtt_config['broker']}:{mqtt_config['port']}")
# 重新订阅主题
client.subscribe(fusion_parameters_topic)
logging.info(f"Subscribed to fusion parameters topic: {fusion_parameters_topic}")
if hasattr(pipe, 'topics'):
for topic in pipe.topics:
client.subscribe(topic)
logging.info(f"Subscribed to topic: {topic}")
else:
logging.error(f"Failed to connect, return code: {rc} ({DISCONNECT_REASONS.get(rc, '未知错误')})")
def on_disconnect(client, userdata, rc):
current_time = datetime.datetime.now()
reason = DISCONNECT_REASONS.get(rc, "未知错误")
logging.warning(f"Disconnected from MQTT Broker at {current_time.strftime('%Y-%m-%d %H:%M:%S')}")
logging.warning(f"Disconnect reason code: {rc} - {reason}")
if rc != 0:
logging.error("Unexpected disconnection. Attempting to reconnect...")
try:
client.reconnect()
logging.info("Reconnection successful")
except Exception as e:
current_time = datetime.datetime.now()
logging.error(f"Reconnection failed at {current_time.strftime('%Y-%m-%d %H:%M:%S')}: {str(e)}")
logging.error(f"Exception type: {type(e).__name__}")
logging.error(f"Stack trace:", exc_info=True)
client = mqtt_client.Client(client_id, clean_session=True)
client.username_pw_set(mqtt_config['username'], mqtt_config['password'])
# 设置保活时间和重试间隔
client.keepalive = 60 # 60秒的保活时间
client.socket_timeout = 30 # 30秒的socket超时
client.reconnect_delay_set(min_delay=1, max_delay=60) # 重连延迟在1-60秒之间
# 设置遗嘱消息last will message
will_topic = f"fromcheck/{fusionCode}/status/{task_id}"
will_payload = "offline"
client.will_set(will_topic, will_payload, qos=1, retain=True)
# 设置回调函数
client.on_connect = on_connect
client.on_disconnect = on_disconnect
try:
client.connect(mqtt_config['broker'], mqtt_config['port'])
except Exception as e:
logging.error(f"Initial connection failed: {str(e)}")
logging.error(f"Exception type: {type(e).__name__}")
logging.error(f"Stack trace:", exc_info=True)
time.sleep(5)
return connect_mqtt()
# 发送上线状态
client.publish(will_topic, "online", qos=1, retain=True)
return client
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
try:
global run_parameter
global task_id
logging.info(f"Received message on topic: {msg.topic}")
logging.info(f"Message payload: {msg.payload.decode()}")
if "FU_PAM" in msg.topic:
if args.task_id == '+' or fusion_code in msg.topic:
new_run_parameter = msg.payload.decode()
if run_parameter != new_run_parameter:
logging.info(f"Run parameter updated from {run_parameter} to {new_run_parameter}")
run_parameter = new_run_parameter
new_topics = pipe.extract_parms(run_parameter)
logging.info(f"Extracted topics: {new_topics}")
client.subscribe(new_topics) # 重新更新订阅的数据
logging.info(f"Subscribed to new topics: {new_topics}")
logging.info('===========new run_parameter!===============')
current_time = datetime.datetime.now()
task_id = pipe.task_id
else:
data_pool.put((msg.topic, msg.payload))
except Exception as e:
logging.error(f"Error processing message: {str(e)}")
logging.error(f"Exception type: {type(e).__name__}")
logging.error(f"Stack trace:", exc_info=True)
subscribe_topics = [(subscribe_topic, 0), (fusion_parameters_topic, 0)] # 默认QoS为0
client.subscribe(subscribe_topics)
client.on_message = on_message
def publish(client, message):
global task_id
global fusionCode
max_retries = 3
retry_delay = 1 # 初始重试延迟(秒)
def do_publish():
publish_topic = f"bridge/{fusionCode}/device_data/fusion/{task_id}"
try:
result = client.publish(publish_topic, message)
status = result.rc
if status == 0:
current_time = datetime.datetime.now()
formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S')
with open('log.txt', 'a') as log_file:
log_file.write('=====================\n')
log_file.write(f"Send message to topic {publish_topic}\n")
log_file.write(f"time: {formatted_time}\n")
log_file.write(f"{message}\n")
return True
else:
logging.error(f"Failed to send message to topic {publish_topic}, status: {status}")
return False
except Exception as e:
logging.error(f"Error publishing message: {str(e)}")
logging.error(f"Exception type: {type(e).__name__}")
logging.error(f"Stack trace:", exc_info=True)
return False
# 实现重试逻辑
for attempt in range(max_retries):
if do_publish():
return
if attempt < max_retries - 1: # 如果不是最后一次尝试
retry_delay *= 2 # 指数退避
logging.warning(f"Retrying publish in {retry_delay} seconds...")
time.sleep(retry_delay)
logging.error(f"Failed to publish message after {max_retries} attempts")
def data_fusion(fusion_container):
global data_pool
data_list = []
# 从数据池中提取所有的数据
while not data_pool.empty():
data_now = data_pool.get()
processed_data = pipe.process_json_data(data_now[1])
# 筛选有意义的数据
if processed_data and processed_data.get("objects"): # 只记录有 objects 的数据
data_list.append(processed_data)
if data_list: # 只有当有数据时才写日志
current_time = datetime.datetime.now()
formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S')
with open('Data_log.txt', 'a') as log_file: # 以追加模式打开日志文件
log_file.write('=====================\n') # 写入分隔符
log_file.write(f"Get message \n")
log_file.write(f"time: {formatted_time}\n") # 写入分隔符
log_file.write(f"{data_list}\n") # 写入消息内容
sensor_data = pipe.data_encoder(data_list)
logging.info(sensor_data)
filtered_results = fusion_container.run(sensor_data)
processed_data = pipe.data_decoder(filtered_results)
processed_data = json.dumps(processed_data, indent=4)
return processed_data # 返回处理后的 JSON 字符串
def fusion_runner(client):
global run_parameter
pre_run_parameter = run_parameter
last_run_time = time.time()
last_health_check = time.time()
health_check_interval = 30 # 每30秒进行一次健康检查
fusion_container = DataFusion(args.gate, args.interval)
def check_connection():
if not client.is_connected():
logging.warning("MQTT client disconnected during fusion_runner")
try:
client.reconnect()
logging.info("Successfully reconnected in fusion_runner")
return True
except Exception as e:
logging.error(f"Reconnection failed in fusion_runner: {str(e)}")
logging.error(f"Exception type: {type(e).__name__}")
logging.error(f"Stack trace:", exc_info=True)
return False
return True
while True:
try:
current_time = time.time()
# 定期健康检查
if current_time - last_health_check >= health_check_interval:
if not check_connection():
time.sleep(5) # 如果连接失败等待5秒后继续
continue
last_health_check = current_time
# 数据处理和发送
if current_time - last_run_time >= interval:
if not check_connection():
continue
last_run_time = current_time
if run_parameter != pre_run_parameter:
fusion_parms = pipe.extract_fusion_parms(run_parameter)
fusion_container.set_parameter(fusion_parms)
pre_run_parameter= run_parameter
processed_data = data_fusion(fusion_container)
if processed_data:
publish(client, processed_data)
except Exception as e:
logging.error(f"Error in fusion_runner: {str(e)}")
logging.error(f"Exception type: {type(e).__name__}")
logging.error(f"Stack trace:", exc_info=True)
time.sleep(1)
def run():
# 初始化日志系统
setup_logging()
logging.error("Starting MQTT client application")
while True: # 添加外层循环来处理完全断开的情况
try:
client = connect_mqtt()
subscribe(client)
logging.info("Starting fusion_runner thread")
fusion_runner_thread = threading.Thread(target=fusion_runner, args=(client,), daemon=True)
fusion_runner_thread.start()
logging.info("Starting MQTT loop")
client.loop_forever()
except Exception as e:
logging.critical(f"Critical error in main loop: {str(e)}")
logging.critical(f"Exception type: {type(e).__name__}")
logging.critical(f"Stack trace:", exc_info=True)
logging.info("Restarting in 5 seconds...")
time.sleep(5)
if __name__ == '__main__':
run()