we.py 14.7 KB
# -*- coding: utf-8 -*--
import asyncio
import websockets
from bleak import BleakScanner, BleakClient
import json
import base64
import threading
from tray_icon import start_tray_icon
import os
import sys

def resource_path(relative_path):
    try:
        base_path = sys._MEIPASS  # 打包后运行时的临时路径
    except AttributeError:
        base_path = os.path.dirname(os.path.abspath(__file__))  # 开发环境下的路径
    # 使用 os.path.join 拼接路径,并通过 os.path.abspath 转换为绝对路径
    return os.path.abspath(os.path.join(base_path, relative_path))

def resource_path(relative_path):
    """获取资源文件的绝对路径,适用于打包后的程序"""
    try:
        base_path = sys._MEIPASS  # 打包后运行时的临时路径
    except AttributeError:
        base_path = os.path.abspath(".")  # 开发环境下的路径
    return os.path.join(base_path, relative_path)

# 获取图标路径
icon_path = resource_path(resource_path("data/app.ico"))

# 调用系统托盘图标函数
start_tray_icon(icon_path)

# 添加线程锁以确保日志写入的原子性
write_lock = threading.Lock()

def log_message_sync(direction, message):
    """同步日志记录函数"""
    log_entry = f"{direction}: {message}\n"
    print(log_entry, end='')  # 控制台仍然输出
    with write_lock:
        with open('a.log', 'a', encoding='utf-8') as f:
            f.write(log_entry)

async def log_message(direction, message):
    """异步封装日志记录"""
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(None, log_message_sync, direction, message)

class BLEClient:
    def __init__(self):
        self.target_device = None
        self.client = None
        self.services = []
        self.optional_services = []
        self.websocket = None

    def on_disconnect(self, client):
        print("BLE连接断开,关闭WebSocket")
        if self.websocket and not self.websocket.closed:
            asyncio.create_task(self.close_websocket())

    async def close_websocket(self):
        await self.websocket.close()
        self.websocket = None

    def detection_callback(self, device, advertisement_data):
        if any(service_uuid in advertisement_data.service_uuids for service_uuid in self.services):
            self.target_device = (device, advertisement_data)
            if not self.target_device:
                print("未找到匹配设备")
                return
            else:
                device, adv_data = self.target_device
                print("\n找到目标设备:")
                print(f"设备名称: {device.name}")
                print(f"设备地址: {device.address}")
                print(f"信号强度: {device.rssi} dBm")
                print("\n广播信息:")
                print(f"服务UUID列表: {adv_data.service_uuids}")
                print(f"制造商数据: {adv_data.manufacturer_data}")
                print(f"服务数据: {adv_data.service_data}")
                print(f"本地名称: {adv_data.local_name}")
            return self.target_device

    async def handle_client(self, websocket, path):
        self.websocket = websocket
        if path != "/scratch/ble":
            await websocket.close(code=1003, reason="Path not allowed")
            return

        try:
            async for message in websocket:
                try:
                    # 记录接收到的消息
                    await log_message("接收", message)

                    request = json.loads(message)
                    if request["jsonrpc"] != "2.0":
                        response = json.dumps({
                            "jsonrpc": "2.0",
                            "result": {"message": "Invalid Request"},
                            "id": request.get("id", None)
                        })
                        await log_message("下发", response)
                        await websocket.send(response)
                        continue

                    method = request.get("method")
                    params = request.get("params", {})
                    request_id = request.get("id")

                    if method == "discover":
                        self.services = params.get("filters", [{}])[0].get("services", [])
                        self.optional_services = params.get("optionalServices", [])

                        scanner = BleakScanner()
                        scanner.register_detection_callback(self.detection_callback)
                        
                        # 添加重试机制
                        max_retries = 3
                        found = False
                        for attempt in range(max_retries):
                            self.target_device = None  # 重置目标设备
                            await scanner.start()
                            await asyncio.sleep(5)
                            await scanner.stop()
                            
                            if self.target_device:
                                found = True
                                break
                            
                            if attempt < max_retries - 1:  # 最后一次不等待
                                print(f"未找到设备,第{attempt+1}次重试...")
                                await asyncio.sleep(3)

                        if not found:
                            # response = json.dumps({
                            #     "jsonrpc": "2.0",
                            #     "result": {"message": "Device not found"},
                            #     "id": request_id
                            # })
                            # await log_message("下发", response)
                            # await websocket.send(response)
                            continue
                        
                        device, adv_data = self.target_device
                        discover_response = json.dumps({
                            "jsonrpc": "2.0",
                            "method": "didDiscoverPeripheral",
                            "params": {
                                "name": device.name,
                                "peripheralId": device.address,
                                "rssi": device.rssi
                            }
                        })
                        await log_message("下发", discover_response)
                        await websocket.send(discover_response)

                        result_response = json.dumps({
                            "jsonrpc": "2.0",
                            "result": None,
                            "id": request_id
                        })
                        await log_message("下发", result_response)
                        await websocket.send(result_response)
                    
                    elif method == "connect":
                        peripheral_id = params.get("peripheralId")
                        if not peripheral_id:
                            response = json.dumps({
                                "jsonrpc": "2.0",
                                "result": {"message": "Invalid params"},
                                "id": request_id
                            })
                            await log_message("下发", response)
                            await websocket.send(response)
                            continue

                        self.client = BleakClient(peripheral_id)
                        self.client.set_disconnected_callback(self.on_disconnect)
                        await self.client.connect()
                        if self.client.is_connected:
                            print(f"已连接至设备: {peripheral_id}")
                            response = json.dumps({
                                "jsonrpc": "2.0",
                                "result": None,
                                "id": request_id
                            })
                            await log_message("下发", response)
                            await websocket.send(response)
                        else:
                            response = json.dumps({
                                "jsonrpc": "2.0",
                                "result": {"message": "Failed to connect"},
                                "id": request_id
                            })
                            await log_message("下发", response)
                            await websocket.send(response)

                    elif method == "write":
                        service_id = params.get("serviceId")
                        characteristic_id = params.get("characteristicId")
                        message = params.get("message")
                        encoding = params.get("encoding", "utf-8")

                        if not all([service_id, characteristic_id, message]):
                            response = json.dumps({
                                "jsonrpc": "2.0",
                                "result": {"message": "Invalid params"},
                                "id": request_id
                            })
                            await log_message("下发", response)
                            await websocket.send(response)
                            continue

                        if encoding == "base64":
                            message_bytes = base64.b64decode(message)
                        else:
                            message_bytes = message.encode(encoding)

                        await self.client.write_gatt_char(characteristic_id, message_bytes)
                        response = json.dumps({
                            "jsonrpc": "2.0",
                            "result": None,
                            "id": request_id
                        })
                        await log_message("下发", response)
                        await websocket.send(response)

                    elif method == "read":
                        service_id = params.get("serviceId")
                        characteristic_id = params.get("characteristicId")

                        if not all([service_id, characteristic_id]):
                            response = json.dumps({
                                "jsonrpc": "2.0",
                                "result": {"message": "Invalid params"},
                                "id": request_id
                            })
                            await log_message("下发", response)
                            await websocket.send(response)
                            continue

                        data = await self.client.read_gatt_char(characteristic_id)
                        response = json.dumps({
                            "jsonrpc": "2.0",
                            "result": {
                                "serviceId": service_id,
                                "characteristicId": characteristic_id,
                                "message": base64.b64encode(data).decode("utf-8")
                            },
                            "id": request_id
                        })
                        await log_message("下发", response)
                        await websocket.send(response)

                    elif method == "startNotifications":
                        service_id = params.get("serviceId")
                        characteristic_id = params.get("characteristicId")

                        if not all([service_id, characteristic_id]):
                            response = json.dumps({
                                "jsonrpc": "2.0",
                                "result": {"message": "Invalid params"},
                                "id": request_id
                            })
                            await log_message("下发", response)
                            await websocket.send(response)
                            continue

                        await self.client.start_notify(
                            characteristic_id,
                            self.notification_handler(websocket, service_id, characteristic_id)
                        )
                        response = json.dumps({
                            "jsonrpc": "2.0",
                            "result": None,
                            "id": request_id
                        })
                        await log_message("下发", response)
                        await websocket.send(response)

                    else:
                        response = json.dumps({
                            "jsonrpc": "2.0",
                            "result": {"message": "Method not found"},
                            "id": request_id
                        })
                        await log_message("下发", response)
                        await websocket.send(response)

                except json.JSONDecodeError:
                    response = json.dumps({
                        "jsonrpc": "2.0",
                        "result": {"message": "Parse error"},
                        "id": None
                    })
                    await log_message("下发", response)
                    await websocket.send(response)
                except Exception as e:
                    response = json.dumps({
                        "jsonrpc": "2.0",
                        "result": {"message": str(e)},
                        "id": request_id
                    })
                    await log_message("下发", response)
                    await websocket.send(response)

        except websockets.exceptions.ConnectionClosedOK:
            print("WebSocket客户端正常断开")
        except websockets.exceptions.ConnectionClosedError as e:
            print(f"WebSocket客户端异常断开: {e.code} - {e.reason}")
        finally:
            if self.client and self.client.is_connected:
                await self.client.disconnect()
                print("BLE设备已主动断开")
                self.client = None
                self.target_device = None

    def notification_handler(self, websocket, service_id, characteristic_id):
        async def callback(sender, data):
            response = json.dumps({
                "jsonrpc": "2.0",
                "method": "characteristicDidChange",
                "params": {
                    "serviceId": service_id,
                    "characteristicId": characteristic_id,
                    "message": base64.b64encode(data).decode("utf-8")
                }
            })
            await log_message("下发", response)
            await websocket.send(response)
        return callback

async def main():
    async with websockets.serve(
        lambda websocket, path: BLEClient().handle_client(websocket, path),
        "localhost", 20111
    ):
        print("WebSocket服务已启动: ws://localhost:20111/scratch/ble")
        print("日志文件路径: ./b.log")
        await asyncio.Future()  # 永久运行

if __name__ == "__main__":
    asyncio.run(main())