s6.9.2.py_0706 14.3 KB
# -*- coding: utf-8 -*-
import sys
import asyncio
import websockets
from bleak import BleakScanner, BleakClient
import json
import base64
import threading
from collections import defaultdict
import socket
import time

import platform

# 方法1:通过sys模块快速判断(推荐)
if sys.platform.startswith('win32'):
    import winrt.windows.foundation.collections  # noqa
    import winrt.windows.devices.bluetooth  # noqa
    import winrt.windows.devices.bluetooth.advertisement  # noq
    print("当前运行在Windows系统")
elif sys.platform.startswith('linux'):
    print("当前运行在Linux系统")
elif sys.platform.startswith('darwin'):
    print("当前运行在macOS系统")
# 添加线程锁以确保日志写入的原子性
write_lock = threading.Lock()

def log_message_sync(direction, message):
    """同步日志记录函数"""
    log_entry = f"{direction}: {message}\n"
    print(log_entry, end='')  # 控制台仍然输出
    with write_lock:
        with open('b.log', 'a', encoding='utf-8') as f:
            f.write(log_entry)

async def log_message(direction, message):
    """异步封装日志记录"""
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(None, log_message_sync, direction, message)

def is_port_in_use(port, host='localhost'):
    """检查端口是否被占用"""
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        try:
            s.bind((host, port))
            return False
        except socket.error:
            return True

async def self_test(websocket):
    """发送自检测试消息"""
    test_message = json.dumps({
        "jsonrpc": "2.0",
        "method": "ping",
        "params": {"timestamp": int(time.time())},
        "id": "test"
    })
    await log_message("自测", test_message)
    await websocket.send(test_message)
    
    try:
        # 等待响应,设置超时
        response = await asyncio.wait_for(websocket.recv(), timeout=5.0)
        await log_message("自测响应", response)
        return True
    except asyncio.TimeoutError:
        await log_message("自测", "自测超时,未收到响应")
        return False
    except Exception as e:
        await log_message("自测", f"自测异常: {str(e)}")
        return False

class BLEClient:
    def __init__(self):
        self.target_device = None
        self.client = None
        self.services = []
        self.optional_services = []
        self.websocket = None
        self.notification_records = defaultdict(lambda: (None, 0))  # 特征ID: (最后消息, 时间戳)

    def on_disconnect(self, client):
        print("BLE连接断开,关闭WebSocket")
        if self.websocket and not self.websocket.closed:
            asyncio.create_task(self.close_websocket())

    async def close_websocket(self):
        await self.websocket.close()
        self.websocket = None

    def detection_callback(self, device, advertisement_data):
        if any(service_uuid in advertisement_data.service_uuids for service_uuid in self.services):
            self.target_device = (device, advertisement_data)
            if not self.target_device:
                print("未找到匹配设备")
                return
            else:
                device, adv_data = self.target_device
                print("\n找到目标设备:")
                print(f"设备名称: {device.name}")
                print(f"设备地址: {device.address}")
                print(f"信号强度: {device.rssi} dBm")
                print("\n广播信息:")
                print(f"服务UUID列表: {adv_data.service_uuids}")
                print(f"制造商数据: {adv_data.manufacturer_data}")
                print(f"服务数据: {adv_data.service_data}")
                print(f"本地名称: {adv_data.local_name}")
            return self.target_device

    async def handle_client(self, websocket, path):
        self.websocket = websocket
        if path != "/scratch/ble":
            await websocket.close(code=1003, reason="Path not allowed")
            return

        try:
            async for message in websocket:
                try:
                    await log_message("接收", message)
                    request = json.loads(message)
                    
                    if request["jsonrpc"] != "2.0":
                        continue

                    method = request.get("method")
                    params = request.get("params", {})
                    request_id = request.get("id")

                    if method == "discover":
                        self.services = []
                        for filt in params.get("filters", [{}]):
                            self.services.extend(filt.get("services", []))
                        self.optional_services = params.get("optionalServices", [])

                        scanner = BleakScanner(scanning_mode="active")
                        scanner.register_detection_callback(self.detection_callback)
                        
                        max_retries = 3
                        found = False
                        for attempt in range(max_retries):
                            self.target_device = None
                            await scanner.start()
                            await asyncio.sleep(5)
                            await scanner.stop()
                            
                            if self.target_device:
                                found = True
                                break
                            
                            if attempt < max_retries - 1:
                                print(f"未找到设备,第{attempt+1}次重试...")
                                await asyncio.sleep(3)

                        if found:
                            device, adv_data = self.target_device
                            discover_response = json.dumps({
                                "jsonrpc": "2.0",
                                "method": "didDiscoverPeripheral",
                                "params": {
                                    "name": device.name,
                                    "peripheralId": device.address,
                                    "rssi": device.rssi
                                }
                            })
                            await log_message("下发", discover_response)
                            await websocket.send(discover_response)

                            result_response = json.dumps({
                                "jsonrpc": "2.0",
                                "result": None,
                                "id": request_id
                            })
                            await log_message("下发", result_response)
                            await websocket.send(result_response)

                    elif method == "connect":
                        peripheral_id = params.get("peripheralId")
                        if peripheral_id:
                            self.client = BleakClient(peripheral_id)
                            self.client.set_disconnected_callback(self.on_disconnect)
                            await self.client.connect()
                            
                            if self.client.is_connected:
                                response = json.dumps({
                                    "jsonrpc": "2.0",
                                    "result": None,
                                    "id": request_id
                                })
                               # await log_message("下发", response)
                                await websocket.send(response)

                    elif method == "write":
                        service_id = params.get("serviceId")
                        characteristic_id = params.get("characteristicId")
                        message = params.get("message")
                        encoding = params.get("encoding", "utf-8")

                        if all([service_id, characteristic_id, message]):
                            if encoding == "base64":
                                message_bytes = base64.b64decode(message)
                                print("write message_bytes",message_bytes)
                            else:
                                print("write message",message)
                                message_bytes = message.encode(encoding)
                              

                            await self.client.write_gatt_char(characteristic_id, message_bytes)
                            response = json.dumps({
                                "jsonrpc": "2.0",
                                "result": None,
                                "id": request_id
                            })
                            await log_message("下发", response)
                            await websocket.send(response)

                    elif method == "read":
                        service_id = params.get("serviceId")
                        characteristic_id = params.get("characteristicId")

                        if all([service_id, characteristic_id]):
                            data = await self.client.read_gatt_char(characteristic_id)
                            print('read-data',base64.decode(data))
                            response = json.dumps({
                                "jsonrpc": "2.0",
                                "result": {
                                    "serviceId": service_id,
                                    "characteristicId": characteristic_id,
                                    "message": base64.b64encode(data).decode("utf-8")
                                },
                                "id": request_id
                            })
                            await log_message("下发", response)
                            await websocket.send(response)

                    elif method == "startNotifications":
                        service_id = params.get("serviceId")
                        characteristic_id = params.get("characteristicId")

                        if all([service_id, characteristic_id]):
                            await self.client.start_notify(
                                characteristic_id,
                                self.notification_handler(websocket, service_id, characteristic_id)
                            )
                            response = json.dumps({
                                "jsonrpc": "2.0",
                                "result": None,
                                "id": request_id
                            })
                            await log_message("下发", response)
                            await websocket.send(response)
                            
                    elif method == "ping":
                        # 处理ping请求,返回pong响应
                        response = json.dumps({
                            "jsonrpc": "2.0",
                            "result": {"pong": True, "timestamp": int(time.time())},
                            "id": request_id
                        })
                        await log_message("下发", response)
                        await websocket.send(response)

                except json.JSONDecodeError:
                    error_msg = json.dumps({
                        "jsonrpc": "2.0",
                        "result": {"message": "Parse error"},
                        "id": None
                    })
                    await log_message("下发", error_msg)
                except Exception as e:
                    error_msg = json.dumps({
                        "jsonrpc": "2.0",
                        "result": {"message": str(e)},
                        "id": request.get("id") if request else None
                    })
                    await log_message("下发", error_msg)

        except websockets.exceptions.ConnectionClosed:
            print("WebSocket连接关闭")
        finally:
            if self.client and self.client.is_connected:
                await self.client.disconnect()
                self.client = None
                self.target_device = None

    def notification_handler(self, websocket, service_id, characteristic_id):
        async def callback(sender, data):
            current_time = asyncio.get_event_loop().time()
            last_message, last_time = self.notification_records[characteristic_id]
            
            # 解码当前数据用于比较
            current_message = base64.b64encode(data).decode('utf-8')
            print('notification_handler current_message',base64.b64decode(current_message))
            
            # 过滤逻辑
            if current_message == last_message and (current_time - last_time) < 0.5:
                return
            
            # 更新记录
            self.notification_records[characteristic_id] = (current_message, current_time)
            
            response = json.dumps({
                "jsonrpc": "2.0",
                "method": "characteristicDidChange",
                "params": {
                    "serviceId": service_id,
                    "characteristicId": characteristic_id,
                    "message": current_message
                }
            })
            await log_message("下发", response)
            await websocket.send(response)
        return callback

async def check_port_and_start_server(port=20111, host='localhost'):
    """检查端口并启动服务器"""
    if is_port_in_use(port, host):
        print(f"错误: 端口 {port} 已被占用,无法启动服务")
        return False
    
    print(f"端口 {port} 可用,正在启动服务...")
    server = await websockets.serve(
        lambda websocket, path: BLEClient().handle_client(websocket, path),
        host, port
    )
    
    print(f"WebSocket服务已启动: ws://{host}:{port}/scratch/ble")
    print("日志文件路径: ./b.log")
    
    # 执行自检测试
    try:
        async with websockets.connect(f"ws://{host}:{port}/scratch/ble") as websocket:
            print("正在执行自检测试...")
            test_result = await self_test(websocket)
            if test_result:
                print("自检测试成功: 服务正常运行")
            else:
                print("自检测试失败: 服务可能存在问题")
    except Exception as e:
        print(f"自检测试异常: {str(e)}")
    
    return server

async def main():
    server = await check_port_and_start_server()
    if server:
        await asyncio.Future()  # 保持服务器运行

if __name__ == "__main__":
    asyncio.run(main())