""" Tunnel Functions""" @self._app.get("/start-tunnel") async def start_tunnel( udid: Optional[str] = self.context.udid, ip: Optional[str] = None, connection_type: Optional[str] = None, ) -> fastapi.Response: udid_tunnels = [ t.tunnel for t in self._tunneld_core.tunnel_tasks.values() if t.udid == udid and t.tunnel is not None ] if len(udid_tunnels) > 0: self.context.udid = udid data = { "interface": udid_tunnels[0].interface, "port": udid_tunnels[0].port, "address": udid_tunnels[0].address, } return generate_http_response(data) queue = asyncio.Queue() created_task = False try: if not created_task and connection_type in ("usbmux", None): task_identifier = f"usbmux-{udid}" try: async with await create_using_usbmux(udid) as lockdown: service = await CoreDeviceTunnelProxy.create(lockdown) task = asyncio.create_task( self._tunneld_core.start_tunnel_task( task_identifier, service, protocol=TunnelProtocol.TCP, queue=queue, ), name=f"start-tunnel-task-{task_identifier}", ) self._tunneld_core.tunnel_tasks[task_identifier] = TunnelTask( task=task, udid=udid ) created_task = True except ConnectionFailedError, InvalidServiceError, MuxException: pass if connection_type in ("usb", None): for rsd in await get_rsds(udid=udid): rsd_ip = rsd.service.address[0] if ip is not None and rsd_ip != ip: await rsd.close() continue task = asyncio.create_task( self._tunneld_core.start_tunnel_task( rsd_ip, await create_core_device_tunnel_service_using_rsd(rsd), queue=queue, ), name=f"start-tunnel-usb-{rsd_ip}", ) self._tunneld_core.tunnel_tasks[rsd_ip] = TunnelTask( task=task, udid=rsd.udid ) created_task = True if not created_task and connection_type in ("wifi", None): for remotepairing in await get_remote_pairing_tunnel_services( udid=udid ): remotepairing_ip = remotepairing.hostname if ip is not None and remotepairing_ip != ip: await remotepairing.close() continue task = asyncio.create_task( self._tunneld_core.start_tunnel_task( remotepairing_ip, remotepairing, queue=queue ), name=f"start-tunnel-wifi-{remotepairing_ip}", ) self._tunneld_core.tunnel_tasks[remotepairing_ip] = TunnelTask( task=task, udid=remotepairing.remote_identifier ) created_task = True except Exception as e: return fastapi.Response( status_code=501, content=json.dumps( { "error": { "exception": e.__class__.__name__, "traceback": traceback.format_exc(), } } ), ) if not created_task: return fastapi.Response( status_code=501, content=json.dumps({"error": "task not created"}) ) tunnel: Optional[TunnelResult] = await queue.get() if tunnel is not None: self.context.udid = udid data = { "interface": tunnel.interface, "port": tunnel.port, "address": tunnel.address, } return generate_http_response(data) else: return fastapi.Response( status_code=404, content=json.dumps( {"error": "something went wrong during tunnel creation"} ), ) @self._app.get("/restart-tunneld") async def restart() -> fastapi.Response: """Restart Tunneld""" self._tunneld_core.clear() await asyncio.sleep(2) self._tunneld_core.start() data = { "operation": "restart-tunneld", "data": True, "message": "Restarting tunneld...", } return generate_http_response(data) @self._app.get("/shutdown") async def shutdown() -> fastapi.Response: """Shutdown Tunneld""" os.kill(os.getpid(), signal.SIGINT) data = { "operation": "shutdown", "data": True, "message": "Server shutting down...", } return generate_http_response(data) @self._app.get("/clear-tunnels") async def clear_tunnels() -> fastapi.Response: """Clear all tunnels""" self._tunneld_core.clear() data = { "operation": "clear_tunnels", "data": True, "message": "Cleared tunnels...", } return generate_http_response(data) @self._app.get("/cancel") async def cancel_tunnel(udid: str) -> fastapi.Response: """Cancel a tunnel""" self._tunneld_core.cancel(udid=udid) data = { "operation": "cancel", "udid": udid, "data": True, "message": f"tunnel {udid} Canceled ...", } return generate_http_response(data) """Simulation Functions""" @self._app.get("/start-simulation") async def app_start_simulation() -> fastapi.Response: logger.info("Simulation Start Requested ") if ( self.context.simulation_task is None or self.context.simulation_task.done() ): await start_icloud_monitor() self.context.simulation_active = True self.context.simulation_task = asyncio.create_task( start_simulation_queue(), name="location-simulation-worker", ) data = {"status": "started", "message": "Simulation started"} else: data = {"status": "error", "message": "Simulation already running"} return generate_http_response(data) @self._app.get("/start-icloud-monitor") async def app_start_icloud_monitor() -> fastapi.Response: await start_icloud_monitor() data = { "status": "started", "icloud_monitor_enabled": self.context.icloud_monitor_enabled, "icloud_monitor_running": is_icloud_monitor_running(), } return generate_http_response(data) @self._app.get("/stop-icloud-monitor") async def app_stop_icloud_monitor() -> fastapi.Response: await end_icloud_monitor() data = { "status": "stopped", "icloud_monitor_enabled": self.context.icloud_monitor_enabled, "icloud_monitor_running": is_icloud_monitor_running(), } return generate_http_response(data) @self._app.get("/icloud-monitor-status") async def app_icloud_monitor_status() -> fastapi.Response: data = { "status": "ok", "icloud_monitor_enabled": self.context.icloud_monitor_enabled, "icloud_monitor_running": is_icloud_monitor_running(), } return generate_http_response(data) @self._app.post("/add-location") async def app_add_location(data: SimulationRequestData) -> fastapi.Response: """Add a location to the simulation queue""" logger.info("Request to add new location to queue") loc_id = str(uuid.uuid4()) latitude = ( data.get("latitude") if isinstance(data, dict) else getattr(data, "latitude", None) ) longitude = ( data.get("longitude") if isinstance(data, dict) else getattr(data, "longitude", None) ) delay = ( data.get("delay", 0) if isinstance(data, dict) else getattr(data, "delay", 0) ) try: delay = parse_delay_seconds(delay) except ValueError as e: return generate_http_response( {"status": "error", "message": str(e)}, status_code=400, ) if latitude is not None and longitude is not None: logger.info( "Adding location %s (%s, %s) with %s delay to the queue", loc_id, latitude, longitude, delay, ) accrued_delay = 0 if self.context.queue_data: accrued_delay = sum( parse_delay_seconds(item.get("delay", 0)) for item in self.context.queue_data.values() ) now_time = datetime.now(timezone.utc) new_time = ( now_time + timedelta(seconds=accrued_delay) + timedelta(seconds=delay) ) start_time = new_time.isoformat() location_item = { "loc_id": loc_id, "latitude": latitude, "longitude": longitude, "delay": delay, "start": start_time, "status": "queued", } resp = { "status": "added", "message": f"Location {loc_id} added to the queue", "item": location_item, } await self.context.queue.put(loc_id) add_item(loc_id, location_item) logger.info("Location %s added to the queue", loc_id) else: resp = {"status": "error", "message": "Invalid location data"} return generate_http_response(resp) @self._app.get("/clear-queue") async def app_clear_queue() -> fastapi.Response: """Clear the simulation queue""" logger.info("Simulation Start Requested ") await empty_simulation_queue() data = {"status": "cleared", "message": "Simulation cleared"} return generate_http_response(data) @self._app.get("/pause-queue") async def app_pause_queue() -> fastapi.Response: """Pause the simulation queue""" await pause_simulation_queue() data = {"status": "paused", "message": "Simulation paused"} return generate_http_response(data) @self._app.get("/resume-queue") async def app_resume_queue() -> fastapi.Response: """Resume the simulation queue""" await resume_simulation_queue() data = {"status": "resumed", "message": "Simulation resumed"} return generate_http_response(data) @self._app.get("/end-simulation") async def app_end_simulation() -> fastapi.Response: """End the simulation queue""" logger.info("End location simulation request") end_task = asyncio.create_task( end_simulation_queue(), name="end-simulation-worker" ) result = await end_task data = {"status": result, "message": "Simulation ended"} return generate_http_response(data) """Status Functions""" @self._app.get("/") async def list_tunnels() -> dict[str, list[dict]]: """Retrieve the available tunnels and format them as {UUID: TUNNEL_ADDRESS}""" tunnels = {} for ip, active_tunnel in self._tunneld_core.tunnel_tasks.items(): if (active_tunnel.udid is None) or (active_tunnel.tunnel is None): continue if active_tunnel.udid not in tunnels: tunnels[active_tunnel.udid] = [] tunnels[active_tunnel.udid].append( { "tunnel-address": active_tunnel.tunnel.address, "tunnel-port": active_tunnel.tunnel.port, "interface": ip, } ) return tunnels @self._app.get("/device-info") async def device_info(): """Get device information""" tunnels = {} for ip, active_tunnel in self._tunneld_core.tunnel_tasks.items(): if (active_tunnel.udid is None) or (active_tunnel.tunnel is None): continue if active_tunnel.udid not in tunnels: tunnels[active_tunnel.udid] = {} try: lockdown = await create_using_usbmux( serial=active_tunnel.udid, autopair=False ) tunnels[active_tunnel.udid] = iterate_multidim(lockdown.all_values) except Exception as e: logger.error( f"Failed to create lockdown session for device {active_tunnel.udid}: {e}" ) continue return tunnels @self._app.get("/device-name") async def rsd_info(): """Get rsd information""" device_name = await get_device_name() return generate_http_response(device_name) @self._app.get("/rsd-info") async def rsd_info(): """Get rsd information""" rsd_info = {} if self.context.tunnel is None: await get_tun() if self.context.tunnel is not None: rsd_info = self.context.tunnel.peer_info return generate_http_response(rsd_info) @self._app.get("/hello") async def hello() -> fastapi.Response: data = {"message": "Hello, I'm alive"} return generate_http_response(data) @self._app.get("/context-status") async def app_context_status() -> fastapi.Response: data = get_status() return generate_http_response(data)