Skip to content
Toggle navigation
Toggle navigation
This project
Loading...
Sign in
wenzhou
/
pythonserver
Go to a project
Toggle navigation
Toggle navigation pinning
Projects
Groups
Snippets
Help
Project
Activity
Repository
Pipelines
Graphs
Issues
0
Merge Requests
0
Wiki
Network
Create a new issue
Builds
Commits
Issue Boards
Files
Commits
Network
Compare
Branches
Tags
54ac76a2
authored
2025-04-10 20:52:01 +0800
by
huangyf2
Browse Files
Options
Browse Files
Tag
Download
Email Patches
Plain Diff
new
1 parent
4696f3d6
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
86 additions
and
11 deletions
b.log
ble_data.log
s6.9.2.py
s6.9.2.spec
b.log
View file @
54ac76a
This diff is collapsed.
Click to expand it.
ble_data.log
0 → 100644
View file @
54ac76a
2025-04-09 21:55:22,617 - websockets.server - INFO - server listening on [::1]:20111
2025-04-09 21:55:22,617 - websockets.server - INFO - server listening on 127.0.0.1:20111
2025-04-09 21:55:30,670 - websockets.server - INFO - connection open
2025-04-09 21:56:04,105 - websockets.server - INFO - server listening on [::1]:20111
2025-04-09 21:56:04,105 - websockets.server - INFO - server listening on 127.0.0.1:20111
2025-04-09 21:56:21,751 - websockets.server - INFO - connection open
s6.9.2.py
View file @
54ac76a
...
...
@@ -7,6 +7,8 @@ import json
import
base64
import
threading
from
collections
import
defaultdict
import
socket
import
time
import
platform
...
...
@@ -36,6 +38,38 @@ async def log_message(direction, message):
loop
=
asyncio
.
get_event_loop
()
await
loop
.
run_in_executor
(
None
,
log_message_sync
,
direction
,
message
)
def
is_port_in_use
(
port
,
host
=
'localhost'
):
"""检查端口是否被占用"""
with
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
as
s
:
try
:
s
.
bind
((
host
,
port
))
return
False
except
socket
.
error
:
return
True
async
def
self_test
(
websocket
):
"""发送自检测试消息"""
test_message
=
json
.
dumps
({
"jsonrpc"
:
"2.0"
,
"method"
:
"ping"
,
"params"
:
{
"timestamp"
:
int
(
time
.
time
())},
"id"
:
"test"
})
await
log_message
(
"自测"
,
test_message
)
await
websocket
.
send
(
test_message
)
try
:
# 等待响应,设置超时
response
=
await
asyncio
.
wait_for
(
websocket
.
recv
(),
timeout
=
5.0
)
await
log_message
(
"自测响应"
,
response
)
return
True
except
asyncio
.
TimeoutError
:
await
log_message
(
"自测"
,
"自测超时,未收到响应"
)
return
False
except
Exception
as
e
:
await
log_message
(
"自测"
,
f
"自测异常: {str(e)}"
)
return
False
class
BLEClient
:
def
__init__
(
self
):
self
.
target_device
=
None
...
...
@@ -210,6 +244,16 @@ class BLEClient:
})
await
log_message
(
"下发"
,
response
)
await
websocket
.
send
(
response
)
elif
method
==
"ping"
:
# 处理ping请求,返回pong响应
response
=
json
.
dumps
({
"jsonrpc"
:
"2.0"
,
"result"
:
{
"pong"
:
True
,
"timestamp"
:
int
(
time
.
time
())},
"id"
:
request_id
})
await
log_message
(
"下发"
,
response
)
await
websocket
.
send
(
response
)
except
json
.
JSONDecodeError
:
error_msg
=
json
.
dumps
({
...
...
@@ -262,14 +306,39 @@ class BLEClient:
await
websocket
.
send
(
response
)
return
callback
async
def
main
():
async
with
websockets
.
serve
(
async
def
check_port_and_start_server
(
port
=
20111
,
host
=
'localhost'
):
"""检查端口并启动服务器"""
if
is_port_in_use
(
port
,
host
):
print
(
f
"错误: 端口 {port} 已被占用,无法启动服务"
)
return
False
print
(
f
"端口 {port} 可用,正在启动服务..."
)
server
=
await
websockets
.
serve
(
lambda
websocket
,
path
:
BLEClient
()
.
handle_client
(
websocket
,
path
),
"localhost"
,
20111
):
print
(
"WebSocket服务已启动: ws://localhost:20111/scratch/ble"
)
print
(
"日志文件路径: ./b.log"
)
await
asyncio
.
Future
()
host
,
port
)
print
(
f
"WebSocket服务已启动: ws://{host}:{port}/scratch/ble"
)
print
(
"日志文件路径: ./b.log"
)
# 执行自检测试
try
:
async
with
websockets
.
connect
(
f
"ws://{host}:{port}/scratch/ble"
)
as
websocket
:
print
(
"正在执行自检测试..."
)
test_result
=
await
self_test
(
websocket
)
if
test_result
:
print
(
"自检测试成功: 服务正常运行"
)
else
:
print
(
"自检测试失败: 服务可能存在问题"
)
except
Exception
as
e
:
print
(
f
"自检测试异常: {str(e)}"
)
return
server
async
def
main
():
server
=
await
check_port_and_start_server
()
if
server
:
await
asyncio
.
Future
()
# 保持服务器运行
if
__name__
==
"__main__"
:
asyncio
.
run
(
main
())
\ No newline at end of file
...
...
s6.9.2.spec
View file @
54ac76a
...
...
@@ -3,14 +3,14 @@
a
=
Analysis
(
[
's6.9.2.py'
],
pathex
=
[],
binaries
=
[],
pathex
=
[
'C:
\\
Python
\\
Lib
\\
site-packages
\\
bleak
\\
backends'
],
binaries
=
[
(
'C:
\\
Windows
\\
System32
\\
BluetoothApis.dll'
,
'.'
)
],
datas
=
[(
'data'
,
'data'
)],
hiddenimports
=
[
'bleak.backends'
,
'asyncio'
],
hiddenimports
=
[
'bleak.backends'
,
'
bleak.backends.winrt'
,
'
asyncio'
],
hookspath
=
[],
hooksconfig
=
{},
runtime_hooks
=
[],
excludes
=
[],
excludes
=
[
'unnecessary_module'
],
noarchive
=
False
,
optimize
=
0
,
)
...
...
Write
Preview
Styling with
Markdown
is supported
Attach a file
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to post a comment