f3b1a85d by huangyf2

11 优化蓝牙扫描

1 parent 0e90a8e9
Showing 1 changed file with 231 additions and 52 deletions
......@@ -128,6 +128,8 @@ class BLEClient:
self._notification_callbacks = {} # 存储通知回调,用于清理
self._shutdown = False # 添加关闭标志
self._scanner = None # 当前扫描器实例
self._connecting = False # 连接状态标志
self._connect_task = None # 连接任务
def on_disconnect(self, client):
print("BLE连接断开,关闭WebSocket")
......@@ -160,14 +162,9 @@ class BLEClient:
def detection_callback(self, device, advertisement_data):
if any(service_uuid in advertisement_data.service_uuids for service_uuid in self.services):
# 优先使用广播包中的local_name
# 设备名称默认获取方式:优先使用广播包local_name,如果为空则使用device.name
device_name = advertisement_data.local_name if advertisement_data.local_name else (device.name if device.name else "")
# 丢弃名字为空的设备
if not device_name or device_name.strip() == "":
print(f"跳过名字为空的设备: {device.address}")
return None
self.target_device = (device, advertisement_data)
if not self.target_device:
print("未找到匹配设备")
......@@ -175,7 +172,7 @@ class BLEClient:
else:
device, adv_data = self.target_device
print("\n找到目标设备:")
print(f"设备名称(从广播包): {device_name}")
print(f"设备名称: {device_name} (优先广播包local_name,否则使用device.name)")
print(f"设备地址: {device.address}")
print(f"信号强度: {device.rssi} dBm")
print("\n广播信息:")
......@@ -200,6 +197,137 @@ class BLEClient:
except Exception:
pass
async def _background_connect(self, peripheral_id, request_id):
"""
后台执行BLE连接任务
优化策略:
1. 快速断开旧连接(不阻塞)
2. 等待资源释放
3. 执行连接(带超时和重试)
4. 验证连接状态
5. 发送状态通知
"""
try:
self._connecting = True
# 步骤1: 快速断开旧连接(带超时)
old_client = self.client
if old_client:
try:
if old_client.is_connected:
await asyncio.wait_for(old_client.disconnect(), timeout=1.0)
print(f"旧连接已断开: {peripheral_id}")
except asyncio.TimeoutError:
print(f"断开旧连接超时,强制清理: {peripheral_id}")
except Exception as e:
print(f"断开旧连接异常: {e}")
finally:
if old_client == self.client:
self.client = None
# 等待资源释放,避免冲突
await asyncio.sleep(0.3)
# 步骤2: 执行连接(带超时和重试机制)
client = None
max_retries = 2
connect_timeout = 8.0
for attempt in range(1, max_retries + 1):
try:
if attempt > 1:
print(f"连接重试 {attempt}/{max_retries}: {peripheral_id}")
await asyncio.sleep(0.5 * attempt) # 退避延迟
client = BleakClient(peripheral_id, timeout=10.0)
client.set_disconnected_callback(self.on_disconnect)
# 连接操作,带超时控制
await asyncio.wait_for(client.connect(), timeout=connect_timeout)
# 验证连接状态
if not client.is_connected:
raise Exception("连接后状态检查失败")
# 再次验证(等待一小段时间)
await asyncio.sleep(0.1)
if not client.is_connected:
raise Exception("连接状态验证失败")
# 连接成功
self.client = client
self._connecting = False
client = None
print(f"成功连接到设备: {peripheral_id}")
# 发送连接成功通知
success_response = json.dumps({
"jsonrpc": "2.0",
"method": "connectionStatus",
"params": {
"connected": True,
"peripheralId": peripheral_id
},
"id": request_id
})
await log_message("下发", success_response)
if self.websocket and not self.websocket.closed:
await self.websocket.send(success_response)
return
except asyncio.TimeoutError:
print(f"连接超时 ({connect_timeout}秒) - 尝试 {attempt}/{max_retries}: {peripheral_id}")
if client:
try:
asyncio.create_task(client.disconnect())
except Exception:
pass
client = None
if attempt == max_retries:
error_msg = f"连接BLE设备超时({connect_timeout}秒)"
break
continue
except Exception as e:
print(f"连接异常 - 尝试 {attempt}/{max_retries}: {e}")
if client:
try:
asyncio.create_task(client.disconnect())
except Exception:
pass
client = None
if attempt == max_retries:
error_msg = str(e)
break
continue
# 连接失败
self._connecting = False
error_response = json.dumps({
"jsonrpc": "2.0",
"method": "connectionStatus",
"params": {
"connected": False,
"error": error_msg,
"peripheralId": peripheral_id
},
"id": request_id
})
await log_message("下发", error_response)
if self.websocket and not self.websocket.closed:
await self.websocket.send(error_response)
except Exception as e:
self._connecting = False
print(f"后台连接未知错误: {e}")
finally:
# 清理未使用的client
if client and client != self.client:
try:
asyncio.create_task(client.disconnect())
except Exception:
pass
async def handle_client(self, websocket, path):
self.websocket = websocket
self._shutdown = False # 重置关闭标志
......@@ -234,24 +362,50 @@ class BLEClient:
# 双重扫描:快速扫描后若未发现,再进行扩展扫描;发现即停
phases = [("active", 3.0), ("passive", 6.0)]
found = False
scan_error = None
for phase_index, (scan_mode, duration) in enumerate(phases, start=1):
self.target_device = None
self._scanner = BleakScanner(scanning_mode=scan_mode)
self._scanner.register_detection_callback(self.detection_callback)
print(f"开始第{phase_index}阶段扫描(模式: {scan_mode}, 时长: {duration}s)...")
self._scanner = None
try:
await self._scanner.start()
# 创建扫描器
self._scanner = BleakScanner(scanning_mode=scan_mode)
self._scanner.register_detection_callback(self.detection_callback)
print(f"开始第{phase_index}阶段扫描(模式: {scan_mode}, 时长: {duration}s)...")
# 启动扫描
try:
await self._scanner.start()
except Exception as e:
print(f"扫描启动失败-阶段{phase_index}: {e}")
scan_error = str(e)
continue
# 轮询检查是否已找到,找到则提前停止
start_ts = time.time()
check_interval = 0.05 # 更频繁的检查,更快响应
while time.time() - start_ts < duration and not self.target_device:
await asyncio.sleep(0.1)
await asyncio.sleep(check_interval)
# 检查是否应该关闭
if self._shutdown:
break
except Exception as e:
print(f"扫描过程异常-阶段{phase_index}: {e}")
scan_error = str(e)
finally:
try:
await self._scanner.stop()
except Exception:
pass
self._scanner = None
# 确保扫描器被停止和清理
if self._scanner is not None:
try:
await self._scanner.stop()
except Exception as e:
print(f"扫描停止失败-阶段{phase_index}: {e}")
self._scanner = None
# 等待扫描完全停止
await asyncio.sleep(0.1)
# 检查是否找到设备
if self.target_device:
found = True
break
......@@ -260,13 +414,14 @@ class BLEClient:
if found:
device, adv_data = self.target_device
# 优先使用广播包中的local_name,如果没有则使用device.name
# 设备名称默认获取方式:优先使用广播包local_name,如果为空则使用device.name
device_name = adv_data.local_name if adv_data.local_name else (device.name if device.name else "")
discover_response = json.dumps({
"jsonrpc": "2.0",
"method": "didDiscoverPeripheral",
"params": {
"name": device_name,
"name": device_name, # 设备名称(优先广播包local_name,否则device.name)
"peripheralId": device.address,
"rssi": device.rssi
}
......@@ -285,39 +440,16 @@ class BLEClient:
await websocket.send(result_response)
elif method == "connect":
"""
优化连接策略:
1. 立即返回"连接中"状态,不阻塞WebSocket
2. 在后台异步执行连接(带重试机制)
3. 连接完成后通过connectionStatus通知客户端
"""
peripheral_id = params.get("peripheralId")
if peripheral_id:
# 如果已有连接,先断开
if self.client and self.client.is_connected:
try:
await self.client.disconnect()
except Exception:
pass
try:
self.client = BleakClient(peripheral_id, timeout=10.0) # 添加超时
self.client.set_disconnected_callback(self.on_disconnect)
await self.client.connect()
if self.client.is_connected:
response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", response)
if not websocket.closed:
await websocket.send(response)
else:
error_response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id if request_id else 0
})
await log_message("下发", error_response)
if not websocket.closed:
await websocket.send(error_response)
except Exception as e:
# 验证设备地址格式
if len(peripheral_id) < 12:
error_response = json.dumps({
"jsonrpc": "2.0",
"result": None,
......@@ -326,8 +458,46 @@ class BLEClient:
await log_message("下发", error_response)
if not websocket.closed:
await websocket.send(error_response)
if self.client:
self.client = None
# 检查是否已有连接任务在进行
elif self._connecting:
# 已有连接任务,返回提示
response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", response)
if not websocket.closed:
await websocket.send(response)
else:
# 取消之前的连接任务(如果存在)
if self._connect_task and not self._connect_task.done():
self._connect_task.cancel()
# 立即返回"连接中"状态(不等待实际连接完成)
response = json.dumps({
"jsonrpc": "2.0",
"result": {"connecting": True},
"id": request_id
})
await log_message("下发", response)
if not websocket.closed:
await websocket.send(response)
# 在后台启动连接任务(不阻塞)
self._connect_task = asyncio.create_task(
self._background_connect(peripheral_id, request_id)
)
else:
# 参数错误
error_response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id if request_id else 0
})
await log_message("下发", error_response)
if not websocket.closed:
await websocket.send(error_response)
elif method == "write":
service_id = params.get("serviceId")
......@@ -516,6 +686,15 @@ class BLEClient:
finally:
# 清理BLE连接和通知
self._shutdown = True
self._connecting = False
# 取消后台连接任务
if self._connect_task and not self._connect_task.done():
try:
self._connect_task.cancel()
except Exception:
pass
if self.client and self.client.is_connected:
try:
# 停止所有通知
......
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!