Files
2025-03-27 16:09:20 +08:00

146 lines
4.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import time
import random
from math import radians, degrees, sin, cos
from paho.mqtt import client as mqtt_client
import datetime
import numpy as np
from math import atan2, sqrt
# 坐标转换函数
def convert_to_cartesian(lat, lon, reference_point):
"""将经纬度转换为基于参考点的直角坐标,考虑地球椭球模型"""
# 地球椭球参数WGS84
a = 6378137.0 # 长半轴,单位:米
f = 1 / 298.257223563 # 扁率
e2 = 2 * f - f ** 2 # 第一偏心率平方
# 提取参考点坐标
ref_lat, ref_lon = reference_point
# 转换成弧度
lat_rad = radians(lat)
lon_rad = radians(lon)
ref_lat_rad = radians(ref_lat)
ref_lon_rad = radians(ref_lon)
# 计算曲率半径
N = a / sqrt(1 - e2 * sin(ref_lat_rad) ** 2) # 参考点处的卯酉圈曲率半径
# 计算基于参考点的平面直角坐标
delta_lon = lon_rad - ref_lon_rad
X = (N + 0) * cos(ref_lat_rad) * delta_lon
Y = (a * (1 - e2)) / (1 - e2 * sin(ref_lat_rad) ** 2) * (lat_rad - ref_lat_rad)
return X, Y
# 模拟数据生成函数
def generate_simulated_data(reference_point, radius_km, angle):
"""生成模拟数据,符合 Pipeline 处理需求"""
R = 6371000 # 地球半径(米)
# 将半径转换为弧度
radius = radius_km / R
# 计算参考点经纬度
lat0, lon0 = reference_point
# 计算新的点的经度和纬度
new_lat = lat0 + degrees(radius * cos(radians(angle)))
new_lon = lon0 + degrees(radius * sin(radians(angle)) / cos(radians(lat0)))
# 生成模拟 JSON 数据
mock_data = {
"deviceId": "80103",
"deviceType": 10,
"objects": [
{
"altitude": 150.0, # 模拟高度
"extension": {
"traceId": "00000000000001876",
"channel": "5756500000",
"objectType": 30,
"uavId": "UAS123456", # 新增字段,与 Pipeline 对应
"uavModel": "DJI Mini 3 Pro", # 模拟 UAV 型号
"deviceId": "80103" # 来源设备 ID
},
"height": 120.0, # 高度
"latitude": new_lat,
"longitude": new_lon,
"X": 0.0, # 预留字段,供转换函数填充
"Y": 0.0, # 预留字段,供转换函数填充
"speed": 15.0, # 模拟速度
"objectId": "AX0009", # 模拟目标 ID
"time": int(time.time() * 1000), # 当前时间戳(毫秒)
"source": [["sensor1", "UAS123456"]] # 模拟来源
}
],
"providerCode": "ZYLYTEST",
"ptTime": int(time.time() * 1000) # 当前时间戳(毫秒)
}
# 转换坐标
for obj in mock_data["objects"]:
lat, lon = obj["latitude"], obj["longitude"]
obj["X"], obj["Y"] = convert_to_cartesian(lat, lon, reference_point)
return json.dumps(mock_data, indent=4)
# MQTT 推送代码
broker = '192.168.36.234'
port = 37826
providerCode = "DP74b4ef9fb4aaf269"
deviceType = "5ga"
deviceId = "10580015"
topic = f"bridge/{providerCode}/device_data/{deviceType}/{deviceId}"
client_id = f'python-mqtt-{random.randint(0, 1000)}'
username = "cmlc"
password = "odD8#Ve7.B"
reference_point = (31.880000, 117.240000) # 经度和纬度
radius = 1500 # 半径,单位:米
def connect_mqtt():
"""连接 MQTT Broker"""
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print(f"Failed to connect, return code {rc}")
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.username_pw_set(username, password)
client.connect(broker, port)
return client
def publish(client):
"""推送生成的模拟数据"""
msg_count = 0
angle = 0
while True:
time.sleep(1)
msg = generate_simulated_data(reference_point, radius, angle)
result = client.publish(topic, msg)
status = result.rc
if status == 0:
print(f"Send `{msg_count}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
angle += 1
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
if __name__ == '__main__':
run()