MUSE IP Comms Using asyncio with Python Code
Frequently Asked Questions
Brand:
- AMX
Models:
- MUSE
- Programming
- Python
Question:
Can asyncio be used with Python on a MUSE controller?
Answer:
This is an example of utilizing the asyncio library in Python to connect to multiple devices with a MUSE controller:
from mojo import context
import asyncio
import threading
context.log.info("Sample Python program")
dvTP = context.devices.get("AMX-10002")
idevice = context.devices.get("idevice")
username = "administrator"
password = "password"
ASYNC_LOOP = None
class AsyncSocketClient:
def __init__(self, host, port, name):
self.host = host
self.port = port
self.name = name
self.reader = None
self.writer = None
self.connected = None
self.command_queue = None
self.running = True
self.keepalive_task = None
#Main connection lifecycle loop
async def connect_loop(self):
if self.connected is None:
self.connected = asyncio.Event()
self.command_queue = asyncio.Queue()
while self.running:
try:
context.log.info(f"{self.name} Attempting socket connection...")
self.reader, self.writer = await asyncio.open_connection(
self.host, self.port
)
await self.handshake()
self.connected.set()
self.keepalive_task = asyncio.create_task(self.keepalive_loop())
# Run listener and sender concurrently
listener = asyncio.create_task(self.socket_listener())
sender = asyncio.create_task(self.socket_sender())
done, pending = await asyncio.wait(
[listener, sender],
return_when=asyncio.FIRST_EXCEPTION
)
for task in pending:
task.cancel()
for task in done:
task.result()
except Exception as e:
context.log.error(f"{self.name} Socket error: {e}")
finally:
self.connected.clear()
await self.close_socket()
if self.keepalive_task:
self.keepalive_task.cancel()
self.keepalive_task = None
context.log.info(f"{self.name} Reconnecting in 5 seconds...")
await asyncio.sleep(5)
async def handshake(self):
"""Login handshake"""
await asyncio.wait_for(self.reader.readuntil(b"Login :"), timeout=10)
self.writer.write(username.encode() + b"\r\n")
await self.writer.drain()
await asyncio.wait_for(self.reader.readuntil(b"Password :"), timeout=10)
self.writer.write(password.encode() + b"\r\n")
await self.writer.drain()
await asyncio.wait_for(self.reader.readuntil(b"Login successful!"), timeout=10)
context.log.info("Login successful")
async def socket_listener(self):
"""Continuously read incoming data"""
try:
while True:
data = await asyncio.wait_for(self.reader.readline(),timeout=30)
if not data:
raise ConnectionError(f"{self.name} Socket closed by remote host")
context.log.info(f"{self.name} Received: {data.decode().strip()}")
except Exception as e:
context.log.warn(f"{self.name} Listener stopped: {e}")
raise
async def socket_sender(self):
"""Send commands from the queue"""
try:
while True:
command = await asyncio.wait_for(self.command_queue.get(), timeout=15)
if not command:
continue
self.writer.write(command)
await self.writer.drain()
except Exception as e:
context.log.warn(f"{self.name} Sender stopped: {e}")
raise
async def send(self, data: bytes):
"""Public API for sending socket commands"""
if not self.connected.is_set():
context.log.warn(f"{self.name} Socket not connected. Command dropped.")
return
await self.command_queue.put(data)
async def close_socket(self):
if self.writer:
self.writer.close()
await self.writer.wait_closed()
self.reader = None
self.writer = None
async def keepalive_loop(self):
context.log.info(f"{self.name}'s KEEPALIVE started")
try:
while True:
await asyncio.sleep(10)
await self.send(b"KEEPALIVE\r\n")
except asyncio.CancelledError:
pass
# Create a single shared client
#socket_client = AsyncSocketClient(HOST, PORT)
brandon_client = AsyncSocketClient("10.35.88.75",33,"Brandon")
bryan_client = AsyncSocketClient("10.35.88.84",33,"Bryan")
darin_client = AsyncSocketClient("10.35.88.39",33,"Darin")
estefan_client = AsyncSocketClient("10.35.84.133",33,"Estefan")
jaime_client = AsyncSocketClient("10.35.88.76",33,"Jaime")
jeremy_client = AsyncSocketClient("10.35.88.70",33,"Jeremy")
shane_client = AsyncSocketClient("10.35.88.23",33,"Shane")
stephen_client = AsyncSocketClient("10.35.84.116",33,"Stephen")
tsmu2300_client = AsyncSocketClient("10.35.88.147",33,"TSMU2300")
tsmu3300_client = AsyncSocketClient("10.35.88.35",33,"TSMU3300")
# Start the asyncio task
def asyncio_thread_main():
global ASYNC_LOOP
ASYNC_LOOP = asyncio.new_event_loop()
asyncio.set_event_loop(ASYNC_LOOP)
context.log.info("Asyncio loop started in background thread")
ASYNC_LOOP.create_task(brandon_client.connect_loop())
ASYNC_LOOP.create_task(bryan_client.connect_loop())
ASYNC_LOOP.create_task(darin_client.connect_loop())
ASYNC_LOOP.create_task(estefan_client.connect_loop())
ASYNC_LOOP.create_task(jaime_client.connect_loop())
ASYNC_LOOP.create_task(jeremy_client.connect_loop())
ASYNC_LOOP.create_task(shane_client.connect_loop())
ASYNC_LOOP.create_task(stephen_client.connect_loop())
ASYNC_LOOP.create_task(tsmu2300_client.connect_loop())
ASYNC_LOOP.create_task(tsmu3300_client.connect_loop())
ASYNC_LOOP.run_forever()
def start_socket(event):
context.log.info("start_socket() CALLED")
if getattr(start_socket, "started", False):
return
start_socket.started = True
t = threading.Thread(
target=asyncio_thread_main,
name="AsyncSocketLoop",
daemon=True
)
t.start()
#Start the socket process
idevice.online(start_socket)
if idevice.isOnline:
start_socket(None)
#Buttons and callbacks
def sendSocketMessage(event):
if dvTP.port[1].button[1].value:
context.log.info("In send command")
ASYNC_LOOP.call_soon_threadsafe(asyncio.create_task, brandon_client.send(b"^^^^ SEND COMMAND TO DEVICE ^^^^\r\n"))
ASYNC_LOOP.call_soon_threadsafe(asyncio.create_task, bryan_client.send(b"^^^^ SEND COMMAND TO DEVICE ^^^^\r\n"))
ASYNC_LOOP.call_soon_threadsafe(asyncio.create_task, darin_client.send(b"^^^^ SEND COMMAND TO DEVICE ^^^^\r\n"))
ASYNC_LOOP.call_soon_threadsafe(asyncio.create_task, estefan_client.send(b"^^^^ SEND COMMAND TO DEVICE ^^^^\r\n"))
ASYNC_LOOP.call_soon_threadsafe(asyncio.create_task, jaime_client.send(b"^^^^ SEND COMMAND TO DEVICE ^^^^\r\n"))
ASYNC_LOOP.call_soon_threadsafe(asyncio.create_task, jeremy_client.send(b"^^^^ SEND COMMAND TO DEVICE ^^^^\r\n"))
ASYNC_LOOP.call_soon_threadsafe(asyncio.create_task, shane_client.send(b"^^^^ SEND COMMAND TO DEVICE ^^^^\r\n"))
ASYNC_LOOP.call_soon_threadsafe(asyncio.create_task, stephen_client.send(b"^^^^ SEND COMMAND TO DEVICE ^^^^\r\n"))
ASYNC_LOOP.call_soon_threadsafe(asyncio.create_task, tsmu2300_client.send(b"^^^^ SEND COMMAND TO DEVICE ^^^^\r\n"))
ASYNC_LOOP.call_soon_threadsafe(asyncio.create_task, tsmu3300_client.send(b"^^^^ SEND COMMAND TO DEVICE ^^^^\r\n"))
def retryConnection(event):
context.log.info("Manual reconnect requested")
#ASYNC_LOOP.call_soon_threadsafe(socket_client.connected.clear)
dvTP.port[1].button[1].watch(sendSocketMessage)
dvTP.port[1].button[2].watch(retryConnection)
# Leave this as the last line in the script
context.run(globals())