import socketio from fastapi import FastAPI from fastapi.responses import HTMLResponse # 1. Create the FastAPI app app = FastAPI() # 2. Create the Socket.IO server (Async mode for FastAPI) sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins='*') # 3. Wrap FastAPI with Socket.IO # This allows 'socket_app' to handle socket traffic and pass # everything else to 'app' socket_app = socketio.ASGIApp(sio, app) # --- Socket.IO Events --- @sio.event async def connect(sid, environ): print(f"Client connected: {sid}") await sio.emit('response', {'data': 'Connected to Server!'}) @sio.event async def message(sid, data): print(f"Received from {sid}: {data}") # Broadcast to everyone (including sender) await sio.emit('response', {'data': f"Echo: {data}"}) @sio.event async def disconnect(sid): print(f"Client disconnected: {sid}") # --- FastAPI Routes --- html_client = """ FastAPI Socket.IO Test

Socket.IO Test

Disconnected

""" @app.get("/") async def index(): return HTMLResponse(html_client)