388 lines
14 KiB
Python
388 lines
14 KiB
Python
import random
|
||
import threading
|
||
from queue import Queue
|
||
from paho.mqtt import client as mqtt_client
|
||
import numpy as np
|
||
import time
|
||
import logging
|
||
import os
|
||
import datetime
|
||
from KF_V2 import *
|
||
from utils import *
|
||
from config import *
|
||
import argparse
|
||
import json
|
||
import yaml
|
||
|
||
# 首先加载yaml配置
|
||
def load_mqtt_config():
|
||
config_path = os.getenv('CONFIG_PATH', 'config.yaml')
|
||
with open(config_path, 'r') as f:
|
||
config = yaml.safe_load(f)
|
||
return config['mqtt'], config['topics']
|
||
|
||
# 获取MQTT和topics配置
|
||
mqtt_config, topics_config = load_mqtt_config()
|
||
|
||
## =======================
|
||
# MQTT 代理地址
|
||
# broker = '192.168.36.234'
|
||
# port = 37826
|
||
# username = "cmlc"
|
||
# password = "odD8#Ve7.B"
|
||
client_id = f'python-mqtt-{random.randint(0, 100)}'
|
||
|
||
# 创建 ArgumentParser 对象
|
||
parser = argparse.ArgumentParser(description='处理命令行参数')
|
||
|
||
# 添加参数 task_id,简称 t,类型为 int,默认值为 1
|
||
parser.add_argument('-t', '--task_id', type=str, default="+", help='任务ID')
|
||
|
||
# 添加参数 gate,简称 g,类型为 str,默认值为 "default_gate"
|
||
parser.add_argument('-g', '--gate', type=int, default=30, help='门限值')
|
||
|
||
# 添加参数 interval,简称 i,类型为 float,默认值为 1.0
|
||
parser.add_argument('-i', '--interval', type=float, default=1.0, help='时间间隔')
|
||
|
||
# 解析命令行参数
|
||
args = parser.parse_args()
|
||
|
||
# 实例化 DataFusion 类
|
||
fusion_instance = DataFusion(
|
||
|
||
gate=args.gate,
|
||
interval=args.interval,
|
||
|
||
|
||
|
||
)
|
||
|
||
global task_id
|
||
task_id = "10087"
|
||
|
||
# 从yaml的mqtt_topic中提取基础路径
|
||
base_path = topics_config['mqtt_topic'].split('/')[0] # 获取"bridge"
|
||
|
||
# 更新数据上报的主题格式
|
||
providerCode = "DP74b4ef9fb4aaf269"
|
||
fusionCode = "DPZYLY"
|
||
deviceType = "5ga"
|
||
fusionType = "fusion"
|
||
deviceId = "10580005"
|
||
fusionId = "554343465692430336"
|
||
sensor_id_list = ["80103"]
|
||
|
||
# 使用base_path构建topic
|
||
topic = f"{base_path}/{providerCode}/device_data/{deviceType}/{deviceId}"
|
||
|
||
# 从yaml的sensor_topic中提取基础路径
|
||
base_topic = topics_config['sensor_topic'].split('FU_PAM')[0] # 得到 "fromcheck/DP74b4ef9fb4aaf269/device_data/"
|
||
|
||
# 订阅主题 - 基于yaml格式构建
|
||
subscribe_topic = f"{base_topic}5ga/10000000000000" # 将FU_PAM替换为5ga,将+替换为具体ID
|
||
|
||
# 发布融合结果的主题
|
||
# fusionId的来源是下发任务时的ID
|
||
publish_topic = f"fromcheck/{fusionCode}/device_data/{fusionType}/{task_id}"
|
||
# 更新运行参数的主题
|
||
fusion_parameters_topic = topics_config['sensor_topic']
|
||
# 生成唯一的 client_id
|
||
|
||
# 数据池
|
||
data_pool = Queue()
|
||
run_parameter = None
|
||
interval = args.interval
|
||
|
||
# 定义参考点 PO(纬度, 经度)
|
||
global reference_point
|
||
reference_point = (104.08, 30.51) # 参考点的经纬度
|
||
# 数据池
|
||
data_pool = Queue()
|
||
run_parameter = None
|
||
# 初始化数据处理类
|
||
pipe = Pipeline(fusion_parameters_topic=topics_config['sensor_topic'], reference_point=reference_point)
|
||
|
||
fusion_code = "FU_PAM/"+args.task_id
|
||
|
||
# 设置日志记录
|
||
def setup_logging():
|
||
# 创建logs目录(如果不存在)
|
||
if not os.path.exists('logs'):
|
||
os.makedirs('logs')
|
||
|
||
# 设置日志文件名(包含日期)
|
||
current_time = datetime.datetime.now()
|
||
error_log_filename = f'logs/mqtt_connection_{current_time.strftime("%Y%m%d")}_error.log'
|
||
|
||
# 配置总的日志记录器
|
||
logging.basicConfig(
|
||
level=logging.INFO, # 记录所有信息
|
||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||
handlers=[
|
||
logging.StreamHandler() # 同时输出到控制台
|
||
]
|
||
)
|
||
|
||
# 配置错误日志记录器
|
||
error_logger = logging.getLogger('error_logger')
|
||
error_logger.setLevel(logging.ERROR)
|
||
|
||
# 创建文件处理器
|
||
error_handler = logging.FileHandler(error_log_filename)
|
||
error_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
|
||
|
||
# 添加处理器到错误日志记录器
|
||
error_logger.addHandler(error_handler)
|
||
|
||
def connect_mqtt() -> mqtt_client:
|
||
def on_connect(client, userdata, flags, rc):
|
||
if rc == 0:
|
||
logging.info("Successfully connected to MQTT Broker")
|
||
logging.info(f"Client ID: {client_id}")
|
||
logging.info(f"Broker: {mqtt_config['broker']}:{mqtt_config['port']}")
|
||
# 重新订阅主题
|
||
client.subscribe(fusion_parameters_topic)
|
||
logging.info(f"Subscribed to fusion parameters topic: {fusion_parameters_topic}")
|
||
if hasattr(pipe, 'topics'):
|
||
for topic in pipe.topics:
|
||
client.subscribe(topic)
|
||
logging.info(f"Subscribed to topic: {topic}")
|
||
else:
|
||
logging.error(f"Failed to connect, return code: {rc} ({DISCONNECT_REASONS.get(rc, '未知错误')})")
|
||
|
||
def on_disconnect(client, userdata, rc):
|
||
current_time = datetime.datetime.now()
|
||
reason = DISCONNECT_REASONS.get(rc, "未知错误")
|
||
|
||
logging.warning(f"Disconnected from MQTT Broker at {current_time.strftime('%Y-%m-%d %H:%M:%S')}")
|
||
logging.warning(f"Disconnect reason code: {rc} - {reason}")
|
||
|
||
if rc != 0:
|
||
logging.error("Unexpected disconnection. Attempting to reconnect...")
|
||
try:
|
||
client.reconnect()
|
||
logging.info("Reconnection successful")
|
||
except Exception as e:
|
||
current_time = datetime.datetime.now()
|
||
logging.error(f"Reconnection failed at {current_time.strftime('%Y-%m-%d %H:%M:%S')}: {str(e)}")
|
||
logging.error(f"Exception type: {type(e).__name__}")
|
||
logging.error(f"Stack trace:", exc_info=True)
|
||
|
||
client = mqtt_client.Client(client_id, clean_session=True)
|
||
client.username_pw_set(mqtt_config['username'], mqtt_config['password'])
|
||
|
||
# 设置保活时间和重试间隔
|
||
client.keepalive = 60 # 60秒的保活时间
|
||
client.socket_timeout = 30 # 30秒的socket超时
|
||
client.reconnect_delay_set(min_delay=1, max_delay=60) # 重连延迟在1-60秒之间
|
||
|
||
# 设置遗嘱消息(last will message)
|
||
will_topic = f"fromcheck/{fusionCode}/status/{task_id}"
|
||
will_payload = "offline"
|
||
client.will_set(will_topic, will_payload, qos=1, retain=True)
|
||
|
||
# 设置回调函数
|
||
client.on_connect = on_connect
|
||
client.on_disconnect = on_disconnect
|
||
|
||
try:
|
||
client.connect(mqtt_config['broker'], mqtt_config['port'])
|
||
except Exception as e:
|
||
logging.error(f"Initial connection failed: {str(e)}")
|
||
logging.error(f"Exception type: {type(e).__name__}")
|
||
logging.error(f"Stack trace:", exc_info=True)
|
||
time.sleep(5)
|
||
return connect_mqtt()
|
||
|
||
# 发送上线状态
|
||
client.publish(will_topic, "online", qos=1, retain=True)
|
||
|
||
return client
|
||
|
||
def subscribe(client: mqtt_client):
|
||
def on_message(client, userdata, msg):
|
||
try:
|
||
global run_parameter
|
||
global task_id
|
||
logging.info(f"Received message on topic: {msg.topic}")
|
||
logging.info(f"Message payload: {msg.payload.decode()}")
|
||
|
||
if "FU_PAM" in msg.topic:
|
||
if args.task_id == '+' or fusion_code in msg.topic:
|
||
new_run_parameter = msg.payload.decode()
|
||
if run_parameter != new_run_parameter:
|
||
logging.info(f"Run parameter updated from {run_parameter} to {new_run_parameter}")
|
||
run_parameter = new_run_parameter
|
||
new_topics = pipe.extract_parms(run_parameter)
|
||
logging.info(f"Extracted topics: {new_topics}")
|
||
client.subscribe(new_topics) # 重新更新订阅的数据
|
||
logging.info(f"Subscribed to new topics: {new_topics}")
|
||
logging.info('===========new run_parameter!===============')
|
||
current_time = datetime.datetime.now()
|
||
task_id = pipe.task_id
|
||
else:
|
||
data_pool.put((msg.topic, msg.payload))
|
||
except Exception as e:
|
||
logging.error(f"Error processing message: {str(e)}")
|
||
logging.error(f"Exception type: {type(e).__name__}")
|
||
logging.error(f"Stack trace:", exc_info=True)
|
||
|
||
subscribe_topics = [(subscribe_topic, 0), (fusion_parameters_topic, 0)] # 默认QoS为0
|
||
client.subscribe(subscribe_topics)
|
||
client.on_message = on_message
|
||
|
||
|
||
def publish(client, message):
|
||
global task_id
|
||
global fusionCode
|
||
max_retries = 3
|
||
retry_delay = 1 # 初始重试延迟(秒)
|
||
|
||
def do_publish():
|
||
publish_topic = f"bridge/{fusionCode}/device_data/fusion/{task_id}"
|
||
try:
|
||
result = client.publish(publish_topic, message)
|
||
status = result.rc
|
||
if status == 0:
|
||
current_time = datetime.datetime.now()
|
||
formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S')
|
||
with open('log.txt', 'a') as log_file:
|
||
log_file.write('=====================\n')
|
||
log_file.write(f"Send message to topic {publish_topic}\n")
|
||
log_file.write(f"time: {formatted_time}\n")
|
||
log_file.write(f"{message}\n")
|
||
return True
|
||
else:
|
||
logging.error(f"Failed to send message to topic {publish_topic}, status: {status}")
|
||
return False
|
||
except Exception as e:
|
||
logging.error(f"Error publishing message: {str(e)}")
|
||
logging.error(f"Exception type: {type(e).__name__}")
|
||
logging.error(f"Stack trace:", exc_info=True)
|
||
return False
|
||
|
||
# 实现重试逻辑
|
||
for attempt in range(max_retries):
|
||
if do_publish():
|
||
return
|
||
if attempt < max_retries - 1: # 如果不是最后一次尝试
|
||
retry_delay *= 2 # 指数退避
|
||
logging.warning(f"Retrying publish in {retry_delay} seconds...")
|
||
time.sleep(retry_delay)
|
||
|
||
logging.error(f"Failed to publish message after {max_retries} attempts")
|
||
|
||
|
||
def data_fusion(fusion_container):
|
||
global data_pool
|
||
data_list = []
|
||
# 从数据池中提取所有的数据
|
||
while not data_pool.empty():
|
||
data_now = data_pool.get()
|
||
processed_data = pipe.process_json_data(data_now[1])
|
||
|
||
# 筛选有意义的数据
|
||
if processed_data and processed_data.get("objects"): # 只记录有 objects 的数据
|
||
data_list.append(processed_data)
|
||
|
||
if data_list: # 只有当有数据时才写日志
|
||
current_time = datetime.datetime.now()
|
||
formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S')
|
||
with open('Data_log.txt', 'a') as log_file: # 以追加模式打开日志文件
|
||
log_file.write('=====================\n') # 写入分隔符
|
||
log_file.write(f"Get message \n")
|
||
log_file.write(f"time: {formatted_time}\n") # 写入分隔符
|
||
log_file.write(f"{data_list}\n") # 写入消息内容
|
||
|
||
sensor_data = pipe.data_encoder(data_list)
|
||
logging.info(sensor_data)
|
||
filtered_results = fusion_container.run(sensor_data)
|
||
processed_data = pipe.data_decoder(filtered_results)
|
||
processed_data = json.dumps(processed_data, indent=4)
|
||
return processed_data # 返回处理后的 JSON 字符串
|
||
|
||
|
||
def fusion_runner(client):
|
||
global run_parameter
|
||
pre_run_parameter = run_parameter
|
||
last_run_time = time.time()
|
||
last_health_check = time.time()
|
||
health_check_interval = 30 # 每30秒进行一次健康检查
|
||
fusion_container = DataFusion(args.gate, args.interval)
|
||
|
||
|
||
def check_connection():
|
||
if not client.is_connected():
|
||
logging.warning("MQTT client disconnected during fusion_runner")
|
||
try:
|
||
client.reconnect()
|
||
logging.info("Successfully reconnected in fusion_runner")
|
||
return True
|
||
except Exception as e:
|
||
logging.error(f"Reconnection failed in fusion_runner: {str(e)}")
|
||
logging.error(f"Exception type: {type(e).__name__}")
|
||
logging.error(f"Stack trace:", exc_info=True)
|
||
return False
|
||
return True
|
||
|
||
while True:
|
||
try:
|
||
current_time = time.time()
|
||
|
||
# 定期健康检查
|
||
if current_time - last_health_check >= health_check_interval:
|
||
if not check_connection():
|
||
time.sleep(5) # 如果连接失败,等待5秒后继续
|
||
continue
|
||
last_health_check = current_time
|
||
|
||
# 数据处理和发送
|
||
if current_time - last_run_time >= interval:
|
||
if not check_connection():
|
||
continue
|
||
|
||
last_run_time = current_time
|
||
|
||
if run_parameter != pre_run_parameter:
|
||
fusion_parms = pipe.extract_fusion_parms(run_parameter)
|
||
fusion_container.set_parameter(fusion_parms)
|
||
pre_run_parameter= run_parameter
|
||
|
||
|
||
processed_data = data_fusion(fusion_container)
|
||
if processed_data:
|
||
publish(client, processed_data)
|
||
|
||
except Exception as e:
|
||
logging.error(f"Error in fusion_runner: {str(e)}")
|
||
logging.error(f"Exception type: {type(e).__name__}")
|
||
logging.error(f"Stack trace:", exc_info=True)
|
||
time.sleep(1)
|
||
|
||
def run():
|
||
# 初始化日志系统
|
||
setup_logging()
|
||
logging.error("Starting MQTT client application")
|
||
|
||
while True: # 添加外层循环来处理完全断开的情况
|
||
try:
|
||
client = connect_mqtt()
|
||
subscribe(client)
|
||
|
||
logging.info("Starting fusion_runner thread")
|
||
fusion_runner_thread = threading.Thread(target=fusion_runner, args=(client,), daemon=True)
|
||
fusion_runner_thread.start()
|
||
|
||
logging.info("Starting MQTT loop")
|
||
client.loop_forever()
|
||
except Exception as e:
|
||
logging.critical(f"Critical error in main loop: {str(e)}")
|
||
logging.critical(f"Exception type: {type(e).__name__}")
|
||
logging.critical(f"Stack trace:", exc_info=True)
|
||
logging.info("Restarting in 5 seconds...")
|
||
time.sleep(5)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
run()
|