Files
sim-location-backend/src/pymd3_vue_location_sim/server.py

2505 lines
107 KiB
Python

import asyncio
from datetime import datetime, timezone, timedelta
import json
import uuid
import logging
import os
import signal
import traceback
import warnings
import random
import math
import socketio
import httpx
from contextlib import suppress
from typing import Optional, Dict
from dotenv import load_dotenv
with warnings.catch_warnings():
# Ignore: "Core Pydantic V1 functionality isn't compatible with Python 3.14 or greater."
warnings.simplefilter("ignore", category=UserWarning)
import fastapi
import uvicorn
from fastapi import FastAPI, APIRouter, Request, Response
from fastapi.encoders import jsonable_encoder
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from pymobiledevice3.services.dvt.instruments.location_simulation_base import (
LocationSimulationBase,
)
from pymobiledevice3.exceptions import (
ConnectionFailedError,
InvalidServiceError,
MuxException,
TunneldConnectionError,
)
from pymobiledevice3.lockdown import create_using_usbmux, get_mobdev2_lockdowns
from pymobiledevice3.osu.os_utils import get_os_utils
from pymobiledevice3.remote.common import TunnelProtocol
from pymobiledevice3.remote.remote_service_discovery import (
RemoteServiceDiscoveryService,
)
from pymobiledevice3.remote.tunnel_service import (
CoreDeviceTunnelProxy,
TunnelResult,
create_core_device_tunnel_service_using_rsd,
get_remote_pairing_tunnel_services,
)
from pymobiledevice3.remote.utils import get_rsds
from pymobiledevice3.services.dvt.instruments.location_simulation import (
LocationSimulation,
)
from pymobiledevice3.services.dvt.instruments.dvt_provider import DvtProvider
try:
from pymobiledevice3.services.simulate_location import DtSimulateLocation
except Exception:
DtSimulateLocation = None
from pymobiledevice3.tunneld.server import TunneldCore, TunnelTask
from .icloud_monitor import FindMyMonitor
from .models import (
SimulationRequestData,
SimulationRequestResponseData,
iCloudLocationData,
LatLng,
)
from .json_formatter import JsonFormatter, handler, root_logger, logger
from .geo_cache import AsyncReverseGeocoder
load_dotenv()
handler.setFormatter(JsonFormatter())
root_logger.handlers = [handler]
root_logger.setLevel(logging.INFO)
# bugfix: after the device reboots, it might take some time for remoted to start answering the bonjour queries
REATTEMPT_INTERVAL = 5
REATTEMPT_COUNT = 5
REMOTEPAIRING_INTERVAL = 5
MOBDEV2_INTERVAL = 5
# USB monitor will periodically forget what interfaces it has seen
# and force a full rescan. The value is the number of iterations of the
# inner loop (which sleeps one second each) before blowing away the
# `previous_ips` cache.
USB_MONITOR_RESCAN_INTERVAL = 30
USBMUX_INTERVAL = 2
OSUTILS = get_os_utils()
TUNNEL_ACQUIRE_TIMEOUT_SECONDS = 15
DVT_CONNECT_TIMEOUT_SECONDS = 20
def env_flag(name: str, default: str = "True") -> bool:
return os.getenv(name, default).lower() == "true"
class LocationSimulationState:
def __init__(self):
self.connected_clients: set[str] = set()
self.current_location: Optional[Dict[str, str]] = None
self.device_name: Optional[str] = os.getenv("SELECTED_DEVICE_NAME")
self.fmf_location: Optional[iCloudLocationData] = None
self.fmf_queue: asyncio.Queue = asyncio.Queue()
self.icloud_consumer_task = None
self.icloud_monitor = FindMyMonitor(self.fmf_queue)
self.icloud_monitor_task = None
self.icloud_monitor_enabled: bool = False
self.tunnel_watcher_task = None
self.next_move: Optional[float] = None
self.simulation_queue: asyncio.Queue = asyncio.Queue()
self.simulation_queue_data: Dict = {}
self.simulation_queue_order: list[str] = []
self.simulation_queue_deleted_items: list[str] = []
self.simulation_queue_pending_ids: set[str] = set()
self.simulation_queue_state: str = "STOPPED"
self.simulation_noise: bool = False
self.simulation_active: bool = False
self.simulation_task: Optional[asyncio.Task] = None
self.sio: socketio.AsyncServer = socketio.AsyncServer(
async_mode="asgi", cors_allowed_origins="*"
)
self.test_mode: bool = env_flag("TEST_MODE", "True")
self.tunnel: Optional[RemoteServiceDiscoveryService] = None
self.udid: Optional[str] = None
self.reverse_geocode = AsyncReverseGeocoder()
def get_current_loc_id(self) -> Optional[str]:
if not isinstance(self.current_location, dict):
return None
loc_id = self.current_location.get("loc_id")
if isinstance(loc_id, str) and loc_id:
return loc_id
return None
def set_current_loc_id(self, loc_id: Optional[str]) -> None:
if isinstance(loc_id, str) and loc_id:
self.current_location = {"loc_id": loc_id}
else:
self.current_location = None
def get_current_item(self) -> Optional[dict]:
current_loc_id = self.get_current_loc_id()
if current_loc_id is None:
return None
item = self.simulation_queue_data.get(current_loc_id)
if isinstance(item, dict):
return item
return None
class TunneldRunnerSio:
"""TunneldRunner orchestrate between the webserver and TunneldCore"""
@classmethod
def create(
cls,
host: str,
port: int,
context: Optional[LocationSimulationState] = None,
protocol: TunnelProtocol = TunnelProtocol.QUIC,
usb_monitor: bool = True,
wifi_monitor: bool = True,
usbmux_monitor: bool = True,
mobdev2_monitor: bool = True,
) -> None:
cls(
host,
port,
protocol=protocol,
usb_monitor=usb_monitor,
wifi_monitor=wifi_monitor,
usbmux_monitor=usbmux_monitor,
mobdev2_monitor=mobdev2_monitor,
context=context,
)._run_app()
def __init__(
self,
host: str,
port: int,
context: Optional[LocationSimulationState] = None,
protocol: TunnelProtocol = TunnelProtocol.QUIC,
usb_monitor: bool = True,
wifi_monitor: bool = True,
usbmux_monitor: bool = True,
mobdev2_monitor: bool = True,
):
async def app_startup() -> None:
logger.info(
"Application startup: starting tunneld core and watcher")
self._tunneld_core.start()
await start_tunnel_watcher()
async def app_shutdown() -> None:
logger.info("Closing tunneld tasks...")
await end_tunnel_watcher()
await end_icloud_monitor()
await empty_simulation_queue()
await self._tunneld_core.close()
await self.context.sio.shutdown()
self.host = host
self.port = port
self.protocol = protocol
self.context = context or LocationSimulationState()
self._tunneld_api_address = (
"127.0.0.1" if host in ("0.0.0.0", "::") else host,
port,
)
self._app = APIRouter()
self._vue_dist = os.getenv("VUE_DIST")
self._vue_app = FastAPI(
title="iOS Device Management API",
cors_allowed_origins="*",
)
self._asgi_app = socketio.ASGIApp(
self.context.sio,
self._vue_app,
on_startup=app_startup,
on_shutdown=app_shutdown,
)
self.context.icloud_monitor.sio = self.context.sio
self.context.icloud_monitor.get_client_sids = lambda: list(
self.context.connected_clients
)
self._tunneld_core = TunneldCore(
protocol=protocol,
wifi_monitor=wifi_monitor,
usb_monitor=usb_monitor,
usbmux_monitor=usbmux_monitor,
mobdev2_monitor=mobdev2_monitor,
)
"""Tunnel Functions"""
async def get_tun(
udid: Optional[str] = None, max_retries: int = 10, retry_delay: float = 0.5
) -> RemoteServiceDiscoveryService:
"""Resolve an active local tunnel and connect directly to its RSD endpoint."""
if self.context.tunnel is not None:
return self.context.tunnel
for attempt in range(max_retries):
candidates = []
for active_tunnel in self._tunneld_core.tunnel_tasks.values():
if active_tunnel.tunnel is None or active_tunnel.udid is None:
continue
if udid is None or active_tunnel.udid == udid:
candidates.append(active_tunnel.tunnel)
if not candidates:
if attempt < max_retries - 1:
logger.info(
"Waiting for local tunnel to be ready... (attempt %s/%s)",
attempt + 1,
max_retries,
)
await asyncio.sleep(retry_delay)
continue
logger.error(
"Failed to find an active local tunnel after max retries"
)
raise TunneldConnectionError()
tunnel = candidates[0]
if udid and len(candidates) > 1:
logger.warning(
"Multiple local tunnels found for udid %s; using first available",
udid,
)
try:
rsd = RemoteServiceDiscoveryService(
(tunnel.address, tunnel.port))
await rsd.connect()
logger.info(
"Connected to local tunnel at %s:%s after %s retries",
tunnel.address,
tunnel.port,
attempt,
)
self.context.tunnel = rsd
return rsd
except Exception:
if attempt < max_retries - 1:
logger.info(
"Tunnel endpoint not ready yet... (attempt %s/%s)",
attempt + 1,
max_retries,
)
await asyncio.sleep(retry_delay)
else:
logger.error(
"Failed to connect to local tunnel endpoint after max retries"
)
raise TunneldConnectionError()
raise TunneldConnectionError()
def collect_active_tunnels() -> Dict[str, dict]:
active: Dict[str, dict] = {}
for interface, tunnel_task in self._tunneld_core.tunnel_tasks.items():
if tunnel_task is None or tunnel_task.tunnel is None or tunnel_task.udid is None:
continue
tunnel = tunnel_task.tunnel
active[str(interface)] = {
"interface": str(interface),
"udid": tunnel_task.udid,
"address": getattr(tunnel, "address", None),
"port": getattr(tunnel, "port", None),
"transport": getattr(tunnel, "protocol", None),
}
return active
"""Tunnel Watcher"""
async def handle_tunnel_drop(disconnected: list[dict]) -> None:
disconnected_udids: set[str] = set()
for item in disconnected:
udid = item.get("udid")
if isinstance(udid, str) and udid:
disconnected_udids.add(udid)
if not disconnected_udids:
return
current_active = collect_active_tunnels()
active_udids: set[str] = set()
for item in current_active.values():
udid = item.get("udid")
if isinstance(udid, str) and udid:
active_udids.add(udid)
primary_disconnected = self.context.udid in disconnected_udids if self.context.udid else False
no_tunnels_left = len(current_active) == 0
if not primary_disconnected and not no_tunnels_left:
return
logger.warning(
"Tunnel drop detected. primary_udid=%s disconnected_udids=%s active_udids=%s",
self.context.udid,
sorted(disconnected_udids),
sorted(active_udids),
)
if self.context.tunnel is not None:
with suppress(Exception):
await self.context.tunnel.close()
self.context.tunnel = None
if no_tunnels_left:
self.context.udid = None
if self.context.simulation_active:
await pause_simulation_queue()
await self.context.sio.emit(
"appError",
{
"type": "tunnel_disconnected",
"message": "Tunnel dropped. Simulation paused until reconnection.",
"disconnected_udids": sorted(disconnected_udids),
},
namespace="/",
)
await end_icloud_monitor()
async def tunnel_watcher_loop() -> None:
previous = collect_active_tunnels()
while True:
try:
await asyncio.sleep(1)
current = collect_active_tunnels()
added_keys = set(current.keys()) - set(previous.keys())
removed_keys = set(previous.keys()) - set(current.keys())
added = [current[k] for k in sorted(added_keys)]
removed = [previous[k] for k in sorted(removed_keys)]
for item in added:
try:
logger.info(
"Tunnel discovered interface=%s udid=%s address=%s port=%s transport=%s",
item.get("interface"),
item.get("udid"),
item.get("address"),
item.get("port"),
item.get("transport"),
)
await safe_sio_emit("tunnel_device_connected", item)
logger.info(f"Current udid: %s, new udid: %s",
self.context.udid, item.get("udid"))
if self.context.udid is None:
self.context.udid = item.get("udid")
device_name = await get_device_name()
selected_device_name = self.context.device_name
if selected_device_name and device_name != selected_device_name:
wrong_udid = self.context.udid
self._tunneld_core.cancel(
udid=item.get("udid"))
logger.warning(
"Tunnel established to wrong device. Dropping tunnel. wrong_udid=%s for device: %s",
wrong_udid, selected_device_name)
self.context.udid = None
await safe_sio_emit(
"appError",
{
"type": "tunnel_wrong_device",
"message": "Tunnel established to wrong device. Dropping tunnel.",
"wrong_udid": wrong_udid,
"device_name": device_name,
},
)
await get_tun(item.get("udid"), max_retries=10, retry_delay=1.0)
except Exception:
logger.exception(
"Tunnel watcher failed while handling added tunnel for udid=%s",
item.get("udid"),
)
if removed:
for item in removed:
try:
logger.warning(
"Tunnel disconnected interface=%s udid=%s address=%s port=%s",
item.get("interface"),
item.get("udid"),
item.get("address"),
item.get("port"),
)
await safe_sio_emit("tunnel_device_disconnected", item)
except Exception:
logger.exception(
"Tunnel watcher failed while emitting disconnection for udid=%s",
item.get("udid"),
)
await handle_tunnel_drop(removed)
previous = current
except asyncio.CancelledError:
raise
except Exception:
logger.exception("Tunnel watcher loop crashed; continuing")
async def start_tunnel_watcher() -> None:
if self.context.tunnel_watcher_task is None or self.context.tunnel_watcher_task.done():
if self.context.tunnel_watcher_task is not None and self.context.tunnel_watcher_task.done():
with suppress(Exception):
exc = self.context.tunnel_watcher_task.exception()
if exc is not None:
logger.error(
"Previous tunnel watcher task exited with error: %s", exc)
self.context.tunnel_watcher_task = asyncio.create_task(
tunnel_watcher_loop(),
name="tunnel-task-watcher",
)
async def end_tunnel_watcher() -> None:
if self.context.tunnel_watcher_task is not None:
self.context.tunnel_watcher_task.cancel()
with suppress(asyncio.CancelledError):
await self.context.tunnel_watcher_task
self.context.tunnel_watcher_task = None
"""Helper Functions"""
def parse_data(d):
mydict = {}
for key, value in d.items():
if isinstance(value, dict):
parse_data(value)
elif (
isinstance(value, str)
or isinstance(value, int)
or isinstance(value, float)
or isinstance(value, bool)
or isinstance(value, list)
):
mydict[key] = value
elif isinstance(value, bytes):
mydict[key] = 'BYTE DATA'
else:
mydict[key] = ""
mydict[key] = value
return mydict
async def get_device_name():
if self.context.tunnel is None:
await get_tun()
device_name = await self.context.tunnel.get_value(key='DeviceName')
return device_name
async def safe_sio_emit(event: str, payload: dict, timeout_seconds: float = 2.0) -> None:
try:
await asyncio.wait_for(
self.context.sio.emit(event, payload, namespace="/"),
timeout=timeout_seconds,
)
except TimeoutError:
logger.warning("Socket.IO emit timed out for event=%s", event)
except Exception:
logger.exception("Socket.IO emit failed for event=%s", event)
def parse_delay_seconds(raw_delay) -> int:
if raw_delay is None:
return 0
if isinstance(raw_delay, bool):
raise ValueError("delay must be a non-negative number")
parsed = float(raw_delay)
if math.isnan(parsed) or math.isinf(parsed) or parsed < 0:
raise ValueError("delay must be a non-negative finite number")
return int(parsed)
"""Host power down functions"""
async def device_reboot(delay):
"""Reboot the device"""
logger.info("Reboot initiated with delay %s")
await asyncio.sleep(delay)
os.system("shutdown -r now")
async def device_shutdown(delay):
"""Shutdown the device"""
logger.info("Shutdown initiated with delay %s")
await asyncio.sleep(delay)
os.system("shutdown -h now")
"""iCloud Find My Monitor"""
async def start_icloud_monitor():
"""Start Apple iCloud Find My Monitor to retrieve actual reported device location"""
logger.info("iCloud monitor start requested")
self.context.icloud_monitor_enabled = True
if (
self.context.icloud_monitor_task is None
or self.context.icloud_monitor_task.done()
):
self.context.icloud_monitor_task = asyncio.create_task(
self.context.icloud_monitor.run_monitor(interval=30),
name="icloud-monitor-producer",
)
logger.info("iCloud monitor producer task started")
if (
self.context.icloud_consumer_task is None
or self.context.icloud_consumer_task.done()
):
self.context.icloud_consumer_task = asyncio.create_task(
consume_icloud_updates(),
name="icloud-monitor-consumer",
)
logger.info("iCloud monitor consumer task started")
async def consume_icloud_updates():
while True:
updated_location = await self.context.fmf_queue.get()
try:
if self.context.fmf_location != updated_location:
self.context.fmf_location = updated_location
await self.context.sio.emit(
"fmf_update", updated_location.model_dump(), namespace="/"
)
finally:
self.context.fmf_queue.task_done()
async def refresh_icloud_location() -> dict:
"""Fetch one iCloud location update, with or without monitor loop running."""
logger.info("iCloud monitor refresh requested")
try:
updated_location = await self.context.icloud_monitor.refresh_location()
except Exception as e:
logger.exception("Failed to refresh iCloud location: %s", e)
return {
"command_status": "ERROR",
"command_class": "icloud_monitor",
"command": "refresh",
"message": "Failed to refresh iCloud location",
"error": type(e).__name__,
"icloud_monitor_enabled": self.context.icloud_monitor_enabled,
"icloud_monitor_running": is_icloud_monitor_running(),
}
if updated_location is None:
return {
"command_status": "OK",
"command_class": "icloud_monitor",
"command": "refresh",
"location_found": False,
"location_updated": False,
"icloud_monitor_enabled": self.context.icloud_monitor_enabled,
"icloud_monitor_running": is_icloud_monitor_running(),
}
location_updated = self.context.fmf_location != updated_location
self.context.fmf_location = updated_location
await self.context.sio.emit(
"fmf_update", updated_location.model_dump(), namespace="/"
)
return {
"command_status": "OK",
"command_class": "icloud_monitor",
"command": "refresh",
"location_found": True,
"location_updated": location_updated,
"icloud_monitor_enabled": self.context.icloud_monitor_enabled,
"icloud_monitor_running": is_icloud_monitor_running(),
"fmf_location": updated_location,
}
async def end_icloud_monitor():
logger.info("iCloud monitor stop requested")
self.context.icloud_monitor_enabled = False
self.context.icloud_monitor.stop()
if self.context.icloud_monitor_task is not None:
self.context.icloud_monitor_task.cancel()
with suppress(asyncio.CancelledError):
await self.context.icloud_monitor_task
self.context.icloud_monitor_task = None
if self.context.icloud_consumer_task is not None:
self.context.icloud_consumer_task.cancel()
with suppress(asyncio.CancelledError):
await self.context.icloud_consumer_task
self.context.icloud_consumer_task = None
def is_icloud_monitor_running() -> bool:
return (
self.context.icloud_monitor_task is not None
and not self.context.icloud_monitor_task.done()
and self.context.icloud_consumer_task is not None
and not self.context.icloud_consumer_task.done()
)
""" Queue Functions"""
async def start_simulation_queue():
if (
self.context.simulation_task is None
or self.context.simulation_task.done()
):
await start_icloud_monitor()
self.context.simulation_active = True
self.context.simulation_task = asyncio.create_task(
start_queue_worker(),
name="location-simulation-worker",
)
data = {
"command_status": "OK",
"command_class": "simulation_control",
"command": "start",
"message": "Simulation started",
"data": {
"simulation_queue": get_simulation_status(),
},
}
else:
data = {
"command_status": "ERROR",
"command_class": "simulation_control",
"command": "start",
"message": "Simulation already running",
"data": {
"simulation_queue": get_simulation_status(),
},
}
return data
async def start_queue_worker():
"""Start Simulation Queue Worker"""
logger.info("Starting location simulation worker...")
self.context.simulation_active = True
self.context.simulation_queue_state = "RUNNING"
try:
if self.context.test_mode:
logger.info("Simulation worker: test mode enabled")
locate_simulation = LocationSimulationQueue(None, self.context)
await locate_simulation.play_queue()
return
if self.context.udid is None:
active_udids = sorted(
{
t.udid
for t in self._tunneld_core.tunnel_tasks.values()
if t.udid is not None and t.tunnel is not None
}
)
if len(active_udids) == 1:
self.context.udid = active_udids[0]
logger.info(
"Simulation worker: auto-selected udid=%s from active tunnel",
self.context.udid,
)
elif len(active_udids) == 0:
logger.error(
"Simulation worker: no active tunnel with udid available"
)
await self.context.sio.emit(
"appError",
{
"type": "simulation_no_tunnel",
"message": "No active tunnel found. Start tunnel first.",
},
namespace="/",
)
return
else:
logger.error(
"Simulation worker: multiple active tunnels; explicit udid required: %s",
active_udids,
)
await self.context.sio.emit(
"appError",
{
"type": "simulation_udid_required",
"message": "Multiple active tunnels found; provide udid in start_simulate_location.",
"udids": active_udids,
},
namespace="/",
)
return
logger.info(
"Simulation worker: acquiring tunnel (udid=%s)", self.context.udid
)
tun = await get_tun(self.context.udid)
logger.info(
"Simulation worker: tunnel acquired, connecting DVT provider"
)
dvt_provider = DvtProvider(tun)
dvt = await asyncio.wait_for(
dvt_provider.__aenter__(), timeout=DVT_CONNECT_TIMEOUT_SECONDS
)
logger.info(
"Simulation worker: DVT provider connected, starting queue playback"
)
try:
async with LocationSimulationQueue(
dvt, self.context
) as locate_simulation:
await locate_simulation.play_queue()
finally:
logger.info("Simulation worker: closing DVT provider")
await dvt_provider.__aexit__(None, None, None)
logger.info("Simulation worker: DVT provider closed")
except TimeoutError:
logger.error(
"Simulation worker timeout. tunnel_timeout=%ss dvt_timeout=%ss udid=%s",
TUNNEL_ACQUIRE_TIMEOUT_SECONDS,
DVT_CONNECT_TIMEOUT_SECONDS,
self.context.udid,
)
await self.context.sio.emit(
"appError",
{
"type": "simulation_timeout",
"udid": self.context.udid,
"tunnel_timeout_seconds": TUNNEL_ACQUIRE_TIMEOUT_SECONDS,
"dvt_timeout_seconds": DVT_CONNECT_TIMEOUT_SECONDS,
},
namespace="/",
)
except Exception:
logger.exception("Simulation worker crashed")
await self.context.sio.emit(
"appError",
{
"type": "simulation_crash",
"udid": self.context.udid,
"udid": self.context.udid,
"error": traceback.format_exc(),
},
namespace="/",
)
finally:
self.context.simulation_active = False
self.context.simulation_task = None
await end_icloud_monitor()
async def add_location_to_simulation_queue(data):
loc_id = str(uuid.uuid4())
latitude = (
data.get("latitude")
if isinstance(data, dict)
else getattr(data, "latitude", None)
)
longitude = (
data.get("longitude")
if isinstance(data, dict)
else getattr(data, "longitude", None)
)
delay = (
data.get("delay", 0)
if isinstance(data, dict)
else getattr(data, "delay", 0)
)
address = (
data.get("address")
if isinstance(data, dict)
else getattr(data, "address", None)
)
try:
delay = parse_delay_seconds(delay)
except ValueError as e:
return {"command_status": "ERROR", "command_class": "simulation_control", "command": "add", "message": str(e)}
if latitude is not None and longitude is not None:
logger.info(
"Adding location %s (%s, %s) with %s delay to the queue",
loc_id,
latitude,
longitude,
delay,
)
accrued_delay = 0
current_loc_id = self.context.get_current_loc_id()
if self.context.simulation_queue_data and len(self.context.simulation_queue_order) > 1:
current_index = get_item_index(
current_loc_id) if current_loc_id else 0
remaining_items = self.context.simulation_queue_order[current_index + 1:]
accrued_delay = sum(parse_delay_seconds(
self.context.simulation_queue_data[loc_id]['delay']) for loc_id in remaining_items if loc_id in self.context.simulation_queue_data)
accrued_delay = accrued_delay + \
parse_delay_seconds(self.context.next_move)
now_time = datetime.now(timezone.utc)
new_time = (
now_time
+ timedelta(seconds=accrued_delay)
+ timedelta(seconds=delay)
)
start_time = new_time.isoformat()
# rev_geocode = await self.context.reverse_geocode.get_address(latitude, longitude)
# if rev_geocode:
# address = rev_geocode
# else:
# address = f"{latitude}, {longitude}"
location_item = {
"loc_id": loc_id,
"latitude": latitude,
"longitude": longitude,
"delay": delay,
"status": "queued",
"start": start_time,
"address": address,
}
resp = {
"command_status": "OK",
"command_class": "simulation_control",
"command": "add",
"message": f"Location {loc_id} added to the queue",
"data": {
"simulation_queue": get_simulation_status(),
"location_item": location_item,
},
}
add_item(loc_id, location_item)
await enqueue_next_simulation_item()
logger.info("Location %s added to the queue", loc_id)
else:
logger.error("Invalid location data: %s", data)
resp = {"command_status": "ERROR",
"command_class": "simulation_control",
"command": "add",
"message": "Invalid location data", "data": data}
return resp
async def delete_location_from_simulation_queue(data):
loc_id = (
data.get("loc_id")
if isinstance(data, dict)
else getattr(data, "loc_id", None)
)
if loc_id is not None:
if not loc_id in self.context.simulation_queue_order:
resp = {
"command_status": "ERROR",
"command_class": "simulation_control",
"command": "delete",
"message": f"Location {loc_id} not found in the queue",
"data": {
"simulation_queue": get_simulation_status(),
"location_item": self.context.simulation_queue_data[loc_id],
},
}
logger.info("Deleting location %s from the queue", loc_id)
await delete_item(loc_id)
resp = {
"command_status": "OK",
"command_class": "simulation_control",
"command": "delete",
"message": f"Location {loc_id} deleted from the queue",
"data": {
"simulation_queue": get_simulation_status(),
"location_item": self.context.simulation_queue_data[loc_id],
},
}
else:
resp = {
"command_status": "ERROR",
"command_class": "simulation_control",
"command": "delete",
"message": "Invalid location data",
"data": {
"simulation_queue:": get_simulation_status(),
},
}
return resp
async def pause_simulation_queue():
"""Pauses asyncio.Queue playback"""
self.context.simulation_queue_state = "PAUSED"
async def resume_simulation_queue():
"""Resumes asyncio.simulation_queue playback"""
self.context.simulation_queue_state = "RUNNING"
await update_queue_times()
async def advance_simulation_queue():
current_loc_id = self.context.get_current_loc_id()
self.context.simulation_queue_data[current_loc_id]["delay"] = 0
await update_queue_times()
async def update_queue_times():
current_loc_id = self.context.get_current_loc_id()
current_index = get_item_index(
current_loc_id) if current_loc_id else 0
remaining_items = self.context.simulation_queue_order[current_index + 1:]
new_delay = 0
now_time = datetime.now(timezone.utc)
for item in remaining_items:
item_delay = parse_delay_seconds(
self.context.simulation_queue_data[item].get("delay")) or 0
new_delay += item_delay
new_time = (
now_time + timedelta(seconds=new_delay)).isoformat()
logger.info("Updating queue time for %s to %s", item, new_time)
self.context.simulation_queue_data[item]["start"] = new_time
await update_queue_data()
async def update_queue_data():
await self.context.sio.emit("queue_data_update", get_simulation_status(), namespace="/")
async def empty_simulation_queue():
"""Empties all items from an asyncio.Queue."""
logger.info("Clearing location simulation queue...")
q = self.context.simulation_queue
while not q.empty():
try:
item = q.get_nowait()
if isinstance(item, str):
self.context.simulation_queue_pending_ids.discard(item)
q.task_done()
logger.info("Discarding item from queue: %s", item)
except asyncio.QueueEmpty:
break
self.context.simulation_queue_pending_ids.clear()
reset_queue()
def add_item(item_id, payload):
self.context.simulation_queue_data[item_id] = payload
self.context.simulation_queue_order.append(item_id)
def remove_item(item_id):
if item_id in self.context.simulation_queue_order:
self.context.simulation_queue_order.remove(item_id)
del self.context.simulation_queue_data[item_id]
def reset_queue():
self.context.simulation_queue_data = {}
self.context.simulation_queue_order = []
self.context.simulation_queue_deleted_items = []
self.context.set_current_loc_id(None)
def remove_future_items():
current_loc_id = self.context.get_current_loc_id()
if current_loc_id and self.context.simulation_active and current_loc_id in self.context.simulation_queue_data:
self.context.simulation_queue_order = [current_loc_id]
self.context.simulation_queue_data = {
current_loc_id: self.context.simulation_queue_data[current_loc_id]
}
else:
self.context.simulation_queue_data = {}
self.context.simulation_queue_order = []
self.context.simulation_queue_deleted_items = []
self.context.set_current_loc_id(None)
async def delete_item(item_id):
if item_id in self.context.simulation_queue_order:
self.context.simulation_queue_data[item_id]["status"] = "deleted"
self.context.simulation_queue_order.remove(item_id)
self.context.simulation_queue_deleted_items.append(item_id)
await update_queue_times()
def clear_future_items():
current_loc_id = self.context.get_current_loc_id()
current_index = get_item_index(
current_loc_id) if current_loc_id else 0
for item in self.context.simulation_queue_order[current_index + 1:]:
self.context.simulation_queue_data[item]["status"] = "deleted"
self.context.simulation_queue_order.remove(item)
self.context.simulation_queue_deleted_items.append(item)
def get_item(item_id):
return self.context.simulation_queue_data[item_id]
def update_item(item_id, **updates):
if item_id in self.context.simulation_queue_data:
self.context.simulation_queue_data[item_id].update(updates)
def get_item_index(item_id):
return self.context.simulation_queue_order.index(item_id)
def get_item_id_by_index(index):
return self.context.simulation_queue_order[index]
def get_items_in_order():
return [self.context.simulation_queue_data[i] for i in self.context.simulation_queue_order if self.context.simulation_queue_data[i].get("status") != "deleted"]
async def enqueue_next_simulation_item() -> Optional[str]:
if self.context.simulation_queue_state == "SHUTDOWN":
return None
stale_pending_ids = {
pending_id
for pending_id in self.context.simulation_queue_pending_ids
if not isinstance(self.context.simulation_queue_data.get(pending_id), dict)
or self.context.simulation_queue_data[pending_id].get("status") != "queued"
}
if stale_pending_ids:
for stale_id in stale_pending_ids:
self.context.simulation_queue_pending_ids.discard(stale_id)
logger.info(
"Cleared stale pending queue ids before enqueue: %s",
sorted(stale_pending_ids),
)
if self.context.simulation_queue_pending_ids:
return None
for item_id in self.context.simulation_queue_order:
if item_id in self.context.simulation_queue_pending_ids:
continue
item = self.context.simulation_queue_data.get(item_id)
if not isinstance(item, dict):
continue
if item.get("status") != "queued":
continue
self.context.simulation_queue_pending_ids.add(item_id)
await self.context.simulation_queue.put(item_id)
logger.info("Scheduled queue item %s", item_id)
return item_id
return None
async def end_simulation_queue():
logger.info("End location simulation request")
end_task = asyncio.create_task(
end_simulation_worker(), name="end-simulation-worker"
)
result = await end_task
return result
async def end_simulation_worker() -> bool:
"""Ends asyncio.Queue playback and closes tunnel"""
logger.info("End location simulation request")
try:
q = self.context.simulation_queue
self.context.simulation_queue_state = "SHUTDOWN"
# Drain pending queue entries.
while not q.empty():
try:
item = q.get_nowait()
if isinstance(item, str):
self.context.simulation_queue_pending_ids.discard(item)
q.task_done()
logger.info("Discarding item from queue: %s", item)
except asyncio.QueueEmpty:
break
# Wake queue consumers blocked on queue.get().
await q.put(None)
reset_queue()
if self.context.simulation_task is not None and not self.context.simulation_task.done():
try:
await asyncio.wait_for(self.context.simulation_task, timeout=5)
except TimeoutError:
logger.warning(
"Simulation worker did not stop in time; canceling task")
self.context.simulation_task.cancel()
with suppress(asyncio.CancelledError):
await self.context.simulation_task
if not self.context.test_mode and self.context.tunnel is not None:
async with (
DvtProvider(self.context.tunnel) as dvt,
LocationSimulationQueue(dvt, self.context) as locate_simulation,
):
await locate_simulation.clear()
self.context.simulation_active = False
self.context.simulation_task = None
self.context.next_move = None
self.context.set_current_loc_id(None)
self.context.simulation_queue_state = "STOPPED"
# Recreate queue to discard sentinel wakeup items and unblock clean restarts.
self.context.simulation_queue = asyncio.Queue()
self.context.simulation_queue_pending_ids = set()
await end_icloud_monitor()
return True
except Exception as e:
logger.error(f"Error ending simulation queue: {e}")
return False
"""Switches"""
def toggle_test_mode() -> dict:
self.context.test_mode = not self.context.test_mode
return {"test_mode": self.context.test_mode}
def toggle_gps_noise() -> dict:
self.context.simulation_noise = not self.context.simulation_noise
return {"simulation_noise": self.context.simulation_noise}
def get_simulation_status() -> dict:
resp = {
"active": self.context.simulation_active,
"data": self.context.simulation_queue_data,
"order": self.context.simulation_queue_order,
"deleted_items": self.context.simulation_queue_deleted_items,
"state": self.context.simulation_queue_state,
"gps_noise": self.context.simulation_noise,
"worker_task": self.context.simulation_task.get_name() if self.context.simulation_task else None,
"test_mode": self.context.test_mode,
}
return resp
def get_status() -> dict:
current_loc_id = self.context.get_current_loc_id()
current_item = self.context.simulation_queue_data.get(
current_loc_id) if current_loc_id else None
current_start = (
current_item.get("start")
if isinstance(current_item, dict)
else getattr(current_item, "start", None)
)
current_latitude = current_item.get(
"latitude") if isinstance(current_item, dict) else None
current_longitude = current_item.get(
"longitude") if isinstance(current_item, dict) else None
data = {
"connected_clients": list(self.context.connected_clients),
"current_location": {
"loc_id": current_loc_id,
"longitude": current_longitude,
"latitude": current_latitude,
"start": current_start,
},
"device_name": self.context.device_name,
"fmf_location": self.context.fmf_location,
"icloud": {
"consumer_queue": self.context.fmf_queue.qsize() if self.context.fmf_queue else 0,
"consumer_task": self.context.icloud_consumer_task.done() if self.context.icloud_consumer_task else False,
"monitor_task": self.context.icloud_monitor_task.done() if self.context.icloud_monitor_task else False,
"monitor_enabled": self.context.icloud_monitor_enabled,
"monitor_running": is_icloud_monitor_running(),
},
"tunnel_watcher_running": True if self.context.tunnel_watcher_task and not self.context.tunnel_watcher_task.done() else False,
"next_move": self.context.next_move,
"simulation_queue": get_simulation_status(),
"tunnel": self.context.tunnel.service.address[0] if self.context.tunnel else None,
"udid": self.context.udid,
}
return data
async def get_reverse_geocode(data):
latitude = float(
data.get("latitude", 999)
if isinstance(data, dict)
else getattr(data, "latitude", 999)
)
longitude = float(
data.get("longitude", 999)
if isinstance(data, dict)
else getattr(data, "longitude", 999)
)
if latitude != 999 and longitude != 999:
rev_geocode = await self.context.reverse_geocode.get_address(latitude, longitude)
return rev_geocode
else:
return None
async def get_favorites():
favorites = await self.context.reverse_geocode.get_favorites()
return favorites
async def clear_favorite(data):
latitude = float(
data.get("latitude", 999)
if isinstance(data, dict)
else getattr(data, "latitude", 999)
)
longitude = float(
data.get("longitude", 999)
if isinstance(data, dict)
else getattr(data, "longitude", 999)
)
if latitude != 999 and longitude != 999:
clear_status = await self.context.reverse_geocode.clear_favorite(latitude, longitude)
if clear_status:
return "OK", "Favorite cleared"
else:
return "ERROR", "Failed to clear favorite"
else:
return "ERROR", "Invalid latitude and longitude"
async def set_favorite(data):
favorite = (
data.get("favorite")
if isinstance(data, dict)
else getattr(data, "favorite", None)
)
if favorite is not None:
name = (
favorite.get("name")
if isinstance(favorite, dict)
else getattr(favorite, "name", None)
)
logger.info("Adding favorite: %s", name)
latitude = float(
favorite.get("latitude", 999)
if isinstance(favorite, dict)
else getattr(favorite, "latitude", 999)
)
longitude = float(
favorite.get("longitude", 999)
if isinstance(favorite, dict)
else getattr(favorite, "longitude", 999)
)
if latitude != 999 and longitude != 999 and favorite:
set_status = await self.context.reverse_geocode.set_favorite(latitude, longitude, favorite)
if set_status:
return "OK", "Favorite added"
else:
return "ERROR", "Failed to add favorite"
else:
return "ERROR", "Invalid latitude and longitude"
else:
return "ERROR", "Invalid favorite data"
""" FastAPI HTTP Functions"""
def generate_http_response(
data: dict, status_code: int = 200, media_type: str = "application/json"
) -> fastapi.Response:
return fastapi.Response(
status_code=status_code,
media_type=media_type,
content=json.dumps(jsonable_encoder(data)),
)
""" Device Functions """
""" name """
@self._app.get("/device/name")
async def rsd_info():
"""Get rsd information"""
device_name = await get_device_name()
return generate_http_response(device_name)
@self._app.get("/device/info")
async def device_info():
"""Get device information"""
tunnels = {}
for ip, active_tunnel in self._tunneld_core.tunnel_tasks.items():
if (active_tunnel.udid is None) or (active_tunnel.tunnel is None):
continue
if active_tunnel.udid not in tunnels:
tunnels[active_tunnel.udid] = {}
try:
lockdown = await create_using_usbmux(
serial=active_tunnel.udid, autopair=False
)
tunnels[active_tunnel.udid] = parse_data(
lockdown.all_values)
except Exception as e:
logger.error(
f"Failed to create lockdown session for device {
active_tunnel.udid}: {e}"
)
continue
return tunnels
""" Tunnel Functions"""
@self._app.get("/")
async def list_tunnels() -> dict[str, list[dict]]:
"""Retrieve the available tunnels and format them as {UUID: TUNNEL_ADDRESS}"""
tunnels = {}
for ip, active_tunnel in self._tunneld_core.tunnel_tasks.items():
if (active_tunnel.udid is None) or (active_tunnel.tunnel is None):
continue
if active_tunnel.udid not in tunnels:
tunnels[active_tunnel.udid] = []
tunnels[active_tunnel.udid].append(
{
"tunnel-address": active_tunnel.tunnel.address,
"tunnel-port": active_tunnel.tunnel.port,
"interface": ip,
}
)
return tunnels
@self._app.get("/shutdown")
async def shutdown() -> fastapi.Response:
"""Shutdown Tunneld"""
os.kill(os.getpid(), signal.SIGINT)
data = {"operation": "shutdown", "data": True,
"message": "Server shutting down..."}
return generate_http_response(data)
@self._app.get("/clear-tunnels")
async def clear_tunnels() -> fastapi.Response:
"""Clear all tunnels"""
self._tunneld_core.clear()
data = {
"operation": "clear_tunnels",
"data": True,
"message": "Cleared tunnels...",
}
return generate_http_response(data)
@self._app.get("/cancel")
async def cancel_tunnel(udid: str) -> fastapi.Response:
"""Cancel a tunnel"""
self._tunneld_core.cancel(udid=udid)
data = {
"operation": "cancel",
"udid": udid,
"data": True,
"message": f"tunnel {udid} Canceled ...",
}
return generate_http_response(data)
@self._app.get("/hello")
async def hello() -> fastapi.Response:
data = {"message": "Hello, I'm alive"}
return generate_http_response(data)
@self._app.get("/start-tunnel")
async def start_tunnel(
udid: Optional[str] = self.context.udid,
ip: Optional[str] = None,
connection_type: Optional[str] = None,
) -> fastapi.Response:
udid_tunnels = [
t.tunnel
for t in self._tunneld_core.tunnel_tasks.values()
if t.udid == udid and t.tunnel is not None
]
if len(udid_tunnels) > 0:
self.context.udid = udid
data = {
"interface": udid_tunnels[0].interface,
"port": udid_tunnels[0].port,
"address": udid_tunnels[0].address,
}
return generate_http_response(data)
queue = asyncio.Queue()
created_task = False
try:
if not created_task and connection_type in ("usbmux", None):
task_identifier = f"usbmux-{udid}"
try:
async with await create_using_usbmux(udid) as lockdown:
service = await CoreDeviceTunnelProxy.create(lockdown)
task = asyncio.create_task(
self._tunneld_core.start_tunnel_task(
task_identifier,
service,
protocol=TunnelProtocol.TCP,
queue=queue,
),
name=f"start-tunnel-task-{task_identifier}",
)
self._tunneld_core.tunnel_tasks[task_identifier] = TunnelTask(
task=task, udid=udid
)
created_task = True
except (ConnectionFailedError, InvalidServiceError, MuxException):
pass
if connection_type in ("usb", None):
for rsd in await get_rsds(udid=udid):
rsd_ip = rsd.service.address[0]
if ip is not None and rsd_ip != ip:
await rsd.close()
continue
task = asyncio.create_task(
self._tunneld_core.start_tunnel_task(
rsd_ip,
await create_core_device_tunnel_service_using_rsd(rsd),
queue=queue,
),
name=f"start-tunnel-usb-{rsd_ip}",
)
self._tunneld_core.tunnel_tasks[rsd_ip] = TunnelTask(
task=task, udid=rsd.udid
)
created_task = True
if not created_task and connection_type in ("wifi", None):
for remotepairing in await get_remote_pairing_tunnel_services(
udid=udid
):
remotepairing_ip = remotepairing.hostname
if ip is not None and remotepairing_ip != ip:
await remotepairing.close()
continue
task = asyncio.create_task(
self._tunneld_core.start_tunnel_task(
remotepairing_ip, remotepairing, queue=queue
),
name=f"start-tunnel-wifi-{remotepairing_ip}",
)
self._tunneld_core.tunnel_tasks[remotepairing_ip] = TunnelTask(
task=task, udid=remotepairing.remote_identifier
)
created_task = True
except Exception as e:
return fastapi.Response(
status_code=501,
content=json.dumps(
{
"error": {
"exception": e.__class__.__name__,
"traceback": traceback.format_exc(),
}
}
),
)
if not created_task:
return fastapi.Response(
status_code=501, content=json.dumps({"error": "task not created"})
)
tunnel: Optional[TunnelResult] = await queue.get()
if tunnel is not None:
self.context.udid = udid
data = {
"interface": tunnel.interface,
"port": tunnel.port,
"address": tunnel.address,
}
return generate_http_response(data)
else:
return fastapi.Response(
status_code=404,
content=json.dumps(
{"error": "something went wrong during tunnel creation"}
),
)
@self._app.get("/restart")
async def restart() -> fastapi.Response:
"""Restart Tunneld"""
self._tunneld_core.clear()
await asyncio.sleep(2)
self._tunneld_core.start()
data = {
"operation": "restart-tunneld",
"data": True,
"message": "Restarting tunneld...",
}
return generate_http_response(data)
""" iCloud Monitor"""
@self._app.get("/icloud-monitor/start")
async def app_start_icloud_monitor() -> fastapi.Response:
await start_icloud_monitor()
data = {
"command_status": "OK",
"command_class": "icloud_monitor",
"command": "start",
"icloud_monitor_enabled": self.context.icloud_monitor_enabled,
"icloud_monitor_running": is_icloud_monitor_running(),
}
return generate_http_response(data)
@self._app.get("/icloud-monitor/stop")
async def app_stop_icloud_monitor() -> fastapi.Response:
await end_icloud_monitor()
data = {
"command_status": "OK",
"command_class": "icloud_monitor",
"command": "stop",
"icloud_monitor_enabled": self.context.icloud_monitor_enabled,
"icloud_monitor_running": is_icloud_monitor_running(),
}
return generate_http_response(data)
@self._app.get("/icloud-monitor/status")
async def app_icloud_monitor_status() -> fastapi.Response:
data = {
"command_status": "OK",
"command_class": "icloud_monitor",
"command": "status",
"icloud_monitor_enabled": self.context.icloud_monitor_enabled,
"icloud_monitor_running": is_icloud_monitor_running(),
}
return generate_http_response(data)
@self._app.get("/icloud-monitor/refresh")
async def app_refresh_icloud_monitor() -> fastapi.Response:
data = await refresh_icloud_location()
return generate_http_response(data)
"""Simulation Functions"""
""" start, add, clear, pause, resume, end, status """
@self._app.get("/simulation/start")
async def app_start_simulation() -> fastapi.Response:
data = await start_simulation_queue()
return generate_http_response(data)
@self._app.post("/simulation/add")
async def app_add_location(data: SimulationRequestData) -> fastapi.Response:
"""Add a location to the simulation queue"""
logger.info("Request to add new location to queue")
resp = await add_location_to_simulation_queue(data)
return generate_http_response(resp)
@self._app.get("/simulation/clear")
async def app_clear_queue() -> fastapi.Response:
"""Clear the simulation queue"""
logger.info("Simulation Start Requested ")
clear_future_items()
data = {"command_status": "OK", "command_class": "simulation_control", "command": "clear", "message": "Simulation cleared"}
return generate_http_response(data)
@self._app.get("/simulation/pause")
async def app_pause_queue() -> fastapi.Response:
"""Pause the simulation queue"""
await pause_simulation_queue()
data = {"command_status": "OK", "command_class": "simulation_control", "command": "pause", "message": "Simulation paused"}
return generate_http_response(data)
@self._app.get("/simulation/resume")
async def app_resume_queue() -> fastapi.Response:
"""Resume the simulation queue"""
await resume_simulation_queue()
data = {"command_status": "OK", "command_class": "simulation_control", "command": "resume", "message": "Simulation resumed"}
return generate_http_response(data)
@self._app.get("/simulation/end")
async def app_end_simulation() -> fastapi.Response:
"""End the simulation queue"""
data = end_simulation_queue()
return generate_http_response(data)
@self._app.get("/simulation/test")
async def app_simulation_test_mode() -> fastapi.Response:
"""Enable test mode for the simulation queue"""
response = {}
before_toggle = self.context.test_mode
response['data'] = toggle_test_mode()
response['command_status'] = "OK"
response['command_class'] = "simulation_control"
response['command'] = "test_mode"
response['message'] = f"Test mode toggled from {
before_toggle} to {self.context.test_mode}"
return generate_http_response(response)
@self._app.get("/simulation/noise")
async def app_simulation_noise() -> fastapi.Response:
response = {}
before_toggle = self.context.simulation_noise
response['data'] = toggle_gps_noise()
response['command_status'] = "OK"
response['command_class'] = "simulation_control"
response['command'] = "noise"
response['message'] = f"GPS noise toggled from {
before_toggle} to {self.context.simulation_noise}"
return generate_http_response(response)
"""Status Functions"""
@self._app.get("/status/rsd")
async def app_rsd_info():
"""Get rsd information"""
rsd_info = {}
if self.context.tunnel is None:
await get_tun()
if self.context.tunnel is not None:
rsd_info = self.context.tunnel.peer_info
return generate_http_response(rsd_info)
@self._app.get("/status/context")
async def app_context_status() -> fastapi.Response:
data = get_status()
return generate_http_response(data)
@self._app.post("/rev_geocode")
async def app_proxy_osm(data: LatLng):
logger.info("OSM Proxy Request, data: %s", data)
rev_geocode = await get_reverse_geocode(data)
return generate_http_response(rev_geocode)
@self._app.get("/ors/status")
async def app_ors_status(request: Request) -> fastapi.Response:
logger.info("request: %s", request)
target_url = "https://ors.intrepidnet.org/ors/v2/status"
async with httpx.AsyncClient() as client:
# Forward the request to the external service
response = await client.get(target_url, params=request.query_params)
# Return the response content and status code back to the original client
return Response(
content=response.content,
status_code=response.status_code,
headers=dict(response.headers)
)
@self._app.get("/ors/health")
async def app_ors_status(request: Request) -> fastapi.Response:
logger.info("request: %s", request)
target_url = "https://ors.intrepidnet.org/ors/v2/health"
async with httpx.AsyncClient() as client:
# Forward the request to the external service
response = await client.get(target_url, params=request.query_params)
# Return the response content and status code back to the original client
return Response(
content=response.content,
status_code=response.status_code,
headers=dict(response.headers)
)
@self._app.post("/ors/proxy/{full_path:path}")
async def app_proxy_ors(full_path: str, request: Request):
logger.info("request: %s", request)
body = await request.json()
headers = dict(request.headers)
method = request.method
url = "https://ors.intrepidnet.org/" + full_path
logger.info("body: %s", body)
logger.info("headers: %s", headers)
logger.info("method: %s", method)
logger.info("url: %s", url)
async with httpx.AsyncClient() as client:
response = await client.request(method, url, json=body)
return response.json()
""" Socket.IO Functions"""
async def sio_send_status(sid):
"""Send Current Status"""
await self.context.sio.emit(
"status", jsonable_encoder(get_status()), namespace="/", to=sid
)
"""Socket.IO Connection Events"""
@self.context.sio.event
async def connect(sid, environ):
"""Client connection event handler."""
self.context.connected_clients.add(sid)
logger.info("Client connected: %s", sid)
await sio_send_status(sid)
return "%s connected" % sid
@self.context.sio.event
async def disconnect(sid):
"""Client disconnection event handler."""
self.context.connected_clients.discard(sid)
logger.info("Client disconnected: %s", sid)
""" Socket.IO Request Events """
@self.context.sio.event
async def request_update(sid):
status_update = jsonable_encoder(get_status())
logger.info("Update request from %s sending %s",
sid, status_update)
return status_update
@self.context.sio.event
async def message(sid, data):
logger.info("Received message from %s: %s", sid, data)
return True, "Message received"
# await self.context.sio.emit("message", f"Received message from {sid}: {data}", namespace="/")
""" Device Control"""
@self.context.sio.event
async def device_control(sid, data):
"""Device Control"""
command = (
data.get("command")
if isinstance(data, dict)
else getattr(data, "command", None)
)
delay = (
data.get("delay")
if isinstance(data, dict)
else getattr(data, "delay", None)
)
if delay is None:
delay = 5
match command:
case "shutdown":
""" Shutdown the device"""
logger.info(
"Shutdown command received from %s with delay %s", sid, delay
)
await device_shutdown(delay)
return {
"command_status": "OK",
"command_class": "device_control",
"command": "shutdown",
"message": f"Device shutdown initiated with {delay} seconds delay",
}
case "reboot":
""" Reboot the device"""
logger.info(
"Reboot command received from %s with delay %s", sid, delay
)
await device_reboot(delay)
return {
"command_status": "OK",
"command_class": "device_control",
"command": "reboot",
"message": f"Device reboot initiated with {delay} seconds delay",
}
case _:
return {
"command_status": "ERROR",
"command_class": "device_control",
"command": command,
"message": f"Invalid command: {command}",
}
@self.context.sio.event
async def simulation_control(sid, data):
"""Simulation Control"""
command = (
data.get("command")
if isinstance(data, dict)
else getattr(data, "command", None)
)
logger.info(
"Simulation Control command: %s requested from %s", command, sid
)
try:
match command:
case "next":
await advance_simulation_queue()
resp = {
"command": command,
"command_class": "simulation_control",
"command_status": "OK",
"message": "Advanced simulation queue",
"data": {
"simulation_queue": get_simulation_status(),
}
}
return resp
case "test-mode":
toggle_test_mode()
resp = {
"command": command,
"command_class": "simulation_control",
"command_status": "OK",
"message": "test-mode toggled",
"data": {
"simulation_queue": get_simulation_status(),
}
}
return resp
case "add":
""" Add a location to the simulation queue"""
resp = await add_location_to_simulation_queue(data)
return resp
case "delete":
""" Delete a location to the simulation queue"""
resp = await delete_location_from_simulation_queue(data)
return resp
case "clear":
""" Clear the simulation queue"""
clear_future_items()
resp = {
"command": command,
"command_class": "simulation_control",
"command_status": "OK",
"message": "Simulation cleared",
"data": {
"simulation_queue": get_simulation_status(),
}
}
return resp
case "reset":
""" Reset the simulation queue"""
reset_queue()
resp = {
"command": command,
"command_class": "simulation_control",
"command_status": "OK",
"message": "Simulation queue reset",
"data": {
"simulation_queue": get_simulation_status(),
}
}
return resp
case "pause":
""" Pause the simulation queue"""
await pause_simulation_queue()
resp = {
"command": command,
"command_class": "simulation_control",
"command_status": "OK",
"message": "Simulation paused",
"data": {
"simulation_queue": get_simulation_status(),
}
}
return resp
case "resume":
""" Resume the simulation queue"""
await resume_simulation_queue()
resp = {
"command": command,
"command_status": "OK",
"command_class": "simulation_control",
"message": "Simulation resumed",
"data": {
"simulation_queue": get_simulation_status(),
}
}
return resp
case "end":
""" End the simulation queue"""
logger.info(
"End location simulation request from %s", sid)
cstat = "OK" if await end_simulation_queue() else "ERROR"
resp = {
"command_status": cstat,
"command_class": "simulation_control",
"command": command,
"message": "Simulation ended",
"data": {
"simulation_queue": get_simulation_status(),
}
}
return resp
case "start":
""" Start the simulation queue"""
logger.info(
"Start location simulation request from %s", sid)
data = await start_simulation_queue()
return data
case "gps-noise":
""" Toggle GPS noise"""
before_toggle = self.context.simulation_noise
toggle_gps_noise()
resp = {
"command_status": "OK",
"command_class": "simulation_control",
"command": command,
"message": f"GPS noise toggled from {before_toggle} to {self.context.simulation_noise}",
"data": {
"simulation_queue": get_simulation_status(),
}
}
return resp
case _:
logger.warning(
"Invalid command received from %s: %s", sid, command
)
return {"command_status": "ERROR", "message": "Invalid command"}
finally:
logger.info("Simulation Control command: %s completed, sending status update...", command)
await sio_send_status(sid)
@self.context.sio.event
async def location_item_update(sid, data):
"""Location Item Control"""
loc_id = (
data.get("loc_id")
if isinstance(data, dict)
else getattr(data, "loc_id", None)
)
key = (
data.get("key")
if isinstance(data, dict)
else getattr(data, "key", None)
)
value = (
data.get("value")
if isinstance(data, dict)
else getattr(data, "value", None)
)
if loc_id and key and value:
old_val = self.context.simulation_queue_data[loc_id][key]
self.context.simulation_queue_data[loc_id][key] = value
if key == "delay":
await update_queue_times()
logger.info(
"Location Item Update: %s: %s changed from %s to %s", loc_id, key, old_val, value
)
return {"command_status": "OK", "command": "update", "command_class": "location_item", "message": "Location Item Updated", "data": self.context.simulation_queue_data[loc_id]}
else:
return {"command_status": "ERROR", "command": "update", "command_class": "location_item", "message": "Invalid request, Location Item Unchanged", "data": self.context.simulation_queue_data[loc_id]}
@self.context.sio.event
async def queue_order_update(sid, data):
logger.info("Queue order update received: %s", data)
new_simulation_queue_order = []
new_order = (
data.get("newOrder")
if isinstance(data, dict)
else getattr(data, "newOrder", None)
)
if new_order:
for item in new_order:
if item in self.context.simulation_queue_data:
new_simulation_queue_order.append(item)
self.context.simulation_queue_order = new_simulation_queue_order
resp = {
"command_status": "OK",
"command": "queue_order_update",
"command_class": "queue_order",
"message": "Queue Order Updated",
"data": self.context.simulation_queue_order
}
else:
resp = {
"command_status": "ERROR",
"command": "queue_order_update",
"command_class": "queue_order",
"message": "Invalid request, Queue Order Unchanged",
"data": self.context.simulation_queue_order
}
return resp
@self.context.sio.event
async def icloud_monitor_control(sid, data):
command = (
data.get("command")
if isinstance(data, dict)
else getattr(data, "command", None)
)
logger.info(
"iCloud Monitor control command: %s requested from %s", command, sid
)
try:
match command:
case "start":
await start_icloud_monitor()
return {
"command": command,
"command_class": "icloud_monitor_control",
"command_status": "OK",
"icloud_monitor_enabled": self.context.icloud_monitor_enabled,
"icloud_monitor_running": is_icloud_monitor_running(),
}
case "stop":
await end_icloud_monitor()
return {
"command": command,
"command_class": "icloud_monitor_control",
"command_status": "OK",
"icloud_monitor_enabled": self.context.icloud_monitor_enabled,
"icloud_monitor_running": is_icloud_monitor_running(),
}
case "status":
return {
"command": command,
"command_class": "icloud_monitor_control",
"command_status": "OK",
"icloud_monitor_enabled": self.context.icloud_monitor_enabled,
"icloud_monitor_running": is_icloud_monitor_running(),
}
case "refresh":
data = await refresh_icloud_location()
return data
case _:
return {
"command": command,
"command_class": "icloud_monitor_control",
"command_status": "ERROR",
"message": "Invalid command",
}
finally:
await sio_send_status(sid)
""" Tunnel Control """
@self.context.sio.event
async def tunneld_control(sid, data):
command = (
data.get("command")
if isinstance(data, dict)
else getattr(data, "command", None)
)
logger.info(
"Tunneld Control command: %s requested from %s", command, sid)
match command:
case "start":
"""Start Tunneld"""
logger.info("Start tunneld request from %s: %s", sid, data)
try:
self._tunneld_core.start()
logger.info("Tunneld started successfully")
return {
"command_status": "OK",
"command_class": "tunneld_control",
"command": command,
"message": "Tunneld started successfully",
}
except Exception as e:
logger.error("Error starting tunneld: %s", e)
return {
"command": command,
"command_class": "tunneld_control",
"command_status": "ERROR",
"message": f"Error starting tunneld: {e}",
}
case "restart":
"""Restart Tunneld"""
logger.info("Restart tunneld request from %s: %s", sid, data)
try:
self._tunneld_core.clear()
await asyncio.sleep(2)
self._tunneld_core.start()
return {
"command_status": "OK",
"command_class": "tunneld_control",
"command": command,
"message": "Tunneld started successfully",
}
except Exception as e:
logger.error("Error restarting tunneld: %s", e)
return {
"command_status": "ERROR",
"command_class": "tunneld_control",
"command": command,
"message": f"Error starting tunneld: {e}",
}
case "start-watcher":
""" Start Tunneld Watcher """
logger.info(
"Start tunneld watcher request from %s: %s", sid, data)
await start_tunnel_watcher()
return {
"command_status": "OK",
"command_class": "tunneld_control",
"command": command,
"message": "Tunneld watcher started successfully",
}
case "end-watcher":
""" End Tunneld Watcher """
logger.info(
"End tunneld watcher request from %s: %s", sid, data)
await end_tunnel_watcher()
return {
"command_status": "OK",
"command_class": "tunneld_control",
"command": command,
"message": "Tunneld watcher stopped successfully",
}
case "shutdown":
"""Shutdown Tunneld"""
logger.info(
"Shutdown tunneld request from %s: %s", sid, data)
try:
os.kill(os.getpid(), signal.SIGINT)
return {
"command_status": "OK",
"command_class": "tunneld_control",
"command": command,
"message": "Server shutting down...",
}
except Exception as e:
logger.error("Error shutting down tunneld: %s", e)
return {
"command": command,
"command_class": "tunneld_control",
"command_status": "ERROR",
"message": f"Error shutting down tunneld: {e}",
}
case "clear":
"""Clear all tunnels"""
logger.info("Clearing tunnels...")
try:
self._tunneld_core.clear()
return {
"command": command,
"command_class": "tunneld_control",
"command_status": "OK",
"message": "Cleared tunnels...",
}
except Exception as e:
logger.error("Error clearing tunnels: %s", e)
return {
"command": command,
"command_class": "tunneld_control",
"command_status": "ERROR",
"message": f"Error clearing tunnels: {e}",
}
case "cancel":
"""Cancel a tunnel"""
logger.info(
"Canceling tunnel request from %s: %s", sid, data)
try:
udid = (
data.get("udid")
if isinstance(data, dict)
else getattr(data, "udid", self.context.udid)
)
if udid is None:
udid = self.context.udid
self._tunneld_core.cancel(udid=udid)
return {
"command": command,
"command_class": "tunneld_control",
"command_status": "OK",
"udid": udid,
"message": f"tunnel {udid} Canceled ...",
}
except Exception as e:
logger.error("Error canceling tunnel: %s", e)
return {
"command": command,
"command_class": "tunneld_control",
"command_status": "ERROR",
"message": f"Error canceling tunnel: {e}",
}
case _:
return {
"command": command,
"command_class": "tunneld_control",
"command_status": "ERROR",
"message": f"Unknown operation: {command}",
}
@self.context.sio.event
async def reverse_geocode(sid, data: LatLng):
logger.info("OSM Proxy Request from %s, data: %s", sid, data)
rev_geocode = await get_reverse_geocode(data)
return rev_geocode
@self.context.sio.event
async def favorite_control(sid, data: dict):
command = (
data.get("command")
if isinstance(data, dict)
else getattr(data, "command", None)
)
logger.info(
"Favorite Control command: %s requested from %s", command, sid
)
match command:
case "set":
""" Add a favorite location to database"""
cstat, cmessage = await set_favorite(data)
if cstat == "ERROR":
logger.error('Favorite set failed: %s', cmessage)
resp = {
"command": command,
"command_class": "favorite_control",
"command_status": cstat,
"message": cmessage,
"data": {
"favorites": await get_favorites(),
}
}
return resp
case "clear":
""" Clear a favorite location from database"""
cstat, cmessage = await clear_favorite(data)
resp = {
"command": command,
"command_class": "favorite_control",
"command_status": cstat,
"message": cmessage,
"data": {
"favorites": await get_favorites(),
}
}
return resp
case "get":
""" Get favorite locations from database"""
favs = await get_favorites()
if favs:
cstat = "OK"
cmessage = "Favorites retrieved successfully"
else:
cstat = "ERROR"
cmessage = "No favorites found"
resp = {
"command": command,
"command_class": "favorite_control",
"command_status": cstat,
"message": cmessage,
"data": {
"favorites": favs,
}
}
return resp
case _:
return {
"command": command,
"command_class": "favorite_control",
"command_status": "ERROR",
"message": f"Invalid command: {command}",
}
self._vue_app.include_router(self._app, prefix="/api")
self._vue_app.mount(
"/assets", StaticFiles(directory=self._vue_dist + "assets", html=True), name="vue")
@self._vue_app.get("/{full_path:path}")
async def serve_vue_app(full_path: str):
return FileResponse(self._vue_dist + "index.html")
def _run_app(self) -> None:
uvicorn.run(
self._asgi_app, host=self.host, port=self.port, loop="asyncio", workers=1
)
class LocationSimulationQueue(LocationSimulation):
def __init__(self, dvt, context: LocationSimulationState):
if dvt is None:
LocationSimulationBase.__init__(self)
else:
super().__init__(dvt)
self.context = context
self._dt_simulate_location = None
self._prefer_dt_simulate_location = False
self._noise_task: Optional[asyncio.Task] = None
self._noise_loc_id: Optional[str] = None
async def _get_dt_simulate_location(self):
if DtSimulateLocation is None:
raise RuntimeError(
"DtSimulateLocation is not available in this pymobiledevice3 build")
if self._dt_simulate_location is None:
lockdown = getattr(self.provider, "lockdown", None)
if lockdown is None:
raise RuntimeError(
"DVT provider does not expose lockdown provider for fallback simulation")
self._dt_simulate_location = DtSimulateLocation(lockdown)
return self._dt_simulate_location
async def set(self, latitude: float, longitude: float) -> None:
if self.context.test_mode:
await asyncio.sleep(0.1)
logger.info("Test mode enabled, simulated location set to %s, %s", latitude, longitude)
return
if not self._prefer_dt_simulate_location:
try:
await super().set(latitude, longitude)
return
except Exception:
logger.exception(
"DVT location set failed; switching to DtSimulateLocation fallback"
)
self._prefer_dt_simulate_location = True
fallback = await self._get_dt_simulate_location()
await fallback.set(latitude, longitude)
async def clear(self) -> None:
if self.context.test_mode:
logger.info("Test mode enabled, skipping location clear")
return
dvt_clear_error = None
if not self._prefer_dt_simulate_location:
try:
await super().clear()
except Exception as e:
dvt_clear_error = e
logger.warning("DVT location clear failed: %s", e)
if self._prefer_dt_simulate_location:
fallback = await self._get_dt_simulate_location()
await fallback.clear()
elif dvt_clear_error is not None:
raise dvt_clear_error
@staticmethod
def _add_gps_noise(lat: float, lon: float, std_dev_meters: float = 5.0) -> tuple[float, float]:
"""Apply Gaussian jitter in meters and convert to lat/lon deltas."""
earth_radius = 6378137.0
lat_sigma_deg = (std_dev_meters / earth_radius) * (180.0 / math.pi)
cos_lat = math.cos(math.radians(lat))
if abs(cos_lat) < 1e-6:
cos_lat = 1e-6
lon_sigma_deg = (std_dev_meters / (earth_radius *
cos_lat)) * (180.0 / math.pi)
noised_lat = lat + random.gauss(0.0, lat_sigma_deg)
noised_lon = lon + random.gauss(0.0, lon_sigma_deg)
return noised_lat, noised_lon
async def _stop_noise_task(self) -> None:
if self._noise_task is not None:
self._noise_task.cancel()
with suppress(asyncio.CancelledError):
await self._noise_task
self._noise_task = None
self._noise_loc_id = None
async def _noise_loop(self, loc_id: str, base_latitude: float, base_longitude: float) -> None:
while True:
await asyncio.sleep(random.randint(45, 180))
if not self.context.simulation_active:
break
if self.context.simulation_queue_state == "SHUTDOWN":
break
if not self.context.simulation_noise:
continue
if self.context.get_current_loc_id() != loc_id:
break
noised_latitude, noised_longitude = self._add_gps_noise(
base_latitude, base_longitude
)
await self.set(noised_latitude, noised_longitude)
logger.info(
"Applied simulation noise to active location loc_id=%s sent=%s,%s base=%s,%s",
loc_id,
noised_latitude,
noised_longitude,
base_latitude,
base_longitude,
)
def _start_noise_task(self, loc_id: str, base_latitude: float, base_longitude: float) -> None:
self._noise_loc_id = loc_id
self._noise_task = asyncio.create_task(
self._noise_loop(loc_id, base_latitude, base_longitude),
name=f"simulation-noise-{loc_id}",
)
async def _update_queue_data(self):
data = {
"active": self.context.simulation_active,
"data": self.context.simulation_queue_data,
"order": self.context.simulation_queue_order,
"deleted_items": self.context.simulation_queue_deleted_items,
"state": self.context.simulation_queue_state,
"worker_task": self.context.simulation_task.get_name() if self.context.simulation_task else None,
"gps_noise": self.context.simulation_noise,
"test_mode": self.context.test_mode,
}
await self.context.sio.emit("queue_data_update", data, namespace="/")
async def _update_location_item(self, loc_id: str) -> None:
await self.context.sio.emit(
"location_item_update",
{
"loc_id": loc_id,
"data": self.context.simulation_queue_data[loc_id]
},
namespace="/",
)
async def _update_current_location(self, active_loc_id, new_latitude, new_longitude) -> None:
await self.context.sio.emit(
"simulation_status",
{
"status": self.context.simulation_active,
"loc_id": active_loc_id,
"latitude": new_latitude,
"longitude": new_longitude
},
namespace="/",
)
async def _enqueue_next_queue_item(self) -> Optional[str]:
if self.context.simulation_queue_state == "SHUTDOWN":
return None
for item_id in self.context.simulation_queue_order:
if item_id in self.context.simulation_queue_pending_ids:
continue
item = self.context.simulation_queue_data.get(item_id)
if not isinstance(item, dict):
continue
if item.get("status") != "queued":
continue
self.context.simulation_queue_pending_ids.add(item_id)
await self.context.simulation_queue.put(item_id)
logger.info("Worker scheduled queue item %s", item_id)
return item_id
return None
async def play_queue(
self, disable_sleep: bool = False, timing_randomness_range: int = 0
) -> None:
try:
while True:
if self.context.simulation_queue_state == "PAUSED":
await asyncio.sleep(0.1)
continue
if self.context.simulation_queue_state == "SHUTDOWN":
break
await self._enqueue_next_queue_item()
loc_id = await self.context.simulation_queue.get()
if loc_id is None:
self.context.simulation_queue.task_done()
break
location_item = self.context.simulation_queue_data.get(loc_id)
if location_item is None:
logger.warning(
"Simulation queue item missing for loc_id=%s; skipping stale entry",
loc_id,
)
self.context.simulation_queue_pending_ids.discard(loc_id)
self.context.simulation_queue.task_done()
await self._enqueue_next_queue_item()
continue
new_status = location_item.get("status")
if new_status == "deleted":
self.context.simulation_queue_pending_ids.discard(loc_id)
self.context.simulation_queue.task_done()
await self._enqueue_next_queue_item()
continue
new_latitude = location_item.get("latitude")
new_longitude = location_item.get("longitude")
new_delay = location_item.get("delay")
new_delay = 0 if new_delay is None else new_delay
new_start = location_item.get("start")
current_loc_id = self.context.get_current_loc_id()
current_location_item = self.context.simulation_queue_data.get(
current_loc_id) if current_loc_id else None
current_latitude = (
current_location_item.get("latitude")
if isinstance(current_location_item, dict)
else None
)
current_longitude = (
current_location_item.get("longitude")
if isinstance(current_location_item, dict)
else None
)
current_start = (
current_location_item.get("start")
if isinstance(current_location_item, dict)
else None
)
while self.context.simulation_queue_data[loc_id]["delay"] > 0:
if self.context.simulation_queue_state == "SHUTDOWN":
break
while self.context.simulation_queue_state == "PAUSED":
await asyncio.sleep(0.1)
if self.context.simulation_queue_state == "SHUTDOWN":
break
if self.context.simulation_queue_state == "SHUTDOWN":
break
self.context.next_move = location_item.get("delay") - 1
self.context.simulation_queue_data[loc_id]["delay"] -= 1
await self._update_location_item(loc_id)
await asyncio.sleep(1)
if self.context.simulation_queue_state == "SHUTDOWN":
self.context.simulation_queue_pending_ids.discard(loc_id)
self.context.simulation_queue.task_done()
break
self.context.simulation_queue_data[loc_id]["start"] = datetime.now(
timezone.utc).isoformat()
if current_loc_id is not None:
self.context.simulation_queue_data[current_loc_id]["status"] = "done"
self.context.simulation_queue_data[current_loc_id]["end"] = datetime.now(
timezone.utc).isoformat()
self.context.simulation_queue_data[current_loc_id]["delay"] = new_delay
await self._update_location_item(current_loc_id)
await self._update_queue_data()
await self._stop_noise_task()
await self.set(new_latitude, new_longitude)
self.context.simulation_queue_data[loc_id]["status"] = "set"
await self._update_location_item(loc_id)
self.context.set_current_loc_id(loc_id)
active_loc_id = self.context.get_current_loc_id()
await self._update_current_location(active_loc_id, new_latitude, new_longitude)
if self.context.simulation_noise:
self._start_noise_task(
loc_id, new_latitude, new_longitude)
logger.info(
"Set simulated location to %s, %s after %ss delay",
new_latitude,
new_longitude,
new_delay,
)
await self._update_queue_data()
self.context.simulation_queue_pending_ids.discard(loc_id)
self.context.simulation_queue.task_done()
await self._enqueue_next_queue_item()
finally:
await self._stop_noise_task()