Files
sim-location-backend/server.py

989 lines
44 KiB
Python

import asyncio
from datetime import datetime, timezone, timedelta
import json
import uuid
import logging
import os
import signal
import traceback
import warnings
import random
from pydantic import BaseModel, RootModel
import socketio
from contextlib import asynccontextmanager, suppress
from typing import Optional
from pymobiledevice3.services.dvt.instruments.location_simulation_base import LocationSimulationBase
with warnings.catch_warnings():
# Ignore: "Core Pydantic V1 functionality isn't compatible with Python 3.14 or greater."
warnings.simplefilter("ignore", category=UserWarning)
import fastapi
import uvicorn
from fastapi import FastAPI
from pymobiledevice3.exceptions import (
ConnectionFailedError,
InvalidServiceError,
MuxException,
TunneldConnectionError,
)
from pymobiledevice3.lockdown import create_using_usbmux, get_mobdev2_lockdowns
from pymobiledevice3.osu.os_utils import get_os_utils
from pymobiledevice3.remote.common import TunnelProtocol
from pymobiledevice3.remote.remote_service_discovery import (
RemoteServiceDiscoveryService
)
from pymobiledevice3.remote.tunnel_service import (
CoreDeviceTunnelProxy,
TunnelResult,
create_core_device_tunnel_service_using_rsd,
get_remote_pairing_tunnel_services,
)
from pymobiledevice3.remote.utils import get_rsds
from pymobiledevice3.services.dvt.instruments.location_simulation import LocationSimulation
from pymobiledevice3.services.dvt.instruments.dvt_provider import DvtProvider
from pymobiledevice3.tunneld.server import TunneldCore, TunnelTask
class JsonFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
payload = {
"ts": self.formatTime(record, "%Y-%m-%dT%H:%M:%S%z"),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
if record.exc_info:
payload["exc_info"] = self.formatException(record.exc_info)
return json.dumps(payload, ensure_ascii=True)
handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())
root_logger = logging.getLogger()
root_logger.handlers = [handler]
root_logger.setLevel(logging.INFO)
logger = logging.getLogger("ios-api")
# bugfix: after the device reboots, it might take some time for remoted to start answering the bonjour queries
REATTEMPT_INTERVAL = 5
REATTEMPT_COUNT = 5
REMOTEPAIRING_INTERVAL = 5
MOBDEV2_INTERVAL = 5
# USB monitor will periodically forget what interfaces it has seen
# and force a full rescan. The value is the number of iterations of the
# inner loop (which sleeps one second each) before blowing away the
# `previous_ips` cache.
USB_MONITOR_RESCAN_INTERVAL = 30
USBMUX_INTERVAL = 2
OSUTILS = get_os_utils()
TUNNEL_ACQUIRE_TIMEOUT_SECONDS = 15
DVT_CONNECT_TIMEOUT_SECONDS = 20
class SimulationStatusData(BaseModel):
latitude: float
longitude: float
start: float
end: Optional[float]
next_move: Optional[float]
class SimulationStatus(BaseModel):
status: bool
data: Optional[SimulationStatusData]
class SimulationRequestData(BaseModel):
latitude: float
longitude: float
delay: int = 0
start: Optional[str] = None
end: Optional[str] = None
class SimulationRequest(BaseModel):
status: bool
data: Optional[SimulationRequestData]
class SimulationRequestResponseData(BaseModel):
loc_id: str
latitude: float
longitude: float
delay: int = 0
start: Optional[str] = None
end: Optional[str] = None
class SimulationQueueList(BaseModel):
data: Optional[SimulationRequestResponseData]
class SimulationRequestResponse(BaseModel):
status: bool
data: Optional[SimulationRequestResponseData]
class LocationSimulationState:
def __init__(self):
self.latitude: Optional[float] = None
self.longitude: Optional[float] = None
self.next_move: Optional[float] = None
self.udid: Optional[str] = None
self.simulation_active: bool = False
self.loc_id: Optional[str] = None
self.set_location_enabled: bool = True
self.queue: asyncio.Queue = asyncio.Queue()
self.queue_list: list[SimulationRequestResponseData] = []
self.queue_status: Optional[asyncio.Event] = asyncio.Event()
self.queue_state: str = "STOPPED"
self.test_mode: bool = True
self.simulation_task: Optional[asyncio.Task] = None
self.sio: socketio.AsyncServer = socketio.AsyncServer(async_mode="asgi", cors_allowed_origins="*")
self.tunnel: Optional[RemoteServiceDiscoveryService] = None
class TunneldRunnerSio:
"""TunneldRunner orchestrate between the webserver and TunneldCore"""
@classmethod
def create(
cls,
host: str,
port: int,
context: LocationSimulationState = LocationSimulationState(),
protocol: TunnelProtocol = TunnelProtocol.QUIC,
usb_monitor: bool = True,
wifi_monitor: bool = True,
usbmux_monitor: bool = True,
mobdev2_monitor: bool = True,
) -> None:
cls(
host,
port,
protocol=protocol,
usb_monitor=usb_monitor,
wifi_monitor=wifi_monitor,
usbmux_monitor=usbmux_monitor,
mobdev2_monitor=mobdev2_monitor,
context=context,
)._run_app()
def __init__(
self,
host: str,
port: int,
context: LocationSimulationState = LocationSimulationState(),
protocol: TunnelProtocol = TunnelProtocol.QUIC,
usb_monitor: bool = True,
wifi_monitor: bool = True,
usbmux_monitor: bool = True,
mobdev2_monitor: bool = True,
):
@asynccontextmanager
async def lifespan(app: FastAPI):
self._tunneld_core.start()
yield
logger.info("Closing tunneld tasks...")
await empty_simulation_queue()
await self._tunneld_core.close()
await self.context.sio.shutdown()
self.host = host
self.port = port
self.protocol = protocol
self.context = context
self._tunneld_api_address = ("127.0.0.1" if host in ("0.0.0.0", "::") else host, port)
self._app = FastAPI(title="iOS Device Management API", lifespan=lifespan, cors_allowed_origins="*")
self._asgi_app = socketio.ASGIApp(self.context.sio, self._app)
self._tunneld_core = TunneldCore(
protocol=protocol,
wifi_monitor=wifi_monitor,
usb_monitor=usb_monitor,
usbmux_monitor=usbmux_monitor,
mobdev2_monitor=mobdev2_monitor,
)
async def get_tun(
udid: Optional[str] = None, max_retries: int = 10, retry_delay: float = 0.5
) -> RemoteServiceDiscoveryService:
"""Resolve an active local tunnel and connect directly to its RSD endpoint."""
if self.context.tunnel is not None:
return self.context.tunnel
for attempt in range(max_retries):
candidates = []
for active_tunnel in self._tunneld_core.tunnel_tasks.values():
if active_tunnel.tunnel is None or active_tunnel.udid is None:
continue
if udid is None or active_tunnel.udid == udid:
candidates.append(active_tunnel.tunnel)
if not candidates:
if attempt < max_retries - 1:
logger.info("Waiting for local tunnel to be ready... (attempt %s/%s)", attempt + 1, max_retries)
await asyncio.sleep(retry_delay)
continue
logger.error("Failed to find an active local tunnel after max retries")
raise TunneldConnectionError()
tunnel = candidates[0]
if udid and len(candidates) > 1:
logger.warning("Multiple local tunnels found for udid %s; using first available", udid)
try:
rsd = RemoteServiceDiscoveryService((tunnel.address, tunnel.port))
await rsd.connect()
logger.info(
"Connected to local tunnel at %s:%s after %s retries",
tunnel.address,
tunnel.port,
attempt,
)
self.context.tunnel = rsd
return rsd
except Exception:
if attempt < max_retries - 1:
logger.info("Tunnel endpoint not ready yet... (attempt %s/%s)", attempt + 1, max_retries)
await asyncio.sleep(retry_delay)
else:
logger.error("Failed to connect to local tunnel endpoint after max retries")
raise TunneldConnectionError()
raise TunneldConnectionError()
def cleanup_device_data(d):
mydict = {}
for key, value in d.items():
if isinstance(value, dict):
cleanup_device_data(value)
elif isinstance(value, bytes):
mydict[key] = 'BYTE DATA'
else:
mydict[key] = value
return mydict
async def device_reboot(delay):
""" Reboot the device"""
logger.info("Reboot iniated with delay %s")
await asyncio.sleep(delay)
os.system("shutdown -r now")
async def device_shutdown(delay):
""" Shutdown the device"""
logger.info("Shutdown iniated with delay %s")
await asyncio.sleep(delay)
os.system("shutdown -h now")
""" Queue Functions"""
async def start_simulation_queue():
""" Start Simulation Queue Worker"""
logger.info("Starting location simulation worker...")
self.context.simulation_active = True
self.context.queue_state = "RUNNING"
self.context.queue_status.set()
try:
if self.context.test_mode:
logger.info("Simulation worker: test mode enabled")
with LocationSimulationTestQueue(self.context) as locate_simulation:
await locate_simulation.play_queue()
if self.context.udid is None and not self.context.test_mode:
active_udids = sorted(
{t.udid for t in self._tunneld_core.tunnel_tasks.values() if
t.udid is not None and t.tunnel is not None}
)
if len(active_udids) == 1:
self.context.udid = active_udids[0]
logger.info("Simulation worker: auto-selected udid=%s from active tunnel", self.context.udid)
elif len(active_udids) == 0:
logger.error("Simulation worker: no active tunnel with udid available")
await self.context.sio.emit(
"error",
{"type": "simulation_no_tunnel", "message": "No active tunnel found. Start tunnel first."},
namespace="/",
)
return
else:
logger.error("Simulation worker: multiple active tunnels; explicit udid required: %s",
active_udids)
await self.context.sio.emit(
"error",
{
"type": "simulation_udid_required",
"message": "Multiple active tunnels found; provide udid in start_simulate_location.",
"udids": active_udids,
},
namespace="/",
)
return
logger.info("Simulation worker: acquiring tunnel (udid=%s)", self.context.udid)
# tun = await asyncio.wait_for(
# get_tun(self.context.udid),
# timeout=TUNNEL_ACQUIRE_TIMEOUT_SECONDS,
# )
tun = await get_tun(self.context.udid)
logger.info("Simulation worker: tunnel acquired, connecting DVT provider")
dvt_provider = DvtProvider(tun)
dvt = await asyncio.wait_for(dvt_provider.__aenter__(), timeout=DVT_CONNECT_TIMEOUT_SECONDS)
logger.info("Simulation worker: DVT provider connected, starting queue playback")
try:
async with LocationSimulationQueue(dvt, self.context) as locate_simulation:
await locate_simulation.play_queue()
finally:
logger.info("Simulation worker: closing DVT provider")
await dvt_provider.__aexit__(None, None, None)
logger.info("Simulation worker: DVT provider closed")
except TimeoutError:
logger.error(
"Simulation worker timeout. tunnel_timeout=%ss dvt_timeout=%ss udid=%s",
TUNNEL_ACQUIRE_TIMEOUT_SECONDS,
DVT_CONNECT_TIMEOUT_SECONDS,
self.context.udid,
)
await self.context.sio.emit(
"error",
{
"type": "simulation_timeout",
"udid": self.context.udid,
"tunnel_timeout_seconds": TUNNEL_ACQUIRE_TIMEOUT_SECONDS,
"dvt_timeout_seconds": DVT_CONNECT_TIMEOUT_SECONDS,
},
namespace="/",
)
except Exception:
logger.exception("Simulation worker crashed")
await self.context.sio.emit(
"error",
{"type": "simulation_crash", "udid": self.context.udid},
namespace="/",
)
finally:
self.context.simulation_active = False
self.context.simulation_task = None
async def pause_simulation_queue():
"""Pauses asyncio.Queue playback"""
self.context.queue_state = "PAUSED"
async def resume_simulation_queue():
"""Resumes asyncio.Queue playback"""
self.context.queue_state = "RUNNING"
async def empty_simulation_queue():
"""Empties all items from an asyncio.Queue."""
logger.info("Clearing location simulation queue...")
q = self.context.queue
self.context.set_location_enabled = False
while not q.empty():
try:
item = q.get_nowait()
q.task_done()
logger.info("Discarding item from queue: %s", item)
except asyncio.QueueEmpty:
break
async def end_simulation_queue() -> str:
"""Ends asyncio.Queue playback and closes tunnel"""
logger.info("End location simulation request from %s", sid)
if self.context.simulation_task is not None and not self.context.simulation_task.done():
q = self.context.queue
if q.qsize() > 0:
await empty_simulation_queue()
while q.empty() and q.qsize() == 0:
await q.join()
with suppress(asyncio.CancelledError):
await self.context.simulation_task
if self.context.tunnel is not None:
async with DvtProvider(self.context.tunnel) as dvt, LocationSimulation(dvt) as locate_simulation:
await locate_simulation.clear()
self.context.simulation_active = False
self.context.queue_state = "SHUTDOWN"
return "ended"
""" FastAPI HTTP Functions"""
def generate_http_response(
data: dict, status_code: int = 200, media_type: str = "application/json"
) -> fastapi.Response:
return fastapi.Response(status_code=status_code, media_type=media_type, content=json.dumps(data))
""" Tunnel Functions"""
@self._app.get("/start-tunnel")
async def start_tunnel(
udid: Optional[str] = self.context.udid, ip: Optional[str] = None, connection_type: Optional[str] = None
) -> fastapi.Response:
udid_tunnels = [
t.tunnel for t in self._tunneld_core.tunnel_tasks.values() if t.udid == udid and t.tunnel is not None
]
if len(udid_tunnels) > 0:
self.context.udid = udid
data = {
"interface": udid_tunnels[0].interface,
"port": udid_tunnels[0].port,
"address": udid_tunnels[0].address,
}
return generate_http_response(data)
queue = asyncio.Queue()
created_task = False
try:
if not created_task and connection_type in ("usbmux", None):
task_identifier = f"usbmux-{udid}"
try:
async with await create_using_usbmux(udid) as lockdown:
service = await CoreDeviceTunnelProxy.create(lockdown)
task = asyncio.create_task(
self._tunneld_core.start_tunnel_task(
task_identifier, service, protocol=TunnelProtocol.TCP, queue=queue
),
name=f"start-tunnel-task-{task_identifier}",
)
self._tunneld_core.tunnel_tasks[task_identifier] = TunnelTask(task=task, udid=udid)
created_task = True
except (ConnectionFailedError, InvalidServiceError, MuxException):
pass
if connection_type in ("usb", None):
for rsd in await get_rsds(udid=udid):
rsd_ip = rsd.service.address[0]
if ip is not None and rsd_ip != ip:
await rsd.close()
continue
task = asyncio.create_task(
self._tunneld_core.start_tunnel_task(
rsd_ip, await create_core_device_tunnel_service_using_rsd(rsd), queue=queue
),
name=f"start-tunnel-usb-{rsd_ip}",
)
self._tunneld_core.tunnel_tasks[rsd_ip] = TunnelTask(task=task, udid=rsd.udid)
created_task = True
if not created_task and connection_type in ("wifi", None):
for remotepairing in await get_remote_pairing_tunnel_services(udid=udid):
remotepairing_ip = remotepairing.hostname
if ip is not None and remotepairing_ip != ip:
await remotepairing.close()
continue
task = asyncio.create_task(
self._tunneld_core.start_tunnel_task(remotepairing_ip, remotepairing, queue=queue),
name=f"start-tunnel-wifi-{remotepairing_ip}",
)
self._tunneld_core.tunnel_tasks[remotepairing_ip] = TunnelTask(
task=task, udid=remotepairing.remote_identifier
)
created_task = True
except Exception as e:
return fastapi.Response(
status_code=501,
content=json.dumps({
"error": {
"exception": e.__class__.__name__,
"traceback": traceback.format_exc(),
}
}),
)
if not created_task:
return fastapi.Response(status_code=501, content=json.dumps({"error": "task not created"}))
tunnel: Optional[TunnelResult] = await queue.get()
if tunnel is not None:
self.context.udid = udid
data = {"interface": tunnel.interface, "port": tunnel.port, "address": tunnel.address}
return generate_http_response(data)
else:
return fastapi.Response(
status_code=404, content=json.dumps({"error": "something went wrong during tunnel creation"})
)
@self._app.get("/shutdown")
async def shutdown() -> fastapi.Response:
"""Shutdown Tunneld"""
os.kill(os.getpid(), signal.SIGINT)
data = {"operation": "shutdown", "data": True, "message": "Server shutting down..."}
return generate_http_response(data)
@self._app.get("/clear-tunnels")
async def clear_tunnels() -> fastapi.Response:
"""Clear all tunnels"""
self._tunneld_core.clear()
data = {"operation": "clear_tunnels", "data": True, "message": "Cleared tunnels..."}
return generate_http_response(data)
@self._app.get("/cancel")
async def cancel_tunnel(udid: str) -> fastapi.Response:
"""Cancel a tunnel"""
self._tunneld_core.cancel(udid=udid)
data = {"operation": "cancel", "udid": udid, "data": True, "message": f"tunnel {udid} Canceled ..."}
return generate_http_response(data)
"""Simulation Functions"""
@self._app.get("/start-simulation")
async def app_start_simulation() -> fastapi.Response:
logger.info("Simulation Start Requested ")
if self.context.simulation_task is None or self.context.simulation_task.done():
self.context.simulation_active = True
self.context.simulation_task = asyncio.create_task(
start_simulation_queue(),
name="location-simulation-worker",
)
data = {"status": "started", "message": "Simulation started"}
else:
data = {"status": "error", "message": "Simulation already running"}
return generate_http_response(data)
@self._app.post("/add-location")
async def app_add_location(data: SimulationRequestData) -> fastapi.Response:
""" Add a location to the simulation queue"""
logger.info("Request to add new location to queue")
loc_id = str(uuid.uuid4())
latitude = data.get("latitude") if isinstance(data, dict) else getattr(data, "latitude", None)
longitude = data.get("longitude") if isinstance(data, dict) else getattr(data, "longitude", None)
delay = data.get("delay", 0) if isinstance(data, dict) else getattr(data, "delay", 0)
delay = 0 if delay is None else delay
if latitude is not None and longitude is not None:
logger.info("Adding location %s (%s, %s) with %s delay to the queue", loc_id, latitude, longitude,
delay)
await self.context.queue.put((loc_id, latitude, longitude, delay))
if delay == 0:
start_time = datetime.now(timezone.utc).isoformat()
else:
now_time = datetime.now(timezone.utc)
new_time = now_time + timedelta(seconds=delay)
start_time = new_time.isoformat()
location_item = {
loc_id: {
"loc_id": loc_id,
"latitude": latitude,
"longitude": longitude,
"delay": delay,
"start": start_time
}
}
self.context.queue_list.append(location_item)
resp = {
"status": "added",
"message": f"Location {loc_id} added to the queue",
"item": location_item
}
else:
resp = {"status": "error", "message": "Invalid location data"}
return generate_http_response(resp)
@self._app.get("/clear-queue")
async def app_clear_queue() -> fastapi.Response:
""" Clear the simulation queue"""
logger.info("Simulation Start Requested ")
await empty_simulation_queue()
data = {"status": "cleared", "message": "Simulation cleared"}
return generate_http_response(data)
@self._app.get("/pause-queue")
async def app_pause_queue() -> fastapi.Response:
""" Pause the simulation queue"""
await pause_simulation_queue()
data = {"status": "paused", "message": "Simulation paused"}
return generate_http_response(data)
@self._app.get("/resume-queue")
async def app_resume_queue() -> fastapi.Response:
""" Resume the simulation queue"""
await resume_simulation_queue()
data = {"status": "resumed", "message": "Simulation resumed"}
return generate_http_response(data)
@self._app.get("/end-simulation")
async def app_end_simulation() -> fastapi.Response:
""" End the simulation queue"""
logger.info("End location simulation request")
end_task = asyncio.create_task(end_simulation_queue(), name="end-simulation-worker")
result = await end_task
data = {"status": result, "message": "Simulation ended"}
return generate_http_response(data)
"""Status Functions"""
@self._app.get("/")
async def list_tunnels() -> dict[str, list[dict]]:
"""Retrieve the available tunnels and format them as {UUID: TUNNEL_ADDRESS}"""
tunnels = {}
for ip, active_tunnel in self._tunneld_core.tunnel_tasks.items():
if (active_tunnel.udid is None) or (active_tunnel.tunnel is None):
continue
if active_tunnel.udid not in tunnels:
tunnels[active_tunnel.udid] = []
tunnels[active_tunnel.udid].append(
{
"tunnel-address": active_tunnel.tunnel.address,
"tunnel-port": active_tunnel.tunnel.port,
"interface": ip,
}
)
return tunnels
@self._app.get("/device-info")
async def device_info():
"""Get device information"""
tunnels = {}
for ip, active_tunnel in self._tunneld_core.tunnel_tasks.items():
if (active_tunnel.udid is None) or (active_tunnel.tunnel is None):
continue
if active_tunnel.udid not in tunnels:
tunnels[active_tunnel.udid] = {}
try:
lockdown = await create_using_usbmux(serial=active_tunnel.udid, autopair=False)
tunnels[active_tunnel.udid] = iterate_multidim(lockdown.all_values)
except Exception as e:
logger.error(f"Failed to create lockdown session for device {active_tunnel.udid}: {e}")
continue
return tunnels
@self._app.get("/hello")
async def hello() -> fastapi.Response:
data = {"message": "Hello, I'm alive"}
return generate_http_response(data)
@self._app.get("/context-status")
async def app_context_status() -> fastapi.Response:
data = {
"latitude": self.context.latitude,
"longitude": self.context.longitude,
"next_more": self.context.next_move,
"udid": self.context.udid,
"simulation_active": self.context.simulation_active,
"loc_id": self.context.loc_id,
"set_location_enable": self.context.set_location_enabled,
"queue_state": self.context.queue_state,
"queue_list": self.context.queue_list
}
return generate_http_response(data)
""" Socket.IO Functions"""
"""Socket.IO Connection Events"""
@self.context.sio.event
async def connect(sid, environ):
"""Client connection event handler."""
logger.info("Client connected: %s", sid)
return('%s connected' % sid)
@self.context.sio.event
async def disconnect(sid):
"""Client disconnection event handler."""
logger.info("Client disconnected: %s", sid)
""" Socket.IO Request Events """
@self.context.sio.event
async def request_update(sid, data):
logger.info("Update request from %s", sid)
@self.context.sio.event
async def message(sid, data):
logger.info("Received message from %s: %s", data, sid)
await self.context.sio.emit("message", f"Received message from {sid}: {data}", namespace="/")
""" Device Control"""
@self.context.sio.event
async def device_control(sid, data):
""" Device Control """
command = data.get("command") if isinstance(data, dict) else getattr(data, "command", None)
delay = data.get("delay") if isinstance(data, dict) else getattr(data, "delay", None)
if delay is None:
delay = 5
match command:
case "shutdown":
""" Shutdown the device"""
logger.info("Shutdown command received from %s with delay %s", sid, delay)
await device_shutdown(delay)
return { "command": "shutdown", "status": "success", "message": f"Device shutdown initiated with {delay} seconds delay" }
case "reboot":
""" Reboot the device"""
logger.info("Reboot command received from %s with delay %s", sid, delay)
await device_reboot(delay)
return { "command": "reboot", "status": "success", "message": f"Device reboot initiated with {delay} seconds delay" }
case _:
return { "command": command, "status": "error", "message": f"Invalid command: {command}" }
@self.context.sio.event
async def simulate_control(sid, data):
""" Simulation Control """
command = data.get("command") if isinstance(data, dict) else getattr(data, "command", None)
logger.info("Simulation Control command: %s requested from %s", command, sid)
match command:
case "add":
""" Add a location to the simulation queue"""
loc_id = str(uuid.uuid4())
latitude = data.get("latitude") if isinstance(data, dict) else getattr(data, "latitude", None)
longitude = data.get("longitude") if isinstance(data, dict) else getattr(data, "longitude", None)
delay = data.get("delay", 0) if isinstance(data, dict) else getattr(data, "delay", 0)
delay = 0 if delay is None else delay
if latitude is not None and longitude is not None:
logger.info("Adding location %s (%s, %s) with %s delay to the queue", loc_id, latitude, longitude,
delay)
await self.context.queue.put((loc_id, latitude, longitude, delay))
if delay == 0:
start_time = datetime.now(timezone.utc).isoformat()
else:
now_time = datetime.now(timezone.utc)
new_time = now_time + timedelta(seconds=delay)
start_time = new_time.isoformat()
location_item = {
loc_id: {
"loc_id": loc_id,
"latitude": latitude,
"longitude": longitude,
"delay": delay,
"start": start_time
}
}
ack = {
"command": command,
"status": "added",
"message": f"Location {loc_id} added to the queue",
"item": location_item
}
self.context.queue_list.append(location_item)
logger.info("Location %s added to the queue", loc_id)
return ack
else:
logger.warning("Invalid location data received from %s: %s", sid, data)
return {"command": command, "status": "error", "message": "Invalid location data"}
case "clear":
""" Clear the simulation queue"""
await empty_simulation_queue()
return {"command": command, "status": "cleared", "message": "Simulation cleared"}
case "pause":
""" Pause the simulation queue"""
await pause_simulation_queue()
return {"command": command, "status": "paused", "message": "Simulation paused"}
case "resume":
""" Resume the simulation queue"""
await resume_simulation_queue()
return {"command": command, "status": "resumed", "message": "Simulation resumed"}
case "end":
""" End the simulation queue"""
logger.info("End location simulation request from %s", sid)
end_task = asyncio.create_task(end_simulation_queue(), name="end-simulation-worker")
result = await end_task
return {"command": command, "status": result, "message": "Simulation ended"}
case "start":
""" Start the simulation queue"""
logger.info("Start location simulation request from %s", sid)
if self.context.simulation_task is None or self.context.simulation_task.done():
self.context.simulation_active = True
self.context.simulation_task = asyncio.create_task(
start_simulation_queue(),
name="location-simulation-worker",
)
return {"command": command, "status": "started", "message": "Simulation started"}
else:
return {"command": command, "status": "error", "message": "Simulation already running"}
case _:
logger.warning("Invalid command received from %s: %s", sid, command)
return {"status": "error", "message": "Invalid command"}
""" Tunnel Control """
@self.context.sio.event
async def tunneld_control(sid, data):
command = data.get("command") if isinstance(data, dict) else getattr(data, "command", None)
logger.info("Tunneld Control command: %s requested from %s", command, sid)
match command:
case "start":
"""Start Tunneld"""
logger.info("Start tunneld request from %s: %s", sid, data)
try:
self._tunneld_core.start()
logger.info("Tunneld started successfully")
return {"status": "started", "message": "Tunneld started successfully"}
except Exception as e:
logger.error("Error starting tunneld: %s", e)
return {"command": command, "status": "error", "message": f"Error starting tunneld: {e}"}
case "shutdown":
"""Shutdown Tunneld"""
logger.info("Shutdown tunneld request from %s: %s", sid, data)
try:
os.kill(os.getpid(), signal.SIGINT)
return {"command": command, "status": "Success", "message": "Server shutting down..."}
except Exception as e:
logger.error("Error shutting down tunneld: %s", e)
return {"command": command, "status": "error", "message": f"Error shutting down tunneld: {e}"}
case "clear":
"""Clear all tunnels"""
logger.info("Clearing tunnels...")
try:
self._tunneld_core.clear()
return {"command": command, "status": "Success", "message": "Cleared tunnels..."}
except Exception as e:
logger.error("Error clearing tunnels: %s", e)
return {"command": command, "status": "error", "message": f"Error clearing tunnels: {e}"}
case "cancel":
"""Cancel a tunnel"""
logger.info("Canceling tunnel request from %s: %s", sid, data)
try:
udid = data.get("udid") if isinstance(data, dict) else getattr(data, "udid", self.context.udid)
if udid is None:
udid = self.context.udid
self._tunneld_core.cancel(udid=udid)
return {"command": command, "status": "Success", "udid": udid, "message": f"tunnel {udid} Canceled ..."}
except Exception as e:
logger.error("Error canceling tunnel: %s", e)
return {"command": command, "status": "error", "message": f"Error canceling tunnel: {e}"}
case _:
return {"command": command, "status": "error", "message": f"Unknown operation: {command}"}
def _run_app(self) -> None:
uvicorn.run(self._asgi_app, host=self.host, port=self.port, loop="asyncio", workers=1)
class LocationSimulationQueue(LocationSimulation):
def __init__(self, dvt, context: LocationSimulationState):
super().__init__(dvt)
self.context = context
async def play_queue(self, disable_sleep: bool = False, timing_randomness_range: int = 0) -> None:
while True:
if self.context.queue_state == "PAUSED":
await asyncio.sleep(0.1)
continue
if self.context.queue_state == "SHUTDOWN":
break
loc_id, latitude, longitude, delay = await self.context.queue.get()
if (loc_id, latitude, longitude, delay) == (None, None, None, None):
break
if self.context.set_location_enabled:
if delay > 0 and not disable_sleep:
if timing_randomness_range > 0:
delay = delay + random.uniform(
-timing_randomness_range, timing_randomness_range
)
for i in range(delay, 0, -1):
self.context.next_move = i
await self.context.sio.emit(
"simulation_status",
{
"status": self.context.simulation_active,
"loc_id": self.context.loc_id,
"latitude": self.context.latitude,
"longitude": self.context.longitude,
"next_move": i
},
namespace="/",
)
await asyncio.sleep(1)
await self.set(latitude, longitude)
self.context.latitude = latitude
self.context.longitude = longitude
await self.context.sio.emit(
"simulation_status",
{
"status": self.context.simulation_active,
"loc_id": self.context.loc_id,
"latitude": self.context.latitude,
"longitude": self.context.longitude,
"next_move": None,
},
namespace="/",
)
logger.info(
"Set simulated location to %s, %s after %ss delay",
latitude,
longitude,
delay,
)
self.context.queue.task_done()
class LocationSimulationTestQueue(LocationSimulationBase):
def __init__(self, context: LocationSimulationState):
super().__init__()
self.context = context
def __enter__(self):
return self
def __exit__(self):
return self
async def set(self, latitude: float, longitude: float) -> None:
await asyncio.sleep(0.1)
logger.info("Simulated location set to %s, %s", latitude, longitude)
async def clear(self) -> None:
q = self.context.queue
if self.context.simulation_task is not None and not self.context.simulation_task.done():
if q.qsize() > 0:
self.context.set_location_enabled = False
while not q.empty():
try:
item = q.get_nowait()
q.task_done()
logger.info("Discarding item from queue: %s", item)
except asyncio.QueueEmpty:
break
while q.empty() and q.qsize() == 0:
await q.join()
with suppress(asyncio.CancelledError):
await self.context.simulation_task
self.context.simulation_active = False
self.context.queue_state = "SHUTDOWN"
async def play_queue(self, disable_sleep: bool = False, timing_randomness_range: int = 0) -> None:
while True:
while self.context.queue_state == "PAUSED":
await asyncio.sleep(0.1)
if self.context.queue_state == "SHUTDOWN":
break
loc_id, latitude, longitude, delay = await self.context.queue.get()
if (loc_id, latitude, longitude, delay) == (None, None, None, None):
break
if self.context.set_location_enabled:
if delay > 0 and not disable_sleep:
if timing_randomness_range > 0:
delay = delay + random.uniform(
-timing_randomness_range, timing_randomness_range
)
for i in range(delay, 0, -1):
self.context.next_move = i
await self.context.sio.emit(
"simulation_status",
{
"status": self.context.simulation_active,
"loc_id": self.context.loc_id,
"latitude": self.context.latitude,
"longitude": self.context.longitude,
"next_move": i
},
namespace="/",
)
await asyncio.sleep(1)
await self.set(latitude, longitude)
self.context.latitude = latitude
self.context.longitude = longitude
self.context.loc_id = loc_id
await self.context.sio.emit(
"simulation_status",
{
"status": self.context.simulation_active,
"loc_id": self.context.loc_id,
"latitude": self.context.latitude,
"longitude": self.context.longitude,
"next_move": None,
},
namespace="/",
)
logger.info(
"Set simulated location to %s, %s after %ss delay",
latitude,
longitude,
delay,
)
self.context.queue.task_done()