1fb31449 by huangyf2

日志异步处理,增加

1 parent aa901ba8
Showing 1 changed file with 292 additions and 88 deletions
# -*- coding: utf-8 -*-
"""
高性能BLE客户端 - 优化版本
修复了以下问题:
1. 内存泄漏 - 每次连接创建新实例导致资源无法释放
2. 日志阻塞 - 实现异步日志队列
3. 通知失效 - 完善连接状态检查和清理
4. WebSocket保活 - 添加ping/pong机制
5. 错误处理 - 全面增强异常捕获和处理
"""
import sys
import asyncio
import websockets
......@@ -9,6 +18,7 @@ import threading
from collections import defaultdict
import socket
import time
import traceback
import platform
......@@ -22,21 +32,58 @@ elif sys.platform.startswith('linux'):
print("当前运行在Linux系统")
elif sys.platform.startswith('darwin'):
print("当前运行在macOS系统")
# 添加线程锁以确保日志写入的原子性
# 高性能日志系统
write_lock = threading.Lock()
_log_queue = None
_log_task = None
_log_dropped_count = 0
def log_message_sync(direction, message):
"""同步日志记录函数"""
log_entry = f"{direction}: {message}\n"
print(log_entry, end='') # 控制台仍然输出
with write_lock:
with open('b.log', 'a', encoding='utf-8') as f:
f.write(log_entry)
try:
log_entry = f"{direction}: {message}\n"
print(log_entry, end='') # 控制台仍然输出
# 截断过长消息,避免写入过大的日志
if len(log_entry) > 4000:
log_entry = log_entry[:4000] + "... [truncated]\n"
with write_lock:
with open('b.log', 'a', encoding='utf-8') as f:
f.write(log_entry)
except Exception:
pass # 日志失败不应影响主流程
async def _log_writer_loop():
"""后台日志写入循环"""
global _log_queue
while True:
try:
item = await _log_queue.get()
if item is None:
break
direction, message = item
log_message_sync(direction, message)
except asyncio.CancelledError:
break
except Exception:
await asyncio.sleep(0.05)
async def log_message(direction, message):
"""异步封装日志记录"""
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, log_message_sync, direction, message)
"""异步日志记录,非阻塞"""
global _log_queue, _log_task, _log_dropped_count
# 首次调用时初始化
if _log_queue is None:
_log_queue = asyncio.Queue(maxsize=1000)
_log_task = asyncio.create_task(_log_writer_loop())
try:
_log_queue.put_nowait((direction, message))
except asyncio.QueueFull:
_log_dropped_count += 1
if _log_dropped_count % 100 == 1:
print(f"警告: 日志队列已满,已丢弃 {_log_dropped_count} 条日志")
def is_port_in_use(port, host='localhost'):
"""检查端口是否被占用"""
......@@ -77,7 +124,9 @@ class BLEClient:
self.services = []
self.optional_services = []
self.websocket = None
self.notification_records = defaultdict(lambda: (None, 0)) # 特征ID: (最后消息, 时间戳)
self.notification_records = defaultdict(lambda: (None, 0.0)) # 特征ID: (最后消息, 时间戳)
self._notification_callbacks = {} # 存储通知回调,用于清理
self._shutdown = False # 添加关闭标志
def on_disconnect(self, client):
print("BLE连接断开,关闭WebSocket")
......@@ -85,8 +134,13 @@ class BLEClient:
asyncio.create_task(self.close_websocket())
async def close_websocket(self):
await self.websocket.close()
self.websocket = None
if self.websocket:
try:
await self.websocket.close()
except Exception as e:
print(f"关闭WebSocket时出错: {e}")
finally:
self.websocket = None
def detection_callback(self, device, advertisement_data):
if any(service_uuid in advertisement_data.service_uuids for service_uuid in self.services):
......@@ -176,18 +230,44 @@ class BLEClient:
elif method == "connect":
peripheral_id = params.get("peripheralId")
if peripheral_id:
self.client = BleakClient(peripheral_id)
self.client.set_disconnected_callback(self.on_disconnect)
await self.client.connect()
# 如果已有连接,先断开
if self.client and self.client.is_connected:
try:
await self.client.disconnect()
except Exception:
pass
if self.client.is_connected:
response = json.dumps({
try:
self.client = BleakClient(peripheral_id, timeout=10.0) # 添加超时
self.client.set_disconnected_callback(self.on_disconnect)
await self.client.connect()
if self.client.is_connected:
response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
else:
error_response = json.dumps({
"jsonrpc": "2.0",
"error": {"code": -1, "message": "连接失败"},
"id": request_id
})
await log_message("下发", error_response)
await websocket.send(error_response)
except Exception as e:
error_response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"error": {"code": -1, "message": f"连接异常: {str(e)}"},
"id": request_id
})
# await log_message("下发", response)
await websocket.send(response)
await log_message("下发", error_response)
await websocket.send(error_response)
if self.client:
self.client = None
elif method == "write":
service_id = params.get("serviceId")
......@@ -195,59 +275,117 @@ class BLEClient:
message = params.get("message")
encoding = params.get("encoding", "utf-8")
if all([service_id, characteristic_id, message]):
if encoding == "base64":
message_bytes = base64.b64decode(message)
print("write message_bytes",message_bytes)
else:
print("write message",message)
message_bytes = message.encode(encoding)
await self.client.write_gatt_char(characteristic_id, message_bytes)
response = json.dumps({
if all([service_id, characteristic_id, message]) and self.client and self.client.is_connected:
try:
if encoding == "base64":
message_bytes = base64.b64decode(message)
print("write message_bytes",message_bytes)
else:
print("write message",message)
message_bytes = message.encode(encoding)
await self.client.write_gatt_char(characteristic_id, message_bytes)
response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
except Exception as e:
error_response = json.dumps({
"jsonrpc": "2.0",
"error": {"code": -1, "message": f"写入失败: {str(e)}"},
"id": request_id
})
await log_message("下发", error_response)
await websocket.send(error_response)
else:
error_response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"error": {"code": -1, "message": "未连接或参数不全"},
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
await log_message("下发", error_response)
await websocket.send(error_response)
elif method == "read":
service_id = params.get("serviceId")
characteristic_id = params.get("characteristicId")
if all([service_id, characteristic_id]):
data = await self.client.read_gatt_char(characteristic_id)
print('read-data',base64.decode(data))
response = json.dumps({
if all([service_id, characteristic_id]) and self.client and self.client.is_connected:
try:
data = await self.client.read_gatt_char(characteristic_id)
print('read-data',data)
response = json.dumps({
"jsonrpc": "2.0",
"result": {
"serviceId": service_id,
"characteristicId": characteristic_id,
"message": base64.b64encode(data).decode("utf-8")
},
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
except Exception as e:
error_response = json.dumps({
"jsonrpc": "2.0",
"error": {"code": -1, "message": f"读取失败: {str(e)}"},
"id": request_id
})
await log_message("下发", error_response)
await websocket.send(error_response)
else:
error_response = json.dumps({
"jsonrpc": "2.0",
"result": {
"serviceId": service_id,
"characteristicId": characteristic_id,
"message": base64.b64encode(data).decode("utf-8")
},
"error": {"code": -1, "message": "未连接或参数不全"},
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
await log_message("下发", error_response)
await websocket.send(error_response)
elif method == "startNotifications":
service_id = params.get("serviceId")
characteristic_id = params.get("characteristicId")
if all([service_id, characteristic_id]):
await self.client.start_notify(
characteristic_id,
self.notification_handler(websocket, service_id, characteristic_id)
)
response = json.dumps({
if all([service_id, characteristic_id]) and self.client and self.client.is_connected:
try:
# 如果已经监听,先停止旧的
if characteristic_id in self._notification_callbacks:
try:
await self.client.stop_notify(characteristic_id)
except Exception:
pass
# 创建新的通知处理器
callback = self.notification_handler(websocket, service_id, characteristic_id)
await self.client.start_notify(characteristic_id, callback)
self._notification_callbacks[characteristic_id] = callback
response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
except Exception as e:
error_response = json.dumps({
"jsonrpc": "2.0",
"error": {"code": -1, "message": f"启动通知失败: {str(e)}"},
"id": request_id
})
await log_message("下发", error_response)
await websocket.send(error_response)
else:
error_response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"error": {"code": -1, "message": "未连接或参数不全"},
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
await log_message("下发", error_response)
await websocket.send(error_response)
elif method == "ping":
# 处理ping请求,返回pong响应
......@@ -259,56 +397,104 @@ class BLEClient:
await log_message("下发", response)
await websocket.send(response)
except json.JSONDecodeError:
except json.JSONDecodeError as e:
error_msg = json.dumps({
"jsonrpc": "2.0",
"result": {"message": "Parse error"},
"id": None
"error": {"code": -32700, "message": "Parse error", "data": str(e)},
"id": request.get("id") if request else None
})
await log_message("下发", error_msg)
try:
await websocket.send(error_msg)
except Exception:
pass
except Exception as e:
# 记录完整错误堆栈
error_trace = traceback.format_exc()
print(f"处理请求时出错: {error_trace}")
error_msg = json.dumps({
"jsonrpc": "2.0",
"result": {"message": str(e)},
"error": {"code": -32603, "message": "Internal error", "data": str(e)},
"id": request.get("id") if request else None
})
await log_message("下发", error_msg)
try:
await websocket.send(error_msg)
except Exception:
pass
except websockets.exceptions.ConnectionClosed:
print("WebSocket连接关闭")
finally:
# 清理BLE连接和通知
self._shutdown = True
if self.client and self.client.is_connected:
await self.client.disconnect()
self.client = None
self.target_device = None
try:
# 停止所有通知
for characteristic_id in list(self._notification_callbacks.keys()):
try:
await self.client.stop_notify(characteristic_id)
except Exception as e:
print(f"停止通知失败 {characteristic_id}: {e}")
# 断开连接
await self.client.disconnect()
except Exception as e:
print(f"断开BLE连接失败: {e}")
finally:
self.client = None
self.target_device = None
self._notification_callbacks.clear()
self.notification_records.clear()
def notification_handler(self, websocket, service_id, characteristic_id):
async def callback(sender, data):
current_time = asyncio.get_event_loop().time()
last_message, last_time = self.notification_records[characteristic_id]
# 解码当前数据用于比较
current_message = base64.b64encode(data).decode('utf-8')
print('notification_handler current_message',base64.b64decode(current_message))
# 过滤逻辑
if current_message == last_message and (current_time - last_time) < 0.5:
return
# 更新记录
self.notification_records[characteristic_id] = (current_message, current_time)
response = json.dumps({
"jsonrpc": "2.0",
"method": "characteristicDidChange",
"params": {
"serviceId": service_id,
"characteristicId": characteristic_id,
"message": current_message
}
})
await log_message("下发", response)
await websocket.send(response)
try:
# 检查websocket是否已关闭
if self.websocket is None or self.websocket.closed or self._shutdown:
return
current_time = time.time() # 使用time.time()而不是event_loop.time()
last_message, last_time = self.notification_records[characteristic_id]
# 解码当前数据用于比较
current_message = base64.b64encode(data).decode('utf-8')
print('notification_handler current_message',data)
# 过滤逻辑 - 只在0.5秒内且消息完全相同时跳过
if current_message == last_message and (current_time - last_time) < 0.5:
return
# 更新记录
self.notification_records[characteristic_id] = (current_message, current_time)
response = json.dumps({
"jsonrpc": "2.0",
"method": "characteristicDidChange",
"params": {
"serviceId": service_id,
"characteristicId": characteristic_id,
"message": current_message
}
})
# 使用非阻塞日志
try:
await log_message("下发", response)
except Exception as e:
print(f"日志写入失败: {e}")
# 发送响应
try:
await websocket.send(response)
except websockets.exceptions.ConnectionClosed:
print("WebSocket连接已关闭,停止发送通知")
self._shutdown = True
except Exception as e:
print(f"发送通知失败: {e}")
except Exception as e:
print(f"通知处理器出错: {e}")
return callback
async def check_port_and_start_server(port=20111, host='localhost'):
......@@ -320,7 +506,11 @@ async def check_port_and_start_server(port=20111, host='localhost'):
print(f"端口 {port} 可用,正在启动服务...")
server = await websockets.serve(
lambda websocket, path: BLEClient().handle_client(websocket, path),
host, port
host, port,
ping_interval=20, # 每20秒发送ping
ping_timeout=10, # 等待pong超时10秒
max_size=2 * 1024 * 1024, # 最大消息大小2MB
close_timeout=5, # 关闭超时5秒
)
print(f"WebSocket服务已启动: ws://{host}:{port}/scratch/ble")
......@@ -343,7 +533,21 @@ async def check_port_and_start_server(port=20111, host='localhost'):
async def main():
server = await check_port_and_start_server()
if server:
await asyncio.Future() # 保持服务器运行
try:
await asyncio.Future() # 保持服务器运行
finally:
# 清理日志系统
global _log_queue, _log_task
if _log_queue is not None:
try:
await _log_queue.put(None)
except Exception:
pass
if _log_task is not None:
try:
await _log_task
except Exception:
pass
if __name__ == "__main__":
asyncio.run(main())
\ No newline at end of file
......
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!