import datetime from transformation import * import json import numpy as np class Pipeline: def __init__(self, fusion_parameters_topic,reference_point): self.fusion_parameters_topic = fusion_parameters_topic self.task_id = '554343465692430336' self.reference_point = reference_point # self.deviceId = deviceId self.sensor_id_list = ["10000000000"] self.fusionCode = 'DPZYLY' self.publish_topic = f"bridge/{self.fusionCode}/device_data/fusion/{self.task_id}" self.priority_dict = {"10000000000":1} self.uavInfo_bucket = dict() self.target_bowl = dict() self.device_info_dict = dict() self.device_type_mapping = { "5ga": 0, "radar": 1, "spec": 2, "oe": 3, "cm": 4, "dec": 5, "ifr": 6, "cv": 7, "isrs": 8, "aoa": 9, "tdoa": 10, "dcd": 11, "direct": 100, "rtk": 101, "rid": 102, "fusion": 1000, "other": 999 # 假设 'other' 对应于未知设备类型 } self.device_type_speedrank = { "radar": 1, "spec": 2, "oe": 3, "cm": 4, "dec": 5, "ifr": 6, "cv": 7, "isrs": 8, "aoa": 9, "tdoa": 10, "dcd": 13, "direct": 12, "5ga": 11, "rid": 14, "rtk": 15, "other": 0 # 假设 'other' 对应于未知设备类型 } import json def process_json_data(self, json_data): """ 将 JSON 数据转换为字典,并添加 X 和 Y 属性。 """ data_dict = json.loads(json_data) # 安全访问 'ptTime' 键 pt_time = data_dict.get('ptTime') if pt_time is not None: print(pt_time) else: print("Key 'ptTime' not found in data_dict.") # 安全访问 'objects' 键 objects = data_dict.get('objects') if objects is None: print("Key 'objects' not found in data_dict.") return data_dict # 如果 'objects' 键不存在,直接返回原始字典或根据需要进行其他处理 # 如果 'objects' 键存在,继续处理 for record in objects: # 检查 'latitude' 和 'longitude' 键是否存在于 record 中 if 'latitude' in record and 'longitude' in record: lat = record['latitude'] lon = record['longitude'] X, Y = convert_to_cartesian(lat, lon, self.reference_point) record['X'] = X record['Y'] = Y else: print("Record is missing 'latitude' or 'longitude' keys.") return data_dict def data_encoder(self, data_list): """ 生成数据矩阵和 ID 列表。 """ sensor_data = [] for sensor_id in self.sensor_id_list: temp = {'data_matrix': [], 'id_list': [], 'deviceId': sensor_id, 'latest_time': [], 'priority':1} for record in data_list: if record.get('noteData'): obj = record['noteData'] obj['objectId'] = obj['uasId'] obj['deviceId'] = obj["extension"]['deviceId'] record['objects'] = [obj] if record['deviceId'] == sensor_id: temp['priority'] = self.priority_dict[sensor_id] if record.get('objects'): for obj in record['objects']: if obj['objectId'] in temp['id_list']: position = temp['id_list'].index(obj['objectId']) if int(record['ptTime']) > int(temp['latest_time'][position]): temp['data_matrix'][position] = [obj['X'], obj['Y'], obj['altitude']] else: temp['data_matrix'].append([obj['X'], obj['Y'], obj['altitude']]) temp['id_list'].append(obj['objectId']) temp['latest_time'].append(record['ptTime']) # 把扩展地段写入 if obj.get('extension'): B_id = str(record['deviceId'])+str(obj['objectId']) self.uavInfo_bucket[B_id] = obj['extension'] # 如果对象有speed字段,将其添加到extension中 if obj.get('speed'): self.uavInfo_bucket[B_id]['speed'] = obj['speed'] # 如果对象有height字段,也存储它 if obj.get('height'): self.uavInfo_bucket[B_id]['height'] = obj['height'] # 写入到数据字典中 temp['data_matrix'] = np.array(temp['data_matrix']) sensor_data.append(temp) return sensor_data def process_extension(self, target): # 定义一个字典,包含给定的键值对 extension = { "objectType": 30, "uavSN": "Un-known", "uavModel": "Un-known", "pilotLat": 0.0, "pilotLon": 0.0, "speedX": 0.0, "speedY": 0.0, "speedZ": 0.0, "time": 0.0, "born_time": 0.0 } # 从target_bowl获取历史值 if target['objectId'] in self.target_bowl.keys(): extension = self.target_bowl[target['objectId']] result_source = target['source'] # 对数据进行更新 for source in result_source: id = str(source[0]) + str(source[1]) if self.uavInfo_bucket.get(id): for key, value in self.uavInfo_bucket[id].items(): # 只有当新值是有效值时才更新 if value not in ["Un-known", 0.0, None, "Unknown", "DJI Mavic"]: extension[key] = value extension['born_time'] = int(target['born_time']) # 更新target_bowl以保持状态 self.target_bowl[target['objectId']] = extension return extension def data_decoder(self, filtered_results): """ 解码过滤后的结果。 """ current_time = datetime.datetime.now() timestamp = int(current_time.timestamp() * 1000) combined_objects = [] for target in filtered_results: X = target['X'] Y = target['Y'] Z = target['Z'] # 这里的Z实际上是altitude lat, lon = convert_to_geodetic(X, Y, self.reference_point) extension = self.process_extension(target) extension['time'] = int(timestamp) extension['born_time'] = int(int(target['born_time']) / 1000) # 毫秒单位数据 new_origin_source = [] for source in target['source']: device_id, object_id = source # 从 device_info_dict 获取设备缩写 device_abbreviation = self.device_info_dict.get(device_id, {}).get('device_type', 'other') # 使用映射字典获取设备类型 device_type = self.device_type_mapping.get(device_abbreviation, 999) new_origin_source.append(f"{device_type}_{device_id}_{object_id}") # 根据优先级顺序选择速度 highest_priority_speed = None highest_priority = float('inf') for source in target['source']: device_id, object_id = source B_id = str(device_id) + str(object_id) if self.uavInfo_bucket.get(B_id): device_type = self.device_info_dict.get(device_id, {}).get('device_type', 'other') priority = self.device_type_speedrank.get(device_type, float('inf')) if priority < highest_priority: highest_priority = priority # 获取速度并进行单位转换 speed = self.uavInfo_bucket[B_id].get('speed', target['speed']) if device_type == "5ga": # 如果设备类型是5ga,进行转换 speed = speed / 3.6 # 从 km/h 转换为 m/s highest_priority_speed = speed # 确保 highest_priority_speed 是从设备获取的速度 if highest_priority_speed is None: # 如果没有找到当前速度,查找历史记录中的速度 for obj in reversed(combined_objects): if obj["objectId"] == target['objectId']: highest_priority_speed = obj.get("speed") break if highest_priority_speed is None: print(f"Warning: No speed found for target {target['objectId']}, using default target speed.") new_speed = target['speed'] else: new_speed = highest_priority_speed else: new_speed = highest_priority_speed # Debug 输出,检查速度来源 print(f"Selected speed for target {target['objectId']}: {new_speed} from device with priority {highest_priority}") # 获取height字段 height = None for source in target['source']: device_id, object_id = source B_id = str(device_id) + str(object_id) if self.uavInfo_bucket.get(B_id): if self.uavInfo_bucket[B_id].get('height'): height = self.uavInfo_bucket[B_id]['height'] break # 如果当前没有获取到height,查找历史记录中的height if height is None: for obj in reversed(combined_objects): if obj["objectId"] == target['objectId']: prev_height = obj.get("height") if prev_height is not None: # 如果找到有效的历史height height = prev_height break # 如果仍然没有找到height,保持上一次的最新历史height if height is None and combined_objects: for obj in reversed(combined_objects): if obj["objectId"] == target['objectId']: height = obj.get("height") break temp = { # "msg_cnt":result['msg_cnt'],#增加msg_cnt用于检测有无丢包 "objectId": target['objectId'], "X": X, "Y": Y, "height": height, # 使用当前height或历史height "altitude": Z, "speed": new_speed, # 使用优先级最高的速度 'latitude': lat, 'longitude': lon, 'sigma': target['sigma'], "extension": { "origin_source": new_origin_source, # 更新后的 origin_source # 其他extension字段... "objectType": extension.get('objectType', 0), "uavSN": extension.get("uavSN", "Un-known"), "uavModel": extension.get("uavModel", "Un-known"), "pilotLat": extension.get("pilotLat", 0.0), "pilotLon": extension.get("pilotLon", 0.0), "speedX": 0.0, # 不再使用速度分量 "speedY": 0.0, "speedZ": 0.0, "time": int(timestamp), "born_time": int(int(target['born_time']) / 1000), }, "time": int(timestamp), } # 检查extension中的objectType是否已经被设置为非0值,如果是,则不再覆盖. if extension.get('objectType', 0) != 0 or target['objectId'] not in [obj['objectId'] for obj in combined_objects]: temp["extension"]["objectType"] = extension.get('objectType', 0) else: # 查找combined_objects中相同objectId的objectType,如果不存在则使用0 existing_object_types = [obj["extension"].get('objectType', 0) for obj in combined_objects if obj["objectId"] == target['objectId']] if existing_object_types and existing_object_types[0] != 0: temp["extension"]["objectType"] = existing_object_types[0] else: temp["extension"]["objectType"] = 0 # 检查并更新uavSN和uavModel invalid_values = ["Un-known", 0.0, None, "Unknown", "DJI Mavic"] # 检查uavSN是否为字母数字组合防止其他部分引入奇怪的值 current_sn = extension.get('uavSN', "Un-known") if isinstance(current_sn, str): has_letter = any(c.isalpha() for c in current_sn) has_digit = any(c.isdigit() for c in current_sn) if not (has_letter and has_digit): # 先查找相同objectId的历史有效SN for obj in reversed(combined_objects): if obj["objectId"] == target['objectId']: prev_sn = obj["extension"].get("uavSN", "Un-known") if isinstance(prev_sn, str): has_letter = any(c.isalpha() for c in prev_sn) has_digit = any(c.isdigit() for c in prev_sn) if has_letter and has_digit: current_sn = prev_sn break temp["extension"]["uavSN"] = current_sn temp["extension"]["uavModel"] = extension.get('uavModel', "Un-known") combined_objects.append(temp) data_processed = { "deviceType": 1000, "providerCode": "DPZYLY", "deviceId": self.task_id, "objects": combined_objects, "ptTime": int(timestamp) } # 筛选有意义的数据 if data_processed and data_processed.get("objects") and len(data_processed["objects"]) > 0: formatted_time = current_time.strftime('%Y-%m-%d %H:%M:%S') with open('PB_log.txt', 'a') as log_file: # 以追加模式打开日志文件 log_file.write('=====================\n') # 写入分隔符 log_file.write(f"time: {formatted_time}\n") # 写入时间戳 log_file.write(f"data: {data_processed}\n") return data_processed def extract_parms(self, parm_data): """ 提取参数。 """ id_list = [] # 存储设备ID priority_dict = {} # 存储设备优先级 device_info_dict = {} # 新增:存储设备详细信息的字典,用于后续拿到与 data_dict = json.loads(parm_data) print(data_dict) self.task_id = data_dict['task_id'] new_topics = [("fromcheck/DPZYLY/fly_data/rtk/#", 0)] devices = data_dict['devices'] for device in devices: device_id = device['device_id'] if device_id: id_list.append(device_id) new_topics.append((device["device_topic"], 0)) # 存储设备优先级,默认优先级为1 if device.get('priority'): priority_dict[device_id] = device['priority'] else: priority_dict[device_id] = 1 # 使用列表存储设备的详细信息(topic、type、sampling_rate),完成一对多 device_info_dict[device_id] = { 'device_topic': device['device_topic'], 'device_type': device['device_type'], 'sampling_rate': device['properties'].get('sampling_rate', 1) # 默认为None,如果没有提供 } self.priority_dict = priority_dict self.device_info_dict = device_info_dict # 将设备信息字典存储到实例变量中 self.sensor_id_list = id_list # 处理参考点 if data_dict.get('reference_point'): try: original_reference_point = data_dict['reference_point'] if len(original_reference_point) == 2: # 确保是包含两个元素的元组或列表 self.reference_point = ( float(original_reference_point[0]) + 0, float(original_reference_point[1]) + 0 ) else: raise ValueError("Invalid reference_point structure. Must be a tuple or list with two elements.") except Exception as e: print(f"Error processing reference_point: {e}") self.reference_point = None # 或者设置为某个默认值 return new_topics def extract_fusion_parms(self,parm_data): data_dict = json.loads(parm_data) # 定义 fusion_dict 字典,包含需要从 data_dict 中提取的键 fusion_dict = { "fusion_type": 1, "gate": 1, "interval": 1, "show_thres": 0.4 } # 检查 data_dict 中是否存在对应的键,并更新 fusion_dict 中的值 if "fusion_type" in data_dict: fusion_dict["fusion_type"] = data_dict["fusion_type"] if "gate" in data_dict: fusion_dict["gate"] = data_dict["gate"] if "interval" in data_dict: fusion_dict["interval"] = data_dict["interval"] if "show_thres" in data_dict: fusion_dict["show_thres"] = data_dict["show_thres"] # 返回更新后的 fusion_dict return fusion_dict