we.py 15 KB
import sys
import asyncio
import json
import base64
import threading
from PyQt5.QtWidgets import QApplication, QMainWindow, QTextEdit, QPushButton, QVBoxLayout, QWidget
from PyQt5.QtCore import QThread, pyqtSignal
from bleak import BleakScanner, BleakClient
import websockets

# 添加线程锁以确保日志写入的原子性
write_lock = threading.Lock()

def log_message_sync(direction, message, log_signal=None):
    """同步日志记录函数(支持信号传递)"""
    log_entry = f"{direction}: {message}\n"
    print(log_entry, end='')  # 控制台输出
    with write_lock:
        with open('a.log', 'a', encoding='utf-8') as f:
            f.write(log_entry)
    if log_signal:
        log_signal.emit(log_entry)  # 发射信号到GUI

async def log_message(direction, message, log_signal=None):
    """异步封装日志记录"""
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(None, log_message_sync, direction, message, log_signal)

class BLEClient:
    def __init__(self, log_signal=None):
        self.target_device = None
        self.client = None
        self.services = []
        self.optional_services = []
        self.websocket = None
        self.log_signal = log_signal  # 绑定日志信号

    def on_disconnect(self, client):
        print("BLE连接断开,关闭WebSocket")
        if self.websocket and not self.websocket.closed:
            asyncio.create_task(self.close_websocket())

    async def close_websocket(self):
        await self.websocket.close()
        self.websocket = None

    def detection_callback(self, device, advertisement_data):
        if any(service_uuid in advertisement_data.service_uuids for service_uuid in self.services):
            self.target_device = (device, advertisement_data)
            if not self.target_device:
                print("未找到匹配设备")
                return
            else:
                device, adv_data = self.target_device
                print("\n找到目标设备:")
                print(f"设备名称: {device.name}")
                print(f"设备地址: {device.address}")
                print(f"信号强度: {device.rssi} dBm")
                print("\n广播信息:")
                print(f"服务UUID列表: {adv_data.service_uuids}")
                print(f"制造商数据: {adv_data.manufacturer_data}")
                print(f"服务数据: {adv_data.service_data}")
                print(f"本地名称: {adv_data.local_name}")
            return self.target_device

    async def handle_client(self, websocket, path):
        self.websocket = websocket
        if path != "/scratch/ble":
            await websocket.close(code=1003, reason="Path not allowed")
            return

        try:
            async for message in websocket:
                try:
                    # 记录接收到的消息
                    await log_message("接收", message, self.log_signal)

                    request = json.loads(message)
                    if request["jsonrpc"] != "2.0":
                        response = json.dumps({
                            "jsonrpc": "2.0",
                            "result": {"message": "Invalid Request"},
                            "id": request.get("id", None)
                        })
                        await log_message("下发", response, self.log_signal)
                        await websocket.send(response)
                        continue

                    method = request.get("method")
                    params = request.get("params", {})
                    request_id = request.get("id")

                    if method == "discover":
                        self.services = params.get("filters", [{}])[0].get("services", [])
                        self.optional_services = params.get("optionalServices", [])

                        scanner = BleakScanner()
                        scanner.register_detection_callback(self.detection_callback)

                        # 添加重试机制
                        max_retries = 3
                        found = False
                        for attempt in range(max_retries):
                            self.target_device = None  # 重置目标设备
                            await scanner.start()
                            await asyncio.sleep(5)
                            await scanner.stop()

                            if self.target_device:
                                found = True
                                break

                            if attempt < max_retries - 1:  # 最后一次不等待
                                print(f"未找到设备,第{attempt+1}次重试...")
                                await asyncio.sleep(3)

                        if not found:
                            continue

                        device, adv_data = self.target_device
                        discover_response = json.dumps({
                            "jsonrpc": "2.0",
                            "method": "didDiscoverPeripheral",
                            "params": {
                                "name": device.name,
                                "peripheralId": device.address,
                                "rssi": device.rssi
                            }
                        })
                        await log_message("下发", discover_response, self.log_signal)
                        await websocket.send(discover_response)

                        result_response = json.dumps({
                            "jsonrpc": "2.0",
                            "result": None,
                            "id": request_id
                        })
                        await log_message("下发", result_response, self.log_signal)
                        await websocket.send(result_response)

                    elif method == "connect":
                        peripheral_id = params.get("peripheralId")
                        if not peripheral_id:
                            response = json.dumps({
                                "jsonrpc": "2.0",
                                "result": {"message": "Invalid params"},
                                "id": request_id
                            })
                            await log_message("下发", response, self.log_signal)
                            await websocket.send(response)
                            continue

                        self.client = BleakClient(peripheral_id)
                        self.client.set_disconnected_callback(self.on_disconnect)
                        await self.client.connect()
                        if self.client.is_connected:
                            print(f"已连接至设备: {peripheral_id}")
                            response = json.dumps({
                                "jsonrpc": "2.0",
                                "result": None,
                                "id": request_id
                            })
                            await log_message("下发", response, self.log_signal)
                            await websocket.send(response)
                        else:
                            response = json.dumps({
                                "jsonrpc": "2.0",
                                "result": {"message": "Failed to connect"},
                                "id": request_id
                            })
                            await log_message("下发", response, self.log_signal)
                            await websocket.send(response)

                    elif method == "write":
                        service_id = params.get("serviceId")
                        characteristic_id = params.get("characteristicId")
                        message = params.get("message")
                        encoding = params.get("encoding", "utf-8")

                        if not all([service_id, characteristic_id, message]):
                            response = json.dumps({
                                "jsonrpc": "2.0",
                                "result": {"message": "Invalid params"},
                                "id": request_id
                            })
                            await log_message("下发", response, self.log_signal)
                            await websocket.send(response)
                            continue

                        if encoding == "base64":
                            message_bytes = base64.b64decode(message)
                        else:
                            message_bytes = message.encode(encoding)

                        await self.client.write_gatt_char(characteristic_id, message_bytes)
                        response = json.dumps({
                            "jsonrpc": "2.0",
                            "result": None,
                            "id": request_id
                        })
                        await log_message("下发", response, self.log_signal)
                        await websocket.send(response)

                    elif method == "read":
                        service_id = params.get("serviceId")
                        characteristic_id = params.get("characteristicId")

                        if not all([service_id, characteristic_id]):
                            response = json.dumps({
                                "jsonrpc": "2.0",
                                "result": {"message": "Invalid params"},
                                "id": request_id
                            })
                            await log_message("下发", response, self.log_signal)
                            await websocket.send(response)
                            continue

                        data = await self.client.read_gatt_char(characteristic_id)
                        response = json.dumps({
                            "jsonrpc": "2.0",
                            "result": {
                                "serviceId": service_id,
                                "characteristicId": characteristic_id,
                                "message": base64.b64encode(data).decode("utf-8")
                            },
                            "id": request_id
                        })
                        await log_message("下发", response, self.log_signal)
                        await websocket.send(response)

                    elif method == "startNotifications":
                        service_id = params.get("serviceId")
                        characteristic_id = params.get("characteristicId")

                        if not all([service_id, characteristic_id]):
                            response = json.dumps({
                                "jsonrpc": "2.0",
                                "result": {"message": "Invalid params"},
                                "id": request_id
                            })
                            await log_message("下发", response, self.log_signal)
                            await websocket.send(response)
                            continue

                        await self.client.start_notify(
                            characteristic_id,
                            self.notification_handler(websocket, service_id, characteristic_id)
                        )
                        response = json.dumps({
                            "jsonrpc": "2.0",
                            "result": None,
                            "id": request_id
                        })
                        await log_message("下发", response, self.log_signal)
                        await websocket.send(response)

                    else:
                        response = json.dumps({
                            "jsonrpc": "2.0",
                            "result": {"message": "Method not found"},
                            "id": request_id
                        })
                        await log_message("下发", response, self.log_signal)
                        await websocket.send(response)

                except json.JSONDecodeError:
                    response = json.dumps({
                        "jsonrpc": "2.0",
                        "result": {"message": "Parse error"},
                        "id": None
                    })
                    await log_message("下发", response, self.log_signal)
                    await websocket.send(response)
                except Exception as e:
                    response = json.dumps({
                        "jsonrpc": "2.0",
                        "result": {"message": str(e)},
                        "id": request_id
                    })
                    await log_message("下发", response, self.log_signal)
                    await websocket.send(response)

        except websockets.exceptions.ConnectionClosedOK:
            print("WebSocket客户端正常断开")
        except websockets.exceptions.ConnectionClosedError as e:
            print(f"WebSocket客户端异常断开: {e.code} - {e.reason}")
        finally:
            if self.client and self.client.is_connected:
                await self.client.disconnect()
                print("BLE设备已主动断开")
                self.client = None
                self.target_device = None

    def notification_handler(self, websocket, service_id, characteristic_id):
        async def callback(sender, data):
            response = json.dumps({
                "jsonrpc": "2.0",
                "method": "characteristicDidChange",
                "params": {
                    "serviceId": service_id,
                    "characteristicId": characteristic_id,
                    "message": base64.b64encode(data).decode("utf-8")
                }
            })
            await log_message("下发", response, self.log_signal)
            await websocket.send(response)
        return callback

class WebSocketThread(QThread):
    log_signal = pyqtSignal(str)

    def run(self):
        async def start_server():
            ble_client = BLEClient(self.log_signal)  # 绑定日志信号
            async with websockets.serve(
                lambda websocket, path: ble_client.handle_client(websocket, path),
                "localhost", 20111
            ):
                self.log_signal.emit("WebSocket服务已启动: ws://localhost:20111/scratch/ble")
                await asyncio.Future()  # 永久运行

        asyncio.run(start_server())

class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()

        self.setWindowTitle("BLE WebSocket Server")
        self.setGeometry(100, 100, 600, 400)

        self.text_edit = QTextEdit(self)
        self.text_edit.setReadOnly(True)

        self.start_button = QPushButton("启动服务", self)
        self.start_button.clicked.connect(self.start_server)

        layout = QVBoxLayout()
        layout.addWidget(self.text_edit)
        layout.addWidget(self.start_button)

        container = QWidget()
        container.setLayout(layout)
        self.setCentralWidget(container)

        self.websocket_thread = WebSocketThread()
        self.websocket_thread.log_signal.connect(self.text_edit.append)

    def start_server(self):
        self.websocket_thread.start()

if __name__ == "__main__":
    app = QApplication(sys.argv)
    window = MainWindow()
    window.show()
    sys.exit(app.exec_())