新建PLC类 PLC.py
python">import json
import time
from threading import Thread
from HslCommunication import SiemensS7Net, SiemensPLCS
from PySide6.QtCore import QThread, Signal, QObject
from tdm.MsgType import MSG_TYPE_LOG, MSG_TYPE_MSGBOX
# 自定义信号类,用于与主线程进行交互
class WorkSingle(QObject):
msg = Signal(str)
class PLC(QThread):
def __init__(self, address, port):
super().__init__()
self.siemens = None # 西门子对象
self.connected_plc = None # 是否连接上了PLC
self.signal = WorkSingle() # 信道,与主线程通信
self.address = address # PLC IP地址
self.port = port # PLC 连接端口
# 发送日志消息
def send_log_msg(self, msg):
data = {
"type": MSG_TYPE_LOG,
"msg": msg,
}
self.signal.msg.emit(json.dumps(data))
# 发送弹窗类消息
def send_dialog_msg(self, msg):
data = {
"type": MSG_TYPE_MSGBOX,
"msg": msg,
}
self.signal.msg.emit(json.dumps(data))
def init_plc_connect(self):
self.siemens = SiemensS7Net(SiemensPLCS.S1200, self.address)
self.siemens.port = int(self.port)
connect = self.siemens.ConnectServer()
if not connect.IsSuccess:
self.connected_plc = False
# print('初始化 PLC 连接失败: ' + connect.ToMessageShowString())
self.send_log_msg('初始化 PLC 连接失败: ' + connect.ToMessageShowString())
else:
self.connected_plc = True
print("初始化 PLC 连接成功!")
self.send_log_msg("初始化 PLC 连接成功!")
def run(self):
self.init_plc_connect() # 初始化PLC连接
if not self.connected_plc:
print("PLC连接失败,不能连续读取")
self.send_log_msg("PLC连接失败,不能连续读取")
return
print("TODO 开始连续读取PLC值")
self.send_dialog_msg("AAAAA")
Thread(target=self.read_plc_value).start()
print("即将执行耗时操作,读取完毕1")
self.send_log_msg("即将执行耗时操作,开启新的线程去执行")
self.send_dialog_msg("CCCCC")
def read_plc_value(self):
time.sleep(10)
print(self.port)
self.send_dialog_msg("BBBBBBBB")
print("耗时操作,读取完毕")
self.send_log_msg("耗时操作,读取完毕")
创建 TDMSocketClient.py
python">import asyncio
import json
import time
import traceback
from threading import Thread
import websockets
from PyQt6.QtCore import QThread
from PySide6.QtCore import QObject, Signal
from tdm.MsgType import MSG_TYPE_LOG
# 自定义信号类,用于与主线程进行交互
class WorkSingle(QObject):
msg = Signal(str)
class TDMSocketClient(QThread):
def __init__(self, ip, port):
QThread.__init__(self)
self.ip = ip
self.port = port
self.signal = WorkSingle()
def run(self):
# 启动 WebSocket 客户端连接服务器,用于接受服务器发来的试验通知
Thread(target=self.runWSClient).start()
def send_log_msg(self, msg):
data = {
"type": MSG_TYPE_LOG,
"msg": msg,
}
self.signal.msg.emit(json.dumps(data))
def runWSClient(self):
# 下面的语句替换上面这一句,就不会报错啦,成功率高
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(self.ws_client())
loop.run_until_complete(asyncio.wait([task]))
st = task.result()
async def ws_client(self):
try:
uri = 'ws://' + self.ip + ':' + self.port + '/tdm/websocket/JZ076'
async with websockets.connect(uri) as websocket:
# localClient.setSuccessStatus("websocket 连接成功!")
print('Connected to ' + self.ip + ':' + self.port + "success")
self.send_log_msg('Connected to ' + self.ip + ':' + self.port + "success")
while True:
result = await websocket.recv()
msg = json.loads(result)
try:
if msg["cmd"] == "startTest":
print("开始试验")
# 开始试验
# localClient.startTest(msg['testInsId'])
elif msg["cmd"] == "pauseTest":
# 暂停试验
# localClient.testPause()
print("暂停试验")
elif msg["cmd"] == "stopTest":
print("停止试验")
# 停止试验
# localClient.testStop()
except:
# 报错的话自动重连
traceback.print_exc()
except:
# 报错的话自动重连
traceback.print_exc()
time.sleep(1)
await self.ws_client()
主类 TDMClient.py
python">'''
TDM 客户端
'''
import json
from typing import Final
from PyQt6.QtWidgets import QMessageBox
from PySide6.QtCore import QFile
from PySide6.QtUiTools import QUiLoader
from PySide6.QtWidgets import QApplication
from tdm.MsgType import MSG_TYPE_LOG, MSG_TYPE_MSGBOX
from tdm.PLC import PLC
from tdm.TDMSocketClient import TDMSocketClient
PLC_ADDRESS: Final = "192.168.1.83"
PLC_PORT: Final = "102"
WS_SERVICE_ADDRESS: Final = "localhost"
WS_SERVICE_PORT: Final = "8080"
# 显示弹窗
def show_msg_box(msg):
mb = QMessageBox()
mb.setText(msg)
mb.exec()
class TDMClient:
def __init__(self):
# 获取设计文件
qf = QFile('ui/TDMLocalClient.ui')
qf.open(QFile.ReadOnly)
qf.close()
# 将设计文件加载为窗口
self.ui = QUiLoader().load(qf)
# PLC 读取相关
self.plc = PLC(PLC_ADDRESS, PLC_PORT)
self.plc.signal.msg.connect(self.receive_signal)
self.ws = TDMSocketClient(WS_SERVICE_ADDRESS, WS_SERVICE_PORT)
self.ws.signal.msg.connect(self.receive_signal)
def run(self):
self.ui.show() # 显示客户端
self.plc.run() # 获取试验器的点位列表,并连续读取
self.ws.run() # 启动 WebSocket 客户端服务,用于接收服务器发来的 试验开始、试验暂停、试验停止等信息
# 收到子线程信号是触发(唯一方法入口)
def receive_signal(self, msg):
msg_obj = json.loads(msg)
if msg_obj['type'] == MSG_TYPE_LOG:
self.ui.retView.appendPlainText(str(msg_obj['msg']))
elif msg_obj['type'] == MSG_TYPE_MSGBOX:
show_msg_box(str(msg_obj['msg']))
app = QApplication([])
tdmClient = TDMClient()
tdmClient.run()
app.exec()