Skip to content
Toggle navigation
Toggle navigation
This project
Loading...
Sign in
wenzhou
/
pythonserver
Go to a project
Toggle navigation
Toggle navigation pinning
Projects
Groups
Snippets
Help
Project
Activity
Repository
Pipelines
Graphs
Issues
0
Merge Requests
0
Wiki
Network
Create a new issue
Builds
Commits
Issue Boards
Files
Commits
Network
Compare
Branches
Tags
26c8727b
authored
2025-04-06 21:41:31 +0800
by
huangyf2
Browse Files
Options
Browse Files
Tag
Download
Email Patches
Plain Diff
s6.9.2
1 parent
6542d06b
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
275 additions
and
0 deletions
s6.9.2.py
s6.9.2.py
0 → 100644
View file @
26c8727
# -*- coding: utf-8 -*-
import
sys
import
asyncio
import
websockets
from
bleak
import
BleakScanner
,
BleakClient
import
json
import
base64
import
threading
from
collections
import
defaultdict
import
platform
# 方法1:通过sys模块快速判断(推荐)
if
sys
.
platform
.
startswith
(
'win32'
):
import
winrt.windows.foundation.collections
# noqa
import
winrt.windows.devices.bluetooth
# noqa
import
winrt.windows.devices.bluetooth.advertisement
# noq
print
(
"当前运行在Windows系统"
)
elif
sys
.
platform
.
startswith
(
'linux'
):
print
(
"当前运行在Linux系统"
)
elif
sys
.
platform
.
startswith
(
'darwin'
):
print
(
"当前运行在macOS系统"
)
# 添加线程锁以确保日志写入的原子性
write_lock
=
threading
.
Lock
()
def
log_message_sync
(
direction
,
message
):
"""同步日志记录函数"""
log_entry
=
f
"{direction}: {message}
\n
"
print
(
log_entry
,
end
=
''
)
# 控制台仍然输出
with
write_lock
:
with
open
(
'b.log'
,
'a'
,
encoding
=
'utf-8'
)
as
f
:
f
.
write
(
log_entry
)
async
def
log_message
(
direction
,
message
):
"""异步封装日志记录"""
loop
=
asyncio
.
get_event_loop
()
await
loop
.
run_in_executor
(
None
,
log_message_sync
,
direction
,
message
)
class
BLEClient
:
def
__init__
(
self
):
self
.
target_device
=
None
self
.
client
=
None
self
.
services
=
[]
self
.
optional_services
=
[]
self
.
websocket
=
None
self
.
notification_records
=
defaultdict
(
lambda
:
(
None
,
0
))
# 特征ID: (最后消息, 时间戳)
def
on_disconnect
(
self
,
client
):
print
(
"BLE连接断开,关闭WebSocket"
)
if
self
.
websocket
and
not
self
.
websocket
.
closed
:
asyncio
.
create_task
(
self
.
close_websocket
())
async
def
close_websocket
(
self
):
await
self
.
websocket
.
close
()
self
.
websocket
=
None
def
detection_callback
(
self
,
device
,
advertisement_data
):
if
any
(
service_uuid
in
advertisement_data
.
service_uuids
for
service_uuid
in
self
.
services
):
self
.
target_device
=
(
device
,
advertisement_data
)
if
not
self
.
target_device
:
print
(
"未找到匹配设备"
)
return
else
:
device
,
adv_data
=
self
.
target_device
print
(
"
\n
找到目标设备:"
)
print
(
f
"设备名称: {device.name}"
)
print
(
f
"设备地址: {device.address}"
)
print
(
f
"信号强度: {device.rssi} dBm"
)
print
(
"
\n
广播信息:"
)
print
(
f
"服务UUID列表: {adv_data.service_uuids}"
)
print
(
f
"制造商数据: {adv_data.manufacturer_data}"
)
print
(
f
"服务数据: {adv_data.service_data}"
)
print
(
f
"本地名称: {adv_data.local_name}"
)
return
self
.
target_device
async
def
handle_client
(
self
,
websocket
,
path
):
self
.
websocket
=
websocket
if
path
!=
"/scratch/ble"
:
await
websocket
.
close
(
code
=
1003
,
reason
=
"Path not allowed"
)
return
try
:
async
for
message
in
websocket
:
try
:
await
log_message
(
"接收"
,
message
)
request
=
json
.
loads
(
message
)
if
request
[
"jsonrpc"
]
!=
"2.0"
:
continue
method
=
request
.
get
(
"method"
)
params
=
request
.
get
(
"params"
,
{})
request_id
=
request
.
get
(
"id"
)
if
method
==
"discover"
:
self
.
services
=
[]
for
filt
in
params
.
get
(
"filters"
,
[{}]):
self
.
services
.
extend
(
filt
.
get
(
"services"
,
[]))
self
.
optional_services
=
params
.
get
(
"optionalServices"
,
[])
scanner
=
BleakScanner
(
scanning_mode
=
"active"
)
scanner
.
register_detection_callback
(
self
.
detection_callback
)
max_retries
=
3
found
=
False
for
attempt
in
range
(
max_retries
):
self
.
target_device
=
None
await
scanner
.
start
()
await
asyncio
.
sleep
(
5
)
await
scanner
.
stop
()
if
self
.
target_device
:
found
=
True
break
if
attempt
<
max_retries
-
1
:
print
(
f
"未找到设备,第{attempt+1}次重试..."
)
await
asyncio
.
sleep
(
3
)
if
found
:
device
,
adv_data
=
self
.
target_device
discover_response
=
json
.
dumps
({
"jsonrpc"
:
"2.0"
,
"method"
:
"didDiscoverPeripheral"
,
"params"
:
{
"name"
:
device
.
name
,
"peripheralId"
:
device
.
address
,
"rssi"
:
device
.
rssi
}
})
await
log_message
(
"下发"
,
discover_response
)
await
websocket
.
send
(
discover_response
)
result_response
=
json
.
dumps
({
"jsonrpc"
:
"2.0"
,
"result"
:
None
,
"id"
:
request_id
})
await
log_message
(
"下发"
,
result_response
)
await
websocket
.
send
(
result_response
)
elif
method
==
"connect"
:
peripheral_id
=
params
.
get
(
"peripheralId"
)
if
peripheral_id
:
self
.
client
=
BleakClient
(
peripheral_id
)
self
.
client
.
set_disconnected_callback
(
self
.
on_disconnect
)
await
self
.
client
.
connect
()
if
self
.
client
.
is_connected
:
response
=
json
.
dumps
({
"jsonrpc"
:
"2.0"
,
"result"
:
None
,
"id"
:
request_id
})
await
log_message
(
"下发"
,
response
)
await
websocket
.
send
(
response
)
elif
method
==
"write"
:
service_id
=
params
.
get
(
"serviceId"
)
characteristic_id
=
params
.
get
(
"characteristicId"
)
message
=
params
.
get
(
"message"
)
encoding
=
params
.
get
(
"encoding"
,
"utf-8"
)
if
all
([
service_id
,
characteristic_id
,
message
]):
if
encoding
==
"base64"
:
message_bytes
=
base64
.
b64decode
(
message
)
else
:
message_bytes
=
message
.
encode
(
encoding
)
await
self
.
client
.
write_gatt_char
(
characteristic_id
,
message_bytes
)
response
=
json
.
dumps
({
"jsonrpc"
:
"2.0"
,
"result"
:
None
,
"id"
:
request_id
})
await
log_message
(
"下发"
,
response
)
await
websocket
.
send
(
response
)
elif
method
==
"read"
:
service_id
=
params
.
get
(
"serviceId"
)
characteristic_id
=
params
.
get
(
"characteristicId"
)
if
all
([
service_id
,
characteristic_id
]):
data
=
await
self
.
client
.
read_gatt_char
(
characteristic_id
)
response
=
json
.
dumps
({
"jsonrpc"
:
"2.0"
,
"result"
:
{
"serviceId"
:
service_id
,
"characteristicId"
:
characteristic_id
,
"message"
:
base64
.
b64encode
(
data
)
.
decode
(
"utf-8"
)
},
"id"
:
request_id
})
await
log_message
(
"下发"
,
response
)
await
websocket
.
send
(
response
)
elif
method
==
"startNotifications"
:
service_id
=
params
.
get
(
"serviceId"
)
characteristic_id
=
params
.
get
(
"characteristicId"
)
if
all
([
service_id
,
characteristic_id
]):
await
self
.
client
.
start_notify
(
characteristic_id
,
self
.
notification_handler
(
websocket
,
service_id
,
characteristic_id
)
)
response
=
json
.
dumps
({
"jsonrpc"
:
"2.0"
,
"result"
:
None
,
"id"
:
request_id
})
await
log_message
(
"下发"
,
response
)
await
websocket
.
send
(
response
)
except
json
.
JSONDecodeError
:
error_msg
=
json
.
dumps
({
"jsonrpc"
:
"2.0"
,
"result"
:
{
"message"
:
"Parse error"
},
"id"
:
None
})
await
log_message
(
"下发"
,
error_msg
)
except
Exception
as
e
:
error_msg
=
json
.
dumps
({
"jsonrpc"
:
"2.0"
,
"result"
:
{
"message"
:
str
(
e
)},
"id"
:
request
.
get
(
"id"
)
if
request
else
None
})
await
log_message
(
"下发"
,
error_msg
)
except
websockets
.
exceptions
.
ConnectionClosed
:
print
(
"WebSocket连接关闭"
)
finally
:
if
self
.
client
and
self
.
client
.
is_connected
:
await
self
.
client
.
disconnect
()
self
.
client
=
None
self
.
target_device
=
None
def
notification_handler
(
self
,
websocket
,
service_id
,
characteristic_id
):
async
def
callback
(
sender
,
data
):
current_time
=
asyncio
.
get_event_loop
()
.
time
()
last_message
,
last_time
=
self
.
notification_records
[
characteristic_id
]
# 解码当前数据用于比较
current_message
=
base64
.
b64encode
(
data
)
.
decode
(
'utf-8'
)
# 过滤逻辑
if
current_message
==
last_message
and
(
current_time
-
last_time
)
<
0.5
:
return
# 更新记录
self
.
notification_records
[
characteristic_id
]
=
(
current_message
,
current_time
)
response
=
json
.
dumps
({
"jsonrpc"
:
"2.0"
,
"method"
:
"characteristicDidChange"
,
"params"
:
{
"serviceId"
:
service_id
,
"characteristicId"
:
characteristic_id
,
"message"
:
current_message
}
})
await
log_message
(
"下发"
,
response
)
await
websocket
.
send
(
response
)
return
callback
async
def
main
():
async
with
websockets
.
serve
(
lambda
websocket
,
path
:
BLEClient
()
.
handle_client
(
websocket
,
path
),
"localhost"
,
20111
):
print
(
"WebSocket服务已启动: ws://localhost:20111/scratch/ble"
)
print
(
"日志文件路径: ./b.log"
)
await
asyncio
.
Future
()
if
__name__
==
"__main__"
:
asyncio
.
run
(
main
())
\ No newline at end of file
Write
Preview
Styling with
Markdown
is supported
Attach a file
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to post a comment