Ask Your Question

Revision history [back]

click to hide/show revision 1
initial version

To receive websocket messages using the Textual Python TUI module, you can create a subclass of the textual.widgets.ScrollView class and override the will_draw method to listen for and process the messages.

Here is an example implementation:

import json
import websocket
import threading
import textual
from textual.app import App
from textual.widget import ScrollView

class MyScrollView(ScrollView):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.websocket_thread = threading.Thread(target=self.connect_websocket)
        self.websocket_thread.start()

    def connect_websocket(self):
        ws = websocket.WebSocketApp("ws://localhost:8000/ws/")
        ws.on_message = self.on_message
        ws.run_forever()

    def on_message(self, message):
        data = json.loads(message)
        # handle the incoming message here
        self.buffer.puts(data['text'])

    async def will_draw(self):
        await super().will_draw()
        # redraw the buffer to display any new messages
        self.buffer.force_display()


class MyApp(App):
    async def on_load(self, event):
        self.set_root_widget(MyScrollView())

MyApp.run(log_level="INFO")

In this example, we create a MyScrollView subclass of ScrollView and override the will_draw method to redraw the buffer whenever new messages arrive. We also add a connect_websocket method that creates a WebSocketApp instance and sets its on_message handler to call the on_message method, which processes each incoming message and adds it to the buffer.

To start the websocket connection, we create a websocket_thread and call connect_websocket in its run method. Finally, we set the root widget of the MyApp instance to an instance of MyScrollView.

Of course, this is just an example, and you will need to modify the code to fit your specific use case.