diff --git a/server.py b/server.py index 42c905b..4cd9aa3 100644 --- a/server.py +++ b/server.py @@ -1,26 +1,21 @@ import asyncio -import dataclasses -from http.cookies import BaseCookie + +from datetime import datetime, timezone, timedelta import json +import uuid import logging import os import signal -from sys import float_repr_style import traceback import warnings -import fastapi import random -from fastapi import FastAPI -from typing import Optional +from pydantic import BaseModel, RootModel import socketio from contextlib import asynccontextmanager, suppress -from ssl import SSLEOFError -from typing import Optional, Union -import construct +from typing import Optional -from pymobiledevice3.bonjour import browse_remoted -from pymobiledevice3.cli.cli_common import print_json +from pymobiledevice3.services.dvt.instruments.location_simulation_base import LocationSimulationBase with warnings.catch_warnings(): # Ignore: "Core Pydantic V1 functionality isn't compatible with Python 3.14 or greater." @@ -28,52 +23,30 @@ with warnings.catch_warnings(): import fastapi import uvicorn -from construct import StreamError from fastapi import FastAPI -from packaging.version import Version -from pymobiledevice3 import usbmux from pymobiledevice3.exceptions import ( ConnectionFailedError, - ConnectionFailedToUsbmuxdError, - ConnectionTerminatedError, - DeviceNotFoundError, - GetProhibitedError, - IncorrectModeError, InvalidServiceError, - LockdownError, MuxException, - PairingError, - QuicProtocolNotSupportedError, - StreamClosedError, TunneldConnectionError, ) from pymobiledevice3.lockdown import create_using_usbmux, get_mobdev2_lockdowns from pymobiledevice3.osu.os_utils import get_os_utils from pymobiledevice3.remote.common import TunnelProtocol -from pymobiledevice3.remote.module_imports import start_tunnel from pymobiledevice3.remote.remote_service_discovery import ( - RSD_PORT, - RemoteServiceDiscoveryService, + RemoteServiceDiscoveryService ) from pymobiledevice3.remote.tunnel_service import ( CoreDeviceTunnelProxy, - RemotePairingProtocol, TunnelResult, create_core_device_tunnel_service_using_rsd, get_remote_pairing_tunnel_services, ) -from pymobiledevice3.remote.utils import get_rsds, stop_remoted -from pymobiledevice3.utils import asyncio_print_traceback +from pymobiledevice3.remote.utils import get_rsds from pymobiledevice3.services.dvt.instruments.location_simulation import LocationSimulation from pymobiledevice3.services.dvt.instruments.dvt_provider import DvtProvider from pymobiledevice3.tunneld.server import TunneldCore, TunnelTask -from pymobiledevice3.tunneld.api import ( - TUNNELD_DEFAULT_ADDRESS, - get_tunneld_device_by_udid, - get_tunneld_devices, -) - class JsonFormatter(logging.Formatter): def format(self, record: logging.LogRecord) -> str: @@ -103,7 +76,7 @@ REMOTEPAIRING_INTERVAL = 5 MOBDEV2_INTERVAL = 5 # USB monitor will periodically forget what interfaces it has seen -# and force a full rescan. The value is number of iterations of the +# and force a full rescan. The value is the number of iterations of the # inner loop (which sleeps one second each) before blowing away the # `previous_ips` cache. USB_MONITOR_RESCAN_INTERVAL = 30 @@ -114,7 +87,7 @@ TUNNEL_ACQUIRE_TIMEOUT_SECONDS = 15 DVT_CONNECT_TIMEOUT_SECONDS = 20 -class SimulationStatusData: +class SimulationStatusData(BaseModel): latitude: float longitude: float start: float @@ -122,24 +95,41 @@ class SimulationStatusData: next_move: Optional[float] -class SimulationStatus: +class SimulationStatus(BaseModel): status: bool data: Optional[SimulationStatusData] -class SimulationRequestData: +class SimulationRequestData(BaseModel): latitude: float longitude: float - delay: int - start: str - end: Optional[str] + delay: int = 0 + start: Optional[str] = None + end: Optional[str] = None -class SimulationRequest: +class SimulationRequest(BaseModel): status: bool data: Optional[SimulationRequestData] +class SimulationRequestResponseData(BaseModel): + loc_id: str + latitude: float + longitude: float + delay: int = 0 + start: Optional[str] = None + end: Optional[str] = None + +class SimulationQueueList(BaseModel): + data: Optional[SimulationRequestResponseData] + + +class SimulationRequestResponse(BaseModel): + status: bool + data: Optional[SimulationRequestResponseData] + + class LocationSimulationState: def __init__(self): self.latitude: Optional[float] = None @@ -147,11 +137,16 @@ class LocationSimulationState: self.next_move: Optional[float] = None self.udid: Optional[str] = None self.simulation_active: bool = False + self.loc_id: Optional[str] = None + self.set_location_enabled: bool = True self.queue: asyncio.Queue = asyncio.Queue() + self.queue_list: list[SimulationRequestResponseData] = [] + self.queue_status: Optional[asyncio.Event] = asyncio.Event() + self.queue_state: str = "STOPPED" + self.test_mode: bool = True self.simulation_task: Optional[asyncio.Task] = None - self.sio: socketio.AsyncServer = socketio.AsyncServer( - async_mode="asgi", cors_allowed_origins="*" - ) + self.sio: socketio.AsyncServer = socketio.AsyncServer(async_mode="asgi", cors_allowed_origins="*") + self.tunnel: Optional[RemoteServiceDiscoveryService] = None class TunneldRunnerSio: @@ -196,7 +191,7 @@ class TunneldRunnerSio: self._tunneld_core.start() yield logger.info("Closing tunneld tasks...") - await empty_queue() + await empty_simulation_queue() await self._tunneld_core.close() await self.context.sio.shutdown() @@ -218,54 +213,91 @@ class TunneldRunnerSio: async def get_tun( udid: Optional[str] = None, max_retries: int = 10, retry_delay: float = 0.5 ) -> RemoteServiceDiscoveryService: - """Try to connect to tunneld with retries to handle startup delay.""" - for attempt in range(max_retries): - try: - if udid: - rsd = await get_tunneld_device_by_udid(udid, self._tunneld_api_address) - if rsd is not None: - logger.info("Connected to tunnel for udid %s after %s retries", udid, attempt) - return rsd + """Resolve an active local tunnel and connect directly to its RSD endpoint.""" + if self.context.tunnel is not None: + return self.context.tunnel - rsds = await get_tunneld_devices(self._tunneld_api_address) - if rsds: - if udid: - logger.warning("Tunnel for udid %s not found; using first available tunnel", udid) - logger.info("Connected to tunneld after %s retries", attempt) - return rsds[0] - except TunneldConnectionError: + for attempt in range(max_retries): + candidates = [] + for active_tunnel in self._tunneld_core.tunnel_tasks.values(): + if active_tunnel.tunnel is None or active_tunnel.udid is None: + continue + if udid is None or active_tunnel.udid == udid: + candidates.append(active_tunnel.tunnel) + + if not candidates: if attempt < max_retries - 1: - logger.info("Waiting for tunneld to be ready... (attempt %s/%s)", attempt + 1, max_retries) + logger.info("Waiting for local tunnel to be ready... (attempt %s/%s)", attempt + 1, max_retries) + await asyncio.sleep(retry_delay) + continue + logger.error("Failed to find an active local tunnel after max retries") + raise TunneldConnectionError() + + tunnel = candidates[0] + if udid and len(candidates) > 1: + logger.warning("Multiple local tunnels found for udid %s; using first available", udid) + + try: + rsd = RemoteServiceDiscoveryService((tunnel.address, tunnel.port)) + await rsd.connect() + logger.info( + "Connected to local tunnel at %s:%s after %s retries", + tunnel.address, + tunnel.port, + attempt, + ) + self.context.tunnel = rsd + return rsd + except Exception: + if attempt < max_retries - 1: + logger.info("Tunnel endpoint not ready yet... (attempt %s/%s)", attempt + 1, max_retries) await asyncio.sleep(retry_delay) else: - logger.error("Failed to connect to tunneld after max retries") - raise + logger.error("Failed to connect to local tunnel endpoint after max retries") + raise TunneldConnectionError() + raise TunneldConnectionError() - async def empty_queue(): - """Empties all items from an asyncio.Queue.""" - logger.info("Clearing location simulation queue... resetting ios location") - q = self.context.queue - while not q.empty(): - try: - q.get_nowait() - q.task_done() - await q.join() - except asyncio.QueueEmpty: - break - tun = await get_tun(self.context.udid) - if tun is not None: - async with DvtProvider(tun) as dvt, LocationSimulationQueue(dvt, self.context) as locate_simulation: - await locate_simulation.clear() - self.context.simulation_active = False + def cleanup_device_data(d): + mydict = {} + for key, value in d.items(): + if isinstance(value, dict): + cleanup_device_data(value) + elif isinstance(value, bytes): + mydict[key] = 'BYTE DATA' + else: + mydict[key] = value + return mydict - async def start_queue(): + async def device_reboot(delay): + """ Reboot the device""" + logger.info("Reboot iniated with delay %s") + await asyncio.sleep(delay) + os.system("shutdown -r now") + + async def device_shutdown(delay): + """ Shutdown the device""" + logger.info("Shutdown iniated with delay %s") + await asyncio.sleep(delay) + os.system("shutdown -h now") + + """ Queue Functions""" + async def start_simulation_queue(): + """ Start Simulation Queue Worker""" logger.info("Starting location simulation worker...") self.context.simulation_active = True + self.context.queue_state = "RUNNING" + self.context.queue_status.set() try: - if self.context.udid is None: + if self.context.test_mode: + logger.info("Simulation worker: test mode enabled") + with LocationSimulationTestQueue(self.context) as locate_simulation: + await locate_simulation.play_queue() + + if self.context.udid is None and not self.context.test_mode: active_udids = sorted( - {t.udid for t in self._tunneld_core.tunnel_tasks.values() if t.udid is not None and t.tunnel is not None} + {t.udid for t in self._tunneld_core.tunnel_tasks.values() if + t.udid is not None and t.tunnel is not None} ) if len(active_udids) == 1: self.context.udid = active_udids[0] @@ -279,7 +311,8 @@ class TunneldRunnerSio: ) return else: - logger.error("Simulation worker: multiple active tunnels; explicit udid required: %s", active_udids) + logger.error("Simulation worker: multiple active tunnels; explicit udid required: %s", + active_udids) await self.context.sio.emit( "error", { @@ -292,10 +325,11 @@ class TunneldRunnerSio: return logger.info("Simulation worker: acquiring tunnel (udid=%s)", self.context.udid) - tun = await asyncio.wait_for( - get_tun(self.context.udid), - timeout=TUNNEL_ACQUIRE_TIMEOUT_SECONDS, - ) + # tun = await asyncio.wait_for( + # get_tun(self.context.udid), + # timeout=TUNNEL_ACQUIRE_TIMEOUT_SECONDS, + # ) + tun = await get_tun(self.context.udid) logger.info("Simulation worker: tunnel acquired, connecting DVT provider") dvt_provider = DvtProvider(tun) dvt = await asyncio.wait_for(dvt_provider.__aenter__(), timeout=DVT_CONNECT_TIMEOUT_SECONDS) @@ -335,88 +369,55 @@ class TunneldRunnerSio: self.context.simulation_active = False self.context.simulation_task = None - def iterate_multidim(d): - mydict = {} - for key, value in d.items(): - if isinstance(value, dict): - iterate_multidim(value) - elif isinstance(value, bytes): - mydict[key] = 'BYTE DATA' - else: - mydict[key] = value -# elif isinstance(value, str) or isinstance(value, int) or isinstance(value, float) or isinstance(value, bool) or isinstance(value, list): -# mydict[key] = type(value).__name__ - return mydict + async def pause_simulation_queue(): + """Pauses asyncio.Queue playback""" + self.context.queue_state = "PAUSED" - @self._app.get("/") - @self.context.sio.event - async def list_tunnels() -> dict[str, list[dict]]: - """Retrieve the available tunnels and format them as {UUID: TUNNEL_ADDRESS}""" - tunnels = {} - for ip, active_tunnel in self._tunneld_core.tunnel_tasks.items(): - if (active_tunnel.udid is None) or (active_tunnel.tunnel is None): - continue - if active_tunnel.udid not in tunnels: - tunnels[active_tunnel.udid] = [] - tunnels[active_tunnel.udid].append( - { - "tunnel-address": active_tunnel.tunnel.address, - "tunnel-port": active_tunnel.tunnel.port, - "interface": ip, - } - ) - return tunnels + async def resume_simulation_queue(): + """Resumes asyncio.Queue playback""" + self.context.queue_state = "RUNNING" - @self._app.get("/device_info") - async def device_info(): - """Get device information""" - tunnels = {} - for ip, active_tunnel in self._tunneld_core.tunnel_tasks.items(): - if (active_tunnel.udid is None) or (active_tunnel.tunnel is None): - continue - if active_tunnel.udid not in tunnels: - tunnels[active_tunnel.udid] = {} + async def empty_simulation_queue(): + """Empties all items from an asyncio.Queue.""" + logger.info("Clearing location simulation queue...") + q = self.context.queue + self.context.set_location_enabled = False + while not q.empty(): try: - lockdown = await create_using_usbmux(serial=active_tunnel.udid, autopair=False) - tunnels[active_tunnel.udid] = iterate_multidim(lockdown.all_values) - except Exception as e: - logger.error(f"Failed to create lockdown session for device {active_tunnel.udid}: {e}") - continue - return tunnels + item = q.get_nowait() + q.task_done() + logger.info("Discarding item from queue: %s", item) + except asyncio.QueueEmpty: + break - @self._app.get("/shutdown") - async def shutdown() -> fastapi.Response: - """Shutdown Tunneld""" - os.kill(os.getpid(), signal.SIGINT) - data = {"operation": "shutdown", "data": True, "message": "Server shutting down..."} - return generate_http_response(data) - - @self._app.get("/clear_tunnels") - async def clear_tunnels() -> fastapi.Response: - self._tunneld_core.clear() - data = {"operation": "clear_tunnels", "data": True, "message": "Cleared tunnels..."} - return generate_http_response(data) - - @self._app.get("/cancel") - async def cancel_tunnel(udid: str) -> fastapi.Response: - self._tunneld_core.cancel(udid=udid) - data = {"operation": "cancel", "udid": udid, "data": True, "message": f"tunnel {udid} Canceled ..."} - return generate_http_response(data) - - @self._app.get("/hello") - async def hello() -> fastapi.Response: - data = {"message": "Hello, I'm alive"} - return generate_http_response(data) + async def end_simulation_queue() -> str: + """Ends asyncio.Queue playback and closes tunnel""" + logger.info("End location simulation request from %s", sid) + if self.context.simulation_task is not None and not self.context.simulation_task.done(): + q = self.context.queue + if q.qsize() > 0: + await empty_simulation_queue() + while q.empty() and q.qsize() == 0: + await q.join() + with suppress(asyncio.CancelledError): + await self.context.simulation_task + if self.context.tunnel is not None: + async with DvtProvider(self.context.tunnel) as dvt, LocationSimulation(dvt) as locate_simulation: + await locate_simulation.clear() + self.context.simulation_active = False + self.context.queue_state = "SHUTDOWN" + return "ended" + """ FastAPI HTTP Functions""" def generate_http_response( data: dict, status_code: int = 200, media_type: str = "application/json" ) -> fastapi.Response: return fastapi.Response(status_code=status_code, media_type=media_type, content=json.dumps(data)) + """ Tunnel Functions""" @self._app.get("/start-tunnel") - @self.context.sio.event async def start_tunnel( - udid: str, ip: Optional[str] = None, connection_type: Optional[str] = None + udid: Optional[str] = self.context.udid, ip: Optional[str] = None, connection_type: Optional[str] = None ) -> fastapi.Response: udid_tunnels = [ t.tunnel for t in self._tunneld_core.tunnel_tasks.values() if t.udid == udid and t.tunnel is not None @@ -429,10 +430,8 @@ class TunneldRunnerSio: "address": udid_tunnels[0].address, } return generate_http_response(data) - queue = asyncio.Queue() created_task = False - try: if not created_task and connection_type in ("usbmux", None): task_identifier = f"usbmux-{udid}" @@ -487,10 +486,8 @@ class TunneldRunnerSio: } }), ) - if not created_task: return fastapi.Response(status_code=501, content=json.dumps({"error": "task not created"})) - tunnel: Optional[TunnelResult] = await queue.get() if tunnel is not None: self.context.udid = udid @@ -501,89 +498,351 @@ class TunneldRunnerSio: status_code=404, content=json.dumps({"error": "something went wrong during tunnel creation"}) ) + @self._app.get("/shutdown") + async def shutdown() -> fastapi.Response: + """Shutdown Tunneld""" + os.kill(os.getpid(), signal.SIGINT) + data = {"operation": "shutdown", "data": True, "message": "Server shutting down..."} + return generate_http_response(data) + + @self._app.get("/clear-tunnels") + async def clear_tunnels() -> fastapi.Response: + """Clear all tunnels""" + self._tunneld_core.clear() + data = {"operation": "clear_tunnels", "data": True, "message": "Cleared tunnels..."} + return generate_http_response(data) + + @self._app.get("/cancel") + async def cancel_tunnel(udid: str) -> fastapi.Response: + """Cancel a tunnel""" + self._tunneld_core.cancel(udid=udid) + data = {"operation": "cancel", "udid": udid, "data": True, "message": f"tunnel {udid} Canceled ..."} + return generate_http_response(data) + + """Simulation Functions""" + @self._app.get("/start-simulation") + async def app_start_simulation() -> fastapi.Response: + logger.info("Simulation Start Requested ") + if self.context.simulation_task is None or self.context.simulation_task.done(): + self.context.simulation_active = True + self.context.simulation_task = asyncio.create_task( + start_simulation_queue(), + name="location-simulation-worker", + ) + data = {"status": "started", "message": "Simulation started"} + else: + data = {"status": "error", "message": "Simulation already running"} + return generate_http_response(data) + + @self._app.post("/add-location") + async def app_add_location(data: SimulationRequestData) -> fastapi.Response: + """ Add a location to the simulation queue""" + logger.info("Request to add new location to queue") + loc_id = str(uuid.uuid4()) + latitude = data.get("latitude") if isinstance(data, dict) else getattr(data, "latitude", None) + longitude = data.get("longitude") if isinstance(data, dict) else getattr(data, "longitude", None) + delay = data.get("delay", 0) if isinstance(data, dict) else getattr(data, "delay", 0) + delay = 0 if delay is None else delay + if latitude is not None and longitude is not None: + logger.info("Adding location %s (%s, %s) with %s delay to the queue", loc_id, latitude, longitude, + delay) + await self.context.queue.put((loc_id, latitude, longitude, delay)) + if delay == 0: + start_time = datetime.now(timezone.utc).isoformat() + else: + now_time = datetime.now(timezone.utc) + new_time = now_time + timedelta(seconds=delay) + start_time = new_time.isoformat() + location_item = { + loc_id: { + "loc_id": loc_id, + "latitude": latitude, + "longitude": longitude, + "delay": delay, + "start": start_time + } + } + self.context.queue_list.append(location_item) + resp = { + "status": "added", + "message": f"Location {loc_id} added to the queue", + "item": location_item + } + else: + resp = {"status": "error", "message": "Invalid location data"} + return generate_http_response(resp) + + @self._app.get("/clear-queue") + async def app_clear_queue() -> fastapi.Response: + """ Clear the simulation queue""" + logger.info("Simulation Start Requested ") + await empty_simulation_queue() + data = {"status": "cleared", "message": "Simulation cleared"} + return generate_http_response(data) + + @self._app.get("/pause-queue") + async def app_pause_queue() -> fastapi.Response: + """ Pause the simulation queue""" + await pause_simulation_queue() + data = {"status": "paused", "message": "Simulation paused"} + return generate_http_response(data) + + @self._app.get("/resume-queue") + async def app_resume_queue() -> fastapi.Response: + """ Resume the simulation queue""" + await resume_simulation_queue() + data = {"status": "resumed", "message": "Simulation resumed"} + return generate_http_response(data) + + @self._app.get("/end-simulation") + async def app_end_simulation() -> fastapi.Response: + """ End the simulation queue""" + logger.info("End location simulation request") + end_task = asyncio.create_task(end_simulation_queue(), name="end-simulation-worker") + result = await end_task + data = {"status": result, "message": "Simulation ended"} + return generate_http_response(data) + + """Status Functions""" + @self._app.get("/") + async def list_tunnels() -> dict[str, list[dict]]: + """Retrieve the available tunnels and format them as {UUID: TUNNEL_ADDRESS}""" + tunnels = {} + for ip, active_tunnel in self._tunneld_core.tunnel_tasks.items(): + if (active_tunnel.udid is None) or (active_tunnel.tunnel is None): + continue + if active_tunnel.udid not in tunnels: + tunnels[active_tunnel.udid] = [] + tunnels[active_tunnel.udid].append( + { + "tunnel-address": active_tunnel.tunnel.address, + "tunnel-port": active_tunnel.tunnel.port, + "interface": ip, + } + ) + return tunnels + + @self._app.get("/device-info") + async def device_info(): + """Get device information""" + tunnels = {} + for ip, active_tunnel in self._tunneld_core.tunnel_tasks.items(): + if (active_tunnel.udid is None) or (active_tunnel.tunnel is None): + continue + if active_tunnel.udid not in tunnels: + tunnels[active_tunnel.udid] = {} + try: + lockdown = await create_using_usbmux(serial=active_tunnel.udid, autopair=False) + tunnels[active_tunnel.udid] = iterate_multidim(lockdown.all_values) + except Exception as e: + logger.error(f"Failed to create lockdown session for device {active_tunnel.udid}: {e}") + continue + return tunnels + + @self._app.get("/hello") + async def hello() -> fastapi.Response: + data = {"message": "Hello, I'm alive"} + return generate_http_response(data) + + @self._app.get("/context-status") + async def app_context_status() -> fastapi.Response: + data = { + "latitude": self.context.latitude, + "longitude": self.context.longitude, + "next_more": self.context.next_move, + "udid": self.context.udid, + "simulation_active": self.context.simulation_active, + "loc_id": self.context.loc_id, + "set_location_enable": self.context.set_location_enabled, + "queue_state": self.context.queue_state, + "queue_list": self.context.queue_list + } + return generate_http_response(data) + + """ Socket.IO Functions""" + + """Socket.IO Connection Events""" @self.context.sio.event async def connect(sid, environ): + """Client connection event handler.""" logger.info("Client connected: %s", sid) - await self.context.sio.emit("connect", sid, namespace="/") + return('%s connected' % sid) + @self.context.sio.event + async def disconnect(sid): + """Client disconnection event handler.""" + logger.info("Client disconnected: %s", sid) + + """ Socket.IO Request Events """ @self.context.sio.event async def request_update(sid, data): logger.info("Update request from %s", sid) - # await self.context.sio.emit("status_update", await get_status()) - @self.context.sio.event async def message(sid, data): logger.info("Received message from %s: %s", data, sid) await self.context.sio.emit("message", f"Received message from {sid}: {data}", namespace="/") + """ Device Control""" @self.context.sio.event - async def simulate_location(sid, data): - logger.info("Simulate location request from %s: %s", sid, data) - latitude = data.get("latitude") if isinstance(data, dict) else getattr(data, "latitude", None) - longitude = data.get("longitude") if isinstance(data, dict) else getattr(data, "longitude", None) - delay = data.get("delay", 0) if isinstance(data, dict) else getattr(data, "delay", 0) - if latitude is not None and longitude is not None: - logger.info("Adding location %s, %s with %ss delay to the queue", latitude, longitude, delay) - await self.context.queue.put((latitude, longitude, delay)) - await self.context.sio.emit( - "simulation", - { - "status": self.context.simulation_active, - "data": { - "latitude": self.context.latitude, - "cur_longitude": longitude, - "next_move": delay, - }, - }, - namespace="/", - ) - else: - logger.warning("Invalid location data received from %s: %s", sid, data) - await self.context.sio.emit("error", "Invalid location data", namespace="/") + async def device_control(sid, data): + """ Device Control """ + command = data.get("command") if isinstance(data, dict) else getattr(data, "command", None) + delay = data.get("delay") if isinstance(data, dict) else getattr(data, "delay", None) + if delay is None: + delay = 5 + match command: + case "shutdown": + """ Shutdown the device""" + logger.info("Shutdown command received from %s with delay %s", sid, delay) + await device_shutdown(delay) + return { "command": "shutdown", "status": "success", "message": f"Device shutdown initiated with {delay} seconds delay" } + + case "reboot": + """ Reboot the device""" + logger.info("Reboot command received from %s with delay %s", sid, delay) + await device_reboot(delay) + return { "command": "reboot", "status": "success", "message": f"Device reboot initiated with {delay} seconds delay" } + + case _: + return { "command": command, "status": "error", "message": f"Invalid command: {command}" } @self.context.sio.event - async def start_simulate_location(sid, data): - logger.info("Start location simulation request from %s", sid) - if isinstance(data, dict) and data.get("udid"): - self.context.udid = data["udid"] - if self.context.simulation_task is None or self.context.simulation_task.done(): - self.context.simulation_active = True - self.context.simulation_task = asyncio.create_task( - start_queue(), - name="location-simulation-worker", - ) - await self.context.sio.emit("simulation", {"status": self.context.simulation_active, "data": None}, - namespace="/") + async def simulate_control(sid, data): + """ Simulation Control """ + command = data.get("command") if isinstance(data, dict) else getattr(data, "command", None) + logger.info("Simulation Control command: %s requested from %s", command, sid) + match command: + case "add": + """ Add a location to the simulation queue""" + loc_id = str(uuid.uuid4()) + latitude = data.get("latitude") if isinstance(data, dict) else getattr(data, "latitude", None) + longitude = data.get("longitude") if isinstance(data, dict) else getattr(data, "longitude", None) + delay = data.get("delay", 0) if isinstance(data, dict) else getattr(data, "delay", 0) + delay = 0 if delay is None else delay + if latitude is not None and longitude is not None: + logger.info("Adding location %s (%s, %s) with %s delay to the queue", loc_id, latitude, longitude, + delay) + await self.context.queue.put((loc_id, latitude, longitude, delay)) + if delay == 0: + start_time = datetime.now(timezone.utc).isoformat() + else: + now_time = datetime.now(timezone.utc) + new_time = now_time + timedelta(seconds=delay) + start_time = new_time.isoformat() + + location_item = { + loc_id: { + "loc_id": loc_id, + "latitude": latitude, + "longitude": longitude, + "delay": delay, + "start": start_time + } + } + ack = { + "command": command, + "status": "added", + "message": f"Location {loc_id} added to the queue", + "item": location_item + } + self.context.queue_list.append(location_item) + logger.info("Location %s added to the queue", loc_id) + return ack + else: + logger.warning("Invalid location data received from %s: %s", sid, data) + return {"command": command, "status": "error", "message": "Invalid location data"} + case "clear": + """ Clear the simulation queue""" + await empty_simulation_queue() + return {"command": command, "status": "cleared", "message": "Simulation cleared"} + case "pause": + """ Pause the simulation queue""" + await pause_simulation_queue() + return {"command": command, "status": "paused", "message": "Simulation paused"} + case "resume": + """ Resume the simulation queue""" + await resume_simulation_queue() + return {"command": command, "status": "resumed", "message": "Simulation resumed"} + case "end": + """ End the simulation queue""" + logger.info("End location simulation request from %s", sid) + end_task = asyncio.create_task(end_simulation_queue(), name="end-simulation-worker") + result = await end_task + return {"command": command, "status": result, "message": "Simulation ended"} + case "start": + """ Start the simulation queue""" + logger.info("Start location simulation request from %s", sid) + if self.context.simulation_task is None or self.context.simulation_task.done(): + self.context.simulation_active = True + self.context.simulation_task = asyncio.create_task( + start_simulation_queue(), + name="location-simulation-worker", + ) + return {"command": command, "status": "started", "message": "Simulation started"} + else: + return {"command": command, "status": "error", "message": "Simulation already running"} + case _: + logger.warning("Invalid command received from %s: %s", sid, command) + return {"status": "error", "message": "Invalid command"} + """ Tunnel Control """ @self.context.sio.event - async def end_simulate_location(sid, data): - logger.info("End location simulation request from %s", sid) - if self.context.simulation_task is not None and not self.context.simulation_task.done(): - await self.context.queue.put((None, None, None)) - with suppress(asyncio.CancelledError): - await self.context.simulation_task - await empty_queue() - await self.context.sio.emit("simulation", {"status": self.context.simulation_active, "data": None}, - namespace="/") + async def tunneld_control(sid, data): + command = data.get("command") if isinstance(data, dict) else getattr(data, "command", None) + logger.info("Tunneld Control command: %s requested from %s", command, sid) + match command: + case "start": + """Start Tunneld""" + logger.info("Start tunneld request from %s: %s", sid, data) + try: + self._tunneld_core.start() + logger.info("Tunneld started successfully") + return {"status": "started", "message": "Tunneld started successfully"} + except Exception as e: + logger.error("Error starting tunneld: %s", e) + return {"command": command, "status": "error", "message": f"Error starting tunneld: {e}"} - @self.context.sio.event - async def disconnect(sid): - logger.info("Client disconnected: %s", sid) + case "shutdown": + """Shutdown Tunneld""" + logger.info("Shutdown tunneld request from %s: %s", sid, data) + try: + os.kill(os.getpid(), signal.SIGINT) + return {"command": command, "status": "Success", "message": "Server shutting down..."} + except Exception as e: + logger.error("Error shutting down tunneld: %s", e) + return {"command": command, "status": "error", "message": f"Error shutting down tunneld: {e}"} - @self.context.sio.event - async def start_tunneld(sid, data): - logger.info("Start tunneld request from %s: %s", sid, data) - try: - self._tunneld_core.start() - logger.info("Tunneld started successfully") - except Exception as e: - logger.error("Error starting tunneld: %s", e) + case "clear": + """Clear all tunnels""" + logger.info("Clearing tunnels...") + try: + self._tunneld_core.clear() + return {"command": command, "status": "Success", "message": "Cleared tunnels..."} + except Exception as e: + logger.error("Error clearing tunnels: %s", e) + return {"command": command, "status": "error", "message": f"Error clearing tunnels: {e}"} + + case "cancel": + """Cancel a tunnel""" + logger.info("Canceling tunnel request from %s: %s", sid, data) + try: + udid = data.get("udid") if isinstance(data, dict) else getattr(data, "udid", self.context.udid) + if udid is None: + udid = self.context.udid + self._tunneld_core.cancel(udid=udid) + return {"command": command, "status": "Success", "udid": udid, "message": f"tunnel {udid} Canceled ..."} + except Exception as e: + logger.error("Error canceling tunnel: %s", e) + return {"command": command, "status": "error", "message": f"Error canceling tunnel: {e}"} + + case _: + return {"command": command, "status": "error", "message": f"Unknown operation: {command}"} + def _run_app(self) -> None: uvicorn.run(self._asgi_app, host=self.host, port=self.port, loop="asyncio", workers=1) - class LocationSimulationQueue(LocationSimulation): def __init__(self, dvt, context: LocationSimulationState): super().__init__(dvt) @@ -591,48 +850,139 @@ class LocationSimulationQueue(LocationSimulation): async def play_queue(self, disable_sleep: bool = False, timing_randomness_range: int = 0) -> None: while True: - latitude, longitude, delay = await self.context.queue.get() - if (latitude, longitude, delay) == (None, None, None): + if self.context.queue_state == "PAUSED": + await asyncio.sleep(0.1) + continue + if self.context.queue_state == "SHUTDOWN": break - if delay > 0 and not disable_sleep: - if timing_randomness_range > 0: - delay = delay + random.uniform( - -timing_randomness_range, timing_randomness_range - ) - for i in range(delay, 0, -1): - self.context.next_move = i - await self.context.sio.emit( - "simulation", - { - "status": self.context.simulation_active, - "data": { + loc_id, latitude, longitude, delay = await self.context.queue.get() + if (loc_id, latitude, longitude, delay) == (None, None, None, None): + break + if self.context.set_location_enabled: + if delay > 0 and not disable_sleep: + if timing_randomness_range > 0: + delay = delay + random.uniform( + -timing_randomness_range, timing_randomness_range + ) + for i in range(delay, 0, -1): + self.context.next_move = i + await self.context.sio.emit( + "simulation_status", + { + "status": self.context.simulation_active, + "loc_id": self.context.loc_id, "latitude": self.context.latitude, "longitude": self.context.longitude, - "next_move": i, + "next_move": i }, - }, - namespace="/", - ) - await asyncio.sleep(1) - await self.set(latitude, longitude) - self.context.latitude = latitude - self.context.longitude = longitude - await self.context.sio.emit( - "simulation", - { - "status": self.context.simulation_active, - "data": { + namespace="/", + ) + await asyncio.sleep(1) + await self.set(latitude, longitude) + self.context.latitude = latitude + self.context.longitude = longitude + await self.context.sio.emit( + "simulation_status", + { + "status": self.context.simulation_active, + "loc_id": self.context.loc_id, "latitude": self.context.latitude, "longitude": self.context.longitude, "next_move": None, }, - }, - namespace="/", - ) - logger.info( - "Set simulated location to %s, %s after %ss delay", - latitude, - longitude, - delay, - ) + namespace="/", + ) + logger.info( + "Set simulated location to %s, %s after %ss delay", + latitude, + longitude, + delay, + ) + self.context.queue.task_done() + + +class LocationSimulationTestQueue(LocationSimulationBase): + def __init__(self, context: LocationSimulationState): + super().__init__() + self.context = context + + def __enter__(self): + return self + + def __exit__(self): + return self + + async def set(self, latitude: float, longitude: float) -> None: + await asyncio.sleep(0.1) + logger.info("Simulated location set to %s, %s", latitude, longitude) + + async def clear(self) -> None: + q = self.context.queue + if self.context.simulation_task is not None and not self.context.simulation_task.done(): + if q.qsize() > 0: + self.context.set_location_enabled = False + while not q.empty(): + try: + item = q.get_nowait() + q.task_done() + logger.info("Discarding item from queue: %s", item) + except asyncio.QueueEmpty: + break + while q.empty() and q.qsize() == 0: + await q.join() + with suppress(asyncio.CancelledError): + await self.context.simulation_task + self.context.simulation_active = False + self.context.queue_state = "SHUTDOWN" + + async def play_queue(self, disable_sleep: bool = False, timing_randomness_range: int = 0) -> None: + while True: + while self.context.queue_state == "PAUSED": + await asyncio.sleep(0.1) + if self.context.queue_state == "SHUTDOWN": + break + loc_id, latitude, longitude, delay = await self.context.queue.get() + if (loc_id, latitude, longitude, delay) == (None, None, None, None): + break + if self.context.set_location_enabled: + if delay > 0 and not disable_sleep: + if timing_randomness_range > 0: + delay = delay + random.uniform( + -timing_randomness_range, timing_randomness_range + ) + for i in range(delay, 0, -1): + self.context.next_move = i + await self.context.sio.emit( + "simulation_status", + { + "status": self.context.simulation_active, + "loc_id": self.context.loc_id, + "latitude": self.context.latitude, + "longitude": self.context.longitude, + "next_move": i + }, + namespace="/", + ) + await asyncio.sleep(1) + await self.set(latitude, longitude) + self.context.latitude = latitude + self.context.longitude = longitude + self.context.loc_id = loc_id + await self.context.sio.emit( + "simulation_status", + { + "status": self.context.simulation_active, + "loc_id": self.context.loc_id, + "latitude": self.context.latitude, + "longitude": self.context.longitude, + "next_move": None, + }, + namespace="/", + ) + logger.info( + "Set simulated location to %s, %s after %ss delay", + latitude, + longitude, + delay, + ) self.context.queue.task_done() diff --git a/socktest.py b/socktest.py deleted file mode 100644 index d344f39..0000000 --- a/socktest.py +++ /dev/null @@ -1,75 +0,0 @@ -import socketio -from fastapi import FastAPI -from fastapi.responses import HTMLResponse - -# 1. Create the FastAPI app -app = FastAPI() - -# 2. Create the Socket.IO server (Async mode for FastAPI) -sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins='*') - -# 3. Wrap FastAPI with Socket.IO -# This allows 'socket_app' to handle socket traffic and pass -# everything else to 'app' -socket_app = socketio.ASGIApp(sio, app) - -# --- Socket.IO Events --- - -@sio.event -async def connect(sid, environ): - print(f"Client connected: {sid}") - await sio.emit('response', {'data': 'Connected to Server!'}) - -@sio.event -async def message(sid, data): - print(f"Received from {sid}: {data}") - # Broadcast to everyone (including sender) - await sio.emit('response', {'data': f"Echo: {data}"}) - -@sio.event -async def disconnect(sid): - print(f"Client disconnected: {sid}") - -# --- FastAPI Routes --- - -html_client = """ - - -
-