CODESYS软PLC如何与Python程序‘对话’?一个共享内存通信的保姆级配置指南
CODESYS软PLC与Python的高效数据交换共享内存通信实战指南工业自动化领域正经历着前所未有的智能化变革传统PLC系统与高级算法如机器视觉、深度学习的协同工作成为刚需。本文将深入解析如何通过共享内存技术在CODESYS软PLC与Python程序之间建立毫秒级延迟的数据通道实现实时控制与智能分析的完美融合。1. 共享内存通信的技术原理与优势共享内存Shared Memory是进程间通信IPC中最快的方式之一它允许多个进程直接访问同一块物理内存区域。在工业控制场景中这种机制特别适合解决以下典型问题实时系统与非实时系统的数据桥接CODESYS提供的硬实时环境与Python运行的通用操作系统环境存在天然的时序隔离大数据量低延迟传输相比Socket等网络通信方式共享内存省去了数据序列化/反序列化开销确定性响应内存访问时间可预测不受网络波动影响关键技术指标对比通信方式延迟(μs)吞吐量(MB/s)适用场景共享内存0.5-25000同主机进程间高速交换Unix域套接字10-20800-1200同主机进程间可靠通信TCP/IP100-500100-500跨网络设备通信提示在x86架构的英特尔开发者套件上共享内存的实际传输速率可达PCIe总线带宽上限2. CODESYS环境配置2.1 系统组件安装确保CODESYS开发环境推荐V3.5 SP17以上版本已安装以下关键组件# 在CODESYS包管理器中搜索安装 1. CODESYS Control for Linux SL 2. CODESYS Edge Gateway for Linux 3. Shared Memory Communication Library2.2 工程初始化步骤新建工程时选择设备类型为CODESYS Control for Linux SL添加必要的系统库(* 在库管理器中添加 *) SysShm,3.5.8.0 (System) SysTypes2 Interfaces,3.5.4.0 (System)配置设备连接参数Device Address192.168.1.100/Address Port1217/Port Usernameadmin/Username Passwordcodesys/Password /Device2.3 运行时部署通过Update Linux工具将Runtime部署到目标设备# 在Linux终端验证安装 $ systemctl status codesyscontrol ● codesyscontrol.service - CODESYS Control Loaded: loaded (/etc/systemd/system/codesyscontrol.service) Active: active (running) since Thu 2023-08-17 14:20:18 CST3. 数据接口设计与实现3.1 数据结构定义创建两个结构体类型分别处理输入输出数据TYPE Str_ParaFromHMI : STRUCT fIn: LREAL; // 来自Python的输入 bValid: BOOL; // 数据有效标志 ulTimestamp: UDINT; // 时间戳 END_STRUCT END_TYPE TYPE Str_ParaToHMI : STRUCT fOut: LREAL; // 发送到Python的输出 bExecute: BOOL; // 执行命令 arrCoords: ARRAY[0..3] OF INT; // 坐标数据 END_STRUCT END_TYPE3.2 共享内存管理POU完整的共享内存管理程序单元实现PROGRAM SharedMemory VAR // 共享内存配置 hReadHandle: RTS_IEC_HANDLE : RTS_INVALID_HANDLE; hWriteHandle: RTS_IEC_HANDLE : RTS_INVALID_HANDLE; szReadName: STRING : PLC_PYTHON_READ; szWriteName: STRING : PLC_PYTHON_WRITE; // 错误处理 aReadErrors: ARRAY[0..2] OF RTS_IEC_RESULT; aWriteErrors: ARRAY[0..2] OF RTS_IEC_RESULT; // 状态监控 bInitialized: BOOL : FALSE; tLastUpdate: TIME; END_VAR // 初始化共享内存 IF NOT bInitialized THEN hReadHandle : SysSharedMemoryCreate( pszName : szReadName, ulPhysicalAddress : 0, pulSize : ADR(ulReadSize), pResult : ADR(aReadErrors[0])); hWriteHandle : SysSharedMemoryCreate( pszName : szWriteName, ulPhysicalAddress : 0, pulSize : ADR(ulWriteSize), pResult : ADR(aWriteErrors[0])); bInitialized : (hReadHandle RTS_INVALID_HANDLE) AND (hWriteHandle RTS_INVALID_HANDLE); END_IF // 数据读写循环 IF bInitialized THEN // 读取Python数据 SysSharedMemoryRead( hShm : hReadHandle, ulOffset : 0, pbyData : ADR(gvIn), ulSize : SIZEOF(gvIn), pResult : ADR(aReadErrors[1])); // 写入PLC数据 SysSharedMemoryWrite( hShm : hWriteHandle, ulOffset : 0, pbyData : ADR(gvOut), ulSize : SIZEOF(gvOut), pResult : ADR(aWriteErrors[2])); tLastUpdate : NOW(); END_IF4. Python端实现方案4.1 基础通信框架import mmap import struct import time from ctypes import sizeof, c_double, c_bool, c_uint32 class PLCCommunicator: def __init__(self): self.read_fd None self.write_fd None self.read_map None self.write_map None def connect(self, read_namePLC_PYTHON_READ, write_namePLC_PYTHON_WRITE): try: # 打开共享内存文件 self.read_fd open(f/dev/shm/{read_name}, rb) self.write_fd open(f/dev/shm/{write_name}, rb) # 创建内存映射 self.read_map mmap.mmap( self.read_fd.fileno(), length1024, accessmmap.ACCESS_READ) self.write_map mmap.mmap( self.write_fd.fileno(), length1024, accessmmap.ACCESS_WRITE) return True except Exception as e: print(fConnection failed: {str(e)}) return False def read_data(self): 读取PLC数据结构 self.read_map.seek(0) data self.read_map.read(sizeof(PLCData)) return struct.unpack(d?I, data) # 对应LREAL, BOOL, UDINT def write_data(self, value: float, execute: bool, coords: list): 写入数据到PLC packed struct.pack(d?4i, c_double(value), c_bool(execute), *[c_uint32(x) for x in coords]) self.write_map.seek(0) self.write_map.write(packed) self.write_map.flush() def close(self): for m in [self.read_map, self.write_map]: if m: m.close() for f in [self.read_fd, self.write_fd]: if f: f.close()4.2 典型应用场景实现机器视觉质检案例import cv2 import numpy as np from plc_comm import PLCCommunicator def detect_defects(image): 使用OpenCV进行缺陷检测 gray cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) _, thresh cv2.threshold(gray, 240, 255, cv2.THRESH_BINARY_INV) contours, _ cv2.findContours(thresh, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) defects [] for cnt in contours: area cv2.contourArea(cnt) if area 100: # 过滤小噪点 x,y,w,h cv2.boundingRect(cnt) defects.append((x,y,xw,yh)) return defects def main(): plc PLCCommunicator() if not plc.connect(): return cap cv2.VideoCapture(0) while True: # 从PLC读取控制命令 value, execute, _ plc.read_data() if execute: ret, frame cap.read() if ret: defects detect_defects(frame) # 将检测结果写回PLC if defects: x1,y1,x2,y2 defects[0] plc.write_data(1.0, False, [x1,y1,x2,y2]) else: plc.write_data(0.0, False, [0,0,0,0]) time.sleep(0.01) # 10ms周期 plc.close() if __name__ __main__: main()5. 性能优化与故障排查5.1 关键性能参数调优在/etc/sysctl.conf中添加以下内核参数# 共享内存段最大数量 kernel.shmall 4294967296 # 单个共享内存段最大尺寸(字节) kernel.shmmax 68719476736 # 每个进程最大内存映射区域数 vm.max_map_count 65530通过以下命令立即生效sudo sysctl -p5.2 常见问题解决方案问题1Python端读取到乱码数据检查CODESYS和Python中的结构体定义是否完全一致验证字节序设置x86平台均为小端序在CODESYS中添加数据校验字段// 在结构体中添加校验码 TYPE Str_ParaToHMI : STRUCT fOut: LREAL; bExecute: BOOL; arrCoords: ARRAY[0..3] OF INT; ulChecksum: UDINT : 0x55AA55AA; // 固定魔数 END_STRUCT问题2通信延迟波动大使用chrt命令提升Python进程优先级sudo chrt -f 99 python3 vision.py在CODESYS中配置实时任务优先级Task NameSharedMemoryTask/Name Priority20/Priority !-- 数值越小优先级越高 -- Interval1000000/Interval !-- 1ms周期 -- /Task问题3共享内存残留导致无法创建添加清理脚本clean_shm.sh#!/bin/bash rm -f /dev/shm/PLC_PYTHON_* ipcs -m | awk {print $2} | xargs -I {} ipcrm -m {}在实际项目中我们通过这种架构成功实现了视觉检测系统与PLC的200μs级同步控制。一个实用建议是对于需要高频更新的数据可以采用内存环缓冲区的设计模式CODESYS和Python分别维护自己的读写指针这样可以避免读写冲突同时提高吞吐量。

相关新闻

最新新闻

日新闻

周新闻

月新闻