Anytime Help Center

Contact Us

If you still have questions or prefer to get help directly from an agent, please submit a request.
We’ll get back to you as soon as possible.

Please fill out the contact form below and we will reply as soon as possible.

  • Support
  • Guest
  • Log In
  • AKG
    Microphones Wireless Integrated Systems Automatic Mixers Headphones Discontinued Products (AKG) General AKG Inquiries Certifications (AKG) Video Manual Series (AKG)
  • AMX
    Networked A/V Distribution (AVoIP) Traditional A/V Distribution Video Signal Processing Architectural Connectivity User Interfaces Control Processing Power (AMX) Programming (AMX) Software (AMX) Discontinued Products (AMX) General AMX Inquiries Certifications (AMX)
  • BSS
    Soundweb™ Omni Soundweb™ London Soundweb™ Contrio™ Software (BSS) Discontinued Products (BSS) General BSS Inquiries Certifications (BSS)
  • Crown
    CDi DriveCore Series CDi Series Commercial Series ComTech Series DCi DriveCore Series I-Tech HD Series XLC series XLi Series XLS DriveCore 2 Series XTi 2 Series Discontinued Products (Crown) Software (Crown) General Crown Inquiries Certifications (Crown) Video Manual Series (Crown)
  • dbx
    CX Series 500 Series DriveRack Personal Monitor Control ZonePRO Zone Controllers FeedBack Suppression Microphone Preamps Dynamics Processors Crossovers Equalizers Software (dbx) Discontinued Products (dbx) General dbx Inquiries Certifications (dbx)
  • Flux::
    Immersive Processing Analysis Subscriptions General FLUX: Inquiries
  • JBL
    Cinema Sound Installed Live Portable Tour Sound Recording & Broadcast Software (JBL) Discontinued Products (JBL) Video Manual Series (JBL) General JBL Inquiries Certifications (JBL)
  • Lexicon
    Plugins Effects Processors Cinema Discontinued Products (Lexicon) Video Manual Series (Lexicon) General Lexicon Inquiries Certifications (Lexicon)
  • Martin
    Atomic ELP ERA Exterior MAC P3 VC VDO Tools Discontinued Products (Martin) General Martin Inquiries Certifications (Martin)
  • Soundcraft
    Digital Analog Connected Analog Only Discontinued Products (Soundcraft) Video Manual Series (Soundcraft) General Soundcraft Inquiries Certifications (Soundcraft)
  • General HARMAN Inquiries
    Dante
+ More
  • Home
  • AMX
  • Programming (AMX)
  • Programming

MUSE IP Comms Using asyncio with Python Code

Frequently Asked Questions

Written by Wesley Moore

Updated at June 1st, 2026

Table of Contents

Brand: Models: Question:   Answer:

Brand:

  • AMX

Models:

  • MUSE
  • Programming
    • Python

Question:  

Can asyncio be used with Python on a MUSE controller?


Answer:

This is an example of utilizing the asyncio library in Python to connect to multiple devices with a MUSE controller:

from mojo import context
import asyncio
import threading

context.log.info("Sample Python program")

dvTP = context.devices.get("AMX-10002")
idevice = context.devices.get("idevice")


username = "administrator"
password = "password"
ASYNC_LOOP = None


class AsyncSocketClient:
    def __init__(self, host, port, name):
        self.host = host
        self.port = port
        self.name = name
        self.reader = None
        self.writer = None
        self.connected = None
        self.command_queue = None
        self.running = True
        self.keepalive_task = None

    #Main connection lifecycle loop
    async def connect_loop(self):

        if self.connected is None:
            self.connected = asyncio.Event()
            self.command_queue = asyncio.Queue()
            
        while self.running:
            try:
                context.log.info(f"{self.name} Attempting socket connection...")
                self.reader, self.writer = await asyncio.open_connection(
                    self.host, self.port
                )
                await self.handshake()
                self.connected.set()

                self.keepalive_task = asyncio.create_task(self.keepalive_loop())             

                # Run listener and sender concurrently

                listener = asyncio.create_task(self.socket_listener())
                sender = asyncio.create_task(self.socket_sender())

                done, pending = await asyncio.wait(
                    [listener, sender],
                    return_when=asyncio.FIRST_EXCEPTION
                )

                for task in pending:
                    task.cancel()

                for task in done:
                    task.result()

            except Exception as e:
                context.log.error(f"{self.name} Socket error: {e}")
            finally:
                self.connected.clear()
                await self.close_socket()
                
                if self.keepalive_task:
                    self.keepalive_task.cancel()
                    self.keepalive_task = None   

                context.log.info(f"{self.name} Reconnecting in 5 seconds...")
                await asyncio.sleep(5)

    async def handshake(self):
        """Login handshake"""
        await asyncio.wait_for(self.reader.readuntil(b"Login :"), timeout=10)
        self.writer.write(username.encode() + b"\r\n")
        await self.writer.drain()

        await asyncio.wait_for(self.reader.readuntil(b"Password :"), timeout=10)
        self.writer.write(password.encode() + b"\r\n")
        await self.writer.drain()

        await asyncio.wait_for(self.reader.readuntil(b"Login successful!"), timeout=10)
        context.log.info("Login successful")

    async def socket_listener(self):
        """Continuously read incoming data"""
        try:
            while True:
                data = await asyncio.wait_for(self.reader.readline(),timeout=30)
                if not data:
                    raise ConnectionError(f"{self.name} Socket closed by remote host")
                context.log.info(f"{self.name} Received: {data.decode().strip()}")
        except Exception as e:
            context.log.warn(f"{self.name} Listener stopped: {e}")
            raise

    async def socket_sender(self):
        """Send commands from the queue"""
        try:
            while True:
                command = await asyncio.wait_for(self.command_queue.get(), timeout=15)
                if not command:
                    continue
                self.writer.write(command)
                await self.writer.drain()
        except Exception as e:
            context.log.warn(f"{self.name} Sender stopped: {e}")
            raise

    async def send(self, data: bytes):
        """Public API for sending socket commands"""
        if not self.connected.is_set():
            context.log.warn(f"{self.name} Socket not connected. Command dropped.")
            return
        await self.command_queue.put(data)

    async def close_socket(self):
        if self.writer:
            self.writer.close()
            await self.writer.wait_closed()
        self.reader = None
        self.writer = None

    async def keepalive_loop(self):
        context.log.info(f"{self.name}'s KEEPALIVE started")
        try:
            while True:
                await asyncio.sleep(10)
                await self.send(b"KEEPALIVE\r\n")
        except asyncio.CancelledError:
            pass



# Create a single shared client
#socket_client = AsyncSocketClient(HOST, PORT)

brandon_client = AsyncSocketClient("10.35.88.75",33,"Brandon")
bryan_client = AsyncSocketClient("10.35.88.84",33,"Bryan")
darin_client = AsyncSocketClient("10.35.88.39",33,"Darin")
estefan_client = AsyncSocketClient("10.35.84.133",33,"Estefan")
jaime_client = AsyncSocketClient("10.35.88.76",33,"Jaime")
jeremy_client = AsyncSocketClient("10.35.88.70",33,"Jeremy")
shane_client = AsyncSocketClient("10.35.88.23",33,"Shane")
stephen_client = AsyncSocketClient("10.35.84.116",33,"Stephen")
tsmu2300_client = AsyncSocketClient("10.35.88.147",33,"TSMU2300")
tsmu3300_client = AsyncSocketClient("10.35.88.35",33,"TSMU3300")

# Start the asyncio task
def asyncio_thread_main():
    global ASYNC_LOOP

    ASYNC_LOOP = asyncio.new_event_loop()
    asyncio.set_event_loop(ASYNC_LOOP)

    context.log.info("Asyncio loop started in background thread")

    ASYNC_LOOP.create_task(brandon_client.connect_loop())
    ASYNC_LOOP.create_task(bryan_client.connect_loop())
    ASYNC_LOOP.create_task(darin_client.connect_loop())
    ASYNC_LOOP.create_task(estefan_client.connect_loop())
    ASYNC_LOOP.create_task(jaime_client.connect_loop())
    ASYNC_LOOP.create_task(jeremy_client.connect_loop())
    ASYNC_LOOP.create_task(shane_client.connect_loop())
    ASYNC_LOOP.create_task(stephen_client.connect_loop())
    ASYNC_LOOP.create_task(tsmu2300_client.connect_loop())
    ASYNC_LOOP.create_task(tsmu3300_client.connect_loop())

    ASYNC_LOOP.run_forever()


def start_socket(event):
    context.log.info("start_socket() CALLED")

    if getattr(start_socket, "started", False):
        return

    start_socket.started = True

    t = threading.Thread(
        target=asyncio_thread_main,
        name="AsyncSocketLoop",
        daemon=True
    )
    t.start()

#Start the socket process
idevice.online(start_socket)

if idevice.isOnline:
    start_socket(None)

#Buttons and callbacks

def sendSocketMessage(event):
    if dvTP.port[1].button[1].value:
        context.log.info("In send command")
        ASYNC_LOOP.call_soon_threadsafe(asyncio.create_task, brandon_client.send(b"^^^^ SEND COMMAND TO DEVICE ^^^^\r\n"))
        ASYNC_LOOP.call_soon_threadsafe(asyncio.create_task, bryan_client.send(b"^^^^ SEND COMMAND TO DEVICE ^^^^\r\n"))
        ASYNC_LOOP.call_soon_threadsafe(asyncio.create_task, darin_client.send(b"^^^^ SEND COMMAND TO DEVICE ^^^^\r\n"))
        ASYNC_LOOP.call_soon_threadsafe(asyncio.create_task, estefan_client.send(b"^^^^ SEND COMMAND TO DEVICE ^^^^\r\n"))
        ASYNC_LOOP.call_soon_threadsafe(asyncio.create_task, jaime_client.send(b"^^^^ SEND COMMAND TO DEVICE ^^^^\r\n"))
        ASYNC_LOOP.call_soon_threadsafe(asyncio.create_task, jeremy_client.send(b"^^^^ SEND COMMAND TO DEVICE ^^^^\r\n"))
        ASYNC_LOOP.call_soon_threadsafe(asyncio.create_task, shane_client.send(b"^^^^ SEND COMMAND TO DEVICE ^^^^\r\n"))
        ASYNC_LOOP.call_soon_threadsafe(asyncio.create_task, stephen_client.send(b"^^^^ SEND COMMAND TO DEVICE ^^^^\r\n"))
        ASYNC_LOOP.call_soon_threadsafe(asyncio.create_task, tsmu2300_client.send(b"^^^^ SEND COMMAND TO DEVICE ^^^^\r\n"))
        ASYNC_LOOP.call_soon_threadsafe(asyncio.create_task, tsmu3300_client.send(b"^^^^ SEND COMMAND TO DEVICE ^^^^\r\n"))


def retryConnection(event):
    context.log.info("Manual reconnect requested")
    #ASYNC_LOOP.call_soon_threadsafe(socket_client.connected.clear)


dvTP.port[1].button[1].watch(sendSocketMessage)
dvTP.port[1].button[2].watch(retryConnection)




# Leave this as the last line in the script
context.run(globals())

Related Videos

guide questions

Was this article helpful?

Yes
No
Give feedback about this article

Table of Contents

Brand: Models: Question:   Answer:

Related Articles

  • Muse Reset

Related Articles

  • Muse Reset
Copyright © HARMAN Professional. All rights reserved. Privacy Policy | Terms of Use
Expand