Compare commits

..

25 Commits

Author SHA1 Message Date
64ee9797ff Delete cookies/willbrunogmailcom.session
Cookies
2026-05-02 11:57:02 -04:00
b13606dcbf Delete cookies/willbrunogmailcom.cookiejar
Cookies
2026-05-02 11:56:45 -04:00
e48c22e0f7 Added Favorites to geocache.db (needs rework)
and more
2026-04-29 13:19:28 -04:00
c10d785bfa fixes 2026-04-22 11:31:45 -04:00
da8716cccc fixes 2026-04-18 08:09:38 -04:00
e667af9201 refractor 2026-04-14 09:53:52 -04:00
02d9e06077 updates 2026-04-05 15:20:28 -04:00
5dbbeb3394 bump pymd3 2026-04-04 22:30:27 -04:00
94cb2441e2 PathFix 2026-04-04 19:09:44 -04:00
502992f69d ENV VAR for VUE DIST 2026-04-04 19:02:16 -04:00
c31a9c01b6 osr proxy 2026-04-04 11:28:57 -04:00
5b04a6aac4 reverse geocode async 2026-04-04 10:15:00 -04:00
c395492583 add second FastAPI server for static site 2026-04-03 16:15:39 -04:00
10f2da14a1 reverse geocode, icloud monitor 2026-04-01 15:47:12 -04:00
a7af0faefc reverse geocode, icloud monitor 2026-04-01 10:32:35 -04:00
1eef99e3b4 extensive changes 2026-03-27 17:12:20 -04:00
c5a563f047 icloud 2026-03-21 08:22:24 -04:00
47aeebd86f lots of changes 2026-03-21 07:32:43 -04:00
21933cdef9 changed reserved socket.io event "error" to "appError" 2026-03-17 17:04:16 -04:00
34a8ce730a tests 2026-03-17 16:06:39 -04:00
3073c4e4b5 rewrite: working. todo: fix worker pause/play. move attributes out of asyncio-Queue in order to edit outside of queue. 2026-03-17 11:33:26 -04:00
b5ebedb2b9 merge
# Conflicts:
#	server.py
2026-03-13 14:21:09 -04:00
37c3bf0aeb debug 2026-03-12 21:33:18 -04:00
5b2d5655d8 clean 2026-03-12 11:48:22 -04:00
7dd45fff2c rewrite 2026-03-12 11:46:19 -04:00
19 changed files with 3849 additions and 1106 deletions

3
.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
__pycache__/*
*.pyc
cookies/*

8
.idea/modules.xml generated Normal file
View File

@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/pymd3_vue_location_sim.iml" filepath="$PROJECT_DIR$/.idea/pymd3_vue_location_sim.iml" />
</modules>
</component>
</project>

15
.idea/pymd3_vue_location_sim.iml generated Normal file
View File

@@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<module external.system.id="pyproject.toml" type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
<excludeFolder url="file://$MODULE_DIR$/.venv" />
</content>
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="jdk" jdkName="uv (pymd3_vue_location_sim)" jdkType="Python SDK" />
</component>
<component name="PyDocumentationSettings">
<option name="format" value="PLAIN" />
<option name="myDocStringFormat" value="Plain" />
</component>
</module>

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.14

0
README.md Normal file
View File

Binary file not shown.

Binary file not shown.

BIN
geocache.db Normal file

Binary file not shown.

50
main.bk
View File

@@ -1,50 +0,0 @@
from flask import Flask, request, jsonify
from flask_cors import CORS
app = Flask(__name__, static_folder="../front-end/dist/assets", template_folder="../front-end/dist")
CORS(app)
@app.route('/')
def index():
return app.send_static_file('index.html')
# Essential for handling client-side routing (e.g., Vue Router history mode)
@app.route('/<path:path>')
def static_files(path):
return app.send_static_file(path)
@app.route('/api/set', methods=['POST'])
def handle_json_data():
data = request.get_json()
if not data:
return jsonify({'error': 'Missing JSON data'}), 400
if 'lat' not in data:
return jsonify({'error': 'Missing lat in JSON data'}), 400
if 'lng' not in data:
return jsonify({'error': 'Missing lng in JSON data'}), 400
# Process the data (e.g., save to a database)
lat = data['lat']
lng = data['lng']
# Return a JSON response
return jsonify({
'message': f'lat: {lat}, lng: {lng} received successfully!',
'data': data
}), 200
@app.route('/api/status', methods=['GET'])
def get_data():
return jsonify(message="Hello from Flask API!")
@app.errorhandler(404)
def not_found(error):
return jsonify({'error': 'Not found'}), 404
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)

972
main.py
View File

@@ -1,930 +1,66 @@
import asyncio import tempfile
import json from functools import partial
import logging from typing import Annotated
import socket import typer
import time import os
from dataclasses import dataclass
from typing import Dict, Optional
import uvicorn
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from pymobiledevice3.lockdown import create_using_usbmux
from pymobiledevice3.remote.common import TunnelProtocol from pymobiledevice3.remote.common import TunnelProtocol
from pymobiledevice3.remote.remotexpc import RemoteXPCConnection from pymobiledevice3.remote.module_imports import verify_tunnel_imports
from pymobiledevice3.remote.tunnel_service import CoreDeviceTunnelProxy, start_tunnel, TunnelResult from pymobiledevice3.tunneld.api import TUNNELD_DEFAULT_ADDRESS
from pymobiledevice3.service_connection import ServiceConnection from src.pymd3_vue_location_sim.server import TunneldRunnerSio, LocationSimulationState
from pymobiledevice3.services.dvt.dvt_secure_socket_proxy import DvtSecureSocketProxyService from src.pymd3_vue_location_sim.json_formatter import logger
from pymobiledevice3.services.dvt.instruments.location_simulation import LocationSimulation
from pymobiledevice3.services.mobile_image_mounter import MobileImageMounterService
from pymobiledevice3.usbmux import list_devices
from pymobiledevice3.remote.remote_service_discovery import RemoteServiceDiscoveryService
def main():
port = int(os.getenv("PORT", 49151))
cli_tunneld(host="0.0.0.0", port=port)
class JsonFormatter(logging.Formatter): def cli_tunneld(
def format(self, record: logging.LogRecord) -> str: host: Annotated[str, typer.Option(help="Address to bind the tunneld server to.")] = TUNNELD_DEFAULT_ADDRESS[0],
payload = { port: Annotated[int, typer.Option(help="Port to bind the tunneld server to.")] = TUNNELD_DEFAULT_ADDRESS[1],
"ts": self.formatTime(record, "%Y-%m-%dT%H:%M:%S%z"), daemonize: Annotated[bool, typer.Option("--daemonize", "-d", help="Run tunneld in the background.")] = False,
"level": record.levelname, protocol: Annotated[
"logger": record.name, TunnelProtocol,
"message": record.getMessage(), typer.Option(
} "--protocol",
if record.exc_info: "-p",
payload["exc_info"] = self.formatException(record.exc_info) case_sensitive=False,
return json.dumps(payload, ensure_ascii=True) help="Transport protocol for tunneld (default: TCP on Python >=3.13, otherwise QUIC).",
),
] = TunnelProtocol.DEFAULT,
class LocationUpdate(BaseModel): usb: Annotated[bool, typer.Option(help="Enable USB monitoring")] = True,
latitude: float wifi: Annotated[bool, typer.Option(help="Enable WiFi monitoring")] = True,
longitude: float usbmux: Annotated[bool, typer.Option(help="Enable usbmux monitoring")] = True,
rsd_address: Optional[str] = None mobdev2: Annotated[bool, typer.Option(help="Enable mobdev2 monitoring")] = True,
rsd_port: Optional[int] = None context: Annotated[LocationSimulationState, typer.Option(
help="Location simulation context to use for the server.")] = LocationSimulationState(),
) -> None:
class DeviceShort(BaseModel): """Start Tunneld service for remote tunneling"""
udid: str if not verify_tunnel_imports():
connection_type: str return
tunneld_runner = partial(
TunneldRunnerSio.create,
class DeviceStatus(BaseModel): host,
device_connected: bool = False
device_count: int = 0
devices: Optional[list[DeviceShort]] = None
udid: Optional[str] = None
device_name: Optional[str] = None
product_version: Optional[str] = None
phone_number: Optional[str] = None
developer_mode_enabled: Optional[bool] = None
ddi_mounted: Optional[bool] = None
rsd_address: Optional[str] = None
rsd_port: Optional[int] = None
lockdown_trusted_port: Optional[int] = None
lockdown_untrusted_port: Optional[int] = None
lockdown_trusted_reachable: bool = False
lockdown_untrusted_reachable: bool = False
dtservicehub_reachable: bool = False
class TunnelStartRequest(BaseModel):
protocol: str = "tcp"
wait_for_device: bool = False
wait_timeout_seconds: int = 30
class PreflightResponse(BaseModel):
rsd_address: str
rsd_port: int
interface: Optional[str] = None
protocol: Optional[str] = None
dtservicehub_port: Optional[int] = None
dtservicehub_reachable: bool = False
lockdown_trusted_port: Optional[int] = None
lockdown_untrusted_port: Optional[int] = None
lockdown_trusted_reachable: bool = False
lockdown_untrusted_reachable: bool = False
@dataclass
class TunnelState:
task: Optional[asyncio.Task]
stop_event: asyncio.Event
ready_event: asyncio.Event
result: Optional[TunnelResult] = None
error: Optional[str] = None
udid: Optional[str] = None
_TUNNELS: Dict[str, TunnelState] = {}
_orig_rsd_connect = RemoteServiceDiscoveryService.connect
_orig_remotexpc_connect = RemoteXPCConnection.connect
_orig_create_using_tcp = ServiceConnection.create_using_tcp
# Global reference to keep the CLI process alive
_location_sim_process: Optional[asyncio.subprocess.Process] = None
# Global DVT session for reuse
_dvt_session: Optional[DvtSecureSocketProxyService] = None
_dvt_session_lock = asyncio.Lock()
_current_rsd: Optional[RemoteServiceDiscoveryService] = None
handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())
root_logger = logging.getLogger()
root_logger.handlers = [handler]
root_logger.setLevel(logging.INFO)
logger = logging.getLogger("ios-api")
async def _remotexpc_connect_with_timeout(self):
"""Wrapper to add timeout to TCP connection and handshake"""
logger.info(f"RemoteXPC attempting TCP connection to {self.address}")
tcp_start = time.time()
try:
self._reader, self._writer = await asyncio.wait_for(
asyncio.open_connection(self.address[0], self.address[1]),
timeout=5.0
)
tcp_elapsed = time.time() - tcp_start
logger.info(f"RemoteXPC TCP connected in {tcp_elapsed:.2f}s")
except asyncio.TimeoutError as exc:
tcp_elapsed = time.time() - tcp_start
logger.error(f"RemoteXPC TCP connection to {self.address} timed out after {tcp_elapsed:.2f}s")
raise asyncio.TimeoutError(f"TCP connection to {self.address} timed out after 5s") from exc
except Exception as exc:
tcp_elapsed = time.time() - tcp_start
logger.error(f"RemoteXPC TCP connection to {self.address} failed after {tcp_elapsed:.2f}s: {exc}")
raise
logger.info(f"RemoteXPC starting handshake")
handshake_start = time.time()
try:
await self._do_handshake()
handshake_elapsed = time.time() - handshake_start
logger.info(f"RemoteXPC handshake complete in {handshake_elapsed:.2f}s")
except Exception as exc:
handshake_elapsed = time.time() - handshake_start
logger.error(f"RemoteXPC handshake failed after {handshake_elapsed:.2f}s: {exc}")
await self.close()
raise
async def _rsd_connect_with_timeout(self):
"""Modified connect with faster timeout for lockdown connections"""
await self.service.connect()
try:
self.peer_info = await self.service.receive_response()
self.udid = self.peer_info["Properties"]["UniqueDeviceID"]
self.product_type = self.peer_info["Properties"]["ProductType"]
# Skip lockdown connection - not working over RSD tunnel and not needed for DVT
self.lockdown = None
logger.info("Skipping lockdown connection for RSD (not required for DVT)")
self.all_values = self.lockdown.all_values if self.lockdown is not None else {}
except Exception:
await self.close()
raise
RemoteXPCConnection.connect = _remotexpc_connect_with_timeout
RemoteServiceDiscoveryService.connect = _rsd_connect_with_timeout
def _create_using_tcp_with_ipv6(
hostname: str,
port: int,
keep_alive: bool = True,
create_connection_timeout: int = 5, # Reduced from 20 to 5 seconds
):
"""Force IPv6 connection with reduced timeout"""
import socket as socket_module
from pymobiledevice3.osu.os_utils import get_os_utils
# Force IPv6 socket creation for tunnel addresses
if ':' in hostname: # IPv6 address
logger.info(f"ServiceConnection connecting to {hostname}:{port} with {create_connection_timeout}s timeout")
sock = socket_module.socket(socket_module.AF_INET6, socket_module.SOCK_STREAM)
sock.settimeout(create_connection_timeout)
connect_start = time.time()
try:
sock.connect((hostname, port))
connect_elapsed = time.time() - connect_start
logger.info(f"ServiceConnection connected to {hostname}:{port} in {connect_elapsed:.2f}s")
# Keep a 5-second timeout for subsequent operations for faster failure detection
sock.settimeout(5.0)
if keep_alive:
get_os_utils().set_keepalive(sock)
return ServiceConnection(sock)
except Exception as exc:
connect_elapsed = time.time() - connect_start
logger.error(f"ServiceConnection failed to {hostname}:{port} after {connect_elapsed:.2f}s: {exc}")
sock.close()
raise
else:
# Fall back to original for non-IPv6
return _orig_create_using_tcp(
hostname,
port, port,
keep_alive=keep_alive,
create_connection_timeout=create_connection_timeout,
)
ServiceConnection.create_using_tcp = staticmethod(_create_using_tcp_with_ipv6)
def _get_developer_mode_status() -> bool:
device_serial = _get_single_device_udid()
lockdown = create_using_usbmux(serial=device_serial, autopair=False)
return True if MobileImageMounterService(lockdown).query_developer_mode_status() == 1 else False
def _get_developer_disk_image_status() -> bool:
device_serial = _get_single_device_udid()
lockdown = create_using_usbmux(serial=device_serial, autopair=False)
images = MobileImageMounterService(lockdown).copy_devices()
is_ddi = False
for image in images:
if image.get("DiskImageType") == "Personalized" and image.get("PersonalizedImageType") == "DeveloperDiskImage":
is_ddi = True
return is_ddi
return True if MobileImageMounterService(lockdown).is_image_mounted() == 1 else False
def _get_single_device_udid() -> str:
devices = list_devices()
if not devices:
raise HTTPException(status_code=404, detail="No devices connected")
if len(devices) > 1:
raise HTTPException(status_code=400, detail="Multiple devices connected")
return devices[0].serial
def _parse_protocol(value: str) -> TunnelProtocol:
try:
return TunnelProtocol[value.upper()]
except KeyError as exc:
raise HTTPException(status_code=400, detail=f"Unsupported protocol: {value}") from exc
async def _run_usbmux_tunnel(udid: str, protocol: TunnelProtocol, state: TunnelState) -> None:
lockdown = None
proxy = None
try:
logger.info("Starting usbmux tunnel for udid=%s protocol=%s", udid, protocol.name.lower())
lockdown = create_using_usbmux(serial=udid) if udid else create_using_usbmux()
proxy = await CoreDeviceTunnelProxy.create(lockdown)
async with start_tunnel(proxy, protocol=protocol) as tunnel:
state.result = tunnel
logger.info(
"Tunnel ready udid=%s address=%s port=%s interface=%s protocol=%s",
udid,
tunnel.address,
tunnel.port,
tunnel.interface,
tunnel.protocol.name.lower(),
)
state.ready_event.set()
await state.stop_event.wait()
except Exception as e:
logger.exception("Tunnel failed udid=%s", udid)
state.error = str(e)
state.ready_event.set()
finally:
logger.info("Shutting down tunnel udid=%s", udid)
if proxy is not None:
await proxy.close()
if lockdown is not None:
lockdown.close()
async def _start_tunnel_internal(
udid: str,
protocol: TunnelProtocol,
wait_for_device: bool,
wait_timeout_seconds: int,
) -> TunnelResult:
key = ""
existing = _TUNNELS.get(key)
if existing and existing.task is not None and not existing.task.done():
if existing.result is not None:
return existing.result
if wait_for_device:
timeout = max(1, wait_timeout_seconds)
start = asyncio.get_event_loop().time()
while True:
devices = list_devices()
if len(devices) == 1:
udid = devices[0].serial
break
if asyncio.get_event_loop().time() - start > timeout:
raise HTTPException(status_code=504, detail="Timed out waiting for device")
await asyncio.sleep(0.5)
stop_event = asyncio.Event()
ready_event = asyncio.Event()
state = TunnelState(
task=None,
stop_event=stop_event,
ready_event=ready_event,
udid=udid,
)
_TUNNELS[key] = state
state.task = asyncio.create_task(_run_usbmux_tunnel(udid, protocol, state))
try:
await asyncio.wait_for(ready_event.wait(), timeout=15)
except asyncio.TimeoutError as exc:
raise HTTPException(status_code=504, detail="Timed out waiting for tunnel to start") from exc
if state.error:
_TUNNELS.pop(key, None)
raise HTTPException(status_code=500, detail=f"Failed to start tunnel: {state.error}")
return state.result
def _wait_for_port(address: str, port: int, timeout_seconds: float = 10.0, interval_seconds: float = 0.5) -> bool:
deadline = time.time() + timeout_seconds
while time.time() < deadline:
try:
sock = socket.create_connection((address, port), timeout=2)
sock.close()
return True
except OSError:
if time.time() >= deadline:
break
time.sleep(interval_seconds)
return False
async def _wait_for_port_async(
address: str,
port: int,
timeout_seconds: float = 10.0,
interval_seconds: float = 0.5,
) -> bool:
return await asyncio.to_thread(_wait_for_port, address, port, timeout_seconds, interval_seconds)
def _simulate_location_with_dvt(service_provider, latitude: float, longitude: float) -> None:
logger.info("DVT opening session")
# For RSD connections, manually create the service connection to dtservicehub
# without going through lockdown (which doesn't work over RSD tunnel)
if isinstance(service_provider, RemoteServiceDiscoveryService):
from pymobiledevice3.services.remote_server import RemoteServer
logger.info("Using RSD dtservicehub connection")
# Get raw TCP connection to dtservicehub without RSDCheckin
service = service_provider.start_lockdown_service_without_checkin(
DvtSecureSocketProxyService.RSD_SERVICE_NAME
)
# Manually create RemoteServer instance bypassing LockdownService.__init__
dvt = RemoteServer.__new__(RemoteServer)
dvt.service_name = DvtSecureSocketProxyService.RSD_SERVICE_NAME
dvt.lockdown = service_provider
dvt.service = service
dvt.logger = logging.getLogger(DvtSecureSocketProxyService.__module__)
dvt.should_remove_ssl_context = False
dvt.channel_cache = {}
from pymobiledevice3.services.remote_server import ChannelFragmenter
dvt.channel_messages = {0: ChannelFragmenter()} # BROADCAST_CHANNEL = 0
from pymobiledevice3.services.remote_server import Channel
dvt.broadcast = Channel.create(0, dvt)
import threading
dvt.lock = threading.Lock()
dvt.supported_identifiers = []
else:
dvt = DvtSecureSocketProxyService(service_provider)
try:
handshake_start = time.monotonic()
logger.info("DVT handshake start")
dvt.perform_handshake()
handshake_seconds = time.monotonic() - handshake_start
logger.info("DVT handshake complete in %.2fs", handshake_seconds)
set_start = time.monotonic()
LocationSimulation(dvt).set(latitude, longitude)
set_seconds = time.monotonic() - set_start
logger.info("DVT location set sent in %.2fs", set_seconds)
finally:
if isinstance(service_provider, RemoteServiceDiscoveryService):
dvt.service.close()
async def _get_or_create_dvt_session(rsd: RemoteServiceDiscoveryService):
"""Get existing DVT session or create a new one (with proper initialization)"""
global _dvt_session, _current_rsd
async with _dvt_session_lock:
# If we have a session and it's for the same RSD connection, reuse it
if _dvt_session is not None and _current_rsd is rsd:
logger.info("Reusing existing DVT session")
return _dvt_session
# Close old session if it exists
if _dvt_session is not None:
logger.info("Closing old DVT session")
try:
_dvt_session.service.close()
except:
pass
_dvt_session = None
_current_rsd = None
# Create new DVT session
logger.info("Creating new DVT session")
def _create_dvt():
from pymobiledevice3.services.remote_server import RemoteServer
# Get raw TCP connection to dtservicehub without RSDCheckin
service = rsd.start_lockdown_service_without_checkin(
DvtSecureSocketProxyService.RSD_SERVICE_NAME
)
# Manually create RemoteServer instance
dvt = RemoteServer.__new__(RemoteServer)
dvt.service_name = DvtSecureSocketProxyService.RSD_SERVICE_NAME
dvt.lockdown = rsd
dvt.service = service
dvt.logger = logging.getLogger(DvtSecureSocketProxyService.__module__)
dvt.should_remove_ssl_context = False
dvt.channel_cache = {}
from pymobiledevice3.services.remote_server import ChannelFragmenter
dvt.channel_messages = {0: ChannelFragmenter()}
from pymobiledevice3.services.remote_server import Channel
dvt.broadcast = Channel.create(0, dvt)
import threading
dvt.lock = threading.Lock()
dvt.supported_identifiers = []
# Perform handshake
logger.info("DVT handshake start")
handshake_start = time.monotonic()
dvt.perform_handshake()
handshake_seconds = time.monotonic() - handshake_start
logger.info("DVT handshake complete in %.2fs", handshake_seconds)
return dvt
_dvt_session = await asyncio.wait_for(
asyncio.to_thread(_create_dvt),
timeout=10.0
)
_current_rsd = rsd
return _dvt_session
async def _simulate_location_via_library(rsd: RemoteServiceDiscoveryService, latitude: float, longitude: float) -> None:
"""Use library with persistent DVT session to avoid location bounce"""
logger.info("Using library for location simulation (persistent session)")
dvt = await _get_or_create_dvt_session(rsd)
# Set location using the persistent session
def _set_location():
set_start = time.monotonic()
LocationSimulation(dvt).set(latitude, longitude)
set_seconds = time.monotonic() - set_start
logger.info("DVT location set in %.2fs", set_seconds)
await asyncio.to_thread(_set_location)
logger.info("Location updated successfully (session maintained)")
async def _simulate_location_via_cli(address: str, port: int, latitude: float, longitude: float) -> None:
"""Use pymobiledevice3 CLI with GPX playback to avoid location bounce"""
global _location_sim_process
import tempfile
import xml.etree.ElementTree as ET
logger.info("Using pymobiledevice3 CLI for location simulation (GPX playback)")
# Kill any existing location simulation process
if _location_sim_process is not None:
logger.info("Stopping previous location simulation")
try:
_location_sim_process.terminate()
await asyncio.wait_for(_location_sim_process.wait(), timeout=2.0)
except:
try:
_location_sim_process.kill()
await _location_sim_process.wait()
except:
pass
_location_sim_process = None
# Create a GPX file with a single stationary point that loops
# This prevents the location from bouncing back when we restart the process
gpx_content = f'''<?xml version="1.0" encoding="UTF-8"?>
<gpx version="1.1" creator="map-sim-location">
<trk>
<name>Simulated Location</name>
<trkseg>
<trkpt lat="{latitude}" lon="{longitude}">
<time>2024-01-01T00:00:00Z</time>
</trkpt>
<trkpt lat="{latitude}" lon="{longitude}">
<time>2024-01-01T00:00:01Z</time>
</trkpt>
</trkseg>
</trk>
</gpx>'''
# Write GPX to temporary file
gpx_file = tempfile.NamedTemporaryFile(mode='w', suffix='.gpx', delete=False)
gpx_file.write(gpx_content)
gpx_file.flush()
gpx_path = gpx_file.name
gpx_file.close()
cmd = [
".venv/bin/pymobiledevice3",
"developer",
"dvt",
"simulate-location",
"play",
"--rsd",
address,
str(port),
"--disable-sleep", # Play immediately without delays
gpx_path,
]
logger.info(f"Running CLI command: {' '.join(cmd)}")
# Start the process and keep it alive
_location_sim_process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.DEVNULL,
)
# Wait briefly to ensure location is set
await asyncio.sleep(1.5)
# Check if process is still running
if _location_sim_process.returncode is not None:
error_msg = f"CLI process exited with code {_location_sim_process.returncode}"
logger.error(error_msg)
_location_sim_process = None
# Clean up temp file
try:
os.unlink(gpx_path)
except:
pass
raise HTTPException(status_code=500, detail=error_msg)
logger.info("CLI location simulation started (GPX playback maintains location)")
# Note: We don't clean up the GPX file here since the process needs it
# It will be cleaned up when the process is stopped
async def _simulate_location_via_rsd_async(address: str, port: int, latitude: float, longitude: float) -> None:
logger.info("Connecting to RSD address=%s port=%s", address, port)
port_check_start = time.monotonic()
if not await _wait_for_port_async(address, port, timeout_seconds=3, interval_seconds=0.5):
port_check_seconds = time.monotonic() - port_check_start
raise HTTPException(
status_code=504,
detail=f"RSD port unreachable after {port_check_seconds:.2f}s",
)
port_check_seconds = time.monotonic() - port_check_start
logger.info("RSD port reachable in %.2fs", port_check_seconds)
rsd = RemoteServiceDiscoveryService((address, port))
connect_timeout = 60
max_attempts = 2
for attempt in range(1, max_attempts + 1):
connect_start = time.monotonic()
try:
await asyncio.wait_for(rsd.connect(), timeout=connect_timeout)
connect_seconds = time.monotonic() - connect_start
logger.info("RSD connect complete in %.2fs on attempt %s/%s", connect_seconds, attempt, max_attempts)
break
except asyncio.TimeoutError as exc:
connect_seconds = time.monotonic() - connect_start
if attempt >= max_attempts:
raise HTTPException(
status_code=504,
detail=f"Timed out connecting to RSD after {max_attempts} attempts",
) from exc
logger.warning(
"RSD connect timed out in %.2fs on attempt %s/%s; retrying",
connect_seconds,
attempt,
max_attempts,
)
await asyncio.sleep(2)
try:
services = rsd.peer_info.get("Services", {})
dtservicehub = services.get(DvtSecureSocketProxyService.RSD_SERVICE_NAME, {})
logger.info("RSD services keys=%s", list(services.keys()))
logger.info("RSD dtservicehub entry=%s", dtservicehub)
dt_port = int(dtservicehub.get("Port", 0)) if dtservicehub else 0
if dt_port:
logger.info("Waiting for RSD dtservicehub port address=%s port=%s", address, dt_port)
wait_start = time.monotonic()
if await _wait_for_port_async(address, dt_port, timeout_seconds=30, interval_seconds=0.5):
wait_seconds = time.monotonic() - wait_start
logger.info("RSD dtservicehub port reachable address=%s port=%s", address, dt_port)
else:
wait_seconds = time.monotonic() - wait_start
logger.warning("RSD dtservicehub port still unreachable address=%s port=%s", address, dt_port)
logger.info("RSD dtservicehub wait complete in %.2fs", wait_seconds)
else:
logger.warning("RSD dtservicehub missing or port=0")
logger.info("Starting DVT location set")
await asyncio.to_thread(_simulate_location_with_dvt, rsd, latitude, longitude)
finally:
await rsd.close()
def _simulate_location_via_rsd(address: str, port: int, latitude: float, longitude: float) -> None:
asyncio.run(_simulate_location_via_rsd_async(address, port, latitude, longitude))
async def _preflight_rsd_async(
address: str,
port: int,
interface: Optional[str],
protocol: Optional[str],
) -> PreflightResponse:
rsd = RemoteServiceDiscoveryService((address, port))
await rsd.connect()
try:
services = rsd.peer_info.get("Services", {})
dtservicehub = services.get(DvtSecureSocketProxyService.RSD_SERVICE_NAME, {})
dt_port = int(dtservicehub.get("Port", 0)) if dtservicehub else 0
lockdown_trusted = services.get("com.apple.mobile.lockdown.remote.trusted", {})
lockdown_untrusted = services.get("com.apple.mobile.lockdown.remote.untrusted", {})
trusted_port = int(lockdown_trusted.get("Port", 0)) if lockdown_trusted else 0
untrusted_port = int(lockdown_untrusted.get("Port", 0)) if lockdown_untrusted else 0
dt_reachable = (
await _wait_for_port_async(address, dt_port, timeout_seconds=5, interval_seconds=0.5) if dt_port else False
)
trusted_reachable = (
await _wait_for_port_async(address, trusted_port, timeout_seconds=3, interval_seconds=0.5)
if trusted_port
else False
)
untrusted_reachable = (
await _wait_for_port_async(address, untrusted_port, timeout_seconds=3, interval_seconds=0.5)
if untrusted_port
else False
)
return PreflightResponse(
rsd_address=address,
rsd_port=port,
interface=interface,
protocol=protocol, protocol=protocol,
dtservicehub_port=dt_port or None, usb_monitor=usb,
dtservicehub_reachable=dt_reachable, wifi_monitor=wifi,
lockdown_trusted_port=trusted_port or None, usbmux_monitor=usbmux,
lockdown_untrusted_port=untrusted_port or None, mobdev2_monitor=mobdev2,
lockdown_trusted_reachable=trusted_reachable, context=context,
lockdown_untrusted_reachable=untrusted_reachable,
) )
finally: if daemonize:
await rsd.close()
app = FastAPI(title="iOS Device Management API")
@app.get("/api/status")
async def get_status():
"""Lists all devices visible to USBMux."""
try: try:
devices = list_devices() from daemonize import Daemonize
if len(devices) > 1: except ImportError as e:
device_list = [] raise NotImplementedError("daemonizing is only supported on unix platforms") from e
for d in devices: with tempfile.NamedTemporaryFile("wt") as pid_file:
device_list.append(DeviceShort(udid=d.serial, connection_type=d.connection_type)) daemon = Daemonize(app=f"Tunneld {host}:{port}", pid=pid_file.name, action=tunneld_runner)
return DeviceStatus(device_connected=True, device_count=len(devices), devices=device_list) logger.info(f"starting Tunneld {host}:{port}")
try: daemon.start()
lockdown = create_using_usbmux(serial=devices[0].serial, autopair=False)
state = _TUNNELS.get("")
if state and state.result:
rsd_address = state.result.address
rsd_port = state.result.port
else: else:
udid = devices[0].serial tunneld_runner()
logger.info("Auto-starting tunnel")
result = await _start_tunnel_internal(
udid,
TunnelProtocol.TCP,
True,
30,
)
rsd_address = result.address
rsd_port = result.port
return DeviceStatus(device_connected=True, device_count=len(devices), udid=devices[0].serial,
device_name=lockdown.get_value(key='DeviceName'),
product_version=lockdown.product_version,
phone_number=lockdown.get_value(key='PhoneNumber'),
developer_mode_enabled=_get_developer_mode_status(),
ddi_mounted=_get_developer_disk_image_status(),
rsd_address=rsd_address, rsd_port=rsd_port, )
except Exception as e:
logger.info("Error establishing lockdown: %s", str(e))
return DeviceStatus(device_connected=False, device_count=0)
except Exception as e:
logger.info("No device connected: %s", str(e))
return DeviceStatus(device_connected=False, device_count=0)
@app.get("/api/lockdown/status")
async def get_lockdown_status():
"""Checks lockdown connectivity and basic device info."""
try:
device_serial = _get_single_device_udid()
lockdown = create_using_usbmux(serial=device_serial, autopair=False)
return {
"udid": device_serial,
"product_version": lockdown.product_version,
"device_name": lockdown.get_value(key='DeviceName'),
"phone_number": lockdown.get_value(key='PhoneNumber'),
"status": "Connected"
}
except Exception as e:
return {"status": "Disconnected", "error": str(e)}
@app.post("/api/tunnel/start")
async def start_usb_tunnel(data: TunnelStartRequest):
"""Starts a CoreDevice tunnel to a USB device and returns RSD connection details."""
udid = _get_single_device_udid()
key = ""
@app.post("/api/tunnel/stop")
async def stop_usb_tunnel():
"""Stops a previously started tunnel."""
key = ""
state = _TUNNELS.get(key)
if state is None:
raise HTTPException(status_code=404, detail="No tunnel found")
if state.task is None:
_TUNNELS.pop(key, None)
raise HTTPException(status_code=500, detail="Tunnel state is incomplete")
logger.info("Stopping tunnel")
state.stop_event.set()
await state.task
_TUNNELS.pop(key, None)
return {"status": "stopped"}
@app.post("/api/tunnel/stop-all")
async def stop_all_tunnels():
"""Stops all running tunnels."""
if not _TUNNELS:
return {"status": "stopped", "count": 0}
logger.info("Stopping all tunnels count=%s", len(_TUNNELS))
items = list(_TUNNELS.items())
for _, state in items:
if state.task is not None:
state.stop_event.set()
await asyncio.gather(
*[state.task for _, state in items if state.task is not None],
return_exceptions=True,
)
_TUNNELS.clear()
return {"status": "stopped", "count": len(items)}
@app.get("/api/tunnel/status")
async def get_tunnel_status():
"""Returns the status of all active tunnels."""
items = []
for key, state in _TUNNELS.items():
if state.result is None:
continue
if state.task is None:
continue
items.append(
{
"udid": state.udid,
"rsd_address": state.result.address,
"rsd_port": state.result.port,
"interface": state.result.interface,
"protocol": state.result.protocol.name.lower(),
"running": not state.task.done(),
}
)
return {"tunnels": items}
@app.get("/api/preflight", response_model=PreflightResponse)
async def preflight():
"""Checks RSD connectivity and service port reachability."""
state = _TUNNELS.get("")
if state is None or state.result is None:
udid = _get_single_device_udid()
logger.info("Auto-starting tunnel for preflight")
result = await _start_tunnel_internal(
udid,
TunnelProtocol.TCP,
True,
30,
)
address = result.address
port = result.port
return await _preflight_rsd_async(
address,
port,
result.interface,
result.protocol.name.lower(),
)
address = state.result.address
port = state.result.port
return await _preflight_rsd_async(
address,
port,
state.result.interface,
state.result.protocol.name.lower(),
)
@app.post("/api/simulate-location/clear")
async def clear_location():
"""Stops location simulation by closing the DVT session."""
global _location_sim_process, _dvt_session, _current_rsd
logger.info("Clearing location simulation")
# Close DVT session if it exists
async with _dvt_session_lock:
if _dvt_session is not None:
try:
# Clear the location first
def _clear():
LocationSimulation(_dvt_session).clear()
await asyncio.to_thread(_clear)
except:
pass
try:
_dvt_session.service.close()
except:
pass
_dvt_session = None
_current_rsd = None
# Also kill CLI process if running
if _location_sim_process is not None:
try:
_location_sim_process.terminate()
await asyncio.wait_for(_location_sim_process.wait(), timeout=2.0)
except:
try:
_location_sim_process.kill()
await _location_sim_process.wait()
except:
pass
_location_sim_process = None
logger.info("Location simulation cleared")
return {"status": "cleared"}
@app.post("/api/simulate-location")
async def set_location(data: LocationUpdate):
"""Sets a simulated GPS location on the device."""
try:
logger.info("Simulate location request lat=%s lon=%s", data.latitude, data.longitude)
rsd_address = data.rsd_address
rsd_port = data.rsd_port
if rsd_address is None and rsd_port is None:
state = _TUNNELS.get("")
if state and state.result:
rsd_address = state.result.address
rsd_port = state.result.port
if rsd_address is None and rsd_port is None:
udid = _get_single_device_udid()
logger.info("Auto-starting tunnel for simulate-location")
result = await _start_tunnel_internal(
udid,
TunnelProtocol.TCP,
True,
30,
)
rsd_address = result.address
rsd_port = result.port
if rsd_address is not None and rsd_port is not None:
# Use CLI approach - library DVT handshake has unresolved issues
# Note: Location will briefly bounce back to real location when changing
await _simulate_location_via_cli(rsd_address, rsd_port, data.latitude, data.longitude)
else:
raise HTTPException(status_code=400, detail="RSD address/port required")
logger.info("Simulate location success")
return {"status": "success", "location": {"lat": data.latitude, "lon": data.longitude}}
except HTTPException:
raise
except Exception as e:
logger.exception("Simulate location failed")
raise HTTPException(status_code=500, detail=f"Failed to set location: {str(e)}")
# 4. Entry point (always last)
if __name__ == "__main__": if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000) main()

View File

@@ -1,13 +1,27 @@
[project] [project]
name = "back-end" name = "pymd3_vue_location_sim"
version = "0.1.0" version = "0.1.0"
requires-python = ">=3.14" requires-python = ">=3.14"
dependencies = [ dependencies = [
"click>=8.3.1",
"daemonize>=2.5.0",
"fastapi==0.135.1", "fastapi==0.135.1",
"geopy[aiohttp]==2.4.1",
"httpx>=0.28.1",
"numpy==2.4.3",
"pydantic==2.12.5", "pydantic==2.12.5",
"flask==3.1.3", "pyicloud>=2.4.1",
"flask-cors==6.0.2", "pymobiledevice3==9.8.1",
"pymobiledevice3==7.8.3", "python-dotenv>=1.2.2",
"python-socketio==5.16.1",
"sqlalchemy>=2.0.48",
"sqlalchemy-orm>=1.2.10",
"typer>=0.24.1",
"typing==3.10.0.0", "typing==3.10.0.0",
"uvicorn==0.41.0", "uvicorn==0.41.0",
] ]
[build-system]
requires = ["uv_build>=0.11.2,<0.12"]
build-backend = "uv_build"

0
src/__init__.py Normal file
View File

View File

View File

@@ -0,0 +1,138 @@
import logging
import sqlite3
import json
from geopy.geocoders import Nominatim
from geopy.adapters import AioHTTPAdapter
from geopy.extra.rate_limiter import AsyncRateLimiter
logger = logging.getLogger("ios-api")
CACHE_LOOKUP_SQL = "SELECT address, favorite FROM location_cache WHERE lat_lon = ?"
CACHE_UPSERT_SQL = """
INSERT INTO location_cache (lat_lon, address)
VALUES (?, ?)
ON CONFLICT(lat_lon) DO UPDATE SET address = excluded.address
"""
FAVORITE_LOOKUP_SQL = "SELECT favorite FROM location_cache WHERE favorite IS NOT NULL"
FAVORITE_UPSERT_SQL = """
INSERT INTO location_cache (lat_lon, favorite)
VALUES (?, ?)
ON CONFLICT(lat_lon) DO UPDATE SET favorite = excluded.favorite
"""
FAVORITE_CLEAR_SQL = "UPDATE location_cache SET favorite = NULL WHERE lat_lon = ?"
class AsyncReverseGeocoder:
def __init__(
self,
db_path: str = "geocache.db",
user_agent: str = "pymd3_vue_location_sim/0.1.0 (iam@williambr.uno)",
):
self.db_path = db_path
self.user_agent = user_agent
self._init_db()
def _init_db(self) -> None:
"""Initializes the SQLite database."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS location_cache (
lat_lon TEXT PRIMARY KEY,
address TEXT,
favorite TEXT
)
''')
cursor.execute("PRAGMA table_info(location_cache)")
columns = {row[1] for row in cursor.fetchall()}
if "favorite" not in columns:
cursor.execute("ALTER TABLE location_cache ADD COLUMN favorite TEXT")
conn.commit()
@staticmethod
def _cache_key(lat: float, lon: float) -> str:
return f"{lat:.5f},{lon:.5f}"
def _read_cached_address(self, key: str) -> dict | None:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(CACHE_LOOKUP_SQL, (key,))
row = cursor.fetchone()
cached = {}
if row:
cached['address'] = json.loads(row[0]) if row[0] else {}
cached['favorite'] = json.loads(row[1]) if row[1] else {}
return cached if row else None
def _store_cached_address(self, key: str, address_data: dict) -> None:
with sqlite3.connect(self.db_path) as conn:
conn.execute(CACHE_UPSERT_SQL, (key, json.dumps(address_data)))
conn.commit()
def _read_favorites(self) -> list | None:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(FAVORITE_LOOKUP_SQL)
rows = cursor.fetchall()
favorites = [json.loads(row[0]) for row in rows if row[0]]
return favorites if favorites else None
async def clear_favorite(self, lat: float, lon: float) -> bool:
key = self._cache_key(lat, lon)
cached = self._read_cached_address(key)
if cached and cached.get("favorite"):
favorite = cached.get("favorite")
name = favorite.get("name") if isinstance(favorite, dict) else key
logger.info("Clearing favorite %s", name)
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(FAVORITE_CLEAR_SQL, (key,))
conn.commit()
return cursor.rowcount > 0
else:
return False
async def set_favorite(self, lat: float, lon: float, favorite_data: dict) -> bool:
key = self._cache_key(lat, lon)
name = favorite_data.get("name")
if not name:
return False
logger.info("Setting favorite location %s to location_cache for %s", name, key)
try:
with sqlite3.connect(self.db_path) as conn:
conn.execute(FAVORITE_UPSERT_SQL, (key, json.dumps(favorite_data)))
conn.commit()
return True
except sqlite3.Error as e:
logger.exception("Failed to set favorite location %s to location_cache for %s: %s", name, key, e)
return False
async def get_favorites(self) -> list | None:
"""Reverse geocode with SQLite cache."""
logger.info("Checking location_cache for favorites")
favorites = self._read_favorites()
return favorites
async def get_address(self, lat: float, lon: float) -> dict | None:
"""Reverse geocode with SQLite cache."""
key = self._cache_key(lat, lon)
cached_address = self._read_cached_address(key)
if cached_address is not None:
logger.info("Found cached address for %s", key)
return cached_address
async with Nominatim(
user_agent=self.user_agent,
adapter_factory=AioHTTPAdapter
) as geolocator:
try:
reverse = AsyncRateLimiter(
geolocator.reverse, min_delay_seconds=1)
location = await reverse(key)
if location:
response = {}
logger.info("Nominatim response: %s", location)
response['address'] = location.raw.get("address", {})
self._store_cached_address(key, response['address'])
return response
return None
except Exception:
logger.exception("Reverse geocoding failed for key=%s", key)
return None

View File

@@ -0,0 +1,342 @@
import asyncio
import logging
import os
import sys
import textwrap
from collections.abc import Callable
import click
from dotenv import load_dotenv
from pyicloud import PyiCloudService
from pyicloud.exceptions import (
PyiCloud2FARequiredException,
PyiCloud2SARequiredException,
PyiCloudAuthRequiredException,
PyiCloudFailedLoginException,
PyiCloudNoStoredPasswordAvailableException,
PyiCloudPasswordException,
)
from .models import ICloudReturnData
load_dotenv()
logger = logging.getLogger("ios-api")
COOKIE_DIRECTORY = "./cookies"
ENV_APPLE_ID = "APPLE_ID"
ENV_APPLE_PW = "APPLE_PW"
ENV_SELECTED_DEVICE_ID = "SELECTED_DEVICE_ID"
ENV_SELECTED_DEVICE_NAME = "SELECTED_DEVICE_NAME"
ENV_AUTH_INIT_TIMEOUT = "ICLOUD_AUTH_INIT_TIMEOUT_SECONDS"
BACKOFF_SCHEDULE = (15, 30, 60, 120, 300)
AUTH_EXCEPTIONS = (
PyiCloudAuthRequiredException,
PyiCloudFailedLoginException,
PyiCloudPasswordException,
PyiCloud2FARequiredException,
PyiCloud2SARequiredException,
PyiCloudNoStoredPasswordAvailableException,
)
class FindMyMonitor:
def __init__(
self,
queue: asyncio.Queue,
token_file: str = "icloud_token.txt",
sio=None,
get_client_sids: Callable[[], list[str]] | None = None,
code_timeout_seconds: int = 180,
):
self.username = os.getenv(ENV_APPLE_ID)
self.password = os.getenv(ENV_APPLE_PW)
self.token_file = token_file
self.selected_device_id = os.getenv(ENV_SELECTED_DEVICE_ID)
self.selected_device_name = os.getenv(ENV_SELECTED_DEVICE_NAME)
self.selected_device = None
self.queue = queue
self.api = None
self.device = None
self.running = True
self.sio = sio
self.get_client_sids = get_client_sids
self.code_timeout_seconds = code_timeout_seconds
self.auth_init_timeout_seconds = int(
os.getenv(ENV_AUTH_INIT_TIMEOUT, "0")
)
self._logged_candidates = False
self._no_location_streak = 0
self._auth_error_streak = 0
self._fetch_lock = asyncio.Lock()
async def _create_api_session(self, has_token: bool) -> bool:
try:
init_args = [self.username]
if not has_token:
init_args.append(self.password)
init_task = asyncio.to_thread(
PyiCloudService, *init_args, cookie_directory=COOKIE_DIRECTORY
)
if self.auth_init_timeout_seconds > 0:
self.api = await asyncio.wait_for(
init_task, timeout=self.auth_init_timeout_seconds
)
else:
self.api = await init_task
return True
except Exception:
source = "cookies" if has_token else "credentials"
logger.exception("Failed to initialize iCloud session from %s", source)
return False
async def _request_code_from_vue(self, prompt: str) -> str | None:
if self.sio is None or self.get_client_sids is None:
logger.warning("2FA request skipped: Socket.IO context not configured")
return None
sids = self.get_client_sids()
if not sids:
logger.warning("2FA request skipped: no connected Socket.IO clients")
return None
payload = {"prompt": prompt, "digits": 6}
logger.info("Emitting icloud_2fa_request to %s connected client(s)", len(sids))
for sid in sids:
try:
# Ask one connected UI client for the code and wait for ACK response.
response = await self.sio.call(
"icloud_2fa_request",
payload,
to=sid,
namespace="/",
timeout=self.code_timeout_seconds,
)
except Exception:
logger.exception("Failed to retrieve 2FA code from sid=%s", sid)
continue
code = response.get("code") if isinstance(response, dict) else response
code_str = str(code).strip() if code is not None else ""
if len(code_str) == 6 and code_str.isdigit():
return code_str
logger.warning("Invalid 2FA code payload from sid=%s", sid)
return None
async def authenticate(self) -> bool:
"""Authenticates with iCloud, handling 2FA and token storage."""
if not self.username:
logger.warning("APPLE_ID is not configured; skipping iCloud monitor authentication")
return False
has_token = os.path.exists(self.token_file)
if not has_token and not self.password:
logger.warning(
"No stored iCloud session and APPLE_PW is not configured; skipping iCloud monitor authentication"
)
return False
logger.info(
"Initializing iCloud session from %s",
"stored cookies" if has_token else "credentials",
)
if not await self._create_api_session(has_token=has_token):
return False
if self.api.requires_2fa:
logger.info("Two-factor authentication required.")
code = await self._request_code_from_vue("Enter the 6-digit Apple verification code")
if code is None:
if sys.stdin and sys.stdin.isatty():
code = str(
await asyncio.to_thread(
click.prompt,
"Please enter the 6-digit code sent to your trusted device",
type=int,
)
)
else:
logger.warning(
"2FA required but no interactive terminal or Vue responder is available; deferring authentication"
)
return False
result = await asyncio.to_thread(self.api.validate_2fa_code, code)
logger.info("2FA validation result: %s", result)
if not result:
logger.warning("Failed to verify 2FA code")
return False
await asyncio.to_thread(self.api.trust_session)
if self.api.requires_2sa:
logger.info(textwrap.dedent("""
Two-step authentication required.
Please select a device to receive a SMS verification code:
"""))
for i, device in enumerate(self.api.trusted_devices):
logger.info(
" %s: %s (%s)",
i + 1,
device.get("deviceName", "Unknown device"),
device.get("phoneNumber", "Unknown number"),
)
if not (sys.stdin and sys.stdin.isatty()):
logger.warning(
"2SA required but no interactive terminal is available; deferring authentication"
)
return False
device_index = await asyncio.to_thread(click.prompt, "Please select a device number", type=int) - 1
device = self.api.trusted_devices[device_index]
if not await asyncio.to_thread(self.api.send_verification_code, device):
logger.warning("Failed to send verification code")
return False
code = await asyncio.to_thread(click.prompt, "Please enter verification code", type=int)
if not await asyncio.to_thread(self.api.validate_verification_code, device, code):
logger.warning("Failed to verify verification code")
return False
logger.info("Successfully authenticated.")
return True
async def get_location(self) -> ICloudReturnData | None:
"""Fetches the latest latitude and longitude."""
if not self.api:
await self.authenticate()
# pyicloud 2.4.1 refresh lives on the FindMy manager, not PyiCloudService.
devices_manager = await asyncio.to_thread(lambda: self.api.devices)
await asyncio.to_thread(devices_manager.refresh, True)
# One-time diagnostics to help verify exact device selection among duplicates.
if not self._logged_candidates:
for d in devices_manager:
status = d.status()
logger.debug(
"iCloud candidate id=%s name=%s model=%s deviceStatus=%s has_location=%s features=%s",
d.data.get("id"),
d.name,
d.model_name,
status.get("deviceStatus"),
d.data.get("location") is not None,
d.data.get("features"),
)
self._logged_candidates = True
# Select by ID first (exact), then by name, then first device if no selectors set.
self.selected_device = None
for device in devices_manager:
if self.selected_device_id and device.data.get("id") == self.selected_device_id:
self.selected_device = device
break
if self.selected_device is None:
for device in devices_manager:
if self.selected_device_name and device.name == self.selected_device_name:
self.selected_device = device
break
if self.selected_device is None and not self.selected_device_id and not self.selected_device_name:
for device in devices_manager:
self.selected_device = device
break
if self.selected_device:
location = self.selected_device.location
status = self.selected_device.status()
logger.info(
"iCloud device=%s location_available=%s status=%s",
self.selected_device.name,
self.selected_device.location_available,
status,
)
if location:
data = {
"latitude": location['latitude'],
"longitude": location['longitude'],
"timeStamp": location['timeStamp'],
"altitude": location['altitude'],
"horizontalAccuracy": location['horizontalAccuracy'],
"verticalAccuracy": location['verticalAccuracy'],
"batteryLevel": status['batteryLevel'],
"deviceDisplayName": status['deviceDisplayName'],
"deviceStatus": status['deviceStatus'],
"name": status['name']
}
response = ICloudReturnData(**data)
return response
logger.info("Location payload is None for device=%s", self.selected_device.name)
return None
logger.warning(
"No iCloud device matched SELECTED_DEVICE_ID='%s' or SELECTED_DEVICE_NAME='%s'.",
self.selected_device_id,
self.selected_device_name,
)
return None
def start(self):
self.running = True
def stop(self):
self.running = False
def _no_location_backoff_seconds(self, base_interval: int) -> int:
idx = min(self._no_location_streak - 1, len(BACKOFF_SCHEDULE) - 1)
return max(base_interval, BACKOFF_SCHEDULE[idx])
def _auth_backoff_seconds(self, base_interval: int) -> int:
idx = min(self._auth_error_streak - 1, len(BACKOFF_SCHEDULE) - 1)
return max(base_interval, BACKOFF_SCHEDULE[idx])
async def refresh_location(self):
"""Fetch one location update while serializing iCloud API access."""
async with self._fetch_lock:
return await self.get_location()
async def run_monitor(self, interval=60):
"""Runs the monitor loop."""
if not await self.authenticate():
return
if not self.running:
self.start()
while self.running:
sleep_seconds = interval
try:
device_data = await self.refresh_location()
if device_data is not None:
self._no_location_streak = 0
self._auth_error_streak = 0
logger.info(
"%s - Location: %s, %s",
device_data.timeStamp,
device_data.latitude,
device_data.longitude,
)
await self.queue.put(device_data)
else:
self._no_location_streak += 1
sleep_seconds = self._no_location_backoff_seconds(interval)
logger.info(
"No iCloud location found for device id='%s' name='%s' streak=%s next_retry=%ss",
self.selected_device_id,
self.selected_device_name,
self._no_location_streak,
sleep_seconds,
)
except AUTH_EXCEPTIONS as e:
self._auth_error_streak += 1
sleep_seconds = self._auth_backoff_seconds(interval)
logger.warning(
"iCloud auth error (%s). Re-authenticating; streak=%s next_retry=%ss",
type(e).__name__,
self._auth_error_streak,
sleep_seconds,
)
try:
await self.authenticate()
except Exception:
logger.exception("iCloud re-authentication failed")
except Exception as e:
logger.exception("iCloud monitor loop error: %s", e)
sleep_seconds = max(interval, 30)
await asyncio.sleep(sleep_seconds)

View File

@@ -0,0 +1,18 @@
import logging
import json
handler = logging.StreamHandler()
root_logger = logging.getLogger()
logger = logging.getLogger("ios-api")
class JsonFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
payload = {
"ts": self.formatTime(record, "%Y-%m-%dT%H:%M:%S%z"),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
if record.exc_info:
payload["exc_info"] = self.formatException(record.exc_info)
return json.dumps(payload, ensure_ascii=True)

View File

@@ -0,0 +1,82 @@
from __future__ import annotations
from typing import Optional
from pydantic import BaseModel
class Coordinate(BaseModel):
latitude: float
longitude: float
class ScheduledCoordinate(Coordinate):
delay: int = 0
start: Optional[str] = None
end: Optional[str] = None
class SimulationStatusData(Coordinate):
start: float
end: Optional[float] = None
next_move: Optional[float] = None
class SimulationStatus(BaseModel):
status: bool
data: Optional[SimulationStatusData] = None
class SimulationRequestData(ScheduledCoordinate):
pass
class SimulationRequest(BaseModel):
status: bool
data: Optional[SimulationRequestData] = None
class SimulationRequestResponseData(ScheduledCoordinate):
loc_id: str
class SimulationQueueList(BaseModel):
data: Optional[SimulationRequestResponseData] = None
class SimulationRequestResponse(BaseModel):
status: bool
data: Optional[SimulationRequestResponseData] = None
class SimulationQueueDict(BaseModel):
location_id: dict[str, SimulationRequestResponseData]
class ICloudLocationData(Coordinate):
timestamp: str
class ICloudReturnData(Coordinate):
timeStamp: int
altitude: float
horizontalAccuracy: float
verticalAccuracy: float
batteryLevel: float
deviceDisplayName: str
deviceStatus: int
name: str
class LatLng(Coordinate):
pass
class ORSRequest(BaseModel):
geometry_simplify: bool
coordinates: list[list[float]]
# Backward compatibility aliases for existing imports.
iCloudLocationData = ICloudLocationData
iCloudReturnData = ICloudReturnData

File diff suppressed because it is too large Load Diff

792
uv.lock generated

File diff suppressed because it is too large Load Diff