aa901ba8 by huangyf2

6.8.2.py

1 parent 54ac76a2
import asyncio
import websockets
from bleak import BleakScanner, BleakClient
import json
import base64
import threading
from collections import defaultdict
# 添加线程锁以确保日志写入的原子性
write_lock = threading.Lock()
def log_message_sync(direction, message):
"""同步日志记录函数"""
log_entry = f"{direction}: {message}\n"
print(log_entry, end='') # 控制台仍然输出
with write_lock:
with open('b.log', 'a', encoding='utf-8') as f:
f.write(log_entry)
async def log_message(direction, message):
"""异步封装日志记录"""
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, log_message_sync, direction, message)
class BLEClient:
def __init__(self):
self.target_device = None
self.client = None
self.services = []
self.optional_services = []
self.websocket = None
self.notification_records = defaultdict(lambda: (None, 0)) # 特征ID: (最后消息, 时间戳)
def on_disconnect(self, client):
print("BLE连接断开,关闭WebSocket")
if self.websocket and not self.websocket.closed:
asyncio.create_task(self.close_websocket())
async def close_websocket(self):
await self.websocket.close()
self.websocket = None
def detection_callback(self, device, advertisement_data):
if any(service_uuid in advertisement_data.service_uuids for service_uuid in self.services):
self.target_device = (device, advertisement_data)
if not self.target_device:
print("未找到匹配设备")
return
else:
device, adv_data = self.target_device
print("\n找到目标设备:")
print(f"设备名称: {device.name}")
print(f"设备地址: {device.address}")
print(f"信号强度: {device.rssi} dBm")
print("\n广播信息:")
print(f"服务UUID列表: {adv_data.service_uuids}")
print(f"制造商数据: {adv_data.manufacturer_data}")
print(f"服务数据: {adv_data.service_data}")
print(f"本地名称: {adv_data.local_name}")
return self.target_device
async def handle_client(self, websocket, path):
self.websocket = websocket
if path != "/scratch/ble":
await websocket.close(code=1003, reason="Path not allowed")
return
try:
async for message in websocket:
try:
await log_message("接收", message)
request = json.loads(message)
if request["jsonrpc"] != "2.0":
continue
method = request.get("method")
params = request.get("params", {})
request_id = request.get("id")
if method == "discover":
self.services = []
for filt in params.get("filters", [{}]):
self.services.extend(filt.get("services", []))
self.optional_services = params.get("optionalServices", [])
scanner = BleakScanner()
scanner.register_detection_callback(self.detection_callback)
max_retries = 3
found = False
for attempt in range(max_retries):
self.target_device = None
await scanner.start()
await asyncio.sleep(5)
await scanner.stop()
if self.target_device:
found = True
break
if attempt < max_retries - 1:
print(f"未找到设备,第{attempt+1}次重试...")
await asyncio.sleep(3)
if found:
device, adv_data = self.target_device
discover_response = json.dumps({
"jsonrpc": "2.0",
"method": "didDiscoverPeripheral",
"params": {
"name": device.name,
"peripheralId": device.address,
"rssi": device.rssi
}
})
await log_message("下发", discover_response)
await websocket.send(discover_response)
result_response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", result_response)
await websocket.send(result_response)
elif method == "connect":
peripheral_id = params.get("peripheralId")
if peripheral_id:
self.client = BleakClient(peripheral_id)
self.client.set_disconnected_callback(self.on_disconnect)
await self.client.connect()
if self.client.is_connected:
response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
elif method == "write":
service_id = params.get("serviceId")
characteristic_id = params.get("characteristicId")
message = params.get("message")
encoding = params.get("encoding", "utf-8")
if all([service_id, characteristic_id, message]):
if encoding == "base64":
message_bytes = base64.b64decode(message)
else:
message_bytes = message.encode(encoding)
if "withResponse" in params:
response=params.get("withResponse")
await self.client.write_gatt_char(characteristic_id, message_bytes,response=False)
else:
await self.client.write_gatt_char(characteristic_id, message_bytes)
# await self.client.write_gatt_char(characteristic_id, message_bytes)
response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
elif method == "read":
service_id = params.get("serviceId")
characteristic_id = params.get("characteristicId")
if all([service_id, characteristic_id]):
data = await self.client.read_gatt_char(characteristic_id)
response = json.dumps({
"jsonrpc": "2.0",
"result": {
"serviceId": service_id,
"characteristicId": characteristic_id,
"message": base64.b64encode(data).decode("utf-8")
},
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
elif method == "startNotifications":
service_id = params.get("serviceId")
characteristic_id = params.get("characteristicId")
if all([service_id, characteristic_id]):
await self.client.start_notify(
characteristic_id,
self.notification_handler(websocket, service_id, characteristic_id)
)
response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
except json.JSONDecodeError:
error_msg = json.dumps({
"jsonrpc": "2.0",
"result": {"message": "Parse error"},
"id": None
})
await log_message("下发", error_msg)
except Exception as e:
error_msg = json.dumps({
"jsonrpc": "2.0",
"result": {"message": str(e)},
"id": request.get("id") if request else None
})
await log_message("下发", error_msg)
except websockets.exceptions.ConnectionClosed:
print("WebSocket连接关闭")
finally:
if self.client and self.client.is_connected:
await self.client.disconnect()
self.client = None
self.target_device = None
def notification_handler(self, websocket, service_id, characteristic_id):
async def callback(sender, data):
current_time = asyncio.get_event_loop().time()
last_message, last_time = self.notification_records[characteristic_id]
# 解码当前数据用于比较
current_message = base64.b64encode(data).decode('utf-8')
# 过滤逻辑
if current_message == last_message and (current_time - last_time) < 0.5:
return
# 更新记录
self.notification_records[characteristic_id] = (current_message, current_time)
response = json.dumps({
"jsonrpc": "2.0",
"method": "characteristicDidChange",
"params": {
"serviceId": service_id,
"characteristicId": characteristic_id,
"message": current_message
}
})
await log_message("下发", response)
await websocket.send(response)
return callback
async def main():
async with websockets.serve(
lambda websocket, path: BLEClient().handle_client(websocket, path),
"localhost", 20111
):
print("WebSocket服务已启动: ws://localhost:20111/scratch/ble")
print("日志文件路径: ./b.log")
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
\ No newline at end of file
# -*- coding: utf-8 -*-
import sys
import asyncio
import websockets
import json
import base64
import threading
from collections import defaultdict
import socket
import time
import platform
from typing import Optional, Dict, List, Callable, Any
# Windows 蓝牙 API 导入
if sys.platform.startswith('win32'):
import winrt.windows.foundation.collections as wfc
import winrt.windows.devices.bluetooth as bt
import winrt.windows.devices.bluetooth.advertisement as bt_adv
import winrt.windows.devices.bluetooth.genericattributeprofile as gatt
import winrt.windows.devices.enumeration as de
import winrt.windows.storage.streams as streams
import winrt.windows.foundation as wf
print("当前运行在Windows系统,使用 pywinrt 蓝牙 API")
USE_PYWINRT = True
else:
# 非 Windows 系统回退到 bleak
from bleak import BleakScanner, BleakClient
print("当前运行在非Windows系统,使用 bleak 蓝牙库")
USE_PYWINRT = False
# 添加线程锁以确保日志写入的原子性
write_lock = threading.Lock()
def log_message_sync(direction, message):
"""同步日志记录函数"""
log_entry = f"{direction}: {message}\n"
print(log_entry, end='') # 控制台仍然输出
with write_lock:
with open('b.log', 'a', encoding='utf-8') as f:
f.write(log_entry)
async def log_message(direction, message):
"""异步封装日志记录"""
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, log_message_sync, direction, message)
def is_port_in_use(port, host='localhost'):
"""检查端口是否被占用"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind((host, port))
return False
except socket.error:
return True
async def self_test(websocket):
"""发送自检测试消息"""
test_message = json.dumps({
"jsonrpc": "2.0",
"method": "ping",
"params": {"timestamp": int(time.time())},
"id": "test"
})
await log_message("自测", test_message)
await websocket.send(test_message)
try:
# 等待响应,设置超时
response = await asyncio.wait_for(websocket.recv(), timeout=5.0)
await log_message("自测响应", response)
return True
except asyncio.TimeoutError:
await log_message("自测", "自测超时,未收到响应")
return False
except Exception as e:
await log_message("自测", f"自测异常: {str(e)}")
return False
class BLEClient:
def __init__(self):
self.target_device = None
self.client = None
self.services = []
self.optional_services = []
self.websocket = None
self.notification_records = defaultdict(lambda: (None, 0)) # 特征ID: (最后消息, 时间戳)
# pywinrt 相关属性
self.device_watcher = None
self.bluetooth_device = None
self.gatt_device_service = None
self.characteristics = {}
self.notification_handlers = {}
# 设备发现回调
self.device_found_callback = None
self.scan_timeout = 10.0
self.connection_timeout = 10.0
self.operation_timeout = 5.0
# 连接状态管理
self.connection_state = {
'status': 'disconnected', # disconnected, connecting, connected, disconnecting, error
'device_id': None,
'device_name': None,
'connection_time': None,
'last_activity': None,
'error_count': 0,
'retry_count': 0,
'max_retries': 3,
'connection_quality': 'unknown', # excellent, good, fair, poor, unknown
'signal_strength': None,
'services_discovered': False,
'characteristics_discovered': False,
'notifications_active': set(),
'connection_history': [],
'error_history': []
}
# 状态变化回调
self.state_change_callbacks = []
self.connection_monitor_task = None
self.auto_reconnect = False
self.auto_reconnect_interval = 5.0
def on_disconnect(self, client):
print("BLE连接断开,关闭WebSocket")
if self.websocket and not self.websocket.closed:
asyncio.create_task(self.close_websocket())
async def close_websocket(self):
if self.websocket:
await self.websocket.close()
self.websocket = None
async def start_pywinrt_scan(self, target_services: List[str] = None) -> Optional[Dict]:
"""使用 pywinrt 进行蓝牙设备扫描"""
if not USE_PYWINRT:
raise RuntimeError("pywinrt 仅在 Windows 系统上可用")
try:
# 创建设备观察器
aqs_filter = "System.Devices.Aep.ProtocolId:=\"{bb7bb05e-5972-42b5-94fc-76eaa7084d49}\""
self.device_watcher = de.DeviceWatcher.create_from_aqs_filter(aqs_filter)
discovered_devices = []
def on_device_added(sender, device_info):
try:
device_name = device_info.name or "未知设备"
device_id = device_info.id
device_kind = device_info.kind
print(f"发现设备: {device_name}, ID: {device_id}")
# 检查是否包含目标服务
if target_services:
# 这里可以添加更复杂的服务过滤逻辑
pass
discovered_devices.append({
'name': device_name,
'id': device_id,
'kind': device_kind,
'info': device_info
})
except Exception as e:
print(f"处理设备信息时出错: {e}")
def on_device_updated(sender, device_info):
print(f"设备更新: {device_info.name}")
def on_device_removed(sender, device_info):
print(f"设备移除: {device_info.name}")
def on_enumeration_completed(sender, args):
print("设备枚举完成")
def on_stopped(sender, args):
print("设备扫描停止")
# 注册事件处理器
self.device_watcher.added += on_device_added
self.device_watcher.updated += on_device_updated
self.device_watcher.removed += on_device_removed
self.device_watcher.enumeration_completed += on_enumeration_completed
self.device_watcher.stopped += on_stopped
# 开始扫描
print("开始扫描蓝牙设备...")
self.device_watcher.start()
# 等待扫描完成或超时
start_time = time.time()
while (self.device_watcher.status != de.DeviceWatcherStatus.STOPPED and
time.time() - start_time < self.scan_timeout):
await asyncio.sleep(0.1)
# 停止扫描
if self.device_watcher.status == de.DeviceWatcherStatus.STARTED:
self.device_watcher.stop()
# 等待停止完成
while self.device_watcher.status != de.DeviceWatcherStatus.STOPPED:
await asyncio.sleep(0.1)
print(f"扫描完成,发现 {len(discovered_devices)} 个设备")
return discovered_devices
except Exception as e:
print(f"pywinrt 扫描出错: {e}")
return None
async def connect_pywinrt_device(self, device_id: str) -> bool:
"""使用 pywinrt 连接蓝牙设备"""
if not USE_PYWINRT:
raise RuntimeError("pywinrt 仅在 Windows 系统上可用")
try:
print(f"正在连接设备: {device_id}")
self.update_connection_state('connecting', device_id=device_id)
# 从设备ID获取蓝牙设备
self.bluetooth_device = await bt.BluetoothLEDevice.from_id_async(device_id)
if not self.bluetooth_device:
print("无法获取蓝牙设备对象")
self.record_error('device_not_found', f'无法获取设备对象: {device_id}')
self.update_connection_state('error')
return False
# 检查连接状态
if self.bluetooth_device.connection_status == bt.BluetoothConnectionStatus.CONNECTED:
print("设备已连接")
self.update_connection_state('connected',
device_id=device_id,
connection_time=time.time(),
services_discovered=False,
characteristics_discovered=False)
return True
# 等待连接建立
start_time = time.time()
while (self.bluetooth_device.connection_status != bt.BluetoothConnectionStatus.CONNECTED and
time.time() - start_time < self.connection_timeout):
await asyncio.sleep(0.1)
if self.bluetooth_device.connection_status == bt.BluetoothConnectionStatus.CONNECTED:
print("设备连接成功")
self.update_connection_state('connected',
device_id=device_id,
connection_time=time.time(),
services_discovered=False,
characteristics_discovered=False)
# 连接成功后自动发现服务
try:
await self.discover_services_pywinrt()
self.update_connection_state('connected', services_discovered=True)
except Exception as e:
print(f"服务发现失败: {e}")
self.record_error('service_discovery_failed', str(e))
return True
else:
print("设备连接失败")
self.record_error('connection_timeout', f'连接超时: {device_id}')
self.update_connection_state('error')
return False
except Exception as e:
print(f"连接设备时出错: {e}")
self.record_error('connection_exception', str(e))
self.update_connection_state('error')
return False
async def discover_services_pywinrt(self) -> List[Dict]:
"""使用 pywinrt 发现设备服务"""
if not USE_PYWINRT or not self.bluetooth_device:
return []
try:
services = []
gatt_result = await self.bluetooth_device.get_gatt_services_async()
if gatt_result.status == gatt.GattCommunicationStatus.SUCCESS:
for service in gatt_result.services:
service_info = {
'uuid': str(service.uuid),
'service': service
}
services.append(service_info)
print(f"发现服务: {service.uuid}")
return services
except Exception as e:
print(f"发现服务时出错: {e}")
return []
async def discover_characteristics_pywinrt(self, service_uuid: str) -> List[Dict]:
"""使用 pywinrt 发现特征"""
if not USE_PYWINRT or not self.bluetooth_device:
return []
try:
# 获取服务
gatt_result = await self.bluetooth_device.get_gatt_services_async()
if gatt_result.status != gatt.GattCommunicationStatus.SUCCESS:
return []
target_service = None
for service in gatt_result.services:
if str(service.uuid) == service_uuid:
target_service = service
break
if not target_service:
print(f"未找到服务: {service_uuid}")
return []
# 获取特征
characteristics = []
char_result = await target_service.get_characteristics_async()
if char_result.status == gatt.GattCommunicationStatus.SUCCESS:
for char in char_result.characteristics:
char_info = {
'uuid': str(char.uuid),
'characteristic': char,
'properties': {
'read': char.characteristic_properties & gatt.GattCharacteristicProperties.READ != 0,
'write': char.characteristic_properties & gatt.GattCharacteristicProperties.WRITE != 0,
'notify': char.characteristic_properties & gatt.GattCharacteristicProperties.NOTIFY != 0,
'indicate': char.characteristic_properties & gatt.GattCharacteristicProperties.INDICATE != 0
}
}
characteristics.append(char_info)
self.characteristics[str(char.uuid)] = char
print(f"发现特征: {char.uuid}, 属性: {char_info['properties']}")
return characteristics
except Exception as e:
print(f"发现特征时出错: {e}")
return []
async def write_characteristic_pywinrt(self, characteristic_uuid: str, data: bytes) -> bool:
"""使用 pywinrt 写入特征值"""
if not USE_PYWINRT or not self.bluetooth_device:
return False
try:
if characteristic_uuid not in self.characteristics:
print(f"特征未找到: {characteristic_uuid}")
return False
characteristic = self.characteristics[characteristic_uuid]
# 创建数据写入器
writer = streams.DataWriter()
writer.write_bytes(data)
buffer = writer.detach_buffer()
# 写入数据
result = await characteristic.write_value_async(buffer)
if result == gatt.GattCommunicationStatus.SUCCESS:
print(f"成功写入特征 {characteristic_uuid}: {data}")
return True
else:
print(f"写入特征失败: {result}")
return False
except Exception as e:
print(f"写入特征时出错: {e}")
return False
async def read_characteristic_pywinrt(self, characteristic_uuid: str) -> Optional[bytes]:
"""使用 pywinrt 读取特征值"""
if not USE_PYWINRT or not self.bluetooth_device:
return None
try:
if characteristic_uuid not in self.characteristics:
print(f"特征未找到: {characteristic_uuid}")
return None
characteristic = self.characteristics[characteristic_uuid]
# 读取数据
result = await characteristic.read_value_async()
if result.status == gatt.GattCommunicationStatus.SUCCESS:
# 读取数据
reader = streams.DataReader.from_buffer(result.value)
data = bytearray(reader.unconsumed_buffer_length)
reader.read_bytes(data)
print(f"成功读取特征 {characteristic_uuid}: {bytes(data)}")
return bytes(data)
else:
print(f"读取特征失败: {result.status}")
return None
except Exception as e:
print(f"读取特征时出错: {e}")
return None
async def start_notifications_pywinrt(self, characteristic_uuid: str, callback: Callable) -> bool:
"""使用 pywinrt 启动通知"""
if not USE_PYWINRT or not self.bluetooth_device:
return False
try:
if characteristic_uuid not in self.characteristics:
print(f"特征未找到: {characteristic_uuid}")
return False
characteristic = self.characteristics[characteristic_uuid]
# 检查特征是否支持通知
if not (characteristic.characteristic_properties & gatt.GattCharacteristicProperties.NOTIFY):
print(f"特征 {characteristic_uuid} 不支持通知")
return False
# 设置通知回调
def notification_handler(sender, args):
try:
# 读取通知数据
reader = streams.DataReader.from_buffer(args.characteristic_value)
data = bytearray(reader.unconsumed_buffer_length)
reader.read_bytes(data)
# 调用用户回调
asyncio.create_task(callback(bytes(data)))
except Exception as e:
print(f"处理通知时出错: {e}")
# 订阅通知
characteristic.value_changed += notification_handler
result = await characteristic.write_client_characteristic_configuration_descriptor_async(
gatt.GattClientCharacteristicConfigurationDescriptorValue.NOTIFY
)
if result == gatt.GattCommunicationStatus.SUCCESS:
self.notification_handlers[characteristic_uuid] = notification_handler
print(f"成功启动特征 {characteristic_uuid} 的通知")
return True
else:
print(f"启动通知失败: {result}")
return False
except Exception as e:
print(f"启动通知时出错: {e}")
return False
async def stop_notifications_pywinrt(self, characteristic_uuid: str) -> bool:
"""使用 pywinrt 停止通知"""
if not USE_PYWINRT or not self.bluetooth_device:
return False
try:
if characteristic_uuid not in self.characteristics:
print(f"特征未找到: {characteristic_uuid}")
return False
characteristic = self.characteristics[characteristic_uuid]
# 取消订阅通知
result = await characteristic.write_client_characteristic_configuration_descriptor_async(
gatt.GattClientCharacteristicConfigurationDescriptorValue.NONE
)
if result == gatt.GattCommunicationStatus.SUCCESS:
# 移除事件处理器
if characteristic_uuid in self.notification_handlers:
characteristic.value_changed -= self.notification_handlers[characteristic_uuid]
del self.notification_handlers[characteristic_uuid]
print(f"成功停止特征 {characteristic_uuid} 的通知")
return True
else:
print(f"停止通知失败: {result}")
return False
except Exception as e:
print(f"停止通知时出错: {e}")
return False
async def disconnect_pywinrt(self):
"""使用 pywinrt 断开连接"""
if not USE_PYWINRT or not self.bluetooth_device:
return
try:
print("正在断开 pywinrt 蓝牙连接...")
self.update_connection_state('disconnecting')
# 停止所有通知
for char_uuid in list(self.notification_handlers.keys()):
try:
await self.stop_notifications_pywinrt(char_uuid)
except Exception as e:
print(f"停止通知 {char_uuid} 时出错: {e}")
# 清理资源
self.characteristics.clear()
self.notification_handlers.clear()
self.bluetooth_device = None
# 更新状态
self.update_connection_state('disconnected',
device_id=None,
device_name=None,
connection_time=None,
services_discovered=False,
characteristics_discovered=False,
notifications_active=set())
print("已断开 pywinrt 蓝牙连接")
except Exception as e:
print(f"断开连接时出错: {e}")
self.record_error('disconnect_exception', str(e))
self.update_connection_state('error')
def is_connected_pywinrt(self) -> bool:
"""检查 pywinrt 连接状态"""
if not USE_PYWINRT or not self.bluetooth_device:
return False
return self.bluetooth_device.connection_status == bt.BluetoothConnectionStatus.CONNECTED
def set_timeouts(self, scan_timeout: float = None, connection_timeout: float = None, operation_timeout: float = None):
"""设置超时时间"""
if scan_timeout is not None:
self.scan_timeout = scan_timeout
if connection_timeout is not None:
self.connection_timeout = connection_timeout
if operation_timeout is not None:
self.operation_timeout = operation_timeout
def update_connection_state(self, status: str, **kwargs):
"""更新连接状态"""
old_status = self.connection_state['status']
self.connection_state['status'] = status
self.connection_state['last_activity'] = time.time()
# 更新其他状态信息
for key, value in kwargs.items():
if key in self.connection_state:
self.connection_state[key] = value
# 记录状态变化历史
state_change = {
'timestamp': time.time(),
'old_status': old_status,
'new_status': status,
'details': kwargs
}
self.connection_state['connection_history'].append(state_change)
# 保持历史记录在合理范围内
if len(self.connection_state['connection_history']) > 100:
self.connection_state['connection_history'] = self.connection_state['connection_history'][-50:]
# 触发状态变化回调
for callback in self.state_change_callbacks:
try:
callback(old_status, status, kwargs)
except Exception as e:
print(f"状态变化回调出错: {e}")
print(f"连接状态变化: {old_status} -> {status}")
def add_state_change_callback(self, callback: Callable):
"""添加状态变化回调"""
self.state_change_callbacks.append(callback)
def remove_state_change_callback(self, callback: Callable):
"""移除状态变化回调"""
if callback in self.state_change_callbacks:
self.state_change_callbacks.remove(callback)
def get_connection_state(self) -> Dict:
"""获取当前连接状态"""
return self.connection_state.copy()
def get_connection_quality(self) -> str:
"""评估连接质量"""
if not self.is_connected():
return 'disconnected'
# 基于错误计数和重试次数评估连接质量
error_count = self.connection_state['error_count']
retry_count = self.connection_state['retry_count']
if error_count == 0 and retry_count == 0:
return 'excellent'
elif error_count <= 2 and retry_count <= 1:
return 'good'
elif error_count <= 5 and retry_count <= 2:
return 'fair'
else:
return 'poor'
def is_connected(self) -> bool:
"""检查是否已连接"""
if USE_PYWINRT:
return (self.connection_state['status'] == 'connected' and
self.bluetooth_device and
self.bluetooth_device.connection_status == bt.BluetoothConnectionStatus.CONNECTED)
else:
return (self.connection_state['status'] == 'connected' and
self.client and
self.client.is_connected)
def is_connecting(self) -> bool:
"""检查是否正在连接"""
return self.connection_state['status'] == 'connecting'
def is_disconnecting(self) -> bool:
"""检查是否正在断开连接"""
return self.connection_state['status'] == 'disconnecting'
def has_error(self) -> bool:
"""检查是否有错误"""
return self.connection_state['status'] == 'error'
def get_connection_duration(self) -> float:
"""获取连接持续时间(秒)"""
if self.connection_state['connection_time']:
return time.time() - self.connection_state['connection_time']
return 0.0
def get_last_activity_duration(self) -> float:
"""获取最后活动时间(秒)"""
if self.connection_state['last_activity']:
return time.time() - self.connection_state['last_activity']
return 0.0
def record_error(self, error_type: str, error_message: str):
"""记录错误"""
self.connection_state['error_count'] += 1
error_record = {
'timestamp': time.time(),
'type': error_type,
'message': error_message,
'status': self.connection_state['status']
}
self.connection_state['error_history'].append(error_record)
# 保持错误历史在合理范围内
if len(self.connection_state['error_history']) > 50:
self.connection_state['error_history'] = self.connection_state['error_history'][-25:]
# 更新连接质量
self.connection_state['connection_quality'] = self.get_connection_quality()
print(f"记录错误: {error_type} - {error_message}")
def reset_error_count(self):
"""重置错误计数"""
self.connection_state['error_count'] = 0
self.connection_state['retry_count'] = 0
self.connection_state['connection_quality'] = self.get_connection_quality()
async def start_connection_monitor(self):
"""启动连接监控"""
if self.connection_monitor_task:
return
self.connection_monitor_task = asyncio.create_task(self._connection_monitor_loop())
async def stop_connection_monitor(self):
"""停止连接监控"""
if self.connection_monitor_task:
self.connection_monitor_task.cancel()
try:
await self.connection_monitor_task
except asyncio.CancelledError:
pass
self.connection_monitor_task = None
async def _connection_monitor_loop(self):
"""连接监控循环"""
while True:
try:
await asyncio.sleep(1.0) # 每秒检查一次
if self.connection_state['status'] == 'connected':
# 检查连接是否仍然有效
if not self.is_connected():
print("检测到连接丢失")
self.update_connection_state('error', error_type='connection_lost')
# 自动重连
if self.auto_reconnect:
await self._attempt_auto_reconnect()
elif self.connection_state['status'] == 'error' and self.auto_reconnect:
# 尝试自动重连
await self._attempt_auto_reconnect()
except asyncio.CancelledError:
break
except Exception as e:
print(f"连接监控出错: {e}")
await asyncio.sleep(5.0) # 出错时等待更长时间
async def _attempt_auto_reconnect(self):
"""尝试自动重连"""
if self.connection_state['retry_count'] >= self.connection_state['max_retries']:
print("达到最大重试次数,停止自动重连")
return
device_id = self.connection_state['device_id']
if not device_id:
print("没有设备ID,无法自动重连")
return
print(f"尝试自动重连 (第{self.connection_state['retry_count'] + 1}次)")
self.connection_state['retry_count'] += 1
await asyncio.sleep(self.auto_reconnect_interval)
if USE_PYWINRT:
success = await self.connect_pywinrt_device(device_id)
else:
# 非 Windows 系统的重连逻辑
if self.client:
await self.client.connect()
success = self.client.is_connected
if success:
print("自动重连成功")
self.reset_error_count()
else:
print("自动重连失败")
self.record_error('auto_reconnect_failed', f'重连尝试 {self.connection_state["retry_count"]} 失败')
def detection_callback(self, device, advertisement_data):
if any(service_uuid in advertisement_data.service_uuids for service_uuid in self.services):
self.target_device = (device, advertisement_data)
if not self.target_device:
print("未找到匹配设备")
return
else:
device, adv_data = self.target_device
print("\n找到目标设备:")
print(f"设备名称: {device.name}")
print(f"设备地址: {device.address}")
print(f"信号强度: {device.rssi} dBm")
print("\n广播信息:")
print(f"服务UUID列表: {adv_data.service_uuids}")
print(f"制造商数据: {adv_data.manufacturer_data}")
print(f"服务数据: {adv_data.service_data}")
print(f"本地名称: {adv_data.local_name}")
return self.target_device
async def handle_client(self, websocket, path):
self.websocket = websocket
if path != "/scratch/ble":
await websocket.close(code=1003, reason="Path not allowed")
return
try:
async for message in websocket:
try:
await log_message("接收", message)
request = json.loads(message)
if request["jsonrpc"] != "2.0":
continue
method = request.get("method")
params = request.get("params", {})
request_id = request.get("id")
if method == "discover":
self.services = []
for filt in params.get("filters", [{}]):
self.services.extend(filt.get("services", []))
self.optional_services = params.get("optionalServices", [])
if USE_PYWINRT:
# 使用 pywinrt 进行设备发现
discovered_devices = await self.start_pywinrt_scan(self.services)
if discovered_devices:
# 选择第一个设备(可以根据需要修改选择逻辑)
device_info = discovered_devices[0]
discover_response = json.dumps({
"jsonrpc": "2.0",
"method": "didDiscoverPeripheral",
"params": {
"name": device_info['name'],
"peripheralId": device_info['id'],
"rssi": -50 # pywinrt 不直接提供 RSSI,使用默认值
}
})
await log_message("下发", discover_response)
await websocket.send(discover_response)
result_response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", result_response)
await websocket.send(result_response)
else:
# 未找到设备
error_response = json.dumps({
"jsonrpc": "2.0",
"error": {"code": -1, "message": "未找到匹配的蓝牙设备"},
"id": request_id
})
await log_message("下发", error_response)
await websocket.send(error_response)
else:
# 使用 bleak 进行设备发现(非 Windows 系统)
scanner = BleakScanner(scanning_mode="active")
scanner.register_detection_callback(self.detection_callback)
max_retries = 3
found = False
for attempt in range(max_retries):
self.target_device = None
await scanner.start()
await asyncio.sleep(5)
await scanner.stop()
if self.target_device:
found = True
break
if attempt < max_retries - 1:
print(f"未找到设备,第{attempt+1}次重试...")
await asyncio.sleep(3)
if found:
device, adv_data = self.target_device
discover_response = json.dumps({
"jsonrpc": "2.0",
"method": "didDiscoverPeripheral",
"params": {
"name": device.name,
"peripheralId": device.address,
"rssi": device.rssi
}
})
await log_message("下发", discover_response)
await websocket.send(discover_response)
result_response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", result_response)
await websocket.send(result_response)
else:
error_response = json.dumps({
"jsonrpc": "2.0",
"error": {"code": -1, "message": "未找到匹配的蓝牙设备"},
"id": request_id
})
await log_message("下发", error_response)
await websocket.send(error_response)
elif method == "connect":
peripheral_id = params.get("peripheralId")
if peripheral_id:
if USE_PYWINRT:
# 使用 pywinrt 连接
success = await self.connect_pywinrt_device(peripheral_id)
if success:
response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
else:
error_response = json.dumps({
"jsonrpc": "2.0",
"error": {"code": -1, "message": "连接设备失败"},
"id": request_id
})
await log_message("下发", error_response)
await websocket.send(error_response)
else:
# 使用 bleak 连接
self.client = BleakClient(peripheral_id)
self.client.set_disconnected_callback(self.on_disconnect)
await self.client.connect()
if self.client.is_connected:
response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
else:
error_response = json.dumps({
"jsonrpc": "2.0",
"error": {"code": -1, "message": "连接设备失败"},
"id": request_id
})
await log_message("下发", error_response)
await websocket.send(error_response)
elif method == "write":
service_id = params.get("serviceId")
characteristic_id = params.get("characteristicId")
message = params.get("message")
encoding = params.get("encoding", "utf-8")
if all([service_id, characteristic_id, message]):
if encoding == "base64":
message_bytes = base64.b64decode(message)
print("write message_bytes", message_bytes)
else:
print("write message", message)
message_bytes = message.encode(encoding)
if USE_PYWINRT:
# 使用 pywinrt 写入
success = await self.write_characteristic_pywinrt(characteristic_id, message_bytes)
if success:
response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
else:
error_response = json.dumps({
"jsonrpc": "2.0",
"error": {"code": -1, "message": "写入特征失败"},
"id": request_id
})
await log_message("下发", error_response)
await websocket.send(error_response)
else:
# 使用 bleak 写入
await self.client.write_gatt_char(characteristic_id, message_bytes)
response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
elif method == "read":
service_id = params.get("serviceId")
characteristic_id = params.get("characteristicId")
if all([service_id, characteristic_id]):
if USE_PYWINRT:
# 使用 pywinrt 读取
data = await self.read_characteristic_pywinrt(characteristic_id)
if data is not None:
print('read-data', data)
response = json.dumps({
"jsonrpc": "2.0",
"result": {
"serviceId": service_id,
"characteristicId": characteristic_id,
"message": base64.b64encode(data).decode("utf-8")
},
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
else:
error_response = json.dumps({
"jsonrpc": "2.0",
"error": {"code": -1, "message": "读取特征失败"},
"id": request_id
})
await log_message("下发", error_response)
await websocket.send(error_response)
else:
# 使用 bleak 读取
data = await self.client.read_gatt_char(characteristic_id)
print('read-data', data)
response = json.dumps({
"jsonrpc": "2.0",
"result": {
"serviceId": service_id,
"characteristicId": characteristic_id,
"message": base64.b64encode(data).decode("utf-8")
},
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
elif method == "startNotifications":
service_id = params.get("serviceId")
characteristic_id = params.get("characteristicId")
if all([service_id, characteristic_id]):
if USE_PYWINRT:
# 使用 pywinrt 启动通知
async def pywinrt_notification_callback(data: bytes):
await self.pywinrt_notification_handler(websocket, service_id, characteristic_id, data)
success = await self.start_notifications_pywinrt(characteristic_id, pywinrt_notification_callback)
if success:
response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
else:
error_response = json.dumps({
"jsonrpc": "2.0",
"error": {"code": -1, "message": "启动通知失败"},
"id": request_id
})
await log_message("下发", error_response)
await websocket.send(error_response)
else:
# 使用 bleak 启动通知
await self.client.start_notify(
characteristic_id,
self.notification_handler(websocket, service_id, characteristic_id)
)
response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
elif method == "discoverServices":
# 发现设备服务
if USE_PYWINRT and self.bluetooth_device:
services = await self.discover_services_pywinrt()
response = json.dumps({
"jsonrpc": "2.0",
"result": {"services": [s['uuid'] for s in services]},
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
else:
error_response = json.dumps({
"jsonrpc": "2.0",
"error": {"code": -1, "message": "服务发现功能仅在 Windows 系统上可用"},
"id": request_id
})
await log_message("下发", error_response)
await websocket.send(error_response)
elif method == "discoverCharacteristics":
# 发现服务特征
service_id = params.get("serviceId")
if service_id and USE_PYWINRT and self.bluetooth_device:
characteristics = await self.discover_characteristics_pywinrt(service_id)
response = json.dumps({
"jsonrpc": "2.0",
"result": {
"characteristics": [
{
"uuid": char['uuid'],
"properties": char['properties']
} for char in characteristics
]
},
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
else:
error_response = json.dumps({
"jsonrpc": "2.0",
"error": {"code": -1, "message": "特征发现功能仅在 Windows 系统上可用"},
"id": request_id
})
await log_message("下发", error_response)
await websocket.send(error_response)
elif method == "disconnect":
# 断开连接
if USE_PYWINRT:
await self.disconnect_pywinrt()
else:
if self.client and self.client.is_connected:
await self.client.disconnect()
self.client = None
response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
elif method == "getConnectionStatus":
# 获取连接状态
if USE_PYWINRT:
is_connected = self.is_connected_pywinrt()
else:
is_connected = self.client and self.client.is_connected
response = json.dumps({
"jsonrpc": "2.0",
"result": {"connected": is_connected},
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
elif method == "setTimeouts":
# 设置超时时间
scan_timeout = params.get("scanTimeout")
connection_timeout = params.get("connectionTimeout")
operation_timeout = params.get("operationTimeout")
self.set_timeouts(scan_timeout, connection_timeout, operation_timeout)
response = json.dumps({
"jsonrpc": "2.0",
"result": {
"scanTimeout": self.scan_timeout,
"connectionTimeout": self.connection_timeout,
"operationTimeout": self.operation_timeout
},
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
elif method == "ping":
# 处理ping请求,返回pong响应
response = json.dumps({
"jsonrpc": "2.0",
"result": {"pong": True, "timestamp": int(time.time())},
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
except json.JSONDecodeError:
error_msg = json.dumps({
"jsonrpc": "2.0",
"result": {"message": "Parse error"},
"id": None
})
await log_message("下发", error_msg)
except Exception as e:
error_msg = json.dumps({
"jsonrpc": "2.0",
"result": {"message": str(e)},
"id": request.get("id") if request else None
})
await log_message("下发", error_msg)
except websockets.exceptions.ConnectionClosed:
print("WebSocket连接关闭")
finally:
# 清理连接
if USE_PYWINRT:
await self.disconnect_pywinrt()
else:
if self.client and self.client.is_connected:
await self.client.disconnect()
self.client = None
self.target_device = None
async def pywinrt_notification_handler(self, websocket, service_id, characteristic_id, data: bytes):
"""pywinrt 通知处理器"""
try:
current_time = asyncio.get_event_loop().time()
last_message, last_time = self.notification_records[characteristic_id]
# 解码当前数据用于比较
current_message = base64.b64encode(data).decode('utf-8')
print('pywinrt_notification_handler current_message', data)
# 过滤逻辑
if current_message == last_message and (current_time - last_time) < 0.5:
return
# 更新记录
self.notification_records[characteristic_id] = (current_message, current_time)
response = json.dumps({
"jsonrpc": "2.0",
"method": "characteristicDidChange",
"params": {
"serviceId": service_id,
"characteristicId": characteristic_id,
"message": current_message
}
})
await log_message("下发", response)
await websocket.send(response)
except Exception as e:
print(f"pywinrt 通知处理出错: {e}")
def notification_handler(self, websocket, service_id, characteristic_id):
async def callback(sender, data):
current_time = asyncio.get_event_loop().time()
last_message, last_time = self.notification_records[characteristic_id]
# 解码当前数据用于比较
current_message = base64.b64encode(data).decode('utf-8')
print('notification_handler current_message',base64.b64decode(current_message))
# 过滤逻辑
if current_message == last_message and (current_time - last_time) < 0.5:
return
# 更新记录
self.notification_records[characteristic_id] = (current_message, current_time)
response = json.dumps({
"jsonrpc": "2.0",
"method": "characteristicDidChange",
"params": {
"serviceId": service_id,
"characteristicId": characteristic_id,
"message": current_message
}
})
await log_message("下发", response)
await websocket.send(response)
return callback
async def check_port_and_start_server(port=20111, host='localhost'):
"""检查端口并启动服务器"""
if is_port_in_use(port, host):
print(f"错误: 端口 {port} 已被占用,无法启动服务")
return False
print(f"端口 {port} 可用,正在启动服务...")
server = await websockets.serve(
lambda websocket, path: BLEClient().handle_client(websocket, path),
host, port
)
print(f"WebSocket服务已启动: ws://{host}:{port}/scratch/ble")
print("日志文件路径: ./b.log")
# 执行自检测试
try:
async with websockets.connect(f"ws://{host}:{port}/scratch/ble") as websocket:
print("正在执行自检测试...")
test_result = await self_test(websocket)
if test_result:
print("自检测试成功: 服务正常运行")
else:
print("自检测试失败: 服务可能存在问题")
except Exception as e:
print(f"自检测试异常: {str(e)}")
return server
async def main():
server = await check_port_and_start_server()
if server:
await asyncio.Future() # 保持服务器运行
if __name__ == "__main__":
asyncio.run(main())
\ No newline at end of file
......@@ -186,7 +186,7 @@ class BLEClient:
"result": None,
"id": request_id
})
await log_message("下发", response)
# await log_message("下发", response)
await websocket.send(response)
elif method == "write":
......@@ -198,9 +198,12 @@ class BLEClient:
if all([service_id, characteristic_id, message]):
if encoding == "base64":
message_bytes = base64.b64decode(message)
print("write message_bytes",message_bytes)
else:
print("write message",message)
message_bytes = message.encode(encoding)
await self.client.write_gatt_char(characteristic_id, message_bytes)
response = json.dumps({
"jsonrpc": "2.0",
......@@ -216,6 +219,7 @@ class BLEClient:
if all([service_id, characteristic_id]):
data = await self.client.read_gatt_char(characteristic_id)
print('read-data',base64.decode(data))
response = json.dumps({
"jsonrpc": "2.0",
"result": {
......@@ -285,6 +289,7 @@ class BLEClient:
# 解码当前数据用于比较
current_message = base64.b64encode(data).decode('utf-8')
print('notification_handler current_message',base64.b64decode(current_message))
# 过滤逻辑
if current_message == last_message and (current_time - last_time) < 0.5:
......
# -*- coding: utf-8 -*-
import sys
import asyncio
import websockets
from bleak import BleakScanner, BleakClient
import json
import base64
import threading
from collections import defaultdict
import socket
import time
import platform
# 方法1:通过sys模块快速判断(推荐)
if sys.platform.startswith('win32'):
import winrt.windows.foundation.collections # noqa
import winrt.windows.devices.bluetooth # noqa
import winrt.windows.devices.bluetooth.advertisement # noq
print("当前运行在Windows系统")
elif sys.platform.startswith('linux'):
print("当前运行在Linux系统")
elif sys.platform.startswith('darwin'):
print("当前运行在macOS系统")
# 添加线程锁以确保日志写入的原子性
write_lock = threading.Lock()
def log_message_sync(direction, message):
"""同步日志记录函数"""
log_entry = f"{direction}: {message}\n"
print(log_entry, end='') # 控制台仍然输出
with write_lock:
with open('b.log', 'a', encoding='utf-8') as f:
f.write(log_entry)
async def log_message(direction, message):
"""异步封装日志记录"""
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, log_message_sync, direction, message)
def is_port_in_use(port, host='localhost'):
"""检查端口是否被占用"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind((host, port))
return False
except socket.error:
return True
async def self_test(websocket):
"""发送自检测试消息"""
test_message = json.dumps({
"jsonrpc": "2.0",
"method": "ping",
"params": {"timestamp": int(time.time())},
"id": "test"
})
await log_message("自测", test_message)
await websocket.send(test_message)
try:
# 等待响应,设置超时
response = await asyncio.wait_for(websocket.recv(), timeout=5.0)
await log_message("自测响应", response)
return True
except asyncio.TimeoutError:
await log_message("自测", "自测超时,未收到响应")
return False
except Exception as e:
await log_message("自测", f"自测异常: {str(e)}")
return False
class BLEClient:
def __init__(self):
self.target_device = None
self.client = None
self.services = []
self.optional_services = []
self.websocket = None
self.notification_records = defaultdict(lambda: (None, 0)) # 特征ID: (最后消息, 时间戳)
def on_disconnect(self, client):
print("BLE连接断开,关闭WebSocket")
if self.websocket and not self.websocket.closed:
asyncio.create_task(self.close_websocket())
async def close_websocket(self):
await self.websocket.close()
self.websocket = None
def detection_callback(self, device, advertisement_data):
if any(service_uuid in advertisement_data.service_uuids for service_uuid in self.services):
self.target_device = (device, advertisement_data)
if not self.target_device:
print("未找到匹配设备")
return
else:
device, adv_data = self.target_device
print("\n找到目标设备:")
print(f"设备名称: {device.name}")
print(f"设备地址: {device.address}")
print(f"信号强度: {device.rssi} dBm")
print("\n广播信息:")
print(f"服务UUID列表: {adv_data.service_uuids}")
print(f"制造商数据: {adv_data.manufacturer_data}")
print(f"服务数据: {adv_data.service_data}")
print(f"本地名称: {adv_data.local_name}")
return self.target_device
async def handle_client(self, websocket, path):
self.websocket = websocket
if path != "/scratch/ble":
await websocket.close(code=1003, reason="Path not allowed")
return
try:
async for message in websocket:
try:
await log_message("接收", message)
request = json.loads(message)
if request["jsonrpc"] != "2.0":
continue
method = request.get("method")
params = request.get("params", {})
request_id = request.get("id")
if method == "discover":
self.services = []
for filt in params.get("filters", [{}]):
self.services.extend(filt.get("services", []))
self.optional_services = params.get("optionalServices", [])
scanner = BleakScanner(scanning_mode="active")
scanner.register_detection_callback(self.detection_callback)
max_retries = 3
found = False
for attempt in range(max_retries):
self.target_device = None
await scanner.start()
await asyncio.sleep(5)
await scanner.stop()
if self.target_device:
found = True
break
if attempt < max_retries - 1:
print(f"未找到设备,第{attempt+1}次重试...")
await asyncio.sleep(3)
if found:
device, adv_data = self.target_device
discover_response = json.dumps({
"jsonrpc": "2.0",
"method": "didDiscoverPeripheral",
"params": {
"name": device.name,
"peripheralId": device.address,
"rssi": device.rssi
}
})
await log_message("下发", discover_response)
await websocket.send(discover_response)
result_response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", result_response)
await websocket.send(result_response)
elif method == "connect":
peripheral_id = params.get("peripheralId")
if peripheral_id:
self.client = BleakClient(peripheral_id)
self.client.set_disconnected_callback(self.on_disconnect)
await self.client.connect()
if self.client.is_connected:
response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
# await log_message("下发", response)
await websocket.send(response)
elif method == "write":
service_id = params.get("serviceId")
characteristic_id = params.get("characteristicId")
message = params.get("message")
encoding = params.get("encoding", "utf-8")
if all([service_id, characteristic_id, message]):
if encoding == "base64":
message_bytes = base64.b64decode(message)
print("write message_bytes",message_bytes)
else:
print("write message",message)
message_bytes = message.encode(encoding)
await self.client.write_gatt_char(characteristic_id, message_bytes)
response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
elif method == "read":
service_id = params.get("serviceId")
characteristic_id = params.get("characteristicId")
if all([service_id, characteristic_id]):
data = await self.client.read_gatt_char(characteristic_id)
print('read-data',base64.decode(data))
response = json.dumps({
"jsonrpc": "2.0",
"result": {
"serviceId": service_id,
"characteristicId": characteristic_id,
"message": base64.b64encode(data).decode("utf-8")
},
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
elif method == "startNotifications":
service_id = params.get("serviceId")
characteristic_id = params.get("characteristicId")
if all([service_id, characteristic_id]):
await self.client.start_notify(
characteristic_id,
self.notification_handler(websocket, service_id, characteristic_id)
)
response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
elif method == "ping":
# 处理ping请求,返回pong响应
response = json.dumps({
"jsonrpc": "2.0",
"result": {"pong": True, "timestamp": int(time.time())},
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
except json.JSONDecodeError:
error_msg = json.dumps({
"jsonrpc": "2.0",
"result": {"message": "Parse error"},
"id": None
})
await log_message("下发", error_msg)
except Exception as e:
error_msg = json.dumps({
"jsonrpc": "2.0",
"result": {"message": str(e)},
"id": request.get("id") if request else None
})
await log_message("下发", error_msg)
except websockets.exceptions.ConnectionClosed:
print("WebSocket连接关闭")
finally:
if self.client and self.client.is_connected:
await self.client.disconnect()
self.client = None
self.target_device = None
def notification_handler(self, websocket, service_id, characteristic_id):
async def callback(sender, data):
current_time = asyncio.get_event_loop().time()
last_message, last_time = self.notification_records[characteristic_id]
# 解码当前数据用于比较
current_message = base64.b64encode(data).decode('utf-8')
print('notification_handler current_message',base64.b64decode(current_message))
# 过滤逻辑
if current_message == last_message and (current_time - last_time) < 0.5:
return
# 更新记录
self.notification_records[characteristic_id] = (current_message, current_time)
response = json.dumps({
"jsonrpc": "2.0",
"method": "characteristicDidChange",
"params": {
"serviceId": service_id,
"characteristicId": characteristic_id,
"message": current_message
}
})
await log_message("下发", response)
await websocket.send(response)
return callback
async def check_port_and_start_server(port=20111, host='localhost'):
"""检查端口并启动服务器"""
if is_port_in_use(port, host):
print(f"错误: 端口 {port} 已被占用,无法启动服务")
return False
print(f"端口 {port} 可用,正在启动服务...")
server = await websockets.serve(
lambda websocket, path: BLEClient().handle_client(websocket, path),
host, port
)
print(f"WebSocket服务已启动: ws://{host}:{port}/scratch/ble")
print("日志文件路径: ./b.log")
# 执行自检测试
try:
async with websockets.connect(f"ws://{host}:{port}/scratch/ble") as websocket:
print("正在执行自检测试...")
test_result = await self_test(websocket)
if test_result:
print("自检测试成功: 服务正常运行")
else:
print("自检测试失败: 服务可能存在问题")
except Exception as e:
print(f"自检测试异常: {str(e)}")
return server
async def main():
server = await check_port_and_start_server()
if server:
await asyncio.Future() # 保持服务器运行
if __name__ == "__main__":
asyncio.run(main())
\ No newline at end of file
# -*- coding: utf-8 -*-
import sys
import asyncio
import websockets
import json
import base64
import threading
from collections import defaultdict
import socket
import time
import platform
# 在导入其他模块前处理Windows特有的COM初始化问题
if sys.platform.startswith('win32'):
print("Windows系统: 处理COM初始化")
try:
from bleak.backends.winrt.util import uninitialize_sta, allow_sta, assert_mta
# 取消其他库可能设置的STA初始化
uninitialize_sta()
# 允许STA模式运行(需要确保有Windows事件循环)
allow_sta()
except ImportError:
print("警告: 无法导入Windows特有的bleak工具")
except Exception as e:
print(f"Windows初始化错误: {str(e)}")
# 导入其他模块
from bleak import BleakScanner, BleakClient
# 添加线程锁以确保日志写入的原子性
write_lock = threading.Lock()
def log_message_sync(direction, message):
"""同步日志记录函数"""
log_entry = f"{direction}: {message}\n"
print(log_entry, end='') # 控制台仍然输出
with write_lock:
with open('b.log', 'a', encoding='utf-8') as f:
f.write(log_entry)
async def log_message(direction, message):
"""异步封装日志记录"""
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, log_message_sync, direction, message)
def is_port_in_use(port, host='localhost'):
"""检查端口是否被占用"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind((host, port))
return False
except socket.error:
return True
async def self_test(websocket):
"""发送自检测试消息"""
test_message = json.dumps({
"jsonrpc": "2.0",
"method": "ping",
"params": {"timestamp": int(time.time())},
"id": "test"
})
await log_message("自测", test_message)
await websocket.send(test_message)
try:
# 等待响应,设置超时
response = await asyncio.wait_for(websocket.recv(), timeout=5.0)
await log_message("自测响应", response)
return True
except asyncio.TimeoutError:
await log_message("自测", "自测超时,未收到响应")
return False
except Exception as e:
await log_message("自测", f"自测异常: {str(e)}")
return False
class BLEClient:
def __init__(self):
self.target_device = None
self.client = None
self.services = []
self.optional_services = []
self.websocket = None
self.notification_records = defaultdict(lambda: (None, 0)) # 特征ID: (最后消息, 时间戳)
self.is_windows = sys.platform.startswith('win32')
def on_disconnect(self, client):
print("BLE连接断开,关闭WebSocket")
if self.websocket and not self.websocket.closed:
asyncio.create_task(self.close_websocket())
async def close_websocket(self):
await self.websocket.close()
self.websocket = None
def detection_callback(self, device, advertisement_data):
if any(service_uuid in advertisement_data.service_uuids for service_uuid in self.services):
self.target_device = (device, advertisement_data)
if not self.target_device:
print("未找到匹配设备")
return
else:
device, adv_data = self.target_device
print("\n找到目标设备:")
print(f"设备名称: {device.name}")
print(f"设备地址: {device.address}")
print(f"信号强度: {device.rssi} dBm")
print("\n广播信息:")
print(f"服务UUID列表: {adv_data.service_uuids}")
print(f"制造商数据: {adv_data.manufacturer_data}")
print(f"服务数据: {adv_data.service_data}")
print(f"本地名称: {adv_data.local_name}")
return self.target_device
async def create_bleak_client(self, address):
"""创建BleakClient,考虑Windows平台的特性"""
if self.is_windows:
print("Windows系统: 使用带地址类型的客户端")
winrt_args = {"address_type": "public"} # 或 "random"
return BleakClient(address, winrt=winrt_args)
else:
return BleakClient(address)
async def handle_client(self, websocket, path):
self.websocket = websocket
if path != "/scratch/ble":
await websocket.close(code=1003, reason="Path not allowed")
return
try:
async for message in websocket:
try:
await log_message("接收", message)
request = json.loads(message)
if request["jsonrpc"] != "2.0":
continue
method = request.get("method")
params = request.get("params", {})
request_id = request.get("id")
if method == "discover":
self.services = []
for filt in params.get("filters", [{}]):
self.services.extend(filt.get("services", []))
self.optional_services = params.get("optionalServices", [])
# Windows使用特定扫描模式配置
scanning_args = {"scanning_mode": "active"}
scanner = BleakScanner(scanning_mode="active")
scanner.register_detection_callback(self.detection_callback)
max_retries = 3
found = False
for attempt in range(max_retries):
self.target_device = None
await scanner.start()
await asyncio.sleep(5)
await scanner.stop()
if self.target_device:
found = True
break
if attempt < max_retries - 1:
print(f"未找到设备,第{attempt+1}次重试...")
await asyncio.sleep(3)
if found:
device, adv_data = self.target_device
discover_response = json.dumps({
"jsonrpc": "2.0",
"method": "didDiscoverPeripheral",
"params": {
"name": device.name,
"peripheralId": device.address,
"rssi": device.rssi
}
})
await log_message("下发", discover_response)
await websocket.send(discover_response)
result_response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", result_response)
await websocket.send(result_response)
elif method == "connect":
peripheral_id = params.get("peripheralId")
if peripheral_id:
self.client = await self.create_bleak_client(peripheral_id)
self.client.set_disconnected_callback(self.on_disconnect)
await self.client.connect()
if self.client.is_connected:
response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await websocket.send(response)
elif method == "write":
service_id = params.get("serviceId")
characteristic_id = params.get("characteristicId")
message = params.get("message")
encoding = params.get("encoding", "utf-8")
if all([service_id, characteristic_id, message]):
if encoding == "base64":
message_bytes = base64.b64decode(message)
else:
message_bytes = message.encode(encoding)
await self.client.write_gatt_char(characteristic_id, message_bytes)
response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
elif method == "read":
service_id = params.get("serviceId")
characteristic_id = params.get("characteristicId")
if all([service_id, characteristic_id]):
data = await self.client.read_gatt_char(characteristic_id)
response = json.dumps({
"jsonrpc": "2.0",
"result": {
"serviceId": service_id,
"characteristicId": characteristic_id,
"message": base64.b64encode(data).decode("utf-8")
},
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
elif method == "startNotifications":
service_id = params.get("serviceId")
characteristic_id = params.get("characteristicId")
if all([service_id, characteristic_id]):
await self.client.start_notify(
characteristic_id,
self.notification_handler(websocket, service_id, characteristic_id)
)
response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
elif method == "ping":
# 处理ping请求,返回pong响应
response = json.dumps({
"jsonrpc": "2.0",
"result": {"pong": True, "timestamp": int(time.time())},
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
except json.JSONDecodeError:
error_msg = json.dumps({
"jsonrpc": "2.0",
"error": {"message": "Parse error", "code": -32700},
"id": None
})
await log_message("下发", error_msg)
await websocket.send(error_msg)
except Exception as e:
error_msg = json.dumps({
"jsonrpc": "2.0",
"error": {"message": str(e), "code": -32000},
"id": request.get("id") if request else None
})
await log_message("下发", error_msg)
await websocket.send(error_msg)
except websockets.exceptions.ConnectionClosed:
print("WebSocket连接关闭")
finally:
if self.client and self.client.is_connected:
await self.client.disconnect()
self.client = None
self.target_device = None
def notification_handler(self, websocket, service_id, characteristic_id):
async def callback(sender, data):
current_time = asyncio.get_event_loop().time()
last_message, last_time = self.notification_records[characteristic_id]
# 解码当前数据用于比较
current_message = base64.b64encode(data).decode('utf-8')
# 过滤逻辑
if current_message == last_message and (current_time - last_time) < 0.5:
return
# 更新记录
self.notification_records[characteristic_id] = (current_message, current_time)
response = json.dumps({
"jsonrpc": "2.0",
"method": "characteristicDidChange",
"params": {
"serviceId": service_id,
"characteristicId": characteristic_id,
"message": current_message
}
})
await log_message("下发", response)
await websocket.send(response)
return callback
async def check_port_and_start_server(port=20111, host='localhost'):
"""检查端口并启动服务器"""
if is_port_in_use(port, host):
print(f"错误: 端口 {port} 已被占用,无法启动服务")
return False
print(f"端口 {port} 可用,正在启动服务...")
server = await websockets.serve(
lambda websocket, path: BLEClient().handle_client(websocket, path),
host, port
)
print(f"WebSocket服务已启动: ws://{host}:{port}/scratch/ble")
print("日志文件路径: ./b.log")
# 执行自检测试
try:
async with websockets.connect(f"ws://{host}:{port}/scratch/ble") as websocket:
print("正在执行自检测试...")
test_result = await self_test(websocket)
if test_result:
print("自检测试成功: 服务正常运行")
else:
print("自检测试失败: 服务可能存在问题")
except Exception as e:
print(f"自检测试异常: {str(e)}")
return server
async def main():
# Windows上保证事件循环设置正确
if sys.platform.startswith('win32'):
import asyncio
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
server = await check_port_and_start_server()
if server:
await asyncio.Future() # 保持服务器运行
if __name__ == "__main__":
asyncio.run(main())
\ No newline at end of file
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!