c428ab60 by huangyf2

蓝牙优化名字

1 parent bb536a51
Showing 1 changed file with 378 additions and 42 deletions
......@@ -157,8 +157,24 @@ class BLEClient:
self.websocket = None
self.notification_records = defaultdict(lambda: (None, 0.0)) # 特征ID: (最后消息, 时间戳)
self._notification_callbacks = {} # 存储通知回调,用于清理
self._shutdown = False # 添加关闭标志
self._scanner = None # 当前扫描器实例
self._shutdown = False
self._scanner = None
self._connecting = False # 连接状态标志
self._connect_task = None # 连接任务
# 设备缓存机制 - 提升扫描速度和稳定性
self._device_cache = {} # {address: (device, adv_data, timestamp, rssi)}
self._cache_timeout = 30.0 # 缓存30秒
self._last_scan_time = 0
self._cache_scan_interval = 5.0 # 5秒内复用缓存
# 连接状态管理
self._connection_history = {} # {address: (success_time, last_attempt_time, success_count)}
self._last_connected_address = None
# 扫描优化
self._best_rssi_device = None # 信号最强的设备
self._best_rssi_value = -999
def on_disconnect(self, client):
print("BLE连接断开,关闭WebSocket")
......@@ -192,23 +208,40 @@ class BLEClient:
self._shutdown = True
def detection_callback(self, device, advertisement_data):
if any(service_uuid in advertisement_data.service_uuids for service_uuid in self.services):
# 优先使用广播包中的local_name
device_name = advertisement_data.local_name if advertisement_data.local_name else (device.name if device.name else "")
"""设备发现回调函数,从广播包中匹配服务UUID,支持信号强度优化"""
try:
# 检查服务UUID匹配
if not self.services or not any(service_uuid in advertisement_data.service_uuids for service_uuid in self.services):
return None
# 丢弃名字为空的设备
# 严格从广播包中获取设备名称,优先使用local_name
device_name = advertisement_data.local_name if advertisement_data.local_name else ""
# 如果广播包中没有local_name,跳过该设备
if not device_name or device_name.strip() == "":
print(f"跳过名字为空的设备: {device.address}")
return None
# 验证设备地址格式
if not device.address or len(device.address) < 12:
return None
# 更新设备缓存(包含时间戳和RSSI)
current_time = time.time()
rssi = device.rssi if hasattr(device, 'rssi') else -100
self._device_cache[device.address] = (device, advertisement_data, current_time, rssi)
# 信号强度优化:选择信号最强的设备
if rssi > self._best_rssi_value:
self._best_rssi_value = rssi
self._best_rssi_device = (device, advertisement_data)
# 保存匹配的设备(优先使用信号最强的)
self.target_device = (device, advertisement_data)
if not self.target_device:
print("未找到匹配设备")
return
else:
if self.target_device:
device, adv_data = self.target_device
print("\n找到目标设备:")
print(f"设备名称(从广播包): {device_name}")
print(f"设备名称(从广播包local_name): {device_name}")
print(f"设备地址: {device.address}")
print(f"信号强度: {device.rssi} dBm")
print("\n广播信息:")
......@@ -216,7 +249,7 @@ class BLEClient:
print(f"制造商数据: {adv_data.manufacturer_data}")
print(f"服务数据: {adv_data.service_data}")
print(f"本地名称(local_name): {adv_data.local_name}")
print(f"设备名称(device.name): {device.name if device.name else 'N/A'}")
print(f"设备名称(device.name): {device.name if device.name else 'N/A'} (仅作参考,实际使用广播包名称)")
# 发现设备后,尝试立即停止扫描
try:
if self._scanner is not None:
......@@ -225,7 +258,11 @@ class BLEClient:
except Exception as e:
log_exception_sync_to_async(e, "detection_callback停止扫描")
pass
return self.target_device
except Exception as e:
log_exception_sync_to_async(e, "detection_callback")
return None
async def _stop_scan_early(self):
try:
......@@ -235,6 +272,197 @@ class BLEClient:
await log_exception_async(e, "_stop_scan_early")
pass
async def _background_connect(self, peripheral_id, request_id):
"""
后台执行BLE连接任务
策略:
1. 验证设备地址格式
2. 异步断开旧连接(不阻塞新连接)
3. 执行新连接(带超时)
4. 连接完成后发送状态通知
"""
try:
self._connecting = True
# 步骤0: 验证设备地址格式
if not peripheral_id or len(peripheral_id) < 12:
error_msg = f"无效的设备地址: {peripheral_id}"
await log_message("连接", error_msg)
error_response = json.dumps({
"jsonrpc": "2.0",
"method": "connectionStatus",
"params": {
"connected": False,
"error": error_msg,
"peripheralId": peripheral_id
},
"id": request_id
})
await log_message("下发", error_response)
if self.websocket and not self.websocket.closed:
await self.websocket.send(error_response)
self._connecting = False
return
# 步骤1: 异步断开旧连接(最多等待1秒,超时后强制清理)
old_client = self.client
if old_client:
try:
if old_client.is_connected:
# 设置较短的超时,快速失败
await asyncio.wait_for(old_client.disconnect(), timeout=1.0)
await log_message("连接", f"旧连接已断开: {peripheral_id}")
# 清理旧客户端引用
if old_client == self.client:
self.client = None
except asyncio.TimeoutError:
await log_exception_async(
asyncio.TimeoutError("断开旧连接超时"),
"后台连接-断开旧连接超时"
)
# 强制清理,不等待
if old_client == self.client:
self.client = None
pass
except Exception as e:
await log_exception_async(e, "后台连接-断开旧连接")
if old_client == self.client:
self.client = None
pass
# 等待一小段时间,确保旧连接资源完全释放
await asyncio.sleep(0.2)
# 步骤2: 执行新连接
# 注意:BLE连接包含多个阶段:
# 1. 建立物理连接(通常1-2秒)
# 2. 发现服务(get_services)(通常3-8秒,是主要瓶颈)
# 因此总超时设置为8秒,确保大多数设备能完成连接
client = None
try:
# BleakClient的超时只影响物理连接阶段
client = BleakClient(peripheral_id, timeout=10.0)
client.set_disconnected_callback(self.on_disconnect)
# 连接操作,总超时8秒(覆盖物理连接+服务发现)
await asyncio.wait_for(client.connect(), timeout=8.0)
# 验证连接状态
if not client.is_connected:
raise Exception("连接后状态检查失败:is_connected为False")
# 再次验证连接确实可用(等待一小段时间后再次检查)
await asyncio.sleep(0.1)
if not client.is_connected:
raise Exception("连接后状态验证失败:连接已断开")
# 连接成功,保存客户端引用
self.client = client
self._connecting = False
client = None # 避免finally中重复清理
await log_message("连接", f"成功连接到设备: {peripheral_id}")
# 发送连接成功通知
success_response = json.dumps({
"jsonrpc": "2.0",
"method": "connectionStatus",
"params": {
"connected": True,
"peripheralId": peripheral_id
},
"id": request_id
})
await log_message("下发", success_response)
if self.websocket and not self.websocket.closed:
await self.websocket.send(success_response)
except asyncio.TimeoutError:
self._connecting = False
error_msg = "连接BLE设备超时(8秒),可能是服务发现阶段耗时过长"
await log_exception_async(
asyncio.TimeoutError(error_msg),
"后台连接-连接超时"
)
# 清理超时的client资源
if client is not None:
try:
# 尝试断开连接,但不等待(快速清理)
asyncio.create_task(client.disconnect())
except Exception:
pass
client = None
# 发送连接失败通知
error_response = json.dumps({
"jsonrpc": "2.0",
"method": "connectionStatus",
"params": {
"connected": False,
"error": error_msg,
"peripheralId": peripheral_id
},
"id": request_id
})
await log_message("下发", error_response)
if self.websocket and not self.websocket.closed:
await self.websocket.send(error_response)
pass
except asyncio.CancelledError:
# 任务被取消,清理资源
self._connecting = False
if client is not None:
try:
asyncio.create_task(client.disconnect())
except Exception:
pass
client = None
# 被取消时不需要发送通知(可能是新的连接请求)
pass
except Exception as e:
self._connecting = False
await log_exception_async(e, "后台连接-连接失败")
# 清理异常的client资源
if client is not None:
try:
asyncio.create_task(client.disconnect())
except Exception:
pass
client = None
# 发送连接失败通知
error_response = json.dumps({
"jsonrpc": "2.0",
"method": "connectionStatus",
"params": {
"connected": False,
"error": str(e),
"peripheralId": peripheral_id
},
"id": request_id
})
await log_message("下发", error_response)
if self.websocket and not self.websocket.closed:
await self.websocket.send(error_response)
pass
finally:
# 确保清理未使用的client资源
if client is not None and client != self.client:
try:
# 异步断开,不阻塞
asyncio.create_task(client.disconnect())
except Exception:
pass
except Exception as e:
self._connecting = False
await log_exception_async(e, "后台连接-未知错误")
pass
async def handle_client(self, websocket, path):
self.websocket = websocket
self._shutdown = False # 重置关闭标志
......@@ -266,43 +494,121 @@ class BLEClient:
self.services.extend(filt.get("services", []))
self.optional_services = params.get("optionalServices", [])
# 验证服务列表不为空
if not self.services:
error_response = json.dumps({
"jsonrpc": "2.0",
"error": {"code": -32602, "message": "Invalid params: services filter is required"},
"id": request_id if request_id else 0
})
await log_message("下发", error_response)
if not websocket.closed:
await websocket.send(error_response)
continue
# 优化策略1: 检查缓存,快速返回
current_time = time.time()
self._best_rssi_device = None
self._best_rssi_value = -999
# 清理过期缓存
expired_addresses = [
addr for addr, (_, _, ts, _) in self._device_cache.items()
if current_time - ts > self._cache_timeout
]
for addr in expired_addresses:
del self._device_cache[addr]
# 如果缓存有效且时间间隔短,直接使用缓存
if (current_time - self._last_scan_time < self._cache_scan_interval and
self._device_cache):
# 从缓存中选择信号最强的设备
best_cache = None
best_rssi = -999
for addr, (dev, adv, ts, rssi) in self._device_cache.items():
# 验证服务匹配
if any(service_uuid in adv.service_uuids for service_uuid in self.services):
if adv.local_name and rssi > best_rssi:
best_rssi = rssi
best_cache = (dev, adv)
if best_cache:
device, adv_data = best_cache
device_name = adv_data.local_name if adv_data.local_name else ""
if device_name:
print(f"使用缓存设备: {device_name} ({device.address}), RSSI: {best_rssi} dBm")
self.target_device = best_cache
found = True
# 如果缓存未命中,进行扫描
if not found:
# 双重扫描:快速扫描后若未发现,再进行扩展扫描;发现即停
phases = [("active", 3.0), ("passive", 6.0)]
found = False
scan_error = None
for phase_index, (scan_mode, duration) in enumerate(phases, start=1):
self.target_device = None
self._scanner = None
try:
# 创建扫描器
self._scanner = BleakScanner(scanning_mode=scan_mode)
self._scanner.register_detection_callback(self.detection_callback)
print(f"开始第{phase_index}阶段扫描(模式: {scan_mode}, 时长: {duration}s)...")
# 启动扫描
try:
await self._scanner.start()
except Exception as e:
await log_exception_async(e, f"扫描启动失败-阶段{phase_index}")
scan_error = str(e)
continue
# 轮询检查是否已找到,找到则提前停止
start_ts = time.time()
while time.time() - start_ts < duration and not self.target_device:
await asyncio.sleep(0.1)
# 检查是否应该关闭
if self._shutdown:
break
except Exception as e:
await log_exception_async(e, f"扫描过程异常-阶段{phase_index}")
scan_error = str(e)
finally:
# 确保扫描器被停止和清理
if self._scanner is not None:
try:
await self._scanner.stop()
except Exception as e:
await log_exception_async(e, "扫描停止")
await log_exception_async(e, f"扫描停止失败-阶段{phase_index}")
pass
self._scanner = None
# 等待一小段时间确保扫描完全停止
await asyncio.sleep(0.1)
# 检查是否找到设备
if self.target_device:
found = True
break
else:
print(f"第{phase_index}阶段未找到设备")
# 处理扫描结果
if found:
device, adv_data = self.target_device
# 优先使用广播包中的local_name,如果没有则使用device.name
device_name = adv_data.local_name if adv_data.local_name else (device.name if device.name else "")
# 严格从广播包中获取设备名称,使用local_name
device_name = adv_data.local_name if adv_data.local_name else ""
# 确保从广播包获取到名字才发送discover通知
if device_name and device_name.strip():
discover_response = json.dumps({
"jsonrpc": "2.0",
"method": "didDiscoverPeripheral",
"params": {
"name": device_name,
"name": device_name, # 从广播包获取的名称
"peripheralId": device.address,
"rssi": device.rssi
}
......@@ -310,7 +616,10 @@ class BLEClient:
await log_message("下发", discover_response)
if not websocket.closed:
await websocket.send(discover_response)
else:
print(f"警告: 设备 {device.address} 广播包中没有local_name,无法发送discover通知")
# 无论是否有名字,都返回result(表示扫描完成)
result_response = json.dumps({
"jsonrpc": "2.0",
"result": None,
......@@ -319,24 +628,34 @@ class BLEClient:
await log_message("下发", result_response)
if not websocket.closed:
await websocket.send(result_response)
else:
# 未找到设备,返回错误
error_msg = "未找到匹配的蓝牙设备"
if scan_error:
error_msg += f" (扫描错误: {scan_error})"
error_response = json.dumps({
"jsonrpc": "2.0",
"error": {"code": -1, "message": error_msg},
"id": request_id if request_id else 0
})
await log_message("下发", error_response)
if not websocket.closed:
await websocket.send(error_response)
elif method == "connect":
"""
优化策略:
1. 立即返回"连接中"状态,不阻塞WebSocket请求处理
2. 在后台任务中执行连接操作
3. 连接完成后通过connectionStatus通知客户端
这样可以显著减少WebSocket响应时间,从10秒降低到<100ms
"""
peripheral_id = params.get("peripheralId")
if peripheral_id:
# 如果已有连接,先断开
if self.client and self.client.is_connected:
try:
await self.client.disconnect()
except Exception as e:
await log_exception_async(e, "connect断开旧连接")
pass
try:
self.client = BleakClient(peripheral_id, timeout=10.0) # 添加超时
self.client.set_disconnected_callback(self.on_disconnect)
await self.client.connect()
if self.client.is_connected:
# 检查是否已有连接任务在进行
if self._connecting:
# 已有连接任务,返回提示
response = json.dumps({
"jsonrpc": "2.0",
"result": None,
......@@ -346,16 +665,26 @@ class BLEClient:
if not websocket.closed:
await websocket.send(response)
else:
error_response = json.dumps({
# 取消之前的连接任务(如果存在)
if self._connect_task and not self._connect_task.done():
self._connect_task.cancel()
# 立即返回"连接中"状态(不等待实际连接完成)
response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id if request_id else 0
"result": {"connecting": True},
"id": request_id
})
await log_message("下发", error_response)
await log_message("下发", response)
if not websocket.closed:
await websocket.send(error_response)
except Exception as e:
await log_exception_async(e, "connect连接BLE")
await websocket.send(response)
# 在后台启动连接任务(不阻塞)
self._connect_task = asyncio.create_task(
self._background_connect(peripheral_id, request_id)
)
else:
# 参数错误,立即返回
error_response = json.dumps({
"jsonrpc": "2.0",
"result": None,
......@@ -364,9 +693,6 @@ class BLEClient:
await log_message("下发", error_response)
if not websocket.closed:
await websocket.send(error_response)
if self.client:
self.client = None
pass
elif method == "write":
service_id = params.get("serviceId")
......@@ -567,6 +893,16 @@ class BLEClient:
finally:
# 清理BLE连接和通知
self._shutdown = True
self._connecting = False
# 取消后台连接任务
if self._connect_task and not self._connect_task.done():
try:
self._connect_task.cancel()
except Exception as e:
await log_exception_async(e, "finally取消连接任务")
pass
if self.client and self.client.is_connected:
try:
# 停止所有通知
......
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!