FREE Reverse Engineering Self-Study Course HERE
Serial Interactive Meshtastic Chat Client which chats on the Primary Channel over serial.
#!/usr/bin/env python3
"""
Serial Interactive Meshtastic Chat Client 0.1.0
Usage:
python simcc.py <SERIAL_PORT>
This script sends and receives text messages over the serial interface.
It uses PyPubSub to subscribe to text messages.
"""
import sys
import time
import asyncio
from pubsub import pub
from meshtastic.serial_interface import SerialInterface
def onReceive(packet=None, interface=None):
"""
Callback function to process an incoming text message when the topic
"meshtastic.receive.text" is published.
This function accepts both 'packet' and 'interface' as optional parameters
to avoid errors from extra keyword arguments. When the Meshtastic library
publishes on the "meshtastic.receive.text" topic, it may include an
'interface' argument along with 'packet'; by default, PyPubSub can raise
errors if it encounters unexpected keywords.
Parameters:
packet (dict, optional): A dictionary containing data about the received
packet. Defaults to None if no packet info is supplied.
If present, it may include:
- 'decoded': A dictionary that may contain a 'text' key if the
packet is a text message.
- 'fromId': (Optional) The identifier of the sender. If missing,
defaults to "unknown".
interface (optional): The Meshtastic interface instance that received
the packet (provided by the publisher). Unused here, but accepted
to avoid pubsub errors.
Side Effects:
- If the packet contains a text message (under 'decoded.text'), it
prints the sender ID and the message to standard output.
- Prints a prompt ("Ch0> ") to indicate that further user
input can be entered.
Returns:
None
This function does not return anything. It processes the packet
for display and updates the console prompt.
"""
if not packet:
return # no packet data, do nothing
decoded = packet.get("decoded", {})
if "text" in decoded:
sender = packet.get("fromId", "unknown")
message = decoded["text"]
print(f"\n{sender}: {message}")
print("Ch0> ", end="", flush=True)
async def main():
"""
Main asynchronous entry point for the serial chat client.
Steps:
1. Reads the serial port (e.g., /dev/cu.usbserial-0001) from the command line.
2. Creates a SerialInterface for that port (auto-connect).
3. Waits briefly to stabilize the connection.
4. Subscribes to incoming text messages (via onReceive).
5. Enters an interactive loop reading user input and sending messages on
channelIndex=0, labeled as "Ch0> ".
6. Closes the interface upon Ctrl+C (KeyboardInterrupt).
Usage:
python simcc.py <SERIAL_PORT>
Raises:
Exception: If the serial connection fails to initialize.
Side Effects:
- Prints debug/info messages about the connection process.
- Prompts for user input with "Ch0>" to send messages.
- Exits gracefully on Ctrl+C.
Returns:
None
"""
if len(sys.argv) < 2:
print("Usage: python simcc.py <SERIAL_PORT>")
sys.exit(1)
port = sys.argv[1].strip()
print(f"Attempting to connect to Meshtastic device at serial port: {port}")
try:
iface = SerialInterface(devPath=port)
print("Connected to Meshtastic device over serial!")
# give a brief pause for the serial link to stabilize
time.sleep(2)
except Exception as e:
print("Error initializing SerialInterface:", e)
sys.exit(1)
print("Serial Interactive Meshtastic Chat Client 0.1.0")
print("-----------------------------------------------")
print("Type your message and press Enter to send.")
print("Press Ctrl+C to exit...\n")
loop = asyncio.get_running_loop()
try:
while True:
# non-blocking input for user messages
msg = await loop.run_in_executor(None, input, "Ch0> ")
if msg:
# send text on channel index 0
iface.sendText(msg, channelIndex=0)
await asyncio.sleep(0.1)
except KeyboardInterrupt:
print("\nExiting...")
finally:
iface.close()
sys.exit(0)
if __name__ == "__main__":
# subscribe to text messages topic, using our onReceive callback
pub.subscribe(onReceive, "meshtastic.receive.text")
asyncio.run(main())