import asyncio import json import logging import socket import time from dataclasses import dataclass from typing import Dict, Optional import uvicorn from fastapi import FastAPI, HTTPException from pydantic import BaseModel from pymobiledevice3.lockdown import create_using_usbmux from pymobiledevice3.remote.common import TunnelProtocol from pymobiledevice3.remote.remotexpc import RemoteXPCConnection from pymobiledevice3.remote.tunnel_service import CoreDeviceTunnelProxy, start_tunnel, TunnelResult from pymobiledevice3.service_connection import ServiceConnection from pymobiledevice3.services.dvt.dvt_secure_socket_proxy import DvtSecureSocketProxyService from pymobiledevice3.services.dvt.instruments.location_simulation import LocationSimulation from pymobiledevice3.usbmux import list_devices _orig_remotexpc_connect = RemoteXPCConnection.connect async def _remotexpc_connect_with_timeout(self): """Wrapper to add timeout to TCP connection and handshake""" import logging _logger = logging.getLogger("ios-api") _logger.info(f"RemoteXPC attempting TCP connection to {self.address}") tcp_start = time.time() try: self._reader, self._writer = await asyncio.wait_for( asyncio.open_connection(self.address[0], self.address[1]), timeout=5.0 ) tcp_elapsed = time.time() - tcp_start _logger.info(f"RemoteXPC TCP connected in {tcp_elapsed:.2f}s") except asyncio.TimeoutError as exc: tcp_elapsed = time.time() - tcp_start _logger.error(f"RemoteXPC TCP connection to {self.address} timed out after {tcp_elapsed:.2f}s") raise asyncio.TimeoutError(f"TCP connection to {self.address} timed out after 5s") from exc except Exception as exc: tcp_elapsed = time.time() - tcp_start _logger.error(f"RemoteXPC TCP connection to {self.address} failed after {tcp_elapsed:.2f}s: {exc}") raise _logger.info(f"RemoteXPC starting handshake") handshake_start = time.time() try: await self._do_handshake() handshake_elapsed = time.time() - handshake_start _logger.info(f"RemoteXPC handshake complete in {handshake_elapsed:.2f}s") except Exception as exc: handshake_elapsed = time.time() - handshake_start _logger.error(f"RemoteXPC handshake failed after {handshake_elapsed:.2f}s: {exc}") await self.close() raise RemoteXPCConnection.connect = _remotexpc_connect_with_timeout from pymobiledevice3.remote.remote_service_discovery import RemoteServiceDiscoveryService # Patch RemoteServiceDiscoveryService.connect to use faster timeouts for lockdown _orig_rsd_connect = RemoteServiceDiscoveryService.connect async def _rsd_connect_with_timeout(self): """Modified connect with faster timeout for lockdown connections""" import logging _logger = logging.getLogger("ios-api") await self.service.connect() try: self.peer_info = await self.service.receive_response() self.udid = self.peer_info["Properties"]["UniqueDeviceID"] self.product_type = self.peer_info["Properties"]["ProductType"] # Skip lockdown connection - not working over RSD tunnel and not needed for DVT self.lockdown = None _logger.info("Skipping lockdown connection for RSD (not required for DVT)") self.all_values = self.lockdown.all_values if self.lockdown is not None else {} except Exception: await self.close() raise RemoteServiceDiscoveryService.connect = _rsd_connect_with_timeout class JsonFormatter(logging.Formatter): def format(self, record: logging.LogRecord) -> str: payload = { "ts": self.formatTime(record, "%Y-%m-%dT%H:%M:%S%z"), "level": record.levelname, "logger": record.name, "message": record.getMessage(), } if record.exc_info: payload["exc_info"] = self.formatException(record.exc_info) return json.dumps(payload, ensure_ascii=True) handler = logging.StreamHandler() handler.setFormatter(JsonFormatter()) root_logger = logging.getLogger() root_logger.handlers = [handler] root_logger.setLevel(logging.INFO) logger = logging.getLogger("ios-api") # Patch ServiceConnection.create_using_tcp to force IPv6 and reduce timeout _orig_create_using_tcp = ServiceConnection.create_using_tcp def _create_using_tcp_with_ipv6( hostname: str, port: int, keep_alive: bool = True, create_connection_timeout: int = 5, # Reduced from 20 to 5 seconds ): """Force IPv6 connection with reduced timeout""" import socket as socket_module from pymobiledevice3.osu.os_utils import get_os_utils import logging _logger = logging.getLogger("ios-api") # Force IPv6 socket creation for tunnel addresses if ':' in hostname: # IPv6 address _logger.info(f"ServiceConnection connecting to {hostname}:{port} with {create_connection_timeout}s timeout") sock = socket_module.socket(socket_module.AF_INET6, socket_module.SOCK_STREAM) sock.settimeout(create_connection_timeout) connect_start = time.time() try: sock.connect((hostname, port)) connect_elapsed = time.time() - connect_start _logger.info(f"ServiceConnection connected to {hostname}:{port} in {connect_elapsed:.2f}s") # Keep a 5-second timeout for subsequent operations for faster failure detection sock.settimeout(5.0) if keep_alive: get_os_utils().set_keepalive(sock) return ServiceConnection(sock) except Exception as exc: connect_elapsed = time.time() - connect_start _logger.error(f"ServiceConnection failed to {hostname}:{port} after {connect_elapsed:.2f}s: {exc}") sock.close() raise else: # Fall back to original for non-IPv6 return _orig_create_using_tcp( hostname, port, keep_alive=keep_alive, create_connection_timeout=create_connection_timeout, ) ServiceConnection.create_using_tcp = staticmethod(_create_using_tcp_with_ipv6) app = FastAPI(title="iOS Device Management API") class LocationUpdate(BaseModel): latitude: float longitude: float rsd_address: Optional[str] = None rsd_port: Optional[int] = None @app.get("/api/usbmux/status") async def get_usbmux_status(): """Lists all devices visible to USBMux.""" try: device = list_devices() return { "device_count": len(device), "devices": [{"serial": d.serial, "connection_type": d.connection_type} for d in device] } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/lockdown/status") async def get_lockdown_status(): """Checks lockdown connectivity and basic device info.""" try: device_serial = _get_single_device_udid() lockdown = create_using_usbmux(serial=device_serial, autopair=False) return { "udid": device_serial, "product_version": lockdown.product_version, "device_name": lockdown.get_value(key='DeviceName'), "phone_number": lockdown.get_value(key='PhoneNumber'), "status": "Connected" } except Exception as e: return {"status": "Disconnected", "error": str(e)} class TunnelStartRequest(BaseModel): protocol: str = "tcp" wait_for_device: bool = False wait_timeout_seconds: int = 30 class PreflightResponse(BaseModel): rsd_address: str rsd_port: int interface: Optional[str] = None protocol: Optional[str] = None dtservicehub_port: Optional[int] = None dtservicehub_reachable: bool = False lockdown_trusted_port: Optional[int] = None lockdown_untrusted_port: Optional[int] = None lockdown_trusted_reachable: bool = False lockdown_untrusted_reachable: bool = False @dataclass class TunnelState: task: Optional[asyncio.Task] stop_event: asyncio.Event ready_event: asyncio.Event result: Optional[TunnelResult] = None error: Optional[str] = None udid: Optional[str] = None _TUNNELS: Dict[str, TunnelState] = {} def _get_single_device_udid() -> str: devices = list_devices() if not devices: raise HTTPException(status_code=404, detail="No devices connected") if len(devices) > 1: raise HTTPException(status_code=400, detail="Multiple devices connected") return devices[0].serial def _parse_protocol(value: str) -> TunnelProtocol: try: return TunnelProtocol[value.upper()] except KeyError as exc: raise HTTPException(status_code=400, detail=f"Unsupported protocol: {value}") from exc async def _run_usbmux_tunnel(udid: str, protocol: TunnelProtocol, state: TunnelState) -> None: lockdown = None proxy = None try: logger.info("Starting usbmux tunnel for udid=%s protocol=%s", udid, protocol.name.lower()) lockdown = create_using_usbmux(serial=udid) if udid else create_using_usbmux() proxy = await CoreDeviceTunnelProxy.create(lockdown) async with start_tunnel(proxy, protocol=protocol) as tunnel: state.result = tunnel logger.info( "Tunnel ready udid=%s address=%s port=%s interface=%s protocol=%s", udid, tunnel.address, tunnel.port, tunnel.interface, tunnel.protocol.name.lower(), ) state.ready_event.set() await state.stop_event.wait() except Exception as e: logger.exception("Tunnel failed udid=%s", udid) state.error = str(e) state.ready_event.set() finally: logger.info("Shutting down tunnel udid=%s", udid) if proxy is not None: await proxy.close() if lockdown is not None: lockdown.close() async def _start_tunnel_internal( udid: str, protocol: TunnelProtocol, wait_for_device: bool, wait_timeout_seconds: int, ) -> TunnelResult: key = "" existing = _TUNNELS.get(key) if existing and existing.task is not None and not existing.task.done(): if existing.result is not None: return existing.result if wait_for_device: timeout = max(1, wait_timeout_seconds) start = asyncio.get_event_loop().time() while True: devices = list_devices() if len(devices) == 1: udid = devices[0].serial break if asyncio.get_event_loop().time() - start > timeout: raise HTTPException(status_code=504, detail="Timed out waiting for device") await asyncio.sleep(0.5) stop_event = asyncio.Event() ready_event = asyncio.Event() state = TunnelState( task=None, stop_event=stop_event, ready_event=ready_event, udid=udid, ) _TUNNELS[key] = state state.task = asyncio.create_task(_run_usbmux_tunnel(udid, protocol, state)) try: await asyncio.wait_for(ready_event.wait(), timeout=15) except asyncio.TimeoutError as exc: raise HTTPException(status_code=504, detail="Timed out waiting for tunnel to start") from exc if state.error: _TUNNELS.pop(key, None) raise HTTPException(status_code=500, detail=f"Failed to start tunnel: {state.error}") return state.result def _wait_for_port(address: str, port: int, timeout_seconds: float = 10.0, interval_seconds: float = 0.5) -> bool: deadline = time.time() + timeout_seconds while time.time() < deadline: try: sock = socket.create_connection((address, port), timeout=2) sock.close() return True except OSError: if time.time() >= deadline: break time.sleep(interval_seconds) return False async def _wait_for_port_async( address: str, port: int, timeout_seconds: float = 10.0, interval_seconds: float = 0.5, ) -> bool: return await asyncio.to_thread(_wait_for_port, address, port, timeout_seconds, interval_seconds) def _simulate_location_with_dvt(service_provider, latitude: float, longitude: float) -> None: logger.info("DVT opening session") # For RSD connections, manually create the service connection to dtservicehub # without going through lockdown (which doesn't work over RSD tunnel) if isinstance(service_provider, RemoteServiceDiscoveryService): from pymobiledevice3.services.remote_server import RemoteServer logger.info("Using RSD dtservicehub connection") # Get raw TCP connection to dtservicehub without RSDCheckin service = service_provider.start_lockdown_service_without_checkin( DvtSecureSocketProxyService.RSD_SERVICE_NAME ) # Manually create RemoteServer instance bypassing LockdownService.__init__ dvt = RemoteServer.__new__(RemoteServer) dvt.service_name = DvtSecureSocketProxyService.RSD_SERVICE_NAME dvt.lockdown = service_provider dvt.service = service dvt.logger = logging.getLogger(DvtSecureSocketProxyService.__module__) dvt.should_remove_ssl_context = False dvt.channel_cache = {} from pymobiledevice3.services.remote_server import ChannelFragmenter dvt.channel_messages = {0: ChannelFragmenter()} # BROADCAST_CHANNEL = 0 from pymobiledevice3.services.remote_server import Channel dvt.broadcast = Channel.create(0, dvt) import threading dvt.lock = threading.Lock() dvt.supported_identifiers = [] else: dvt = DvtSecureSocketProxyService(service_provider) try: handshake_start = time.monotonic() logger.info("DVT handshake start") dvt.perform_handshake() handshake_seconds = time.monotonic() - handshake_start logger.info("DVT handshake complete in %.2fs", handshake_seconds) set_start = time.monotonic() LocationSimulation(dvt).set(latitude, longitude) set_seconds = time.monotonic() - set_start logger.info("DVT location set sent in %.2fs", set_seconds) finally: if isinstance(service_provider, RemoteServiceDiscoveryService): dvt.service.close() # Global reference to keep the CLI process alive _location_sim_process: Optional[asyncio.subprocess.Process] = None # Global DVT session for reuse _dvt_session: Optional[DvtSecureSocketProxyService] = None _dvt_session_lock = asyncio.Lock() _current_rsd: Optional[RemoteServiceDiscoveryService] = None async def _get_or_create_dvt_session(rsd: RemoteServiceDiscoveryService): """Get existing DVT session or create a new one (with proper initialization)""" global _dvt_session, _current_rsd async with _dvt_session_lock: # If we have a session and it's for the same RSD connection, reuse it if _dvt_session is not None and _current_rsd is rsd: logger.info("Reusing existing DVT session") return _dvt_session # Close old session if it exists if _dvt_session is not None: logger.info("Closing old DVT session") try: _dvt_session.service.close() except: pass _dvt_session = None _current_rsd = None # Create new DVT session logger.info("Creating new DVT session") def _create_dvt(): from pymobiledevice3.services.remote_server import RemoteServer # Get raw TCP connection to dtservicehub without RSDCheckin service = rsd.start_lockdown_service_without_checkin( DvtSecureSocketProxyService.RSD_SERVICE_NAME ) # Manually create RemoteServer instance dvt = RemoteServer.__new__(RemoteServer) dvt.service_name = DvtSecureSocketProxyService.RSD_SERVICE_NAME dvt.lockdown = rsd dvt.service = service dvt.logger = logging.getLogger(DvtSecureSocketProxyService.__module__) dvt.should_remove_ssl_context = False dvt.channel_cache = {} from pymobiledevice3.services.remote_server import ChannelFragmenter dvt.channel_messages = {0: ChannelFragmenter()} from pymobiledevice3.services.remote_server import Channel dvt.broadcast = Channel.create(0, dvt) import threading dvt.lock = threading.Lock() dvt.supported_identifiers = [] # Perform handshake logger.info("DVT handshake start") handshake_start = time.monotonic() dvt.perform_handshake() handshake_seconds = time.monotonic() - handshake_start logger.info("DVT handshake complete in %.2fs", handshake_seconds) return dvt _dvt_session = await asyncio.wait_for( asyncio.to_thread(_create_dvt), timeout=10.0 ) _current_rsd = rsd return _dvt_session async def _simulate_location_via_library(rsd: RemoteServiceDiscoveryService, latitude: float, longitude: float) -> None: """Use library with persistent DVT session to avoid location bounce""" logger.info("Using library for location simulation (persistent session)") dvt = await _get_or_create_dvt_session(rsd) # Set location using the persistent session def _set_location(): set_start = time.monotonic() LocationSimulation(dvt).set(latitude, longitude) set_seconds = time.monotonic() - set_start logger.info("DVT location set in %.2fs", set_seconds) await asyncio.to_thread(_set_location) logger.info("Location updated successfully (session maintained)") async def _simulate_location_via_cli(address: str, port: int, latitude: float, longitude: float) -> None: """Use pymobiledevice3 CLI with GPX playback to avoid location bounce""" global _location_sim_process import tempfile import xml.etree.ElementTree as ET logger.info("Using pymobiledevice3 CLI for location simulation (GPX playback)") # Kill any existing location simulation process if _location_sim_process is not None: logger.info("Stopping previous location simulation") try: _location_sim_process.terminate() await asyncio.wait_for(_location_sim_process.wait(), timeout=2.0) except: try: _location_sim_process.kill() await _location_sim_process.wait() except: pass _location_sim_process = None # Create a GPX file with a single stationary point that loops # This prevents the location from bouncing back when we restart the process gpx_content = f''' Simulated Location ''' # Write GPX to temporary file gpx_file = tempfile.NamedTemporaryFile(mode='w', suffix='.gpx', delete=False) gpx_file.write(gpx_content) gpx_file.flush() gpx_path = gpx_file.name gpx_file.close() cmd = [ ".venv/bin/pymobiledevice3", "developer", "dvt", "simulate-location", "play", "--rsd", address, str(port), "--disable-sleep", # Play immediately without delays gpx_path, ] logger.info(f"Running CLI command: {' '.join(cmd)}") # Start the process and keep it alive _location_sim_process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.DEVNULL, ) # Wait briefly to ensure location is set await asyncio.sleep(1.5) # Check if process is still running if _location_sim_process.returncode is not None: error_msg = f"CLI process exited with code {_location_sim_process.returncode}" logger.error(error_msg) _location_sim_process = None # Clean up temp file try: os.unlink(gpx_path) except: pass raise HTTPException(status_code=500, detail=error_msg) logger.info("CLI location simulation started (GPX playback maintains location)") # Note: We don't clean up the GPX file here since the process needs it # It will be cleaned up when the process is stopped async def _simulate_location_via_rsd_async(address: str, port: int, latitude: float, longitude: float) -> None: logger.info("Connecting to RSD address=%s port=%s", address, port) port_check_start = time.monotonic() if not await _wait_for_port_async(address, port, timeout_seconds=3, interval_seconds=0.5): port_check_seconds = time.monotonic() - port_check_start raise HTTPException( status_code=504, detail=f"RSD port unreachable after {port_check_seconds:.2f}s", ) port_check_seconds = time.monotonic() - port_check_start logger.info("RSD port reachable in %.2fs", port_check_seconds) rsd = RemoteServiceDiscoveryService((address, port)) connect_timeout = 60 max_attempts = 2 for attempt in range(1, max_attempts + 1): connect_start = time.monotonic() try: await asyncio.wait_for(rsd.connect(), timeout=connect_timeout) connect_seconds = time.monotonic() - connect_start logger.info("RSD connect complete in %.2fs on attempt %s/%s", connect_seconds, attempt, max_attempts) break except asyncio.TimeoutError as exc: connect_seconds = time.monotonic() - connect_start if attempt >= max_attempts: raise HTTPException( status_code=504, detail=f"Timed out connecting to RSD after {max_attempts} attempts", ) from exc logger.warning( "RSD connect timed out in %.2fs on attempt %s/%s; retrying", connect_seconds, attempt, max_attempts, ) await asyncio.sleep(2) try: services = rsd.peer_info.get("Services", {}) dtservicehub = services.get(DvtSecureSocketProxyService.RSD_SERVICE_NAME, {}) logger.info("RSD services keys=%s", list(services.keys())) logger.info("RSD dtservicehub entry=%s", dtservicehub) dt_port = int(dtservicehub.get("Port", 0)) if dtservicehub else 0 if dt_port: logger.info("Waiting for RSD dtservicehub port address=%s port=%s", address, dt_port) wait_start = time.monotonic() if await _wait_for_port_async(address, dt_port, timeout_seconds=30, interval_seconds=0.5): wait_seconds = time.monotonic() - wait_start logger.info("RSD dtservicehub port reachable address=%s port=%s", address, dt_port) else: wait_seconds = time.monotonic() - wait_start logger.warning("RSD dtservicehub port still unreachable address=%s port=%s", address, dt_port) logger.info("RSD dtservicehub wait complete in %.2fs", wait_seconds) else: logger.warning("RSD dtservicehub missing or port=0") logger.info("Starting DVT location set") await asyncio.to_thread(_simulate_location_with_dvt, rsd, latitude, longitude) finally: await rsd.close() def _simulate_location_via_rsd(address: str, port: int, latitude: float, longitude: float) -> None: asyncio.run(_simulate_location_via_rsd_async(address, port, latitude, longitude)) async def _preflight_rsd_async( address: str, port: int, interface: Optional[str], protocol: Optional[str], ) -> PreflightResponse: rsd = RemoteServiceDiscoveryService((address, port)) await rsd.connect() try: services = rsd.peer_info.get("Services", {}) dtservicehub = services.get(DvtSecureSocketProxyService.RSD_SERVICE_NAME, {}) dt_port = int(dtservicehub.get("Port", 0)) if dtservicehub else 0 lockdown_trusted = services.get("com.apple.mobile.lockdown.remote.trusted", {}) lockdown_untrusted = services.get("com.apple.mobile.lockdown.remote.untrusted", {}) trusted_port = int(lockdown_trusted.get("Port", 0)) if lockdown_trusted else 0 untrusted_port = int(lockdown_untrusted.get("Port", 0)) if lockdown_untrusted else 0 dt_reachable = ( await _wait_for_port_async(address, dt_port, timeout_seconds=5, interval_seconds=0.5) if dt_port else False ) trusted_reachable = ( await _wait_for_port_async(address, trusted_port, timeout_seconds=3, interval_seconds=0.5) if trusted_port else False ) untrusted_reachable = ( await _wait_for_port_async(address, untrusted_port, timeout_seconds=3, interval_seconds=0.5) if untrusted_port else False ) return PreflightResponse( rsd_address=address, rsd_port=port, interface=interface, protocol=protocol, dtservicehub_port=dt_port or None, dtservicehub_reachable=dt_reachable, lockdown_trusted_port=trusted_port or None, lockdown_untrusted_port=untrusted_port or None, lockdown_trusted_reachable=trusted_reachable, lockdown_untrusted_reachable=untrusted_reachable, ) finally: await rsd.close() @app.post("/api/tunnel/start") async def start_usb_tunnel(data: TunnelStartRequest): """Starts a CoreDevice tunnel to a USB device and returns RSD connection details.""" udid = _get_single_device_udid() key = "" existing = _TUNNELS.get(key) if existing and not existing.task.done(): if existing.result is not None: logger.info("Tunnel already running udid=%s", existing.udid) return { "status": "already_running", "udid": existing.udid or udid, "rsd_address": existing.result.address, "rsd_port": existing.result.port, "interface": existing.result.interface, "protocol": existing.result.protocol.name.lower(), } protocol = _parse_protocol(data.protocol) result = await _start_tunnel_internal( udid, protocol, data.wait_for_device, data.wait_timeout_seconds, ) logger.info("Tunnel start completed udid=%s", udid) return { "status": "started", "udid": udid, "rsd_address": result.address, "rsd_port": result.port, "interface": result.interface, "protocol": result.protocol.name.lower(), } @app.post("/api/tunnel/stop") async def stop_usb_tunnel(): """Stops a previously started tunnel.""" key = "" state = _TUNNELS.get(key) if state is None: raise HTTPException(status_code=404, detail="No tunnel found") if state.task is None: _TUNNELS.pop(key, None) raise HTTPException(status_code=500, detail="Tunnel state is incomplete") logger.info("Stopping tunnel") state.stop_event.set() await state.task _TUNNELS.pop(key, None) return {"status": "stopped"} @app.post("/api/tunnel/stop-all") async def stop_all_tunnels(): """Stops all running tunnels.""" if not _TUNNELS: return {"status": "stopped", "count": 0} logger.info("Stopping all tunnels count=%s", len(_TUNNELS)) items = list(_TUNNELS.items()) for _, state in items: if state.task is not None: state.stop_event.set() await asyncio.gather( *[state.task for _, state in items if state.task is not None], return_exceptions=True, ) _TUNNELS.clear() return {"status": "stopped", "count": len(items)} @app.get("/api/tunnel/status") async def get_tunnel_status(): """Returns the status of all active tunnels.""" items = [] for key, state in _TUNNELS.items(): if state.result is None: continue if state.task is None: continue items.append( { "udid": state.udid, "rsd_address": state.result.address, "rsd_port": state.result.port, "interface": state.result.interface, "protocol": state.result.protocol.name.lower(), "running": not state.task.done(), } ) return {"tunnels": items} @app.get("/api/preflight", response_model=PreflightResponse) async def preflight(): """Checks RSD connectivity and service port reachability.""" state = _TUNNELS.get("") if state is None or state.result is None: udid = _get_single_device_udid() logger.info("Auto-starting tunnel for preflight") result = await _start_tunnel_internal( udid, TunnelProtocol.TCP, True, 30, ) address = result.address port = result.port return await _preflight_rsd_async( address, port, result.interface, result.protocol.name.lower(), ) address = state.result.address port = state.result.port return await _preflight_rsd_async( address, port, state.result.interface, state.result.protocol.name.lower(), ) @app.post("/api/simulate-location/clear") async def clear_location(): """Stops location simulation by closing the DVT session.""" global _location_sim_process, _dvt_session, _current_rsd logger.info("Clearing location simulation") # Close DVT session if it exists async with _dvt_session_lock: if _dvt_session is not None: try: # Clear the location first def _clear(): LocationSimulation(_dvt_session).clear() await asyncio.to_thread(_clear) except: pass try: _dvt_session.service.close() except: pass _dvt_session = None _current_rsd = None # Also kill CLI process if running if _location_sim_process is not None: try: _location_sim_process.terminate() await asyncio.wait_for(_location_sim_process.wait(), timeout=2.0) except: try: _location_sim_process.kill() await _location_sim_process.wait() except: pass _location_sim_process = None logger.info("Location simulation cleared") return {"status": "cleared"} @app.post("/api/simulate-location") async def set_location(data: LocationUpdate): """Sets a simulated GPS location on the device.""" try: logger.info("Simulate location request lat=%s lon=%s", data.latitude, data.longitude) rsd_address = data.rsd_address rsd_port = data.rsd_port if rsd_address is None and rsd_port is None: state = _TUNNELS.get("") if state and state.result: rsd_address = state.result.address rsd_port = state.result.port if rsd_address is None and rsd_port is None: udid = _get_single_device_udid() logger.info("Auto-starting tunnel for simulate-location") result = await _start_tunnel_internal( udid, TunnelProtocol.TCP, True, 30, ) rsd_address = result.address rsd_port = result.port if rsd_address is not None and rsd_port is not None: # Use CLI approach - library DVT handshake has unresolved issues # Note: Location will briefly bounce back to real location when changing await _simulate_location_via_cli(rsd_address, rsd_port, data.latitude, data.longitude) else: raise HTTPException(status_code=400, detail="RSD address/port required") logger.info("Simulate location success") return {"status": "success", "location": {"lat": data.latitude, "lon": data.longitude}} except HTTPException: raise except Exception as e: logger.exception("Simulate location failed") raise HTTPException(status_code=500, detail=f"Failed to set location: {str(e)}") if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)