s6.8.2.py 11.5 KB
import asyncio
import websockets
from bleak import BleakScanner, BleakClient
import json
import base64
import threading
from collections import defaultdict

# 添加线程锁以确保日志写入的原子性
write_lock = threading.Lock()

def log_message_sync(direction, message):
    """同步日志记录函数"""
    log_entry = f"{direction}: {message}\n"
    print(log_entry, end='')  # 控制台仍然输出
    with write_lock:
        with open('b.log', 'a', encoding='utf-8') as f:
            f.write(log_entry)

async def log_message(direction, message):
    """异步封装日志记录"""
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(None, log_message_sync, direction, message)

class BLEClient:
    def __init__(self):
        self.target_device = None
        self.client = None
        self.services = []
        self.optional_services = []
        self.websocket = None
        self.notification_records = defaultdict(lambda: (None, 0))  # 特征ID: (最后消息, 时间戳)

    def on_disconnect(self, client):
        print("BLE连接断开,关闭WebSocket")
        if self.websocket and not self.websocket.closed:
            asyncio.create_task(self.close_websocket())

    async def close_websocket(self):
        await self.websocket.close()
        self.websocket = None

    def detection_callback(self, device, advertisement_data):
        if any(service_uuid in advertisement_data.service_uuids for service_uuid in self.services):
            self.target_device = (device, advertisement_data)
            if not self.target_device:
                print("未找到匹配设备")
                return
            else:
                device, adv_data = self.target_device
                print("\n找到目标设备:")
                print(f"设备名称: {device.name}")
                print(f"设备地址: {device.address}")
                print(f"信号强度: {device.rssi} dBm")
                print("\n广播信息:")
                print(f"服务UUID列表: {adv_data.service_uuids}")
                print(f"制造商数据: {adv_data.manufacturer_data}")
                print(f"服务数据: {adv_data.service_data}")
                print(f"本地名称: {adv_data.local_name}")
            return self.target_device

    async def handle_client(self, websocket, path):
        self.websocket = websocket
        if path != "/scratch/ble":
            await websocket.close(code=1003, reason="Path not allowed")
            return

        try:
            async for message in websocket:
                try:
                    await log_message("接收", message)
                    request = json.loads(message)
                    
                    if request["jsonrpc"] != "2.0":
                        continue

                    method = request.get("method")
                    params = request.get("params", {})
                    request_id = request.get("id")

                    if method == "discover":
                        self.services = []
                        for filt in params.get("filters", [{}]):
                            self.services.extend(filt.get("services", []))
                        self.optional_services = params.get("optionalServices", [])

                        scanner = BleakScanner()
                        scanner.register_detection_callback(self.detection_callback)
                        
                        max_retries = 3
                        found = False
                        for attempt in range(max_retries):
                            self.target_device = None
                            await scanner.start()
                            await asyncio.sleep(5)
                            await scanner.stop()
                            
                            if self.target_device:
                                found = True
                                break
                            
                            if attempt < max_retries - 1:
                                print(f"未找到设备,第{attempt+1}次重试...")
                                await asyncio.sleep(3)

                        if found:
                            device, adv_data = self.target_device
                            discover_response = json.dumps({
                                "jsonrpc": "2.0",
                                "method": "didDiscoverPeripheral",
                                "params": {
                                    "name": device.name,
                                    "peripheralId": device.address,
                                    "rssi": device.rssi
                                }
                            })
                            await log_message("下发", discover_response)
                            await websocket.send(discover_response)

                            result_response = json.dumps({
                                "jsonrpc": "2.0",
                                "result": None,
                                "id": request_id
                            })
                            await log_message("下发", result_response)
                            await websocket.send(result_response)

                    elif method == "connect":
                        peripheral_id = params.get("peripheralId")
                        if peripheral_id:
                            self.client = BleakClient(peripheral_id)
                            self.client.set_disconnected_callback(self.on_disconnect)
                            await self.client.connect()
                            
                            if self.client.is_connected:
                                response = json.dumps({
                                    "jsonrpc": "2.0",
                                    "result": None,
                                    "id": request_id
                                })
                                await log_message("下发", response)
                                await websocket.send(response)

                    elif method == "write":
                        service_id = params.get("serviceId")
                        characteristic_id = params.get("characteristicId")
                        message = params.get("message")
                        encoding = params.get("encoding", "utf-8")

                        if all([service_id, characteristic_id, message]):
                            if encoding == "base64":
                                message_bytes = base64.b64decode(message)
                            else:
                                message_bytes = message.encode(encoding)
                            if "withResponse" in params:
                                response=params.get("withResponse")
                                await self.client.write_gatt_char(characteristic_id, message_bytes,response=False)
                            else:
                                await self.client.write_gatt_char(characteristic_id, message_bytes)

                         #   await self.client.write_gatt_char(characteristic_id, message_bytes)
                            response = json.dumps({
                                "jsonrpc": "2.0",
                                "result": None,
                                "id": request_id
                            })
                            await log_message("下发", response)
                            await websocket.send(response)

                    elif method == "read":
                        service_id = params.get("serviceId")
                        characteristic_id = params.get("characteristicId")

                        if all([service_id, characteristic_id]):
                            data = await self.client.read_gatt_char(characteristic_id)
                            response = json.dumps({
                                "jsonrpc": "2.0",
                                "result": {
                                    "serviceId": service_id,
                                    "characteristicId": characteristic_id,
                                    "message": base64.b64encode(data).decode("utf-8")
                                },
                                "id": request_id
                            })
                            await log_message("下发", response)
                            await websocket.send(response)

                    elif method == "startNotifications":
                        service_id = params.get("serviceId")
                        characteristic_id = params.get("characteristicId")

                        if all([service_id, characteristic_id]):
                            await self.client.start_notify(
                                characteristic_id,
                                self.notification_handler(websocket, service_id, characteristic_id)
                            )
                            response = json.dumps({
                                "jsonrpc": "2.0",
                                "result": None,
                                "id": request_id
                            })
                            await log_message("下发", response)
                            await websocket.send(response)

                except json.JSONDecodeError:
                    error_msg = json.dumps({
                        "jsonrpc": "2.0",
                        "result": {"message": "Parse error"},
                        "id": None
                    })
                    await log_message("下发", error_msg)
                except Exception as e:
                    error_msg = json.dumps({
                        "jsonrpc": "2.0",
                        "result": {"message": str(e)},
                        "id": request.get("id") if request else None
                    })
                    await log_message("下发", error_msg)

        except websockets.exceptions.ConnectionClosed:
            print("WebSocket连接关闭")
        finally:
            if self.client and self.client.is_connected:
                await self.client.disconnect()
                self.client = None
                self.target_device = None

    def notification_handler(self, websocket, service_id, characteristic_id):
        async def callback(sender, data):
            current_time = asyncio.get_event_loop().time()
            last_message, last_time = self.notification_records[characteristic_id]
            
            # 解码当前数据用于比较
            current_message = base64.b64encode(data).decode('utf-8')
            
            # 过滤逻辑
            if current_message == last_message and (current_time - last_time) < 0.5:
                return
            
            # 更新记录
            self.notification_records[characteristic_id] = (current_message, current_time)
            
            response = json.dumps({
                "jsonrpc": "2.0",
                "method": "characteristicDidChange",
                "params": {
                    "serviceId": service_id,
                    "characteristicId": characteristic_id,
                    "message": current_message
                }
            })
            await log_message("下发", response)
            await websocket.send(response)
        return callback

async def main():
    async with websockets.serve(
        lambda websocket, path: BLEClient().handle_client(websocket, path),
        "localhost", 20111
    ):
        print("WebSocket服务已启动: ws://localhost:20111/scratch/ble")
        print("日志文件路径: ./b.log")
        await asyncio.Future()

if __name__ == "__main__":
    asyncio.run(main())