fixes
This commit is contained in:
@@ -114,10 +114,10 @@ class LocationSimulationState:
|
||||
self.simulation_queue: asyncio.Queue = asyncio.Queue()
|
||||
self.simulation_queue_data: Dict = {}
|
||||
self.simulation_queue_order: list[str] = []
|
||||
self.simulation_queue_deleted_items: list[str] = []
|
||||
self.simulation_queue_pending_ids: set[str] = set()
|
||||
self.simulation_queue_state: str = "STOPPED"
|
||||
self.simulation_noise: bool = False
|
||||
self.set_location_enabled: bool = True
|
||||
self.simulation_active: bool = False
|
||||
self.simulation_task: Optional[asyncio.Task] = None
|
||||
self.sio: socketio.AsyncServer = socketio.AsyncServer(
|
||||
@@ -642,6 +642,7 @@ class TunneldRunnerSio:
|
||||
"command": "start",
|
||||
"message": "Simulation started",
|
||||
"data": {
|
||||
|
||||
"simulation_active": self.context.simulation_active,
|
||||
"simulation_queue_state": self.context.simulation_queue_state
|
||||
}
|
||||
@@ -667,10 +668,11 @@ class TunneldRunnerSio:
|
||||
try:
|
||||
if self.context.test_mode:
|
||||
logger.info("Simulation worker: test mode enabled")
|
||||
with LocationSimulationTestQueue(self.context) as locate_simulation:
|
||||
await locate_simulation.play_queue()
|
||||
locate_simulation = LocationSimulationQueue(None, self.context)
|
||||
await locate_simulation.play_queue()
|
||||
return
|
||||
|
||||
if self.context.udid is None and not self.context.test_mode:
|
||||
if self.context.udid is None:
|
||||
active_udids = sorted(
|
||||
{
|
||||
t.udid
|
||||
@@ -841,7 +843,10 @@ class TunneldRunnerSio:
|
||||
"command_class": "simulation_control",
|
||||
"command": "add",
|
||||
"message": f"Location {loc_id} added to the queue",
|
||||
"data": location_item,
|
||||
"data": {
|
||||
"simulation_queue": get_simulation_status(),
|
||||
"location_item": location_item,
|
||||
},
|
||||
}
|
||||
add_item(loc_id, location_item)
|
||||
await enqueue_next_simulation_item()
|
||||
@@ -854,6 +859,49 @@ class TunneldRunnerSio:
|
||||
"message": "Invalid location data", "data": data}
|
||||
return resp
|
||||
|
||||
async def delete_location_from_simulation_queue(data):
|
||||
loc_id = (
|
||||
data.get("loc_id")
|
||||
if isinstance(data, dict)
|
||||
else getattr(data, "loc_id", None)
|
||||
)
|
||||
if loc_id is not None:
|
||||
if not loc_id in self.context.simulation_queue_order:
|
||||
resp = {
|
||||
"command_status": "ERROR",
|
||||
"command_class": "simulation_control",
|
||||
"command": "delete",
|
||||
"message": f"Location {loc_id} not found in the queue",
|
||||
"data": {
|
||||
"simulation_queue": get_simulation_status(),
|
||||
"location_item": self.context.simulation_queue_data[loc_id],
|
||||
},
|
||||
}
|
||||
logger.info("Deleting location %s from the queue", loc_id)
|
||||
await delete_item(loc_id)
|
||||
resp = {
|
||||
"command_status": "OK",
|
||||
"command_class": "simulation_control",
|
||||
"command": "delete",
|
||||
"message": f"Location {loc_id} deleted from the queue",
|
||||
"data": {
|
||||
"simulation_queue": get_simulation_status(),
|
||||
"simulation_queue": get_simulation_status(),
|
||||
"location_item": self.context.simulation_queue_data[loc_id],
|
||||
},
|
||||
}
|
||||
else:
|
||||
resp = {
|
||||
"command_status": "ERROR",
|
||||
"command_class": "simulation_control",
|
||||
"command": "delete",
|
||||
"message": "Invalid location data",
|
||||
"data": {
|
||||
"simulation_queue:": get_simulation_status(),
|
||||
},
|
||||
}
|
||||
return resp
|
||||
|
||||
async def pause_simulation_queue():
|
||||
"""Pauses asyncio.Queue playback"""
|
||||
self.context.simulation_queue_state = "PAUSED"
|
||||
@@ -864,7 +912,8 @@ class TunneldRunnerSio:
|
||||
await update_queue_times()
|
||||
|
||||
async def advance_simulation_queue():
|
||||
self.context.simulation_queue_state = "NEXT"
|
||||
current_loc_id = self.context.get_current_loc_id()
|
||||
self.context.simulation_queue_data[current_loc_id]["delay"] = 0
|
||||
await update_queue_times()
|
||||
|
||||
async def update_queue_times():
|
||||
@@ -885,23 +934,12 @@ class TunneldRunnerSio:
|
||||
await update_queue_data()
|
||||
|
||||
async def update_queue_data():
|
||||
data = {
|
||||
"simulation_queue": {
|
||||
"active": self.context.simulation_active,
|
||||
"data": self.context.simulation_queue_data,
|
||||
"order": self.context.simulation_queue_order,
|
||||
"state": self.context.simulation_queue_state,
|
||||
"worker_task": self.context.simulation_task.get_name() if self.context.simulation_task else None,
|
||||
}
|
||||
}
|
||||
await self.context.sio.emit("queue_data_update", {
|
||||
"data": data}, namespace="/")
|
||||
await self.context.sio.emit("queue_data_update", get_simulation_status(), namespace="/")
|
||||
|
||||
async def empty_simulation_queue():
|
||||
"""Empties all items from an asyncio.Queue."""
|
||||
logger.info("Clearing location simulation queue...")
|
||||
q = self.context.simulation_queue
|
||||
self.context.set_location_enabled = False
|
||||
while not q.empty():
|
||||
try:
|
||||
item = q.get_nowait()
|
||||
@@ -926,6 +964,7 @@ class TunneldRunnerSio:
|
||||
def reset_queue():
|
||||
self.context.simulation_queue_data = {}
|
||||
self.context.simulation_queue_order = []
|
||||
self.context.simulation_queue_deleted_items = []
|
||||
self.context.set_current_loc_id(None)
|
||||
|
||||
def remove_future_items():
|
||||
@@ -938,12 +977,16 @@ class TunneldRunnerSio:
|
||||
else:
|
||||
self.context.simulation_queue_data = {}
|
||||
self.context.simulation_queue_order = []
|
||||
self.context.simulation_queue_deleted_items = []
|
||||
self.context.set_current_loc_id(None)
|
||||
|
||||
|
||||
def clear_item(item_id):
|
||||
async def delete_item(item_id):
|
||||
if item_id in self.context.simulation_queue_order:
|
||||
self.context.simulation_queue_data[item_id]["status"] = "deleted"
|
||||
self.context.simulation_queue_order.remove(item_id)
|
||||
self.context.simulation_queue_deleted_items.append(item_id)
|
||||
await update_queue_times()
|
||||
|
||||
def clear_future_items():
|
||||
current_loc_id = self.context.get_current_loc_id()
|
||||
@@ -951,6 +994,8 @@ class TunneldRunnerSio:
|
||||
current_loc_id) if current_loc_id else 0
|
||||
for item in self.context.simulation_queue_order[current_index + 1:]:
|
||||
self.context.simulation_queue_data[item]["status"] = "deleted"
|
||||
self.context.simulation_queue_order.remove(item)
|
||||
self.context.simulation_queue_deleted_items.append(item)
|
||||
|
||||
def get_item(item_id):
|
||||
return self.context.simulation_queue_data[item_id]
|
||||
@@ -1006,20 +1051,13 @@ class TunneldRunnerSio:
|
||||
end_simulation_worker(), name="end-simulation-worker"
|
||||
)
|
||||
result = await end_task
|
||||
data = {
|
||||
"command_status": "OK",
|
||||
"command_class": "simulation_control",
|
||||
"command": "end",
|
||||
"message": "Simulation ended"
|
||||
}
|
||||
return data
|
||||
return result
|
||||
|
||||
async def end_simulation_worker() -> bool:
|
||||
"""Ends asyncio.Queue playback and closes tunnel"""
|
||||
logger.info("End location simulation request")
|
||||
try:
|
||||
q = self.context.simulation_queue
|
||||
self.context.set_location_enabled = False
|
||||
self.context.simulation_queue_state = "SHUTDOWN"
|
||||
|
||||
# Drain pending queue entries.
|
||||
@@ -1056,7 +1094,6 @@ class TunneldRunnerSio:
|
||||
await locate_simulation.clear()
|
||||
self.context.simulation_active = False
|
||||
self.context.simulation_task = None
|
||||
self.context.set_location_enabled = True
|
||||
self.context.next_move = None
|
||||
self.context.set_current_loc_id(None)
|
||||
self.context.simulation_queue_state = "STOPPED"
|
||||
@@ -1079,6 +1116,19 @@ class TunneldRunnerSio:
|
||||
self.context.simulation_noise = not self.context.simulation_noise
|
||||
return {"simulation_noise": self.context.simulation_noise}
|
||||
|
||||
def get_simulation_status() -> dict:
|
||||
resp = {
|
||||
"active": self.context.simulation_active,
|
||||
"data": self.context.simulation_queue_data,
|
||||
"order": self.context.simulation_queue_order,
|
||||
"deleted_items": self.context.simulation_queue_deleted_items,
|
||||
"state": self.context.simulation_queue_state,
|
||||
"gps_noise": self.context.simulation_noise,
|
||||
"worker_task": self.context.simulation_task.get_name() if self.context.simulation_task else None,
|
||||
"test_mode": self.context.test_mode,
|
||||
}
|
||||
return resp
|
||||
|
||||
def get_status() -> dict:
|
||||
current_loc_id = self.context.get_current_loc_id()
|
||||
current_item = self.context.simulation_queue_data.get(
|
||||
@@ -1111,16 +1161,7 @@ class TunneldRunnerSio:
|
||||
},
|
||||
"tunnel_watcher_running": True if self.context.tunnel_watcher_task and not self.context.tunnel_watcher_task.done() else False,
|
||||
"next_move": self.context.next_move,
|
||||
"simulation_queue": {
|
||||
"active": self.context.simulation_active,
|
||||
"data": self.context.simulation_queue_data,
|
||||
"order": self.context.simulation_queue_order,
|
||||
"state": self.context.simulation_queue_state,
|
||||
"gps_noise": self.context.simulation_noise,
|
||||
"worker_task": self.context.simulation_task.get_name() if self.context.simulation_task else None,
|
||||
},
|
||||
"set_location_enable": self.context.set_location_enabled,
|
||||
"test_mode": self.context.test_mode,
|
||||
"simulation_queue": get_simulation_status(),
|
||||
"tunnel": self.context.tunnel.service.address[0] if self.context.tunnel else None,
|
||||
"udid": self.context.udid,
|
||||
}
|
||||
@@ -1648,58 +1689,96 @@ class TunneldRunnerSio:
|
||||
try:
|
||||
match command:
|
||||
case "next":
|
||||
resp = await advance_simulation_queue()
|
||||
await advance_simulation_queue()
|
||||
resp = {
|
||||
"command": command,
|
||||
"command_class": "simulation_control",
|
||||
"command_status": "OK",
|
||||
"message": "Advanced simulation queue",
|
||||
"data": {
|
||||
"simulation_queue": get_simulation_status(),
|
||||
}
|
||||
}
|
||||
return resp
|
||||
|
||||
case "test-mode":
|
||||
data = toggle_test_mode()
|
||||
return {
|
||||
toggle_test_mode()
|
||||
resp = {
|
||||
"command": command,
|
||||
"command_class": "simulation_control",
|
||||
"command_status": "OK",
|
||||
"message": "test-mode toggled",
|
||||
"data": data
|
||||
"data": {
|
||||
"simulation_queue": get_simulation_status(),
|
||||
}
|
||||
}
|
||||
return resp
|
||||
|
||||
case "add":
|
||||
""" Add a location to the simulation queue"""
|
||||
resp = await add_location_to_simulation_queue(data)
|
||||
return resp
|
||||
|
||||
case "delete":
|
||||
""" Delete a location to the simulation queue"""
|
||||
resp = await delete_location_from_simulation_queue(data)
|
||||
return resp
|
||||
|
||||
case "clear":
|
||||
""" Clear the simulation queue"""
|
||||
clear_future_items()
|
||||
return {
|
||||
resp = {
|
||||
"command": command,
|
||||
"command_class": "simulation_control",
|
||||
"command_status": "OK",
|
||||
"message": "Simulation cleared",
|
||||
"data": {
|
||||
"simulation_queue": get_simulation_status(),
|
||||
}
|
||||
}
|
||||
return resp
|
||||
|
||||
case "pause":
|
||||
""" Pause the simulation queue"""
|
||||
await pause_simulation_queue()
|
||||
return {
|
||||
resp = {
|
||||
"command": command,
|
||||
"command_class": "simulation_control",
|
||||
"command_status": "OK",
|
||||
"message": "Simulation paused",
|
||||
"data": {
|
||||
"simulation_queue": get_simulation_status(),
|
||||
}
|
||||
}
|
||||
return resp
|
||||
case "resume":
|
||||
""" Resume the simulation queue"""
|
||||
await resume_simulation_queue()
|
||||
return {
|
||||
resp = {
|
||||
"command": command,
|
||||
"command_status": "OK",
|
||||
"command_class": "simulation_control",
|
||||
"message": "Simulation resumed",
|
||||
"data": {
|
||||
"simulation_queue": get_simulation_status(),
|
||||
}
|
||||
}
|
||||
return resp
|
||||
case "end":
|
||||
""" End the simulation queue"""
|
||||
logger.info(
|
||||
"End location simulation request from %s", sid)
|
||||
data = await end_simulation_queue()
|
||||
return data
|
||||
cstat = "OK" if await end_simulation_queue() else "ERROR"
|
||||
resp = {
|
||||
"command_status": cstat,
|
||||
"command_class": "simulation_control",
|
||||
"command": command,
|
||||
"message": "Simulation ended",
|
||||
"data": {
|
||||
"simulation_queue": get_simulation_status(),
|
||||
}
|
||||
}
|
||||
return resp
|
||||
case "start":
|
||||
""" Start the simulation queue"""
|
||||
logger.info(
|
||||
@@ -1709,19 +1788,24 @@ class TunneldRunnerSio:
|
||||
case "gps-noise":
|
||||
""" Toggle GPS noise"""
|
||||
before_toggle = self.context.simulation_noise
|
||||
data = toggle_gps_noise()
|
||||
data['command_status'] = "OK"
|
||||
data['command_class'] = "simulation_control"
|
||||
data['message'] = f"GPS noise toggled from {
|
||||
before_toggle} to {self.context.simulation_noise}"
|
||||
data['command'] = command
|
||||
return data
|
||||
toggle_gps_noise()
|
||||
resp = {
|
||||
"command_status": "OK",
|
||||
"command_class": "simulation_control",
|
||||
"command": command,
|
||||
"message": f"GPS noise toggled from {before_toggle} to {self.context.simulation_noise}",
|
||||
"data": {
|
||||
"simulation_queue": get_simulation_status(),
|
||||
}
|
||||
}
|
||||
return resp
|
||||
case _:
|
||||
logger.warning(
|
||||
"Invalid command received from %s: %s", sid, command
|
||||
)
|
||||
return {"command_status": "ERROR", "message": "Invalid command"}
|
||||
finally:
|
||||
logger.info("Simulation Control command: %s completed, sending status update...", command)
|
||||
await sio_send_status(sid)
|
||||
|
||||
@self.context.sio.event
|
||||
@@ -1754,6 +1838,37 @@ class TunneldRunnerSio:
|
||||
else:
|
||||
return {"command_status": "ERROR", "command": "update", "command_class": "location_item", "message": "Invalid request, Location Item Unchanged", "data": self.context.simulation_queue_data[loc_id]}
|
||||
|
||||
@self.context.sio.event
|
||||
async def queue_order_update(sid, data):
|
||||
logger.info("Queue order update received: %s", data)
|
||||
new_simulation_queue_order = []
|
||||
new_order = (
|
||||
data.get("newOrder")
|
||||
if isinstance(data, dict)
|
||||
else getattr(data, "newOrder", None)
|
||||
)
|
||||
if new_order:
|
||||
for item in new_order:
|
||||
if item in self.context.simulation_queue_data:
|
||||
new_simulation_queue_order.append(item)
|
||||
self.context.simulation_queue_order = new_simulation_queue_order
|
||||
resp = {
|
||||
"command_status": "OK",
|
||||
"command": "queue_order_update",
|
||||
"command_class": "queue_order",
|
||||
"message": "Queue Order Updated",
|
||||
"data": self.context.simulation_queue_order
|
||||
}
|
||||
else:
|
||||
resp = {
|
||||
"command_status": "ERROR",
|
||||
"command": "queue_order_update",
|
||||
"command_class": "queue_order",
|
||||
"message": "Invalid request, Queue Order Unchanged",
|
||||
"data": self.context.simulation_queue_order
|
||||
}
|
||||
return resp
|
||||
|
||||
@self.context.sio.event
|
||||
async def icloud_monitor_control(sid, data):
|
||||
command = (
|
||||
@@ -1961,7 +2076,10 @@ class TunneldRunnerSio:
|
||||
|
||||
class LocationSimulationQueue(LocationSimulation):
|
||||
def __init__(self, dvt, context: LocationSimulationState):
|
||||
super().__init__(dvt)
|
||||
if dvt is None:
|
||||
LocationSimulationBase.__init__(self)
|
||||
else:
|
||||
super().__init__(dvt)
|
||||
self.context = context
|
||||
self._dt_simulate_location = None
|
||||
self._prefer_dt_simulate_location = False
|
||||
@@ -1981,6 +2099,10 @@ class LocationSimulationQueue(LocationSimulation):
|
||||
return self._dt_simulate_location
|
||||
|
||||
async def set(self, latitude: float, longitude: float) -> None:
|
||||
if self.context.test_mode:
|
||||
await asyncio.sleep(0.1)
|
||||
logger.info("Test mode enabled, simulated location set to %s, %s", latitude, longitude)
|
||||
return
|
||||
if not self._prefer_dt_simulate_location:
|
||||
try:
|
||||
await super().set(latitude, longitude)
|
||||
@@ -1994,6 +2116,9 @@ class LocationSimulationQueue(LocationSimulation):
|
||||
await fallback.set(latitude, longitude)
|
||||
|
||||
async def clear(self) -> None:
|
||||
if self.context.test_mode:
|
||||
logger.info("Test mode enabled, skipping location clear")
|
||||
return
|
||||
dvt_clear_error = None
|
||||
if not self._prefer_dt_simulate_location:
|
||||
try:
|
||||
@@ -2036,8 +2161,6 @@ class LocationSimulationQueue(LocationSimulation):
|
||||
break
|
||||
if self.context.simulation_queue_state == "SHUTDOWN":
|
||||
break
|
||||
if not self.context.set_location_enabled:
|
||||
continue
|
||||
if not self.context.simulation_noise:
|
||||
continue
|
||||
if self.context.get_current_loc_id() != loc_id:
|
||||
@@ -2064,16 +2187,16 @@ class LocationSimulationQueue(LocationSimulation):
|
||||
|
||||
async def _update_queue_data(self):
|
||||
data = {
|
||||
"simulation_queue": {
|
||||
"active": self.context.simulation_active,
|
||||
"data": self.context.simulation_queue_data,
|
||||
"order": self.context.simulation_queue_order,
|
||||
"deleted_items": self.context.simulation_queue_deleted_items,
|
||||
"state": self.context.simulation_queue_state,
|
||||
"worker_task": self.context.simulation_task.get_name() if self.context.simulation_task else None,
|
||||
"gps_noise": self.context.simulation_noise,
|
||||
"test_mode": self.context.test_mode,
|
||||
}
|
||||
}
|
||||
await self.context.sio.emit("queue_data_update", {
|
||||
"data": data}, namespace="/")
|
||||
await self.context.sio.emit("queue_data_update", data, namespace="/")
|
||||
|
||||
async def _update_location_item(self, loc_id: str) -> None:
|
||||
await self.context.sio.emit(
|
||||
@@ -2085,6 +2208,18 @@ class LocationSimulationQueue(LocationSimulation):
|
||||
namespace="/",
|
||||
)
|
||||
|
||||
async def _update_current_location(self, active_loc_id, new_latitude, new_longitude) -> None:
|
||||
await self.context.sio.emit(
|
||||
"simulation_status",
|
||||
{
|
||||
"status": self.context.simulation_active,
|
||||
"loc_id": active_loc_id,
|
||||
"latitude": new_latitude,
|
||||
"longitude": new_longitude
|
||||
},
|
||||
namespace="/",
|
||||
)
|
||||
|
||||
async def _enqueue_next_queue_item(self) -> Optional[str]:
|
||||
if self.context.simulation_queue_state == "SHUTDOWN":
|
||||
return None
|
||||
@@ -2156,321 +2291,30 @@ class LocationSimulationQueue(LocationSimulation):
|
||||
if isinstance(current_location_item, dict)
|
||||
else None
|
||||
)
|
||||
|
||||
if self.context.set_location_enabled:
|
||||
while self.context.simulation_queue_data[loc_id]["delay"] > 0:
|
||||
if self.context.simulation_queue_state == "NEXT":
|
||||
self.context.simulation_queue_state = "RUNNING"
|
||||
break
|
||||
if self.context.simulation_queue_state == "SHUTDOWN":
|
||||
break
|
||||
while self.context.simulation_queue_state == "PAUSED":
|
||||
await asyncio.sleep(0.1)
|
||||
if self.context.simulation_queue_state == "SHUTDOWN":
|
||||
break
|
||||
if self.context.simulation_queue_state == "NEXT":
|
||||
self.context.simulation_queue_state = "RUNNING"
|
||||
break
|
||||
if self.context.simulation_queue_state == "SHUTDOWN":
|
||||
break
|
||||
self.context.next_move = location_item.get("delay") - 1
|
||||
self.context.simulation_queue_data[loc_id]["delay"] -= 1
|
||||
# await self.context.sio.emit(
|
||||
# "simulation_status",
|
||||
# {
|
||||
# "status": self.context.simulation_active,
|
||||
# "loc_id": current_loc_id,
|
||||
# "latitude": current_latitude,
|
||||
# "longitude": current_longitude,
|
||||
# "start": current_start,
|
||||
# "next_move": self.context.next_move,
|
||||
# },
|
||||
# namespace="/",
|
||||
# )
|
||||
await self._update_location_item(loc_id)
|
||||
await asyncio.sleep(1)
|
||||
while self.context.simulation_queue_data[loc_id]["delay"] > 0:
|
||||
if self.context.simulation_queue_state == "SHUTDOWN":
|
||||
self.context.simulation_queue_pending_ids.discard(loc_id)
|
||||
self.context.simulation_queue.task_done()
|
||||
break
|
||||
self.context.simulation_queue_data[loc_id]["start"] = datetime.now(
|
||||
timezone.utc).isoformat()
|
||||
if current_loc_id is not None:
|
||||
self.context.simulation_queue_data[current_loc_id]["status"] = "done"
|
||||
self.context.simulation_queue_data[current_loc_id]["end"] = datetime.now(
|
||||
timezone.utc).isoformat()
|
||||
await self._update_location_item(current_loc_id)
|
||||
await self._update_queue_data()
|
||||
await self._stop_noise_task()
|
||||
await self.set(new_latitude, new_longitude)
|
||||
self.context.simulation_queue_data[loc_id]["status"] = "set"
|
||||
self.context.set_current_loc_id(loc_id)
|
||||
if self.context.simulation_noise:
|
||||
self._start_noise_task(
|
||||
loc_id, new_latitude, new_longitude)
|
||||
active_loc_id = self.context.get_current_loc_id()
|
||||
# await self.context.sio.emit(
|
||||
# "simulation_status",
|
||||
# {
|
||||
# "status": self.context.simulation_active,
|
||||
# "loc_id": active_loc_id,
|
||||
# "latitude": new_latitude,
|
||||
# "longitude": new_longitude,
|
||||
# "start": new_start,
|
||||
# "next_move": None,
|
||||
# },
|
||||
# namespace="/",
|
||||
# )
|
||||
logger.info(
|
||||
"Set simulated location to %s, %s after %ss delay",
|
||||
new_latitude,
|
||||
new_longitude,
|
||||
new_delay,
|
||||
)
|
||||
await self._update_queue_data()
|
||||
self.context.simulation_queue_pending_ids.discard(loc_id)
|
||||
self.context.simulation_queue.task_done()
|
||||
await self._enqueue_next_queue_item()
|
||||
finally:
|
||||
await self._stop_noise_task()
|
||||
|
||||
|
||||
class LocationSimulationTestQueue(LocationSimulationBase):
|
||||
def __init__(self, context: LocationSimulationState):
|
||||
super().__init__()
|
||||
self.context = context
|
||||
self._noise_task: Optional[asyncio.Task] = None
|
||||
self._noise_loc_id: Optional[str] = None
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self):
|
||||
return self
|
||||
|
||||
async def set(self, latitude: float, longitude: float) -> None:
|
||||
await asyncio.sleep(0.1)
|
||||
logger.info("Simulated location set to %s, %s", latitude, longitude)
|
||||
|
||||
async def clear(self) -> None:
|
||||
q = self.context.simulation_queue
|
||||
self.context.set_location_enabled = False
|
||||
self.context.simulation_queue_state = "SHUTDOWN"
|
||||
while not q.empty():
|
||||
try:
|
||||
item = q.get_nowait()
|
||||
if isinstance(item, str):
|
||||
self.context.simulation_queue_pending_ids.discard(item)
|
||||
q.task_done()
|
||||
logger.info("Discarding item from queue: %s", item)
|
||||
except asyncio.QueueEmpty:
|
||||
break
|
||||
self.context.simulation_queue_pending_ids.clear()
|
||||
await q.put(None)
|
||||
|
||||
if self.context.simulation_task is not None and not self.context.simulation_task.done():
|
||||
try:
|
||||
await asyncio.wait_for(self.context.simulation_task, timeout=5)
|
||||
except TimeoutError:
|
||||
self.context.simulation_task.cancel()
|
||||
with suppress(asyncio.CancelledError):
|
||||
await self.context.simulation_task
|
||||
self.context.simulation_active = False
|
||||
self.context.simulation_queue_state = "SHUTDOWN"
|
||||
|
||||
@staticmethod
|
||||
def _add_gps_noise(lat: float, lon: float, std_dev_meters: float = 5.0) -> tuple[float, float]:
|
||||
"""Apply Gaussian jitter in meters and convert to lat/lon deltas."""
|
||||
earth_radius = 6378137.0
|
||||
lat_sigma_deg = (std_dev_meters / earth_radius) * (180.0 / math.pi)
|
||||
cos_lat = math.cos(math.radians(lat))
|
||||
if abs(cos_lat) < 1e-6:
|
||||
cos_lat = 1e-6
|
||||
lon_sigma_deg = (std_dev_meters / (earth_radius *
|
||||
cos_lat)) * (180.0 / math.pi)
|
||||
noised_lat = lat + random.gauss(0.0, lat_sigma_deg)
|
||||
noised_lon = lon + random.gauss(0.0, lon_sigma_deg)
|
||||
return noised_lat, noised_lon
|
||||
|
||||
async def _stop_noise_task(self) -> None:
|
||||
if self._noise_task is not None:
|
||||
self._noise_task.cancel()
|
||||
with suppress(asyncio.CancelledError):
|
||||
await self._noise_task
|
||||
self._noise_task = None
|
||||
self._noise_loc_id = None
|
||||
|
||||
async def _noise_loop(self, loc_id: str, base_latitude: float, base_longitude: float) -> None:
|
||||
while True:
|
||||
await asyncio.sleep(random.randint(45, 180))
|
||||
if not self.context.simulation_active:
|
||||
break
|
||||
if self.context.simulation_queue_state == "SHUTDOWN":
|
||||
break
|
||||
if not self.context.set_location_enabled:
|
||||
continue
|
||||
if not self.context.simulation_noise:
|
||||
continue
|
||||
if self.context.get_current_loc_id() != loc_id:
|
||||
break
|
||||
noised_latitude, noised_longitude = self._add_gps_noise(
|
||||
base_latitude, base_longitude
|
||||
)
|
||||
await self.set(noised_latitude, noised_longitude)
|
||||
logger.info(
|
||||
"Applied simulation noise to active location loc_id=%s sent=%s,%s base=%s,%s",
|
||||
loc_id,
|
||||
noised_latitude,
|
||||
noised_longitude,
|
||||
base_latitude,
|
||||
base_longitude,
|
||||
)
|
||||
|
||||
def _start_noise_task(self, loc_id: str, base_latitude: float, base_longitude: float) -> None:
|
||||
self._noise_loc_id = loc_id
|
||||
self._noise_task = asyncio.create_task(
|
||||
self._noise_loop(loc_id, base_latitude, base_longitude),
|
||||
name=f"simulation-noise-{loc_id}",
|
||||
)
|
||||
|
||||
async def _update_queue_data(self):
|
||||
data = {
|
||||
"simulation_queue": {
|
||||
"active": self.context.simulation_active,
|
||||
"data": self.context.simulation_queue_data,
|
||||
"order": self.context.simulation_queue_order,
|
||||
"state": self.context.simulation_queue_state,
|
||||
"worker_task": self.context.simulation_task.get_name() if self.context.simulation_task else None,
|
||||
}
|
||||
}
|
||||
await self.context.sio.emit("queue_data_update", {
|
||||
"data": data}, namespace="/")
|
||||
|
||||
async def _update_location_item(self, loc_id: str) -> None:
|
||||
await self.context.sio.emit(
|
||||
"location_item_update",
|
||||
{
|
||||
"loc_id": loc_id,
|
||||
"data": self.context.simulation_queue_data[loc_id]
|
||||
},
|
||||
namespace="/",
|
||||
)
|
||||
|
||||
async def _enqueue_next_queue_item(self) -> Optional[str]:
|
||||
if self.context.simulation_queue_state == "SHUTDOWN":
|
||||
return None
|
||||
for item_id in self.context.simulation_queue_order:
|
||||
if item_id in self.context.simulation_queue_pending_ids:
|
||||
continue
|
||||
item = self.context.simulation_queue_data.get(item_id)
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
if item.get("status") != "queued":
|
||||
continue
|
||||
self.context.simulation_queue_pending_ids.add(item_id)
|
||||
await self.context.simulation_queue.put(item_id)
|
||||
logger.info("Worker scheduled queue item %s", item_id)
|
||||
return item_id
|
||||
return None
|
||||
|
||||
async def play_queue(
|
||||
self, disable_sleep: bool = False, timing_randomness_range: int = 0
|
||||
) -> None:
|
||||
try:
|
||||
while True:
|
||||
if self.context.simulation_queue_state == "PAUSED":
|
||||
await asyncio.sleep(0.1)
|
||||
continue
|
||||
while self.context.simulation_queue_state == "PAUSED":
|
||||
await asyncio.sleep(0.1)
|
||||
if self.context.simulation_queue_state == "SHUTDOWN":
|
||||
break
|
||||
if self.context.simulation_queue_state == "SHUTDOWN":
|
||||
break
|
||||
self.context.next_move = location_item.get("delay") - 1
|
||||
self.context.simulation_queue_data[loc_id]["delay"] -= 1
|
||||
await self._update_location_item(loc_id)
|
||||
await asyncio.sleep(1)
|
||||
if self.context.simulation_queue_state == "SHUTDOWN":
|
||||
break
|
||||
await self._enqueue_next_queue_item()
|
||||
loc_id = await self.context.simulation_queue.get()
|
||||
if loc_id is None:
|
||||
self.context.simulation_queue.task_done()
|
||||
break
|
||||
location_item = self.context.simulation_queue_data.get(loc_id)
|
||||
if location_item is None:
|
||||
logger.warning(
|
||||
"Test simulation queue item missing for loc_id=%s; skipping stale entry",
|
||||
loc_id,
|
||||
)
|
||||
self.context.simulation_queue_pending_ids.discard(loc_id)
|
||||
self.context.simulation_queue.task_done()
|
||||
await self._enqueue_next_queue_item()
|
||||
continue
|
||||
new_status = location_item.get("status")
|
||||
if new_status == "deleted":
|
||||
self.context.simulation_queue_pending_ids.discard(loc_id)
|
||||
self.context.simulation_queue.task_done()
|
||||
await self._enqueue_next_queue_item()
|
||||
continue
|
||||
new_latitude = location_item.get("latitude")
|
||||
new_longitude = location_item.get("longitude")
|
||||
new_delay = location_item.get("delay")
|
||||
new_delay = 0 if new_delay is None else new_delay
|
||||
new_start = location_item.get("start")
|
||||
current_loc_id = self.context.get_current_loc_id()
|
||||
current_location_item = self.context.simulation_queue_data.get(
|
||||
current_loc_id) if current_loc_id else None
|
||||
current_latitude = (
|
||||
current_location_item.get("latitude")
|
||||
if isinstance(current_location_item, dict)
|
||||
else None
|
||||
)
|
||||
current_longitude = (
|
||||
current_location_item.get("longitude")
|
||||
if isinstance(current_location_item, dict)
|
||||
else None
|
||||
)
|
||||
current_start = (
|
||||
current_location_item.get("start")
|
||||
if isinstance(current_location_item, dict)
|
||||
else None
|
||||
)
|
||||
if self.context.set_location_enabled:
|
||||
while self.context.simulation_queue_data[loc_id]["delay"] > 0:
|
||||
if self.context.simulation_queue_state == "NEXT":
|
||||
self.context.simulation_queue_state = "RUNNING"
|
||||
break
|
||||
if self.context.simulation_queue_state == "SHUTDOWN":
|
||||
break
|
||||
while self.context.simulation_queue_state == "PAUSED":
|
||||
await asyncio.sleep(0.1)
|
||||
if self.context.simulation_queue_state == "SHUTDOWN":
|
||||
break
|
||||
if self.context.simulation_queue_state == "NEXT":
|
||||
self.context.simulation_queue_state = "RUNNING"
|
||||
break
|
||||
if self.context.simulation_queue_state == "SHUTDOWN":
|
||||
break
|
||||
self.context.next_move = location_item.get("delay") - 1
|
||||
self.context.simulation_queue_data[loc_id]["delay"] -= 1
|
||||
|
||||
# await self.context.sio.emit(
|
||||
# "simulation_status",
|
||||
# {
|
||||
# "status": self.context.simulation_active,
|
||||
# "loc_id": current_loc_id,
|
||||
# "latitude": current_latitude,
|
||||
# "longitude": current_longitude,
|
||||
# "start": current_start,
|
||||
# "next_move": self.context.next_move,
|
||||
# },
|
||||
# namespace="/",
|
||||
# )
|
||||
|
||||
await self._update_location_item(loc_id)
|
||||
await asyncio.sleep(1)
|
||||
if self.context.simulation_queue_state == "SHUTDOWN":
|
||||
self.context.simulation_queue_pending_ids.discard(loc_id)
|
||||
self.context.simulation_queue.task_done()
|
||||
break
|
||||
break
|
||||
self.context.simulation_queue_data[loc_id]["start"] = datetime.now(
|
||||
timezone.utc).isoformat()
|
||||
if current_loc_id is not None:
|
||||
self.context.simulation_queue_data[current_loc_id]["status"] = "done"
|
||||
self.context.simulation_queue_data[current_loc_id]["end"] = datetime.now(
|
||||
timezone.utc).isoformat()
|
||||
self.context.simulation_queue_data[current_loc_id]["delay"] = new_delay
|
||||
await self._update_location_item(current_loc_id)
|
||||
await self._update_queue_data()
|
||||
await self._stop_noise_task()
|
||||
@@ -2478,22 +2322,11 @@ class LocationSimulationTestQueue(LocationSimulationBase):
|
||||
self.context.simulation_queue_data[loc_id]["status"] = "set"
|
||||
await self._update_location_item(loc_id)
|
||||
self.context.set_current_loc_id(loc_id)
|
||||
active_loc_id = self.context.get_current_loc_id()
|
||||
await self._update_current_location(active_loc_id, new_latitude, new_longitude)
|
||||
if self.context.simulation_noise:
|
||||
self._start_noise_task(
|
||||
loc_id, new_latitude, new_longitude)
|
||||
active_loc_id = self.context.get_current_loc_id()
|
||||
# await self.context.sio.emit(
|
||||
# "simulation_status",
|
||||
# {
|
||||
# "status": self.context.simulation_active,
|
||||
# "loc_id": active_loc_id,
|
||||
# "latitude": new_latitude,
|
||||
# "longitude": new_longitude,
|
||||
# "start": new_start,
|
||||
# "next_move": None,
|
||||
# },
|
||||
# namespace="/",
|
||||
# )
|
||||
logger.info(
|
||||
"Set simulated location to %s, %s after %ss delay",
|
||||
new_latitude,
|
||||
|
||||
Reference in New Issue
Block a user