import asyncio import dataclasses from http.cookies import BaseCookie import json import logging import os import signal from sys import float_repr_style import traceback import warnings import fastapi import random from fastapi import FastAPI from typing import Optional import socketio from contextlib import asynccontextmanager, suppress from ssl import SSLEOFError from typing import Optional, Union import construct from pymobiledevice3.bonjour import browse_remoted from pymobiledevice3.cli.cli_common import print_json with warnings.catch_warnings(): # Ignore: "Core Pydantic V1 functionality isn't compatible with Python 3.14 or greater." warnings.simplefilter("ignore", category=UserWarning) import fastapi import uvicorn from construct import StreamError from fastapi import FastAPI from packaging.version import Version from pymobiledevice3 import usbmux from pymobiledevice3.exceptions import ( ConnectionFailedError, ConnectionFailedToUsbmuxdError, ConnectionTerminatedError, DeviceNotFoundError, GetProhibitedError, IncorrectModeError, InvalidServiceError, LockdownError, MuxException, PairingError, QuicProtocolNotSupportedError, StreamClosedError, TunneldConnectionError, ) from pymobiledevice3.lockdown import create_using_usbmux, get_mobdev2_lockdowns from pymobiledevice3.osu.os_utils import get_os_utils from pymobiledevice3.remote.common import TunnelProtocol from pymobiledevice3.remote.module_imports import start_tunnel from pymobiledevice3.remote.remote_service_discovery import ( RSD_PORT, RemoteServiceDiscoveryService, ) from pymobiledevice3.remote.tunnel_service import ( CoreDeviceTunnelProxy, RemotePairingProtocol, TunnelResult, create_core_device_tunnel_service_using_rsd, get_remote_pairing_tunnel_services, ) from pymobiledevice3.remote.utils import get_rsds, stop_remoted from pymobiledevice3.utils import asyncio_print_traceback from pymobiledevice3.services.dvt.instruments.location_simulation import LocationSimulation from pymobiledevice3.services.dvt.instruments.dvt_provider import DvtProvider from pymobiledevice3.tunneld.server import TunneldCore, TunnelTask from pymobiledevice3.tunneld.api import ( TUNNELD_DEFAULT_ADDRESS, get_tunneld_device_by_udid, get_tunneld_devices, ) class JsonFormatter(logging.Formatter): def format(self, record: logging.LogRecord) -> str: payload = { "ts": self.formatTime(record, "%Y-%m-%dT%H:%M:%S%z"), "level": record.levelname, "logger": record.name, "message": record.getMessage(), } if record.exc_info: payload["exc_info"] = self.formatException(record.exc_info) return json.dumps(payload, ensure_ascii=True) handler = logging.StreamHandler() handler.setFormatter(JsonFormatter()) root_logger = logging.getLogger() root_logger.handlers = [handler] root_logger.setLevel(logging.INFO) logger = logging.getLogger("ios-api") # bugfix: after the device reboots, it might take some time for remoted to start answering the bonjour queries REATTEMPT_INTERVAL = 5 REATTEMPT_COUNT = 5 REMOTEPAIRING_INTERVAL = 5 MOBDEV2_INTERVAL = 5 # USB monitor will periodically forget what interfaces it has seen # and force a full rescan. The value is number of iterations of the # inner loop (which sleeps one second each) before blowing away the # `previous_ips` cache. USB_MONITOR_RESCAN_INTERVAL = 30 USBMUX_INTERVAL = 2 OSUTILS = get_os_utils() TUNNEL_ACQUIRE_TIMEOUT_SECONDS = 15 DVT_CONNECT_TIMEOUT_SECONDS = 20 class SimulationStatusData: latitude: float longitude: float start: float end: Optional[float] next_move: Optional[float] class SimulationStatus: status: bool data: Optional[SimulationStatusData] class SimulationRequestData: latitude: float longitude: float delay: int start: str end: Optional[str] class SimulationRequest: status: bool data: Optional[SimulationRequestData] class LocationSimulationState: def __init__(self): self.latitude: Optional[float] = None self.longitude: Optional[float] = None self.next_move: Optional[float] = None self.udid: Optional[str] = None self.simulation_active: bool = False self.queue: asyncio.Queue = asyncio.Queue() self.simulation_task: Optional[asyncio.Task] = None self.sio: socketio.AsyncServer = socketio.AsyncServer( async_mode="asgi", cors_allowed_origins="*" ) class TunneldRunnerSio: """TunneldRunner orchestrate between the webserver and TunneldCore""" @classmethod def create( cls, host: str, port: int, context: LocationSimulationState = LocationSimulationState(), protocol: TunnelProtocol = TunnelProtocol.QUIC, usb_monitor: bool = True, wifi_monitor: bool = True, usbmux_monitor: bool = True, mobdev2_monitor: bool = True, ) -> None: cls( host, port, protocol=protocol, usb_monitor=usb_monitor, wifi_monitor=wifi_monitor, usbmux_monitor=usbmux_monitor, mobdev2_monitor=mobdev2_monitor, context=context, )._run_app() def __init__( self, host: str, port: int, context: LocationSimulationState = LocationSimulationState(), protocol: TunnelProtocol = TunnelProtocol.QUIC, usb_monitor: bool = True, wifi_monitor: bool = True, usbmux_monitor: bool = True, mobdev2_monitor: bool = True, ): @asynccontextmanager async def lifespan(app: FastAPI): self._tunneld_core.start() yield logger.info("Closing tunneld tasks...") await empty_queue() await self._tunneld_core.close() await self.context.sio.shutdown() self.host = host self.port = port self.protocol = protocol self.context = context self._tunneld_api_address = ("127.0.0.1" if host in ("0.0.0.0", "::") else host, port) self._app = FastAPI(title="iOS Device Management API", lifespan=lifespan, cors_allowed_origins="*") self._asgi_app = socketio.ASGIApp(self.context.sio, self._app) self._tunneld_core = TunneldCore( protocol=protocol, wifi_monitor=wifi_monitor, usb_monitor=usb_monitor, usbmux_monitor=usbmux_monitor, mobdev2_monitor=mobdev2_monitor, ) async def get_tun( udid: Optional[str] = None, max_retries: int = 10, retry_delay: float = 0.5 ) -> RemoteServiceDiscoveryService: """Try to connect to tunneld with retries to handle startup delay.""" for attempt in range(max_retries): try: if udid: rsd = await get_tunneld_device_by_udid(udid, self._tunneld_api_address) if rsd is not None: logger.info("Connected to tunnel for udid %s after %s retries", udid, attempt) return rsd rsds = await get_tunneld_devices(self._tunneld_api_address) if rsds: if udid: logger.warning("Tunnel for udid %s not found; using first available tunnel", udid) logger.info("Connected to tunneld after %s retries", attempt) return rsds[0] except TunneldConnectionError: if attempt < max_retries - 1: logger.info("Waiting for tunneld to be ready... (attempt %s/%s)", attempt + 1, max_retries) await asyncio.sleep(retry_delay) else: logger.error("Failed to connect to tunneld after max retries") raise raise TunneldConnectionError() async def empty_queue(): """Empties all items from an asyncio.Queue.""" logger.info("Clearing location simulation queue... resetting ios location") q = self.context.queue while not q.empty(): try: q.get_nowait() q.task_done() await q.join() except asyncio.QueueEmpty: break tun = await get_tun(self.context.udid) if tun is not None: async with DvtProvider(tun) as dvt, LocationSimulationQueue(dvt, self.context) as locate_simulation: await locate_simulation.clear() self.context.simulation_active = False async def start_queue(): logger.info("Starting location simulation worker...") self.context.simulation_active = True try: if self.context.udid is None: active_udids = sorted( {t.udid for t in self._tunneld_core.tunnel_tasks.values() if t.udid is not None and t.tunnel is not None} ) if len(active_udids) == 1: self.context.udid = active_udids[0] logger.info("Simulation worker: auto-selected udid=%s from active tunnel", self.context.udid) elif len(active_udids) == 0: logger.error("Simulation worker: no active tunnel with udid available") await self.context.sio.emit( "error", {"type": "simulation_no_tunnel", "message": "No active tunnel found. Start tunnel first."}, namespace="/", ) return else: logger.error("Simulation worker: multiple active tunnels; explicit udid required: %s", active_udids) await self.context.sio.emit( "error", { "type": "simulation_udid_required", "message": "Multiple active tunnels found; provide udid in start_simulate_location.", "udids": active_udids, }, namespace="/", ) return logger.info("Simulation worker: acquiring tunnel (udid=%s)", self.context.udid) tun = await asyncio.wait_for( get_tun(self.context.udid), timeout=TUNNEL_ACQUIRE_TIMEOUT_SECONDS, ) logger.info("Simulation worker: tunnel acquired, connecting DVT provider") dvt_provider = DvtProvider(tun) dvt = await asyncio.wait_for(dvt_provider.__aenter__(), timeout=DVT_CONNECT_TIMEOUT_SECONDS) logger.info("Simulation worker: DVT provider connected, starting queue playback") try: async with LocationSimulationQueue(dvt, self.context) as locate_simulation: await locate_simulation.play_queue() finally: logger.info("Simulation worker: closing DVT provider") await dvt_provider.__aexit__(None, None, None) logger.info("Simulation worker: DVT provider closed") except TimeoutError: logger.error( "Simulation worker timeout. tunnel_timeout=%ss dvt_timeout=%ss udid=%s", TUNNEL_ACQUIRE_TIMEOUT_SECONDS, DVT_CONNECT_TIMEOUT_SECONDS, self.context.udid, ) await self.context.sio.emit( "error", { "type": "simulation_timeout", "udid": self.context.udid, "tunnel_timeout_seconds": TUNNEL_ACQUIRE_TIMEOUT_SECONDS, "dvt_timeout_seconds": DVT_CONNECT_TIMEOUT_SECONDS, }, namespace="/", ) except Exception: logger.exception("Simulation worker crashed") await self.context.sio.emit( "error", {"type": "simulation_crash", "udid": self.context.udid}, namespace="/", ) finally: self.context.simulation_active = False self.context.simulation_task = None def iterate_multidim(d): mydict = {} for key, value in d.items(): if isinstance(value, dict): iterate_multidim(value) elif isinstance(value, bytes): mydict[key] = 'BYTE DATA' else: mydict[key] = value # elif isinstance(value, str) or isinstance(value, int) or isinstance(value, float) or isinstance(value, bool) or isinstance(value, list): # mydict[key] = type(value).__name__ return mydict @self._app.get("/") @self.context.sio.event async def list_tunnels() -> dict[str, list[dict]]: """Retrieve the available tunnels and format them as {UUID: TUNNEL_ADDRESS}""" tunnels = {} for ip, active_tunnel in self._tunneld_core.tunnel_tasks.items(): if (active_tunnel.udid is None) or (active_tunnel.tunnel is None): continue if active_tunnel.udid not in tunnels: tunnels[active_tunnel.udid] = [] tunnels[active_tunnel.udid].append( { "tunnel-address": active_tunnel.tunnel.address, "tunnel-port": active_tunnel.tunnel.port, "interface": ip, } ) return tunnels @self._app.get("/device_info") async def device_info(): """Get device information""" tunnels = {} for ip, active_tunnel in self._tunneld_core.tunnel_tasks.items(): if (active_tunnel.udid is None) or (active_tunnel.tunnel is None): continue if active_tunnel.udid not in tunnels: tunnels[active_tunnel.udid] = {} try: lockdown = await create_using_usbmux(serial=active_tunnel.udid, autopair=False) tunnels[active_tunnel.udid] = iterate_multidim(lockdown.all_values) except Exception as e: logger.error(f"Failed to create lockdown session for device {active_tunnel.udid}: {e}") continue return tunnels @self._app.get("/shutdown") async def shutdown() -> fastapi.Response: """Shutdown Tunneld""" os.kill(os.getpid(), signal.SIGINT) data = {"operation": "shutdown", "data": True, "message": "Server shutting down..."} return generate_http_response(data) @self._app.get("/clear_tunnels") async def clear_tunnels() -> fastapi.Response: self._tunneld_core.clear() data = {"operation": "clear_tunnels", "data": True, "message": "Cleared tunnels..."} return generate_http_response(data) @self._app.get("/cancel") async def cancel_tunnel(udid: str) -> fastapi.Response: self._tunneld_core.cancel(udid=udid) data = {"operation": "cancel", "udid": udid, "data": True, "message": f"tunnel {udid} Canceled ..."} return generate_http_response(data) @self._app.get("/hello") async def hello() -> fastapi.Response: data = {"message": "Hello, I'm alive"} return generate_http_response(data) def generate_http_response( data: dict, status_code: int = 200, media_type: str = "application/json" ) -> fastapi.Response: return fastapi.Response(status_code=status_code, media_type=media_type, content=json.dumps(data)) @self._app.get("/start-tunnel") @self.context.sio.event async def start_tunnel( udid: str, ip: Optional[str] = None, connection_type: Optional[str] = None ) -> fastapi.Response: udid_tunnels = [ t.tunnel for t in self._tunneld_core.tunnel_tasks.values() if t.udid == udid and t.tunnel is not None ] if len(udid_tunnels) > 0: self.context.udid = udid data = { "interface": udid_tunnels[0].interface, "port": udid_tunnels[0].port, "address": udid_tunnels[0].address, } return generate_http_response(data) queue = asyncio.Queue() created_task = False try: if not created_task and connection_type in ("usbmux", None): task_identifier = f"usbmux-{udid}" try: async with await create_using_usbmux(udid) as lockdown: service = await CoreDeviceTunnelProxy.create(lockdown) task = asyncio.create_task( self._tunneld_core.start_tunnel_task( task_identifier, service, protocol=TunnelProtocol.TCP, queue=queue ), name=f"start-tunnel-task-{task_identifier}", ) self._tunneld_core.tunnel_tasks[task_identifier] = TunnelTask(task=task, udid=udid) created_task = True except (ConnectionFailedError, InvalidServiceError, MuxException): pass if connection_type in ("usb", None): for rsd in await get_rsds(udid=udid): rsd_ip = rsd.service.address[0] if ip is not None and rsd_ip != ip: await rsd.close() continue task = asyncio.create_task( self._tunneld_core.start_tunnel_task( rsd_ip, await create_core_device_tunnel_service_using_rsd(rsd), queue=queue ), name=f"start-tunnel-usb-{rsd_ip}", ) self._tunneld_core.tunnel_tasks[rsd_ip] = TunnelTask(task=task, udid=rsd.udid) created_task = True if not created_task and connection_type in ("wifi", None): for remotepairing in await get_remote_pairing_tunnel_services(udid=udid): remotepairing_ip = remotepairing.hostname if ip is not None and remotepairing_ip != ip: await remotepairing.close() continue task = asyncio.create_task( self._tunneld_core.start_tunnel_task(remotepairing_ip, remotepairing, queue=queue), name=f"start-tunnel-wifi-{remotepairing_ip}", ) self._tunneld_core.tunnel_tasks[remotepairing_ip] = TunnelTask( task=task, udid=remotepairing.remote_identifier ) created_task = True except Exception as e: return fastapi.Response( status_code=501, content=json.dumps({ "error": { "exception": e.__class__.__name__, "traceback": traceback.format_exc(), } }), ) if not created_task: return fastapi.Response(status_code=501, content=json.dumps({"error": "task not created"})) tunnel: Optional[TunnelResult] = await queue.get() if tunnel is not None: self.context.udid = udid data = {"interface": tunnel.interface, "port": tunnel.port, "address": tunnel.address} return generate_http_response(data) else: return fastapi.Response( status_code=404, content=json.dumps({"error": "something went wrong during tunnel creation"}) ) @self.context.sio.event async def connect(sid, environ): logger.info("Client connected: %s", sid) await self.context.sio.emit("connect", sid, namespace="/") @self.context.sio.event async def request_update(sid, data): logger.info("Update request from %s", sid) # await self.context.sio.emit("status_update", await get_status()) @self.context.sio.event async def message(sid, data): logger.info("Received message from %s: %s", data, sid) await self.context.sio.emit("message", f"Received message from {sid}: {data}", namespace="/") @self.context.sio.event async def simulate_location(sid, data): logger.info("Simulate location request from %s: %s", sid, data) latitude = data.get("latitude") if isinstance(data, dict) else getattr(data, "latitude", None) longitude = data.get("longitude") if isinstance(data, dict) else getattr(data, "longitude", None) delay = data.get("delay", 0) if isinstance(data, dict) else getattr(data, "delay", 0) if latitude is not None and longitude is not None: logger.info("Adding location %s, %s with %ss delay to the queue", latitude, longitude, delay) await self.context.queue.put((latitude, longitude, delay)) await self.context.sio.emit( "simulation", { "status": self.context.simulation_active, "data": { "latitude": self.context.latitude, "cur_longitude": longitude, "next_move": delay, }, }, namespace="/", ) else: logger.warning("Invalid location data received from %s: %s", sid, data) await self.context.sio.emit("error", "Invalid location data", namespace="/") @self.context.sio.event async def start_simulate_location(sid, data): logger.info("Start location simulation request from %s", sid) if isinstance(data, dict) and data.get("udid"): self.context.udid = data["udid"] if self.context.simulation_task is None or self.context.simulation_task.done(): self.context.simulation_active = True self.context.simulation_task = asyncio.create_task( start_queue(), name="location-simulation-worker", ) await self.context.sio.emit("simulation", {"status": self.context.simulation_active, "data": None}, namespace="/") @self.context.sio.event async def end_simulate_location(sid, data): logger.info("End location simulation request from %s", sid) if self.context.simulation_task is not None and not self.context.simulation_task.done(): await self.context.queue.put((None, None, None)) with suppress(asyncio.CancelledError): await self.context.simulation_task await empty_queue() await self.context.sio.emit("simulation", {"status": self.context.simulation_active, "data": None}, namespace="/") @self.context.sio.event async def disconnect(sid): logger.info("Client disconnected: %s", sid) @self.context.sio.event async def start_tunneld(sid, data): logger.info("Start tunneld request from %s: %s", sid, data) try: self._tunneld_core.start() logger.info("Tunneld started successfully") except Exception as e: logger.error("Error starting tunneld: %s", e) def _run_app(self) -> None: uvicorn.run(self._asgi_app, host=self.host, port=self.port, loop="asyncio", workers=1) class LocationSimulationQueue(LocationSimulation): def __init__(self, dvt, context: LocationSimulationState): super().__init__(dvt) self.context = context async def play_queue(self, disable_sleep: bool = False, timing_randomness_range: int = 0) -> None: while True: latitude, longitude, delay = await self.context.queue.get() if (latitude, longitude, delay) == (None, None, None): break if delay > 0 and not disable_sleep: if timing_randomness_range > 0: delay = delay + random.uniform( -timing_randomness_range, timing_randomness_range ) for i in range(delay, 0, -1): self.context.next_move = i await self.context.sio.emit( "simulation", { "status": self.context.simulation_active, "data": { "latitude": self.context.latitude, "longitude": self.context.longitude, "next_move": i, }, }, namespace="/", ) await asyncio.sleep(1) await self.set(latitude, longitude) self.context.latitude = latitude self.context.longitude = longitude await self.context.sio.emit( "simulation", { "status": self.context.simulation_active, "data": { "latitude": self.context.latitude, "longitude": self.context.longitude, "next_move": None, }, }, namespace="/", ) logger.info( "Set simulated location to %s, %s after %ss delay", latitude, longitude, delay, ) self.context.queue.task_done()