原理:类似echo , 不用额外安装串口库等
sudo cat /dev/ttyUSB2 & sudo echo -ne 'AT+CUSBPIDSWITCH=9011,1,1\r\n' | sudo tee /dev/ttyUSB2 >/dev/null
方便一次发送多个指令的程序
#!/usr/bin/env python3 import os,termios,time,select,sys ports=["/dev/ttyUSB2","/dev/ttyUSB3","/dev/ttyUSB4"] baud=termios.B115200 def openp(p): fd=os.open(p,os.O_RDWR|os.O_NOCTTY|os.O_NONBLOCK) old=termios.tcgetattr(fd) a=termios.tcgetattr(fd) a[0]=a[1]=a[3]=0 a[2]=baud|termios.CS8|termios.CREAD|termios.CLOCAL a[4]=a[5]=baud a[6][termios.VMIN]=0 a[6][termios.VTIME]=0 termios.tcsetattr(fd,termios.TCSANOW,a) termios.tcflush(fd,termios.TCIOFLUSH) return fd,old def rd(fd,t=1.5,show=1): end=time.time()+t r=b"" while time.time()<end: a,_,_=select.select([fd],[],[],0.2) if a: d=os.read(fd,4096) if d: r+=d if show: print(d.decode(errors="replace"),end="") end=time.time()+0.3 return r def wr(fd,s): os.write(fd,(s+"\r\n").encode()) fd=old=None port=None for p in ports: if not os.path.exists(p): continue try: fd,old=openp(p) wr(fd,"AT") r=rd(fd,2,0).decode(errors="ignore") if "OK" in r: port=p break termios.tcsetattr(fd,termios.TCSANOW,old) os.close(fd) except: pass if not port: print("ttyUSB2 ttyUSB3 ttyUSB4 都无响应") sys.exit(1) print("使用",port,"115200") print("多条用 ; 分隔 退出输入 q") try: while True: s=input("AT> ").strip() if s.lower() in ("q","quit","exit"): break for c in s.replace(";","\n").splitlines(): c=c.strip() if not c: continue print(">>>",c) wr(fd,c) if not rd(fd,1.5,1): print("超时") print("---") finally: termios.tcsetattr(fd,termios.TCSANOW,old) os.close(fd)方便选择拨号上网指令的程序
#!/usr/bin/env python3 import os,termios,time,select,sys PORTS=["/dev/ttyUSB2","/dev/ttyUSB3","/dev/ttyUSB4"] BAUD=termios.B115200 COMMANDS = """ AT ATE1 AT+CSQ AT+CUSBCFG=USBID,1E0E,9001 AT+CUSBCFG=USBID,1E0E,9011 AT+CUSBCFG=USBID,1E0E,9018 AT+QCFG="usbnet",0 AT+QCFG="usbnet",1 AT+QCFG="usbnet",2 AT+QCFG="usbnet",3 AT+CFUN=1,1 """ # ======================= def openp(p): fd=os.open(p,os.O_RDWR|os.O_NOCTTY|os.O_NONBLOCK) old=termios.tcgetattr(fd) a=termios.tcgetattr(fd) a[0]=a[1]=a[3]=0 a[2]=BAUD|termios.CS8|termios.CREAD|termios.CLOCAL a[4]=a[5]=BAUD a[6][termios.VMIN]=0 a[6][termios.VTIME]=0 termios.tcsetattr(fd,termios.TCSANOW,a) termios.tcflush(fd,termios.TCIOFLUSH) return fd,old def wr(fd,s): os.write(fd,(s+"\r\n").encode()) def rd(fd,t=1.8,show=1): end=time.time()+t r=b"" while time.time()<end: a,_,_=select.select([fd],[],[],0.2) if a: d=os.read(fd,4096) if d: r+=d if show: print(d.decode(errors="replace"),end="") end=time.time()+0.3 return r def closep(fd,old): termios.tcsetattr(fd,termios.TCSANOW,old) os.close(fd) fd=old=None PORT=None for p in PORTS: if not os.path.exists(p): continue try: fd,old=openp(p) wr(fd,"AT") if "OK" in rd(fd,2,0).decode(errors="ignore"): PORT=p break closep(fd,old) fd=old=None except: pass if not PORT: print("ttyUSB2 ttyUSB3 ttyUSB4 都无响应") sys.exit(1) cmds=[] for x in COMMANDS.splitlines(): x=x.strip() if x and not x.startswith("#"): cmds.append(x) print("使用",PORT,"115200") print("选择编号发送 a 全部发送 m 手动输入 q 退出") try: while True: print("\n指令列表") for i,c in enumerate(cmds,1): print(f"{i}. {c}") s=input("\n选择: ").strip().lower() if s in ("q","quit","exit"): break if s=="m": s=input("AT> ").strip() todo=[x.strip() for x in s.replace(";","\n").splitlines() if x.strip()] elif s=="a": todo=cmds else: try: todo=[cmds[int(s)-1]] except: print("选择无效") continue for c in todo: print("\n>>>",c) wr(fd,c) if not rd(fd,2,1): print("超时") print("---") time.sleep(1) finally: if fd: closep(fd,old) print("\n串口已关闭")自定义指令
方案1:
#!/usr/bin/env python3 import subprocess, time TTY = "ttyUSB2" COMMANDS = """ AT ATE1 AT+CSQ """ PORT = "/dev/" + TTY cat = subprocess.Popen(["sudo", "timeout", "6", "cat", PORT]) time.sleep(0.2) for cmd in COMMANDS.strip().splitlines(): subprocess.run( ["sudo", "tee", PORT], input=(cmd.strip() + "\r\n").encode(), stdout=subprocess.DEVNULL ) time.sleep(0.5) cat.wait()方案2
#!/usr/bin/env python3 import os import termios import time import select # ======== 配置区 ======== PORT = "/dev/ttyUSB3" BAUD = termios.B115200 COMMANDS = """ AT ATE1 AT+CSQ """ # ======================= cmds = [x.strip() for x in COMMANDS.strip().splitlines() if x.strip()] fd = os.open(PORT, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK) old = termios.tcgetattr(fd) try: attr = termios.tcgetattr(fd) # 输入 输出 本地模式 关闭特殊处理 attr[0] = 0 attr[1] = 0 attr[3] = 0 # 8N1 本地连接 允许接收 attr[2] = BAUD | termios.CS8 | termios.CREAD | termios.CLOCAL # 关键 这里必须设置输入输出波特率 attr[4] = BAUD attr[5] = BAUD # 非阻塞读 attr[6][termios.VMIN] = 0 attr[6][termios.VTIME] = 0 termios.tcsetattr(fd, termios.TCSANOW, attr) termios.tcflush(fd, termios.TCIOFLUSH) print(f"{PORT} 已打开 115200 8N1") for i, cmd in enumerate(cmds, 1): data = (cmd + "\r\n").encode() os.write(fd, data) print(f"\n[{i}] >>> {cmd}") resp = b"" end = time.time() + 3 while time.time() < end: r, _, _ = select.select([fd], [], [], 0.2) if not r: continue chunk = os.read(fd, 4096) if chunk: resp += chunk print(chunk.decode(errors="replace"), end="") if b"\r\nOK\r\n" in resp or b"\r\nERROR\r\n" in resp: break if not resp: print("超时未收到响应") print("\n---") finally: termios.tcsetattr(fd, termios.TCSANOW, old) os.close(fd) print("串口已关闭")拨号指令
import os, termios, time, select fd = os.open('/dev/ttyUSB2', os.O_RDWR) attr = termios.tcgetattr(fd) attr[2] = termios.B115200 | termios.CS8 | termios.CREAD | termios.CLOCAL attr[4] = attr[5] = 0 attr[6][termios.VMIN] = 0 attr[6][termios.VTIME] = 20 # 2秒超时 termios.tcsetattr(fd, termios.TCSANOW, attr) termios.tcflush(fd, termios.TCIOFLUSH) #cmd = 'AT+CUSBPIDSWITCH=9011,1,1\r' cmd = 'AT+CUSBPIDSWITCH=9003,1,1\r' os.write(fd, cmd.encode()) time.sleep(0.1) resp = b'' deadline = time.monotonic() + 3 while time.monotonic() < deadline: r, _, _ = select.select([fd], [], [], 0.2) if r: data = os.read(fd, 1024) if not data: break resp += data if b'OK' in resp or b'ERROR' in resp: break print(resp.decode(errors='replace')) termios.tcsetattr(fd, termios.TCSANOW, termios.tcgetattr(fd)) os.close(fd)选择查询网络状态
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import re import sys import glob import time import locale import curses import queue import threading import serial import serial.tools.list_ports as list_ports 默认波特率 = 115200 串口对象 = None 接收队列 = queue.Queue() 运行中 = True 串口写锁 = threading.Lock() 批量发送锁 = threading.Lock() 自动测试完成事件 = threading.Event() 厂家别名 = { "SIMCOM": "SIMCOM", "SIM": "SIMCOM", "SIMTECH": "SIMCOM", "QUECTEL": "QUECTEL", "QC": "QUECTEL", "移远": "QUECTEL", "FIBOCOM": "FIBOCOM", "FBCOM": "FIBOCOM", "FIBO": "FIBOCOM", "广和通": "FIBOCOM", } 测试指令表 = { "SIMCOM": [ ("检查SIM卡是否接触良好", "AT+CPIN?"), ("检查APN配置与否", "AT+CGDCONT?"), ("检查运营商接入情况", "AT+COPS?"), ("检查联网情况", "AT+CPSI?"), ("检查是否成功注册到网络", "AT+CGREG?"), ("检查当前环境的信号质量", "AT+CSQ"), ("检查网络模式设置是否正确", "AT+CNMP?"), ("检查是否开射频 关闭飞行模式", "AT+CFUN?"), ("检查固件版本", "AT+SIMCOMATI"), ], "QUECTEL": [ ("开启回显", "ATE1"), ("开启详细错误提示", "AT+CMEE=2"), ("检查USB拨号模式", 'AT+QCFG="usbnet"'), ("检查SIM卡是否接触良好", "AT+CPIN?"), ("检查运营商接入情况", "AT+COPS?"), ("检查当前驻网小区", 'AT+QENG="servingcell"'), ("检查APN配置与否", "AT+CGDCONT?"), ("检查网络模式偏好", 'AT+QNWPREFCFG="mode_pref"'), ("检查5G注册状态", "AT+C5GREG?"), ("检查固件版本", "AT+QGMR"), ("检查模块信息", "ATI"), ], "FIBOCOM": [ ("开启回显", "ATE1"), ("检查模块信息", "ATI"), ("检查SIM卡是否接触良好", "AT+CPIN?"), ("检查运营商接入情况", "AT+COPS?"), ("检查APN配置与否", "AT+CGDCONT?"), ("开启自动连接", "AT+GTAUTOCONNECT=1"), ("检查USB模式", "AT+GTUSBMODE?"), ("检查RNDIS状态", "AT+GTRNDIS?"), ], } def 安全显示(stdscr, y, x, text, attr=0): try: height, width = stdscr.getmaxyx() if y < 0 or y >= height: return if x < 0 or x >= width: return text = str(text) text = text[:max(1, width - x - 1)] if attr: stdscr.addstr(y, x, text, attr) else: stdscr.addstr(y, x, text) except Exception: pass def 获取串口列表(): ports = set() try: for p in list_ports.comports(): if p.device: ports.add(p.device) except Exception: pass patterns = [ "/dev/serial/by-id/*", "/dev/ttyUSB*", "/dev/ttyACM*", "/dev/ttyAMA*", "/dev/ttyS*", ] for pattern in patterns: for dev in glob.glob(pattern): if os.path.exists(dev): ports.add(dev) def 排序规则(dev): name = os.path.basename(dev) if dev.startswith("/dev/serial/by-id/"): group = 0 elif name.startswith("ttyS"): group = 1 elif name.startswith("ttyUSB"): group = 2 elif name.startswith("ttyACM"): group = 3 elif name.startswith("ttyAMA"): group = 4 else: group = 9 nums = re.findall(r"\d+", name) num = int(nums[-1]) if nums else 9999 return group, num, dev return sorted(ports, key=排序规则) def 补全串口名称(name): name = name.strip() if not name: return "" if name.startswith("/dev/"): return name if name.startswith("serial/by-id/"): return "/dev/" + name return "/dev/" + name def 识别厂家(name): if not name: return "" key = name.strip().upper() return 厂家别名.get(key, "") def 接收线程(): global 串口对象 global 运行中 while 运行中: try: if 串口对象 is None or not 串口对象.is_open: time.sleep(0.05) continue data = 串口对象.read(4096) if data: text = data.decode("utf-8", errors="replace") 接收队列.put(text) except Exception as e: 接收队列.put("\n[接收异常] " + str(e) + "\n") break def 发送一条AT指令(指令, 来源="手动发送"): global 串口对象 指令 = 指令.strip() if not 指令: return try: with 串口写锁: if 串口对象 is None or not 串口对象.is_open: 接收队列.put("\n[发送失败] 串口未打开\n") return 接收队列.put(f"\n[{来源}] {指令}\n") 串口对象.write((指令 + "\r\n").encode("utf-8")) except Exception as e: 接收队列.put("\n[发送失败] " + str(e) + "\n") def 自动发送测试指令(厂家): global 运行中 if not 批量发送锁.acquire(blocking=False): 接收队列.put("\n[提示] 上一轮测试指令还没有发送完成\n") return try: 自动测试完成事件.clear() time.sleep(0.5) cmds = 测试指令表.get(厂家, []) 接收队列.put(f"\n========== 开始自动发送 {厂家} 测试指令 ==========\n") for 说明, 指令 in cmds: if not 运行中: break 接收队列.put(f"\n[说明] {说明}\n") 发送一条AT指令(指令, "自动发送") time.sleep(1) 接收队列.put(f"\n========== {厂家} 测试指令发送完成 ==========\n") 接收队列.put("\n现在可以在底部输入自定义 AT 指令 回车发送\n") 自动测试完成事件.set() finally: 批量发送锁.release() def 选择项目界面(stdscr, 标题, 项目列表, 空提示="没有可选项目"): curses.curs_set(0) stdscr.keypad(True) stdscr.timeout(-1) index = 0 top = 0 while True: stdscr.erase() height, width = stdscr.getmaxyx() 安全显示(stdscr, 0, 0, 标题 + " ↑↓移动 Enter选中 q退出 r刷新") if not 项目列表: 安全显示(stdscr, 2, 0, 空提示) else: 可显示行数 = max(1, height - 3) if index < top: top = index if index >= top + 可显示行数: top = index - 可显示行数 + 1 显示列表 = 项目列表[top:top + 可显示行数] for i, item in enumerate(显示列表): real_index = top + i y = i + 2 if real_index == index: 安全显示(stdscr, y, 0, item, curses.A_REVERSE) else: 安全显示(stdscr, y, 0, item) stdscr.noutrefresh() curses.doupdate() key = stdscr.getch() if key in (ord("q"), ord("Q")): return None if key in (ord("r"), ord("R")) and "串口" in 标题: 项目列表[:] = 获取串口列表() index = 0 top = 0 continue if not 项目列表: continue if key == curses.KEY_UP: index = max(0, index - 1) elif key == curses.KEY_DOWN: index = min(len(项目列表) - 1, index + 1) elif key in (10, 13, curses.KEY_ENTER): return 项目列表[index] def 打开串口(port, baud): return serial.Serial( port=port, baudrate=baud, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=0.05, write_timeout=1 ) def 串口调试界面(stdscr, 厂家, port, baud): global 串口对象 global 运行中 curses.curs_set(1) stdscr.keypad(True) stdscr.timeout(100) 接收显示 = [] 输入内容 = "" AT历史 = [] 历史位置 = None 需要刷新 = True 上次刷新时间 = 0 try: 串口对象 = 打开串口(port, baud) 串口对象.reset_input_buffer() 串口对象.reset_output_buffer() except Exception as e: stdscr.erase() 安全显示(stdscr, 0, 0, "打开串口失败") 安全显示(stdscr, 2, 0, str(e)) 安全显示(stdscr, 4, 0, "按任意键退出") stdscr.noutrefresh() curses.doupdate() stdscr.timeout(-1) stdscr.getch() return t1 = threading.Thread(target=接收线程, daemon=True) t1.start() t2 = threading.Thread(target=自动发送测试指令, args=(厂家,), daemon=True) t2.start() def 重画界面(): stdscr.erase() height, width = stdscr.getmaxyx() 状态行 = f"{厂家} {port} {baud}bps Ctrl+X退出 Ctrl+L清屏 Ctrl+T重发测试 Ctrl+U清输入 Enter发送" 安全显示(stdscr, 0, 0, 状态行, curses.A_REVERSE) 接收文本 = "".join(接收显示) 接收行 = 接收文本.splitlines(True) 可显示行数 = max(1, height - 5) 显示行 = 接收行[-可显示行数:] for i, line in enumerate(显示行): line = line.replace("\r", "") 安全显示(stdscr, i + 1, 0, line) if 自动测试完成事件.is_set(): 提示文字 = "自定义AT模式 输入 AT 指令后回车发送 ↑↓调历史" else: 提示文字 = "正在自动发送测试指令 也可以提前输入 AT 指令回车发送" 安全显示(stdscr, height - 3, 0, "-" * (width - 1)) 安全显示(stdscr, height - 2, 0, 提示文字) 输入提示 = "> " + 输入内容 安全显示(stdscr, height - 1, 0, 输入提示) try: stdscr.move(height - 1, min(len(输入提示), width - 1)) except Exception: pass stdscr.noutrefresh() curses.doupdate() while True: 有新数据 = False try: while True: data = 接收队列.get_nowait() 接收显示.append(data) 有新数据 = True except queue.Empty: pass if len(接收显示) > 1000: 接收显示 = 接收显示[-500:] 有新数据 = True if 有新数据: 需要刷新 = True key = stdscr.getch() if key != -1: 需要刷新 = True if key == 24: break elif key == 12: 接收显示.clear() elif key == 21: 输入内容 = "" 历史位置 = None elif key == 20: t2 = threading.Thread(target=自动发送测试指令, args=(厂家,), daemon=True) t2.start() elif key == curses.KEY_UP: if AT历史: if 历史位置 is None: 历史位置 = len(AT历史) - 1 else: 历史位置 = max(0, 历史位置 - 1) 输入内容 = AT历史[历史位置] elif key == curses.KEY_DOWN: if AT历史 and 历史位置 is not None: 历史位置 += 1 if 历史位置 >= len(AT历史): 历史位置 = None 输入内容 = "" else: 输入内容 = AT历史[历史位置] elif key in (10, 13, curses.KEY_ENTER): 指令 = 输入内容.strip() if 指令: 发送一条AT指令(指令, "自定义发送") if not AT历史 or AT历史[-1] != 指令: AT历史.append(指令) if len(AT历史) > 50: AT历史 = AT历史[-50:] 输入内容 = "" 历史位置 = None elif key in (curses.KEY_BACKSPACE, 127, 8): 输入内容 = 输入内容[:-1] 历史位置 = None elif 32 <= key <= 126: 输入内容 += chr(key) 历史位置 = None 当前时间 = time.time() if 需要刷新 and 当前时间 - 上次刷新时间 >= 0.15: 重画界面() 上次刷新时间 = 当前时间 需要刷新 = False 运行中 = False try: if 串口对象 and 串口对象.is_open: 串口对象.close() except Exception: pass def 解析命令参数(): args = sys.argv[1:] 厂家 = "" port = "" baud = 默认波特率 for arg in args: maybe_vendor = 识别厂家(arg) if maybe_vendor: 厂家 = maybe_vendor continue if arg.isdigit(): baud = int(arg) continue port = 补全串口名称(arg) return 厂家, port, baud def 主程序(stdscr): global 运行中 厂家, port, baud = 解析命令参数() if not 厂家: 厂家 = 选择项目界面( stdscr, "请选择模组厂家", ["SIMCOM", "QUECTEL", "FIBOCOM"], "没有可选厂家" ) if not 厂家: return if not port: ports = 获取串口列表() port = 选择项目界面( stdscr, "请选择串口", ports, "没有识别到串口 请检查设备连接 或尝试 sudo 运行" ) if not port: return 运行中 = True 串口调试界面(stdscr, 厂家, port, baud) if __name__ == "__main__": locale.setlocale(locale.LC_ALL, "") try: curses.wrapper(主程序) except KeyboardInterrupt: pass