reverse geocode, icloud monitor

This commit is contained in:
2026-04-01 10:32:35 -04:00
parent 1eef99e3b4
commit a7af0faefc
10 changed files with 515 additions and 382 deletions

View File

@@ -98,10 +98,11 @@ class LocationSimulationState:
self.loc_id: Optional[str] = None
self.longitude: Optional[float] = None
self.next_move: Optional[float] = None
self.queue: asyncio.Queue = asyncio.Queue()
self.queue_data: Dict = {}
self.queue_order: list[str] = []
self.queue_state: str = "STOPPED"
self.simulation_queue: asyncio.Queue = asyncio.Queue()
self.simulation_queue_data: Dict = {}
self.simulation_queue_order: list[str] = []
self.simulation_queue_state: str = "STOPPED"
self.simulation_noise: bool = False
self.set_location_enabled: bool = True
self.simulation_active: bool = False
self.simulation_task: Optional[asyncio.Task] = None
@@ -441,7 +442,7 @@ class TunneldRunnerSio:
"""Start Simulation Queue Worker"""
logger.info("Starting location simulation worker...")
self.context.simulation_active = True
self.context.queue_state = "RUNNING"
self.context.simulation_queue_state = "RUNNING"
try:
if self.context.test_mode:
logger.info("Simulation worker: test mode enabled")
@@ -544,6 +545,77 @@ class TunneldRunnerSio:
self.context.simulation_task = None
await end_icloud_monitor()
async def add_location_to_simulation_queue(data):
loc_id = str(uuid.uuid4())
latitude = (
data.get("latitude")
if isinstance(data, dict)
else getattr(data, "latitude", None)
)
longitude = (
data.get("longitude")
if isinstance(data, dict)
else getattr(data, "longitude", None)
)
delay = (
data.get("delay", 0)
if isinstance(data, dict)
else getattr(data, "delay", 0)
)
try:
delay = parse_delay_seconds(delay)
except ValueError as e:
return {"status": "error","command": "add", "message": str(e) }
if latitude is not None and longitude is not None:
logger.info(
"Adding location %s (%s, %s) with %s delay to the queue",
loc_id,
latitude,
longitude,
delay,
)
accrued_delay = 0
if self.context.simulation_queue_data and len(self.context.simulation_queue_order) > 1:
current_index = get_item_index(self.context.loc_id) if self.context.loc_id else 0
remaining_items = self.context.simulation_queue_order[current_index + 1:]
accrued_delay = sum(parse_delay_seconds(self.context.simulation_queue_data[loc_id]['delay']) for loc_id in remaining_items if loc_id in self.context.simulation_queue_data)
accrued_delay = accrued_delay + parse_delay_seconds(self.context.next_move)
now_time = datetime.now(timezone.utc)
new_time = (
now_time
+ timedelta(seconds=accrued_delay)
+ timedelta(seconds=delay)
)
start_time = new_time.isoformat()
coords = f"{latitude}, {longitude}"
rev_geocode = self.context.reverse_geocode(coords)
if rev_geocode:
address = rev_geocode.address
else:
address = f"{latitude}, {longitude}"
location_item = {
"loc_id": loc_id,
"latitude": latitude,
"longitude": longitude,
"delay": delay,
"start": start_time,
"address": address,
}
resp = {
"status": "OK",
"command": "add",
"message": f"Location {loc_id} added to the queue",
"item": location_item,
}
await self.context.simulation_queue.put(loc_id)
add_item(loc_id, location_item)
logger.info("Location %s added to the queue", loc_id)
else:
logger.error("Invalid location data: %s", data)
resp = {"status": "error", "message": "Invalid location data", "data": data}
return resp
async def start_icloud_monitor():
"""Start Apple iCloud Find My Monitor to retreive actual reported device location"""
logger.info("iCloud monitor start requested")
@@ -579,6 +651,44 @@ class TunneldRunnerSio:
finally:
self.context.fmf_queue.task_done()
async def refresh_icloud_location() -> dict:
"""Fetch one iCloud location update, with or without monitor loop running."""
logger.info("iCloud monitor refresh requested")
try:
updated_location = await self.context.icloud_monitor.refresh_location()
except Exception as e:
logger.exception("Failed to refresh iCloud location: %s", e)
return {
"status": "error",
"message": "Failed to refresh iCloud location",
"error": type(e).__name__,
"icloud_monitor_enabled": self.context.icloud_monitor_enabled,
"icloud_monitor_running": is_icloud_monitor_running(),
}
if updated_location is None:
return {
"status": "ok",
"location_found": False,
"location_updated": False,
"icloud_monitor_enabled": self.context.icloud_monitor_enabled,
"icloud_monitor_running": is_icloud_monitor_running(),
}
location_updated = self.context.fmf_location != updated_location
self.context.fmf_location = updated_location
await self.context.sio.emit(
"fmf_update", updated_location.model_dump(), namespace="/"
)
return {
"status": "ok",
"location_found": True,
"location_updated": location_updated,
"icloud_monitor_enabled": self.context.icloud_monitor_enabled,
"icloud_monitor_running": is_icloud_monitor_running(),
"fmf_location": updated_location,
}
async def end_icloud_monitor():
logger.info("iCloud monitor stop requested")
self.context.icloud_monitor_enabled = False
@@ -604,29 +714,42 @@ class TunneldRunnerSio:
async def pause_simulation_queue():
"""Pauses asyncio.Queue playback"""
self.context.queue_state = "PAUSED"
self.context.simulation_queue_state = "PAUSED"
async def resume_simulation_queue():
"""Resumes asyncio.Queue playback"""
self.context.queue_state = "RUNNING"
"""Resumes asyncio.simulation_queue playback"""
self.context.simulation_queue_state = "RUNNING"
update_queue_times()
def update_queue_times():
current_index = get_item_index(self.context.loc_id)
remaining_items = self.context.queue_order[current_index + 1:]
remaining_items = self.context.simulation_queue_order[current_index + 1:]
new_delay = self.context.next_move or 0
now_time = datetime.now(timezone.utc)
for item in remaining_items:
item_delay = parse_delay_seconds(self.context.queue_data[item].get("delay")) or 0
item_delay = parse_delay_seconds(self.context.simulation_queue_data[item].get("delay")) or 0
new_delay += item_delay
new_time = (now_time + timedelta(seconds=new_delay)).isoformat()
self.context.queue_data[item].start = new_time
self.context.simulation_queue_data[item].start = new_time
update_queue_data()
def update_queue_data():
data = {
"simulation_queue": {
"active": self.context.simulation_active,
"data": self.context.simulation_queue_data,
"order": self.context.simulation_queue_order,
"state": self.context.simulation_queue_state,
"worker_task": self.context.simulation_task.get_name() if self.context.simulation_task else None,
}
}
self.context.sio.emit("queue_data_update", { "data": data }, namespace="/" )
async def empty_simulation_queue():
"""Empties all items from an asyncio.Queue."""
logger.info("Clearing location simulation queue...")
q = self.context.queue
q = self.context.simulation_queue
self.context.set_location_enabled = False
while not q.empty():
try:
@@ -639,38 +762,38 @@ class TunneldRunnerSio:
def add_item(item_id, payload):
self.context.queue_data[item_id] = payload
self.context.queue_order.append(item_id)
self.context.simulation_queue_data[item_id] = payload
self.context.simulation_queue_order.append(item_id)
def remove_item(item_id):
if item_id in self.context.queue_order:
# self.context.queue_order.remove(item_id)
self.context.queue_data[item_id]["status"] = "deleted"
if item_id in self.context.simulation_queue_order:
# self.context.simulation_queue_order.remove(item_id)
self.context.simulation_queue_data[item_id]["status"] = "deleted"
def clear_item(item_id):
if item_id in self.context.queue_order:
self.context.queue_order.remove(item_id)
del self.context.queue_data[item_id]
if item_id in self.context.simulation_queue_order:
self.context.simulation_queue_order.remove(item_id)
del self.context.simulation_queue_data[item_id]
def clear_items():
self.context.queue_data = {}
self.context.queue_order = []
self.context.simulation_queue_data = {}
self.context.simulation_queue_order = []
def get_item(item_id):
return self.context.queue_data[item_id]
return self.context.simulation_queue_data[item_id]
def update_item(item_id, **updates):
if item_id in self.context.queue_data:
self.context.queue_data[item_id].update(updates)
if item_id in self.context.simulation_queue_data:
self.context.simulation_queue_data[item_id].update(updates)
def get_item_index(item_id):
return self.context.queue_order.index(item_id)
return self.context.simulation_queue_order.index(item_id)
def get_item_id_by_index(index):
return self.context.queue_order[index]
return self.context.simulation_queue_order[index]
def get_items_in_order():
return [self.context.queue_data[i] for i in self.context.queue_order if self.context.queue_data[i].get("status") != "deleted"]
return [self.context.simulation_queue_data[i] for i in self.context.simulation_queue_order if self.context.simulation_queue_data[i].get("status") != "deleted"]
def parse_delay_seconds(raw_delay) -> int:
if raw_delay is None:
@@ -695,9 +818,9 @@ class TunneldRunnerSio:
"""Ends asyncio.Queue playback and closes tunnel"""
logger.info("End location simulation request")
try:
q = self.context.queue
q = self.context.simulation_queue
self.context.set_location_enabled = False
self.context.queue_state = "SHUTDOWN"
self.context.simulation_queue_state = "SHUTDOWN"
# Drain pending queue entries.
while not q.empty():
@@ -731,23 +854,26 @@ class TunneldRunnerSio:
self.context.simulation_active = False
self.context.simulation_task = None
self.context.simulation_state = "ENDED"
self.context.set_location_enabled = True
self.context.next_move = None
self.context.loc_id = None
self.context.latitude = None
self.context.longitude = None
self.context.queue_state = "STOPPED"
self.context.simulation_queue_state = "STOPPED"
# Recreate queue to discard sentinel wakeup items and unblock clean restarts.
self.context.queue = asyncio.Queue()
self.context.simulation_queue = asyncio.Queue()
await end_icloud_monitor()
return True
except Exception as e:
logger.error(f"Error ending simulation queue: {e}")
return False
def toggle_test_mode() -> dict:
self.context.test_mode = not self.context.test_mode
return {"test_mode": self.context.test_mode }
def get_status() -> dict:
current_item = self.context.queue_data.get(self.context.loc_id) if self.context.loc_id else None
current_item = self.context.simulation_queue_data.get(self.context.loc_id) if self.context.loc_id else None
current_start = (
current_item.get("start")
if isinstance(current_item, dict)
@@ -774,9 +900,9 @@ class TunneldRunnerSio:
"next_move": self.context.next_move,
"simulation_queue": {
"active": self.context.simulation_active,
"data": self.context.queue_data,
"order": self.context.queue_order,
"state": self.context.queue_state,
"data": self.context.simulation_queue_data,
"order": self.context.simulation_queue_order,
"state": self.context.simulation_queue_state,
"worker_task": self.context.simulation_task.get_name() if self.context.simulation_task else None,
},
"set_location_enable": self.context.set_location_enabled,
@@ -786,6 +912,27 @@ class TunneldRunnerSio:
}
return data
import numpy as np
def add_gps_noise(lat, lon, std_dev_meters=5):
"""
Simulates GPS noise by adding Gaussian noise to coordinates.
1 degree of latitude ~ 111,000 meters.
1 degree of longitude ~ 111,000 * cos(latitude) meters.
"""
# Earth's radius, meters
earth_radius = 6378137
# Convert meters to degrees
lat_diff = (std_dev_meters / earth_radius) * (180 / np.pi)
lon_diff = (std_dev_meters / (earth_radius * np.cos(np.radians(lat)))) * (180 / np.pi)
# Generate Gaussian noise
noised_lat = lat + np.random.normal(0, lat_diff)
noised_lon = lon + np.random.normal(0, lon_diff)
return noised_lat, noised_lon
""" FastAPI HTTP Functions"""
def generate_http_response(
@@ -923,7 +1070,7 @@ class TunneldRunnerSio:
task=task, udid=udid
)
created_task = True
except ConnectionFailedError, InvalidServiceError, MuxException:
except (ConnectionFailedError, InvalidServiceError, MuxException):
pass
if connection_type in ("usb", None):
for rsd in await get_rsds(udid=udid):
@@ -1041,6 +1188,11 @@ class TunneldRunnerSio:
}
return generate_http_response(data)
@self._app.get("/icloud-monitor/refresh")
async def app_refresh_icloud_monitor() -> fastapi.Response:
data = await refresh_icloud_location()
return generate_http_response(data)
"""Simulation Functions"""
""" start, add, clear, pause, resume, end, status """
@@ -1055,77 +1207,7 @@ class TunneldRunnerSio:
async def app_add_location(data: SimulationRequestData) -> fastapi.Response:
"""Add a location to the simulation queue"""
logger.info("Request to add new location to queue")
loc_id = str(uuid.uuid4())
latitude = (
data.get("latitude")
if isinstance(data, dict)
else getattr(data, "latitude", None)
)
longitude = (
data.get("longitude")
if isinstance(data, dict)
else getattr(data, "longitude", None)
)
delay = (
data.get("delay", 0)
if isinstance(data, dict)
else getattr(data, "delay", 0)
)
try:
delay = parse_delay_seconds(delay)
except ValueError as e:
return generate_http_response(
{"status": "error", "message": str(e)},
status_code=400,
)
if latitude is not None and longitude is not None:
logger.info(
"Adding location %s (%s, %s) with %s delay to the queue",
loc_id,
latitude,
longitude,
delay,
)
accrued_delay = 0
if self.context.queue_data:
current_index = get_item_index(self.context.loc_id)
remaining_items = self.context.queue_order[current_index + 1:]
accrued_delay = sum(parse_delay_seconds(self.context.queue_data[loc_id]['delay']) for loc_id in remaining_items if loc_id in self.context.queue_data)
accrued_delay = accrued_delay + parse_delay_seconds(self.context.next_move)
now_time = datetime.now(timezone.utc)
new_time = (
now_time
+ timedelta(seconds=accrued_delay)
+ timedelta(seconds=delay)
)
start_time = new_time.isoformat()
coords = f"{latitude}, {longitude}"
rev_geocode = self.context.reverse_geocode(coords)
if rev_geocode:
address = rev_geocode.address
else:
address = f"{latitude}, {longitude}"
location_item = {
"loc_id": loc_id,
"latitude": latitude,
"longitude": longitude,
"delay": delay,
"start": start_time,
"address": address,
}
resp = {
"status": "added",
"message": f"Location {loc_id} added to the queue",
"item": location_item,
}
await self.context.queue.put(loc_id)
add_item(loc_id, location_item)
logger.info("Location %s added to the queue", loc_id)
else:
logger.error("Invalid location data: %s", data)
resp = {"status": "error", "message": "Invalid location data", "data": data}
resp = await add_location_to_simulation_queue(data)
return generate_http_response(resp)
@self._app.get("/simulation/clear")
@@ -1160,12 +1242,12 @@ class TunneldRunnerSio:
async def app_simulation_test_mode() -> fastapi.Response:
"""Enable test mode for the simulation queue"""
before_toggle = self.context.test_mode
self.context.test_mode = not self.context.test_mode
data = {"status": "Ok", "message": f"Test mode toggled from {before_toggle} to {self.context.test_mode}"}
data = toggle_test_mode()
data['status'] = "OK"
data['message'] = f"Test mode toggled from {before_toggle} to {self.context.test_mode}"
return generate_http_response(data)
"""Status Functions"""
@self._app.get("/status/rsd")
@@ -1289,88 +1371,19 @@ class TunneldRunnerSio:
)
try:
match command:
case "test-mode":
data = toggle_test_mode()
return {
"command": command,
"status": "OK",
"message": "test-mode toggled",
"data": data
}
case "add":
""" Add a location to the simulation queue"""
loc_id = str(uuid.uuid4())
latitude = (
data.get("latitude")
if isinstance(data, dict)
else getattr(data, "latitude", None)
)
longitude = (
data.get("longitude")
if isinstance(data, dict)
else getattr(data, "longitude", None)
)
delay = (
data.get("delay", 0)
if isinstance(data, dict)
else getattr(data, "delay", 0)
)
try:
delay = parse_delay_seconds(delay)
except ValueError as e:
return {
"command": command,
"status": "error",
"message": str(e),
}
if latitude is not None and longitude is not None:
logger.info(
"Adding location %s (%s, %s) with %s delay to the queue",
loc_id,
latitude,
longitude,
delay,
)
accrued_delay = 0
if self.context.queue_data:
current_index = get_item_index(self.context.loc_id)
remaining_items = self.context.queue_order[current_index + 1:]
accrued_delay = sum(parse_delay_seconds(self.context.queue_data[loc_id]['delay']) for loc_id in remaining_items if loc_id in self.context.queue_data)
accrued_delay = accrued_delay + parse_delay_seconds(self.context.next_move)
now_time = datetime.now(timezone.utc)
new_time = (
now_time
+ timedelta(seconds=accrued_delay)
+ timedelta(seconds=delay)
)
start_time = new_time.isoformat()
coords = f"{latitude}, {longitude}"
rev_geocode = self.context.reverse_geocode(coords)
if rev_geocode:
address = rev_geocode.address
else:
address = f"{latitude}, {longitude}"
location_item = {
"loc_id": loc_id,
"latitude": latitude,
"longitude": longitude,
"delay": delay,
"start": start_time,
"address": address,
}
ack = {
"command": command,
"status": "added",
"message": f"Location {loc_id} added to the queue",
"item": location_item,
}
await self.context.queue.put(loc_id)
add_item(loc_id, location_item)
logger.info("Location %s added to the queue", loc_id)
return ack
else:
logger.warning(
"Invalid location data received from %s: %s", sid, data
)
return {
"command": command,
"status": "error",
"message": "Invalid location data",
"data": data,
}
resp = await add_location_to_simulation_queue(data)
return resp
case "clear":
""" Clear the simulation queue"""
await empty_simulation_queue()
@@ -1448,6 +1461,10 @@ class TunneldRunnerSio:
"icloud_monitor_enabled": self.context.icloud_monitor_enabled,
"icloud_monitor_running": is_icloud_monitor_running(),
}
case "refresh":
data = await refresh_icloud_location()
data["command"] = command
return data
case _:
return {
"command": command,
@@ -1583,111 +1600,175 @@ class LocationSimulationQueue(LocationSimulation):
def __init__(self, dvt, context: LocationSimulationState):
super().__init__(dvt)
self.context = context
self._noise_task: Optional[asyncio.Task] = None
self._noise_loc_id: Optional[str] = None
@staticmethod
def _add_gps_noise(lat: float, lon: float, std_dev_meters: float = 5.0) -> tuple[float, float]:
"""Apply Gaussian jitter in meters and convert to lat/lon deltas."""
earth_radius = 6378137.0
lat_sigma_deg = (std_dev_meters / earth_radius) * (180.0 / math.pi)
cos_lat = math.cos(math.radians(lat))
if abs(cos_lat) < 1e-6:
cos_lat = 1e-6
lon_sigma_deg = (std_dev_meters / (earth_radius * cos_lat)) * (180.0 / math.pi)
noised_lat = lat + random.gauss(0.0, lat_sigma_deg)
noised_lon = lon + random.gauss(0.0, lon_sigma_deg)
return noised_lat, noised_lon
async def _stop_noise_task(self) -> None:
if self._noise_task is not None:
self._noise_task.cancel()
with suppress(asyncio.CancelledError):
await self._noise_task
self._noise_task = None
self._noise_loc_id = None
async def _noise_loop(self, loc_id: str, base_latitude: float, base_longitude: float) -> None:
while True:
await asyncio.sleep(random.randint(45, 180))
if not self.context.simulation_active:
break
if self.context.simulation_queue_state == "SHUTDOWN":
break
if not self.context.set_location_enabled:
continue
if not self.context.simulation_noise:
continue
if self.context.loc_id != loc_id:
break
noised_latitude, noised_longitude = self._add_gps_noise(
base_latitude, base_longitude
)
await self.set(noised_latitude, noised_longitude)
logger.info(
"Applied simulation noise to active location loc_id=%s sent=%s,%s base=%s,%s",
loc_id,
noised_latitude,
noised_longitude,
base_latitude,
base_longitude,
)
def _start_noise_task(self, loc_id: str, base_latitude: float, base_longitude: float) -> None:
self._noise_loc_id = loc_id
self._noise_task = asyncio.create_task(
self._noise_loop(loc_id, base_latitude, base_longitude),
name=f"simulation-noise-{loc_id}",
)
async def play_queue(
self, disable_sleep: bool = False, timing_randomness_range: int = 0
) -> None:
while True:
if self.context.queue_state == "PAUSED":
await asyncio.sleep(0.1)
continue
if self.context.queue_state == "SHUTDOWN":
break
loc_id = await self.context.queue.get()
if loc_id is None:
self.context.queue.task_done()
break
location_item = self.context.queue_data.get(loc_id)
if location_item is None:
logger.warning(
"Simulation queue item missing for loc_id=%s; skipping stale entry",
loc_id,
try:
while True:
if self.context.simulation_queue_state == "PAUSED":
await asyncio.sleep(0.1)
continue
if self.context.simulation_queue_state == "SHUTDOWN":
break
loc_id = await self.context.simulation_queue.get()
if loc_id is None:
self.context.simulation_queue.task_done()
break
location_item = self.context.simulation_queue_data.get(loc_id)
if location_item is None:
logger.warning(
"Simulation queue item missing for loc_id=%s; skipping stale entry",
loc_id,
)
self.context.simulation_queue.task_done()
continue
new_latitude = location_item.get("latitude")
new_longitude = location_item.get("longitude")
new_delay = location_item.get("delay")
new_delay = 0 if new_delay is None else new_delay
new_start = location_item.get("start")
current_location_item = self.context.simulation_queue_data.get(self.context.loc_id)
current_latitude = (
current_location_item.get("latitude")
if isinstance(current_location_item, dict)
else self.context.latitude
)
current_longitude = (
current_location_item.get("longitude")
if isinstance(current_location_item, dict)
else self.context.longitude
)
current_start = (
current_location_item.get("start")
if isinstance(current_location_item, dict)
else None
)
self.context.queue.task_done()
continue
new_latitude = location_item.get("latitude")
new_longitude = location_item.get("longitude")
new_delay = location_item.get("delay")
new_delay = 0 if new_delay is None else new_delay
new_start = location_item.get("start")
current_location_item = self.context.queue_data.get(self.context.loc_id)
current_latitude = (
current_location_item.get("latitude")
if isinstance(current_location_item, dict)
else self.context.latitude
)
current_longitude = (
current_location_item.get("longitude")
if isinstance(current_location_item, dict)
else self.context.longitude
)
current_start = (
current_location_item.get("start")
if isinstance(current_location_item, dict)
else None
)
if self.context.set_location_enabled:
if new_delay > 0 and not disable_sleep:
countdown_delay = int(round(float(new_delay)))
if timing_randomness_range > 0:
new_delay = new_delay + random.uniform(
-timing_randomness_range, timing_randomness_range
)
if self.context.set_location_enabled:
if new_delay > 0 and not disable_sleep:
countdown_delay = int(round(float(new_delay)))
for i in range(max(0, countdown_delay), 0, -1):
if self.context.queue_state == "SHUTDOWN":
break
while self.context.queue_state == "PAUSED":
await asyncio.sleep(0.1)
if self.context.queue_state == "SHUTDOWN":
if timing_randomness_range > 0:
new_delay = new_delay + random.uniform(
-timing_randomness_range, timing_randomness_range
)
countdown_delay = int(round(float(new_delay)))
for i in range(max(0, countdown_delay), 0, -1):
if self.context.simulation_queue_state == "SHUTDOWN":
break
if self.context.queue_state == "SHUTDOWN":
while self.context.simulation_queue_state == "PAUSED":
await asyncio.sleep(0.1)
if self.context.simulation_queue_state == "SHUTDOWN":
break
if self.context.simulation_queue_state == "SHUTDOWN":
break
self.context.next_move = i
await self.context.sio.emit(
"simulation_status",
{
"status": self.context.simulation_active,
"loc_id": self.context.loc_id,
"latitude": current_latitude,
"longitude": current_longitude,
"start": current_start,
"next_move": i,
},
namespace="/",
)
await asyncio.sleep(1)
if self.context.simulation_queue_state == "SHUTDOWN":
self.context.simulation_queue.task_done()
break
self.context.next_move = i
await self.context.sio.emit(
"simulation_status",
{
"status": self.context.simulation_active,
"loc_id": self.context.loc_id,
"latitude": current_latitude,
"longitude": current_longitude,
"start": current_start,
"next_move": i,
},
namespace="/",
)
await asyncio.sleep(1)
if self.context.queue_state == "SHUTDOWN":
self.context.queue.task_done()
break
self.context.queue_data[loc_id]["start"] = datetime.now(timezone.utc).isoformat()
if self.context.loc_id is not None:
self.context.queue_data[self.context.loc_id]["end"] = datetime.now(timezone.utc).isoformat()
await self.set(new_latitude, new_longitude)
self.context.loc_id = loc_id
self.context.latitude = new_latitude
self.context.longitude = new_longitude
await self.context.sio.emit(
"simulation_status",
{
"status": self.context.simulation_active,
"loc_id": self.context.loc_id,
"latitude": self.context.latitude,
"longitude": self.context.longitude,
"start": new_start,
"next_move": None,
},
namespace="/",
)
logger.info(
"Set simulated location to %s, %s after %ss delay",
new_latitude,
new_longitude,
new_delay,
)
self.context.queue.task_done()
self.context.simulation_queue_data[loc_id]["start"] = datetime.now(timezone.utc).isoformat()
if self.context.loc_id is not None:
self.context.simulation_queue_data[self.context.loc_id]["end"] = datetime.now(timezone.utc).isoformat()
update_queue_data()
await self._stop_noise_task()
await self.set(new_latitude, new_longitude)
self.context.loc_id = loc_id
self.context.latitude = new_latitude
self.context.longitude = new_longitude
if self.context.simulation_noise:
self._start_noise_task(loc_id, new_latitude, new_longitude)
await self.context.sio.emit(
"simulation_status",
{
"status": self.context.simulation_active,
"loc_id": self.context.loc_id,
"latitude": self.context.latitude,
"longitude": self.context.longitude,
"start": new_start,
"next_move": None,
},
namespace="/",
)
logger.info(
"Set simulated location to %s, %s after %ss delay",
new_latitude,
new_longitude,
new_delay,
)
self.context.simulation_queue.task_done()
finally:
await self._stop_noise_task()
class LocationSimulationTestQueue(LocationSimulationBase):
@@ -1706,9 +1787,9 @@ class LocationSimulationTestQueue(LocationSimulationBase):
logger.info("Simulated location set to %s, %s", latitude, longitude)
async def clear(self) -> None:
q = self.context.queue
q = self.context.simulation_queue
self.context.set_location_enabled = False
self.context.queue_state = "SHUTDOWN"
self.context.simulation_queue_state = "SHUTDOWN"
while not q.empty():
try:
item = q.get_nowait()
@@ -1726,36 +1807,36 @@ class LocationSimulationTestQueue(LocationSimulationBase):
with suppress(asyncio.CancelledError):
await self.context.simulation_task
self.context.simulation_active = False
self.context.queue_state = "SHUTDOWN"
self.context.simulation_queue_state = "SHUTDOWN"
async def play_queue(
self, disable_sleep: bool = False, timing_randomness_range: int = 0
) -> None:
while True:
if self.context.queue_state == "PAUSED":
if self.context.simulation_queue_state == "PAUSED":
await asyncio.sleep(0.1)
continue
if self.context.queue_state == "SHUTDOWN":
if self.context.simulation_queue_state == "SHUTDOWN":
break
loc_id = await self.context.queue.get()
loc_id = await self.context.simulation_queue.get()
if loc_id is None:
self.context.queue.task_done()
self.context.simulation_queue.task_done()
break
location_item = self.context.queue_data.get(loc_id)
location_item = self.context.simulation_queue_data.get(loc_id)
if location_item is None:
logger.warning(
"Test simulation queue item missing for loc_id=%s; skipping stale entry",
loc_id,
)
self.context.queue.task_done()
self.context.simulation_queue.task_done()
continue
new_latitude = location_item.get("latitude")
new_longitude = location_item.get("longitude")
new_delay = location_item.get("delay")
new_delay = 0 if delay is None else delay
new_delay = 0 if new_delay is None else new_delay
new_start = location_item.get("start")
current_location_item = self.context.queue_data.get(self.context.loc_id)
current_location_item = self.context.simulation_queue_data.get(self.context.loc_id)
current_latitude = (
current_location_item.get("latitude")
if isinstance(current_location_item, dict)
@@ -1781,13 +1862,13 @@ class LocationSimulationTestQueue(LocationSimulationBase):
)
countdown_delay = int(round(float(new_delay)))
for i in range(max(0, countdown_delay), 0, -1):
if self.context.queue_state == "SHUTDOWN":
if self.context.simulation_queue_state == "SHUTDOWN":
break
while self.context.queue_state == "PAUSED":
while self.context.simulation_queue_state == "PAUSED":
await asyncio.sleep(0.1)
if self.context.queue_state == "SHUTDOWN":
if self.context.simulation_queue_state == "SHUTDOWN":
break
if self.context.queue_state == "SHUTDOWN":
if self.context.simulation_queue_state == "SHUTDOWN":
break
self.context.next_move = i
await self.context.sio.emit(
@@ -1803,14 +1884,15 @@ class LocationSimulationTestQueue(LocationSimulationBase):
namespace="/",
)
await asyncio.sleep(1)
if self.context.queue_state == "SHUTDOWN":
self.context.queue.task_done()
if self.context.simulation_queue_state == "SHUTDOWN":
self.context.simulation_queue.task_done()
break
self.context.queue_data[loc_id]["start"] = datetime.now(timezone.utc).isoformat()
self.context.simulation_queue_data[loc_id]["start"] = datetime.now(timezone.utc).isoformat()
if self.context.loc_id is not None:
self.context.queue_data[self.context.loc_id]["end"] = datetime.now(timezone.utc).isoformat()
self.context.simulation_queue_data[self.context.loc_id]["end"] = datetime.now(timezone.utc).isoformat()
await self.set(new_latitude, new_longitude)
self.context.loc_id = loc_id
self.context.loc_id = loc_id
self.context.latitude = new_latitude
self.context.longitude = new_longitude
await self.context.sio.emit(
@@ -1822,14 +1904,13 @@ class LocationSimulationTestQueue(LocationSimulationBase):
"longitude": self.context.longitude,
"start": new_start,
"next_move": None,
"next_move": None,
},
namespace="/",
)
logger.info(
"Set simulated location to %s, %s after %ss delay",
latitude,
longitude,
delay,
new_latitude,
new_longitude,
new_delay,
)
self.context.queue.task_done()
self.context.simulation_queue.task_done()