424 lines
18 KiB
Python
424 lines
18 KiB
Python
import datetime
|
||
from transformation import *
|
||
import json
|
||
import numpy as np
|
||
|
||
|
||
class Pipeline:
|
||
def __init__(self, fusion_parameters_topic,reference_point):
|
||
self.fusion_parameters_topic = fusion_parameters_topic
|
||
self.task_id = '554343465692430336'
|
||
self.reference_point = reference_point
|
||
# self.deviceId = deviceId
|
||
self.sensor_id_list = ["10000000000"]
|
||
self.fusionCode = 'DPZYLY'
|
||
self.publish_topic = f"bridge/{self.fusionCode}/device_data/fusion/{self.task_id}"
|
||
self.priority_dict = {"10000000000":1}
|
||
self.uavInfo_bucket = dict()
|
||
self.target_bowl = dict()
|
||
self.device_info_dict = dict()
|
||
self.device_type_mapping = {
|
||
"5ga": 0,
|
||
"radar": 1,
|
||
"spec": 2,
|
||
"oe": 3,
|
||
"cm": 4,
|
||
"dec": 5,
|
||
"ifr": 6,
|
||
"cv": 7,
|
||
"isrs": 8,
|
||
"aoa": 9,
|
||
"tdoa": 10,
|
||
"dcd": 11,
|
||
"direct": 100,
|
||
"rtk": 101,
|
||
"rid": 102,
|
||
"fusion": 1000,
|
||
"other": 999 # 假设 'other' 对应于未知设备类型
|
||
}
|
||
self.device_type_speedrank = {
|
||
"radar": 1,
|
||
"spec": 2,
|
||
"oe": 3,
|
||
"cm": 4,
|
||
"dec": 5,
|
||
"ifr": 6,
|
||
"cv": 7,
|
||
"isrs": 8,
|
||
"aoa": 9,
|
||
"tdoa": 10,
|
||
"dcd": 13,
|
||
"direct": 12,
|
||
"5ga": 11,
|
||
"rid": 14,
|
||
"rtk": 15,
|
||
"other": 0 # 假设 'other' 对应于未知设备类型
|
||
}
|
||
|
||
import json
|
||
|
||
def process_json_data(self, json_data):
|
||
"""
|
||
将 JSON 数据转换为字典,并添加 X 和 Y 属性。
|
||
"""
|
||
data_dict = json.loads(json_data)
|
||
|
||
# 安全访问 'ptTime' 键
|
||
pt_time = data_dict.get('ptTime')
|
||
if pt_time is not None:
|
||
print(pt_time)
|
||
else:
|
||
print("Key 'ptTime' not found in data_dict.")
|
||
|
||
# 安全访问 'objects' 键
|
||
objects = data_dict.get('objects')
|
||
if objects is None:
|
||
print("Key 'objects' not found in data_dict.")
|
||
return data_dict # 如果 'objects' 键不存在,直接返回原始字典或根据需要进行其他处理
|
||
|
||
# 如果 'objects' 键存在,继续处理
|
||
for record in objects:
|
||
# 检查 'latitude' 和 'longitude' 键是否存在于 record 中
|
||
if 'latitude' in record and 'longitude' in record:
|
||
lat = record['latitude']
|
||
lon = record['longitude']
|
||
X, Y = convert_to_cartesian(lat, lon, self.reference_point)
|
||
record['X'] = X
|
||
record['Y'] = Y
|
||
else:
|
||
print("Record is missing 'latitude' or 'longitude' keys.")
|
||
|
||
return data_dict
|
||
|
||
def data_encoder(self, data_list):
|
||
"""
|
||
生成数据矩阵和 ID 列表。
|
||
"""
|
||
sensor_data = []
|
||
for sensor_id in self.sensor_id_list:
|
||
temp = {'data_matrix': [],
|
||
'id_list': [],
|
||
'deviceId': sensor_id,
|
||
'latest_time': [],
|
||
'priority':1}
|
||
for record in data_list:
|
||
if record.get('noteData'):
|
||
obj = record['noteData']
|
||
obj['objectId'] = obj['uasId']
|
||
obj['deviceId'] = obj["extension"]['deviceId']
|
||
record['objects'] = [obj]
|
||
|
||
if record['deviceId'] == sensor_id:
|
||
temp['priority'] = self.priority_dict[sensor_id]
|
||
if record.get('objects'):
|
||
for obj in record['objects']:
|
||
if obj['objectId'] in temp['id_list']:
|
||
position = temp['id_list'].index(obj['objectId'])
|
||
if int(record['ptTime']) > int(temp['latest_time'][position]):
|
||
temp['data_matrix'][position] = [obj['X'], obj['Y'], obj['altitude']]
|
||
else:
|
||
temp['data_matrix'].append([obj['X'], obj['Y'], obj['altitude']])
|
||
temp['id_list'].append(obj['objectId'])
|
||
temp['latest_time'].append(record['ptTime'])
|
||
# 把扩展地段写入
|
||
if obj.get('extension'):
|
||
B_id = str(record['deviceId'])+str(obj['objectId'])
|
||
self.uavInfo_bucket[B_id] = obj['extension']
|
||
# 如果对象有speed字段,将其添加到extension中
|
||
if obj.get('speed'):
|
||
self.uavInfo_bucket[B_id]['speed'] = obj['speed']
|
||
# 如果对象有height字段,也存储它
|
||
if obj.get('height'):
|
||
self.uavInfo_bucket[B_id]['height'] = obj['height']
|
||
|
||
# 写入到数据字典中
|
||
temp['data_matrix'] = np.array(temp['data_matrix'])
|
||
sensor_data.append(temp)
|
||
return sensor_data
|
||
|
||
def process_extension(self, target):
|
||
# 定义一个字典,包含给定的键值对
|
||
extension = {
|
||
"objectType": 30,
|
||
"uavSN": "Un-known",
|
||
"uavModel": "Un-known",
|
||
"pilotLat": 0.0,
|
||
"pilotLon": 0.0,
|
||
"speedX": 0.0,
|
||
"speedY": 0.0,
|
||
"speedZ": 0.0,
|
||
"time": 0.0,
|
||
"born_time": 0.0
|
||
}
|
||
|
||
# 从target_bowl获取历史值
|
||
if target['objectId'] in self.target_bowl.keys():
|
||
extension = self.target_bowl[target['objectId']]
|
||
|
||
result_source = target['source']
|
||
# 对数据进行更新
|
||
for source in result_source:
|
||
id = str(source[0]) + str(source[1])
|
||
if self.uavInfo_bucket.get(id):
|
||
for key, value in self.uavInfo_bucket[id].items():
|
||
# 只有当新值是有效值时才更新
|
||
if value not in ["Un-known", 0.0, None, "Unknown", "DJI Mavic"]:
|
||
extension[key] = value
|
||
|
||
extension['born_time'] = int(target['born_time'])
|
||
|
||
# 更新target_bowl以保持状态
|
||
self.target_bowl[target['objectId']] = extension
|
||
|
||
return extension
|
||
|
||
def data_decoder(self, filtered_results):
|
||
"""
|
||
解码过滤后的结果。
|
||
"""
|
||
current_time = datetime.datetime.now()
|
||
timestamp = int(current_time.timestamp() * 1000)
|
||
combined_objects = []
|
||
for target in filtered_results:
|
||
X = target['X']
|
||
Y = target['Y']
|
||
Z = target['Z'] # 这里的Z实际上是altitude
|
||
lat, lon = convert_to_geodetic(X, Y, self.reference_point)
|
||
extension = self.process_extension(target)
|
||
extension['time'] = int(timestamp)
|
||
extension['born_time'] = int(int(target['born_time']) / 1000) # 毫秒单位数据
|
||
|
||
new_origin_source = []
|
||
for source in target['source']:
|
||
device_id, object_id = source
|
||
# 从 device_info_dict 获取设备缩写
|
||
device_abbreviation = self.device_info_dict.get(device_id, {}).get('device_type', 'other')
|
||
# 使用映射字典获取设备类型
|
||
device_type = self.device_type_mapping.get(device_abbreviation, 999)
|
||
new_origin_source.append(f"{device_type}_{device_id}_{object_id}")
|
||
|
||
# 根据优先级顺序选择速度
|
||
highest_priority_speed = None
|
||
highest_priority = float('inf')
|
||
|
||
for source in target['source']:
|
||
device_id, object_id = source
|
||
B_id = str(device_id) + str(object_id)
|
||
if self.uavInfo_bucket.get(B_id):
|
||
device_type = self.device_info_dict.get(device_id, {}).get('device_type', 'other')
|
||
priority = self.device_type_speedrank.get(device_type, float('inf'))
|
||
|
||
if priority < highest_priority:
|
||
highest_priority = priority
|
||
# 获取速度并进行单位转换
|
||
speed = self.uavInfo_bucket[B_id].get('speed', target['speed'])
|
||
if device_type == "5ga": # 如果设备类型是5ga,进行转换
|
||
speed = speed / 3.6 # 从 km/h 转换为 m/s
|
||
highest_priority_speed = speed
|
||
|
||
# 确保 highest_priority_speed 是从设备获取的速度
|
||
if highest_priority_speed is None:
|
||
# 如果没有找到当前速度,查找历史记录中的速度
|
||
for obj in reversed(combined_objects):
|
||
if obj["objectId"] == target['objectId']:
|
||
highest_priority_speed = obj.get("speed")
|
||
break
|
||
|
||
if highest_priority_speed is None:
|
||
print(f"Warning: No speed found for target {target['objectId']}, using default target speed.")
|
||
new_speed = target['speed']
|
||
else:
|
||
new_speed = highest_priority_speed
|
||
else:
|
||
new_speed = highest_priority_speed
|
||
|
||
# Debug 输出,检查速度来源
|
||
print(f"Selected speed for target {target['objectId']}: {new_speed} from device with priority {highest_priority}")
|
||
|
||
# 获取height字段
|
||
height = None
|
||
for source in target['source']:
|
||
device_id, object_id = source
|
||
B_id = str(device_id) + str(object_id)
|
||
if self.uavInfo_bucket.get(B_id):
|
||
if self.uavInfo_bucket[B_id].get('height'):
|
||
height = self.uavInfo_bucket[B_id]['height']
|
||
break
|
||
|
||
# 如果当前没有获取到height,查找历史记录中的height
|
||
if height is None:
|
||
for obj in reversed(combined_objects):
|
||
if obj["objectId"] == target['objectId']:
|
||
prev_height = obj.get("height")
|
||
if prev_height is not None: # 如果找到有效的历史height
|
||
height = prev_height
|
||
break
|
||
|
||
# 如果仍然没有找到height,保持上一次的最新历史height
|
||
if height is None and combined_objects:
|
||
for obj in reversed(combined_objects):
|
||
if obj["objectId"] == target['objectId']:
|
||
height = obj.get("height")
|
||
break
|
||
|
||
temp = {
|
||
# "msg_cnt":result['msg_cnt'],#增加msg_cnt用于检测有无丢包
|
||
"objectId": target['objectId'],
|
||
"X": X,
|
||
"Y": Y,
|
||
"height": height, # 使用当前height或历史height
|
||
"altitude": Z,
|
||
"speed": new_speed, # 使用优先级最高的速度
|
||
'latitude': lat,
|
||
'longitude': lon,
|
||
'sigma': target['sigma'],
|
||
"extension": {
|
||
"origin_source": new_origin_source, # 更新后的 origin_source
|
||
# 其他extension字段...
|
||
"objectType": extension.get('objectType', 0),
|
||
"uavSN": extension.get("uavSN", "Un-known"),
|
||
"uavModel": extension.get("uavModel", "Un-known"),
|
||
"pilotLat": extension.get("pilotLat", 0.0),
|
||
"pilotLon": extension.get("pilotLon", 0.0),
|
||
"speedX": 0.0, # 不再使用速度分量
|
||
"speedY": 0.0,
|
||
"speedZ": 0.0,
|
||
"time": int(timestamp),
|
||
"born_time": int(int(target['born_time']) / 1000),
|
||
},
|
||
"time": int(timestamp),
|
||
}
|
||
|
||
# 检查extension中的objectType是否已经被设置为非0值,如果是,则不再覆盖.
|
||
if extension.get('objectType', 0) != 0 or target['objectId'] not in [obj['objectId'] for obj in
|
||
combined_objects]:
|
||
temp["extension"]["objectType"] = extension.get('objectType', 0)
|
||
else:
|
||
# 查找combined_objects中相同objectId的objectType,如果不存在则使用0
|
||
existing_object_types = [obj["extension"].get('objectType', 0) for obj in combined_objects if
|
||
obj["objectId"] == target['objectId']]
|
||
if existing_object_types and existing_object_types[0] != 0:
|
||
temp["extension"]["objectType"] = existing_object_types[0]
|
||
else:
|
||
temp["extension"]["objectType"] = 0
|
||
|
||
# 检查并更新uavSN和uavModel
|
||
invalid_values = ["Un-known", 0.0, None, "Unknown", "DJI Mavic"]
|
||
|
||
# 检查uavSN是否为字母数字组合防止其他部分引入奇怪的值
|
||
current_sn = extension.get('uavSN', "Un-known")
|
||
if isinstance(current_sn, str):
|
||
has_letter = any(c.isalpha() for c in current_sn)
|
||
has_digit = any(c.isdigit() for c in current_sn)
|
||
if not (has_letter and has_digit):
|
||
# 先查找相同objectId的历史有效SN
|
||
for obj in reversed(combined_objects):
|
||
if obj["objectId"] == target['objectId']:
|
||
prev_sn = obj["extension"].get("uavSN", "Un-known")
|
||
if isinstance(prev_sn, str):
|
||
has_letter = any(c.isalpha() for c in prev_sn)
|
||
has_digit = any(c.isdigit() for c in prev_sn)
|
||
if has_letter and has_digit:
|
||
current_sn = prev_sn
|
||
break
|
||
temp["extension"]["uavSN"] = current_sn
|
||
temp["extension"]["uavModel"] = extension.get('uavModel', "Un-known")
|
||
|
||
combined_objects.append(temp)
|
||
|
||
data_processed = {
|
||
"deviceType": 1000,
|
||
"providerCode": "DPZYLY",
|
||
"deviceId": self.task_id,
|
||
"objects": combined_objects,
|
||
"ptTime": int(timestamp)
|
||
}
|
||
# 筛选有意义的数据
|
||
if data_processed and data_processed.get("objects") and len(data_processed["objects"]) > 0:
|
||
formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S')
|
||
with open('PB_log.txt', 'a') as log_file: # 以追加模式打开日志文件
|
||
log_file.write('=====================\n') # 写入分隔符
|
||
log_file.write(f"time: {formatted_time}\n") # 写入时间戳
|
||
log_file.write(f"data: {data_processed}\n")
|
||
return data_processed
|
||
|
||
def extract_parms(self, parm_data):
|
||
"""
|
||
提取参数。
|
||
"""
|
||
id_list = [] # 存储设备ID
|
||
priority_dict = {} # 存储设备优先级
|
||
device_info_dict = {} # 新增:存储设备详细信息的字典,用于后续拿到与
|
||
|
||
data_dict = json.loads(parm_data)
|
||
print(data_dict)
|
||
self.task_id = data_dict['task_id']
|
||
new_topics = [("fromcheck/DPZYLY/fly_data/rtk/#", 0)]
|
||
|
||
devices = data_dict['devices']
|
||
for device in devices:
|
||
device_id = device['device_id']
|
||
if device_id:
|
||
id_list.append(device_id)
|
||
new_topics.append((device["device_topic"], 0))
|
||
|
||
# 存储设备优先级,默认优先级为1
|
||
if device.get('priority'):
|
||
priority_dict[device_id] = device['priority']
|
||
else:
|
||
priority_dict[device_id] = 1
|
||
|
||
# 使用列表存储设备的详细信息(topic、type、sampling_rate),完成一对多
|
||
device_info_dict[device_id] = {
|
||
'device_topic': device['device_topic'],
|
||
'device_type': device['device_type'],
|
||
'sampling_rate': device['properties'].get('sampling_rate', 1) # 默认为None,如果没有提供
|
||
}
|
||
|
||
self.priority_dict = priority_dict
|
||
self.device_info_dict = device_info_dict # 将设备信息字典存储到实例变量中
|
||
self.sensor_id_list = id_list
|
||
|
||
# 处理参考点
|
||
if data_dict.get('reference_point'):
|
||
try:
|
||
original_reference_point = data_dict['reference_point']
|
||
if len(original_reference_point) == 2: # 确保是包含两个元素的元组或列表
|
||
self.reference_point = (
|
||
float(original_reference_point[0]) + 0,
|
||
float(original_reference_point[1]) + 0
|
||
)
|
||
else:
|
||
raise ValueError("Invalid reference_point structure. Must be a tuple or list with two elements.")
|
||
except Exception as e:
|
||
print(f"Error processing reference_point: {e}")
|
||
self.reference_point = None # 或者设置为某个默认值
|
||
|
||
return new_topics
|
||
|
||
def extract_fusion_parms(self,parm_data):
|
||
data_dict = json.loads(parm_data)
|
||
# 定义 fusion_dict 字典,包含需要从 data_dict 中提取的键
|
||
fusion_dict = {
|
||
"fusion_type": 1,
|
||
"gate": 1,
|
||
"interval": 1,
|
||
"show_thres": 0.4
|
||
}
|
||
|
||
# 检查 data_dict 中是否存在对应的键,并更新 fusion_dict 中的值
|
||
if "fusion_type" in data_dict:
|
||
fusion_dict["fusion_type"] = data_dict["fusion_type"]
|
||
|
||
if "gate" in data_dict:
|
||
fusion_dict["gate"] = data_dict["gate"]
|
||
|
||
if "interval" in data_dict:
|
||
fusion_dict["interval"] = data_dict["interval"]
|
||
|
||
if "show_thres" in data_dict:
|
||
fusion_dict["show_thres"] = data_dict["show_thres"]
|
||
|
||
# 返回更新后的 fusion_dict
|
||
return fusion_dict
|