we1.py 11.8 KB
import sys
import asyncio
import websockets
from bleak import BleakScanner, BleakClient
import json
import base64
import threading
from PyQt5.QtWidgets import QApplication, QMainWindow, QTextEdit, QVBoxLayout, QWidget
from PyQt5.QtCore import QThread, pyqtSignal

# 添加线程锁以确保日志写入的原子性
write_lock = threading.Lock()

def log_message_sync(direction, message):
    """同步日志记录函数"""
    log_entry = f"{direction}: {message}\n"
    print(log_entry, end='')  # 控制台仍然输出
    with write_lock:
        with open('b.log', 'a', encoding='utf-8') as f:
            f.write(log_entry)

async def log_message(direction, message):
    """异步封装日志记录"""
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(None, log_message_sync, direction, message)

class BLEClient:
    def __init__(self):
        self.target_device = None
        self.client = None
        self.services = []
        self.optional_services = []
        self.websocket = None

    def on_disconnect(self, client):
        print("BLE连接断开,关闭WebSocket")
        if self.websocket and not self.websocket.closed:
            asyncio.create_task(self.close_websocket())

    async def close_websocket(self):
        await self.websocket.close()
        self.websocket = None

    def detection_callback(self, device, advertisement_data):
        # 合并所有过滤条件的services进行匹配
        if any(service_uuid in advertisement_data.service_uuids for service_uuid in self.services):
            self.target_device = (device, advertisement_data)
            if not self.target_device:
                print("未找到匹配设备")
                return
            else:
                device, adv_data = self.target_device
                print("\n找到目标设备:")
                print(f"设备名称: {device.name}")
                print(f"设备地址: {device.address}")
                print(f"信号强度: {device.rssi} dBm")
                print("\n广播信息:")
                print(f"服务UUID列表: {adv_data.service_uuids}")
                print(f"制造商数据: {adv_data.manufacturer_data}")
                print(f"服务数据: {adv_data.service_data}")
                print(f"本地名称: {adv_data.local_name}")
            return self.target_device

    async def handle_client(self, websocket, path):
        self.websocket = websocket
        if path != "/scratch/ble":
            await websocket.close(code=1003, reason="Path not allowed")
            return

        try:
            async for message in websocket:
                try:
                    await log_message("接收", message)
                    request = json.loads(message)
                    
                    if request["jsonrpc"] != "2.0":
                        continue  # 跳过无效协议版本

                    method = request.get("method")
                    params = request.get("params", {})
                    request_id = request.get("id")

                    if method == "discover":
                        # 合并所有过滤条件的services
                        self.services = []
                        for filt in params.get("filters", [{}]):
                            self.services.extend(filt.get("services", []))
                        self.optional_services = params.get("optionalServices", [])

                        scanner = BleakScanner()
                        scanner.register_detection_callback(self.detection_callback)
                        
                        # 带重试的扫描逻辑
                        max_retries = 3
                        found = False
                        for attempt in range(max_retries):
                            self.target_device = None
                            await scanner.start()
                            await asyncio.sleep(5)
                            await scanner.stop()
                            
                            if self.target_device:
                                found = True
                                break
                            
                            if attempt < max_retries - 1:
                                print(f"未找到设备,第{attempt+1}次重试...")
                                await asyncio.sleep(3)

                        if found:
                            device, adv_data = self.target_device
                            discover_response = json.dumps({
                                "jsonrpc": "2.0",
                                "method": "didDiscoverPeripheral",
                                "params": {
                                    "name": device.name,
                                    "peripheralId": device.address,
                                    "rssi": device.rssi
                                }
                            })
                            await log_message("下发", discover_response)
                            await websocket.send(discover_response)

                            result_response = json.dumps({
                                "jsonrpc": "2.0",
                                "result": None,
                                "id": request_id
                            })
                            await log_message("下发", result_response)
                            await websocket.send(result_response)

                    elif method == "connect":
                        peripheral_id = params.get("peripheralId")
                        if peripheral_id:
                            self.client = BleakClient(peripheral_id)
                            self.client.set_disconnected_callback(self.on_disconnect)
                            await self.client.connect()
                            
                            if self.client.is_connected:
                                response = json.dumps({
                                    "jsonrpc": "2.0",
                                    "result": None,
                                    "id": request_id
                                })
                                await log_message("下发", response)
                                await websocket.send(response)

                    elif method == "write":
                        service_id = params.get("serviceId")
                        characteristic_id = params.get("characteristicId")
                        message = params.get("message")
                        encoding = params.get("encoding", "utf-8")

                        if all([service_id, characteristic_id, message]):
                            if encoding == "base64":
                                message_bytes = base64.b64decode(message)
                            else:
                                message_bytes = message.encode(encoding)

                            await self.client.write_gatt_char(characteristic_id, message_bytes)
                            response = json.dumps({
                                "jsonrpc": "2.0",
                                "result": None,
                                "id": request_id
                            })
                            await log_message("下发", response)
                            await websocket.send(response)

                    elif method == "read":
                        service_id = params.get("serviceId")
                        characteristic_id = params.get("characteristicId")

                        if all([service_id, characteristic_id]):
                            data = await self.client.read_gatt_char(characteristic_id)
                            response = json.dumps({
                                "jsonrpc": "2.0",
                                "result": {
                                    "serviceId": service_id,
                                    "characteristicId": characteristic_id,
                                    "message": base64.b64encode(data).decode("utf-8")
                                },
                                "id": request_id
                            })
                            await log_message("下发", response)
                            await websocket.send(response)

                    elif method == "startNotifications":
                        service_id = params.get("serviceId")
                        characteristic_id = params.get("characteristicId")

                        if all([service_id, characteristic_id]):
                            await self.client.start_notify(
                                characteristic_id,
                                self.notification_handler(websocket, service_id, characteristic_id)
                            )
                            response = json.dumps({
                                "jsonrpc": "2.0",
                                "result": None,
                                "id": request_id
                            })
                            await log_message("下发", response)
                            await websocket.send(response)

                except json.JSONDecodeError:
                    error_msg = json.dumps({
                        "jsonrpc": "2.0",
                        "result": {"message": "Parse error"},
                        "id": None
                    })
                    await log_message("下发", error_msg)
                except Exception as e:
                    error_msg = json.dumps({
                        "jsonrpc": "2.0",
                        "result": {"message": str(e)},
                        "id": request.get("id") if request else None
                    })
                    await log_message("下发", error_msg)

        except websockets.exceptions.ConnectionClosed:
            print("WebSocket连接关闭")
        finally:
            if self.client and self.client.is_connected:
                await self.client.disconnect()
                self.client = None
                self.target_device = None

    def notification_handler(self, websocket, service_id, characteristic_id):
        async def callback(sender, data):
            response = json.dumps({
                "jsonrpc": "2.0",
                "method": "characteristicDidChange",
                "params": {
                    "serviceId": service_id,
                    "characteristicId": characteristic_id,
                    "message": base64.b64encode(data).decode("utf-8")
                }
            })
            await log_message("下发", response)
            await websocket.send(response)
        return callback

class WebSocketServerThread(QThread):
    log_signal = pyqtSignal(str)

    def run(self):
        async def main():
            async with websockets.serve(
                lambda websocket, path: BLEClient().handle_client(websocket, path),
                "localhost", 20111
            ):
                self.log_signal.emit("WebSocket服务已启动: ws://localhost:20111/scratch/ble")
                self.log_signal.emit("日志文件路径: ./b.log")
                await asyncio.Future()  # 永久运行

        asyncio.run(main())

class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()
        self.initUI()

    def initUI(self):
        self.setWindowTitle("BLE WebSocket Server")
        self.setGeometry(100, 100, 800, 600)

        self.text_edit = QTextEdit(self)
        self.text_edit.setReadOnly(True)

        layout = QVBoxLayout()
        layout.addWidget(self.text_edit)

        container = QWidget()
        container.setLayout(layout)
        self.setCentralWidget(container)

        self.websocket_thread = WebSocketServerThread()
        self.websocket_thread.log_signal.connect(self.update_log)
        self.websocket_thread.start()

    def update_log(self, message):
        self.text_edit.append(message)

if __name__ == "__main__":
    app = QApplication(sys.argv)
    window = MainWindow()
    window.show()
    sys.exit(app.exec_())