92bcaa4a by huangyf2

1

1 parent 61a6b1a6
dist
build
__pycache__
\ No newline at end of file
No preview for this file type
r'r'rv
\ No newline at end of file
This file is too large to display.
##
重要文件就是这个了
##
tray_icon.py 是来进行显示系统托盘图标的
##
主函数 we.py
使用了 注释和引用图标逻辑
# -*- coding: utf-8 -*--
import asyncio
import websockets
from bleak import BleakScanner, BleakClient
import json
import base64
import threading
from tray_icon import start_tray_icon
start_tray_icon("app.ico")
## 打包需要python3 然后
使用 pip cache dir 观察python3 的路径
安装环境
##
pip install pyinstaller
pip install bleak
pip install pystray pillow
##
打包指令 产生一个带窗口的程序 打包完后可以随便改exe 名字
C:\Users\Admin\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.13_qbz5n2kfra8p0\LocalCache\local-packages\Python313\Scripts\pyinstaller.exe --onefile --icon=data\app.ico we.py --add-data "data;data"
##
端口占用
netstat -ano | findstr :201111
taskkill /PID /F 1234
##
app.ico 需要有特定格式 不能随便图片
##
运行时的路径 和打包后的路径 是不一致的 如何解决 见具体函数
icon_path = resource_path(resource_path("data/app.ico"))
import pystray
from PIL import Image, ImageDraw
import threading
def create_image(width, height, color1, color2):
image = Image.new("RGB", (width, height), color1)
dc = ImageDraw.Draw(image)
dc.rectangle((width // 2, 0, width, height // 2), fill=color2)
return image
def run_icon(icon_path):
image = Image.open(icon_path) # 加载自定义图标
icon = pystray.Icon("test_icon", image, "My System Tray Icon")
icon.run()
def start_tray_icon(icon_path):
threading.Thread(target=run_icon, args=(icon_path,), daemon=True).start()
\ No newline at end of file
# -*- coding: utf-8 -*--
import asyncio
import websockets
from bleak import BleakScanner, BleakClient
import json
import base64
import threading
from tray_icon import start_tray_icon
import os
import sys
def resource_path(relative_path):
try:
base_path = sys._MEIPASS # 打包后运行时的临时路径
except AttributeError:
base_path = os.path.dirname(os.path.abspath(__file__)) # 开发环境下的路径
# 使用 os.path.join 拼接路径,并通过 os.path.abspath 转换为绝对路径
return os.path.abspath(os.path.join(base_path, relative_path))
def resource_path(relative_path):
"""获取资源文件的绝对路径,适用于打包后的程序"""
try:
base_path = sys._MEIPASS # 打包后运行时的临时路径
except AttributeError:
base_path = os.path.abspath(".") # 开发环境下的路径
return os.path.join(base_path, relative_path)
# 获取图标路径
icon_path = resource_path(resource_path("data/app.ico"))
# 调用系统托盘图标函数
start_tray_icon(icon_path)
# 添加线程锁以确保日志写入的原子性
write_lock = threading.Lock()
def log_message_sync(direction, message):
"""同步日志记录函数"""
log_entry = f"{direction}: {message}\n"
print(log_entry, end='') # 控制台仍然输出
with write_lock:
with open('a.log', 'a', encoding='utf-8') as f:
f.write(log_entry)
async def log_message(direction, message):
"""异步封装日志记录"""
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, log_message_sync, direction, message)
class BLEClient:
def __init__(self):
self.target_device = None
self.client = None
self.services = []
self.optional_services = []
self.websocket = None
def on_disconnect(self, client):
print("BLE连接断开,关闭WebSocket")
if self.websocket and not self.websocket.closed:
asyncio.create_task(self.close_websocket())
async def close_websocket(self):
await self.websocket.close()
self.websocket = None
def detection_callback(self, device, advertisement_data):
if any(service_uuid in advertisement_data.service_uuids for service_uuid in self.services):
self.target_device = (device, advertisement_data)
if not self.target_device:
print("未找到匹配设备")
return
else:
device, adv_data = self.target_device
print("\n找到目标设备:")
print(f"设备名称: {device.name}")
print(f"设备地址: {device.address}")
print(f"信号强度: {device.rssi} dBm")
print("\n广播信息:")
print(f"服务UUID列表: {adv_data.service_uuids}")
print(f"制造商数据: {adv_data.manufacturer_data}")
print(f"服务数据: {adv_data.service_data}")
print(f"本地名称: {adv_data.local_name}")
return self.target_device
async def handle_client(self, websocket, path):
self.websocket = websocket
if path != "/scratch/ble":
await websocket.close(code=1003, reason="Path not allowed")
return
try:
async for message in websocket:
try:
# 记录接收到的消息
await log_message("接收", message)
request = json.loads(message)
if request["jsonrpc"] != "2.0":
response = json.dumps({
"jsonrpc": "2.0",
"result": {"message": "Invalid Request"},
"id": request.get("id", None)
})
await log_message("下发", response)
await websocket.send(response)
continue
method = request.get("method")
params = request.get("params", {})
request_id = request.get("id")
if method == "discover":
self.services = params.get("filters", [{}])[0].get("services", [])
self.optional_services = params.get("optionalServices", [])
scanner = BleakScanner()
scanner.register_detection_callback(self.detection_callback)
# 添加重试机制
max_retries = 3
found = False
for attempt in range(max_retries):
self.target_device = None # 重置目标设备
await scanner.start()
await asyncio.sleep(5)
await scanner.stop()
if self.target_device:
found = True
break
if attempt < max_retries - 1: # 最后一次不等待
print(f"未找到设备,第{attempt+1}次重试...")
await asyncio.sleep(3)
if not found:
# response = json.dumps({
# "jsonrpc": "2.0",
# "result": {"message": "Device not found"},
# "id": request_id
# })
# await log_message("下发", response)
# await websocket.send(response)
continue
device, adv_data = self.target_device
discover_response = json.dumps({
"jsonrpc": "2.0",
"method": "didDiscoverPeripheral",
"params": {
"name": device.name,
"peripheralId": device.address,
"rssi": device.rssi
}
})
await log_message("下发", discover_response)
await websocket.send(discover_response)
result_response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", result_response)
await websocket.send(result_response)
elif method == "connect":
peripheral_id = params.get("peripheralId")
if not peripheral_id:
response = json.dumps({
"jsonrpc": "2.0",
"result": {"message": "Invalid params"},
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
continue
self.client = BleakClient(peripheral_id)
self.client.set_disconnected_callback(self.on_disconnect)
await self.client.connect()
if self.client.is_connected:
print(f"已连接至设备: {peripheral_id}")
response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
else:
response = json.dumps({
"jsonrpc": "2.0",
"result": {"message": "Failed to connect"},
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
elif method == "write":
service_id = params.get("serviceId")
characteristic_id = params.get("characteristicId")
message = params.get("message")
encoding = params.get("encoding", "utf-8")
if not all([service_id, characteristic_id, message]):
response = json.dumps({
"jsonrpc": "2.0",
"result": {"message": "Invalid params"},
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
continue
if encoding == "base64":
message_bytes = base64.b64decode(message)
else:
message_bytes = message.encode(encoding)
await self.client.write_gatt_char(characteristic_id, message_bytes)
response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
elif method == "read":
service_id = params.get("serviceId")
characteristic_id = params.get("characteristicId")
if not all([service_id, characteristic_id]):
response = json.dumps({
"jsonrpc": "2.0",
"result": {"message": "Invalid params"},
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
continue
data = await self.client.read_gatt_char(characteristic_id)
response = json.dumps({
"jsonrpc": "2.0",
"result": {
"serviceId": service_id,
"characteristicId": characteristic_id,
"message": base64.b64encode(data).decode("utf-8")
},
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
elif method == "startNotifications":
service_id = params.get("serviceId")
characteristic_id = params.get("characteristicId")
if not all([service_id, characteristic_id]):
response = json.dumps({
"jsonrpc": "2.0",
"result": {"message": "Invalid params"},
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
continue
await self.client.start_notify(
characteristic_id,
self.notification_handler(websocket, service_id, characteristic_id)
)
response = json.dumps({
"jsonrpc": "2.0",
"result": None,
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
else:
response = json.dumps({
"jsonrpc": "2.0",
"result": {"message": "Method not found"},
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
except json.JSONDecodeError:
response = json.dumps({
"jsonrpc": "2.0",
"result": {"message": "Parse error"},
"id": None
})
await log_message("下发", response)
await websocket.send(response)
except Exception as e:
response = json.dumps({
"jsonrpc": "2.0",
"result": {"message": str(e)},
"id": request_id
})
await log_message("下发", response)
await websocket.send(response)
except websockets.exceptions.ConnectionClosedOK:
print("WebSocket客户端正常断开")
except websockets.exceptions.ConnectionClosedError as e:
print(f"WebSocket客户端异常断开: {e.code} - {e.reason}")
finally:
if self.client and self.client.is_connected:
await self.client.disconnect()
print("BLE设备已主动断开")
self.client = None
self.target_device = None
def notification_handler(self, websocket, service_id, characteristic_id):
async def callback(sender, data):
response = json.dumps({
"jsonrpc": "2.0",
"method": "characteristicDidChange",
"params": {
"serviceId": service_id,
"characteristicId": characteristic_id,
"message": base64.b64encode(data).decode("utf-8")
}
})
await log_message("下发", response)
await websocket.send(response)
return callback
async def main():
async with websockets.serve(
lambda websocket, path: BLEClient().handle_client(websocket, path),
"localhost", 20111
):
print("WebSocket服务已启动: ws://localhost:20111/scratch/ble")
print("日志文件路径: ./b.log")
await asyncio.Future() # 永久运行
if __name__ == "__main__":
asyncio.run(main())
\ No newline at end of file
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!