This commit is contained in:
2026-04-18 08:09:38 -04:00
parent e667af9201
commit da8716cccc
5 changed files with 116 additions and 105 deletions

View File

@@ -14,7 +14,6 @@ import httpx
from contextlib import suppress
from typing import Optional, Dict
from dotenv import load_dotenv
from sqlalchemy.util import await_only
with warnings.catch_warnings():
# Ignore: "Core Pydantic V1 functionality isn't compatible with Python 3.14 or greater."
@@ -205,7 +204,7 @@ class TunneldRunnerSio:
await self.context.sio.shutdown()
self.host = host
self.port = os.getenv("PORT")
self.port = port
self.protocol = protocol
self.context = context or LocationSimulationState()
self._tunneld_api_address = (
@@ -862,18 +861,18 @@ class TunneldRunnerSio:
async def resume_simulation_queue():
"""Resumes asyncio.simulation_queue playback"""
self.context.simulation_queue_state = "RUNNING"
update_queue_times()
await update_queue_times()
def advance_simulation_queue():
async def advance_simulation_queue():
self.context.simulation_queue_state = "NEXT"
update_queue_times()
await update_queue_times()
def update_queue_times():
async def update_queue_times():
current_loc_id = self.context.get_current_loc_id()
current_index = get_item_index(
current_loc_id) if current_loc_id else 0
remaining_items = self.context.simulation_queue_order[current_index + 1:]
new_delay = self.context.next_move or 0
new_delay = 0
now_time = datetime.now(timezone.utc)
for item in remaining_items:
item_delay = parse_delay_seconds(
@@ -881,10 +880,11 @@ class TunneldRunnerSio:
new_delay += item_delay
new_time = (
now_time + timedelta(seconds=new_delay)).isoformat()
self.context.simulation_queue_data[item].start = new_time
update_queue_data()
logger.info("Updating queue time for %s to %s", item, new_time)
self.context.simulation_queue_data[item]["start"] = new_time
await update_queue_data()
def update_queue_data():
async def update_queue_data():
data = {
"simulation_queue": {
"active": self.context.simulation_active,
@@ -894,7 +894,7 @@ class TunneldRunnerSio:
"worker_task": self.context.simulation_task.get_name() if self.context.simulation_task else None,
}
}
self.context.sio.emit("queue_data_update", {
await self.context.sio.emit("queue_data_update", {
"data": data}, namespace="/")
async def empty_simulation_queue():
@@ -924,6 +924,11 @@ class TunneldRunnerSio:
del self.context.simulation_queue_data[item_id]
def reset_queue():
self.context.simulation_queue_data = {}
self.context.simulation_queue_order = []
self.context.set_current_loc_id(None)
def remove_future_items():
current_loc_id = self.context.get_current_loc_id()
if current_loc_id and self.context.simulation_active and current_loc_id in self.context.simulation_queue_data:
self.context.simulation_queue_order = [current_loc_id]
@@ -1122,20 +1127,19 @@ class TunneldRunnerSio:
return data
async def get_reverse_geocode(data):
latitude = (
data.get("latitude")
latitude = float(
data.get("latitude", 999)
if isinstance(data, dict)
else getattr(data, "latitude", None)
else getattr(data, "latitude", 999)
)
longitude = (
data.get("longitude")
longitude = float(
data.get("longitude", 999)
if isinstance(data, dict)
else getattr(data, "longitude", None)
else getattr(data, "longitude", 999)
)
if latitude and longitude:
if latitude != 999 and longitude != 999:
coords = f"{latitude}, {longitude}"
rev_geocode = await self.context.reverse_geocode.get_address(latitude, longitude)
logger.info("Reverse Geocoded %s to %s", coords, rev_geocode)
return rev_geocode
else:
return None
@@ -1422,7 +1426,7 @@ class TunneldRunnerSio:
async def app_clear_queue() -> fastapi.Response:
"""Clear the simulation queue"""
logger.info("Simulation Start Requested ")
await empty_simulation_queue()
clear_future_items()
data = {"command_status": "OK", "command_class": "simulation_control", "command": "clear", "message": "Simulation cleared"}
return generate_http_response(data)
@@ -1644,7 +1648,7 @@ class TunneldRunnerSio:
try:
match command:
case "next":
resp = advance_simulation_queue()
resp = await advance_simulation_queue()
return resp
case "test-mode":
@@ -1664,7 +1668,7 @@ class TunneldRunnerSio:
case "clear":
""" Clear the simulation queue"""
await clear_future_items()
clear_future_items()
return {
"command": command,
"command_class": "simulation_control",
@@ -1741,6 +1745,8 @@ class TunneldRunnerSio:
if loc_id and key and value:
old_val = self.context.simulation_queue_data[loc_id][key]
self.context.simulation_queue_data[loc_id][key] = value
if key == "delay":
await update_queue_times()
logger.info(
"Location Item Update: %s: %s changed from %s to %s", loc_id, key, old_val, value
)
@@ -1933,6 +1939,12 @@ class TunneldRunnerSio:
"command_status": "ERROR",
"message": f"Unknown operation: {command}",
}
@self.context.sio.event
async def reverse_geocode(sid, data: LatLng):
logger.info("OSM Proxy Request from %s, data: %s", sid, data)
rev_geocode = await get_reverse_geocode(data)
return rev_geocode
self._vue_app.include_router(self._app, prefix="/api")
self._vue_app.mount(
@@ -1943,15 +1955,10 @@ class TunneldRunnerSio:
return FileResponse(self._vue_dist + "index.html")
def _run_app(self) -> None:
# api = uvicorn.Server(uvicorn.Config(self._app, host=self.host, port=49151, log_level="info"))
# vue = uvicorn.Server(uvicorn.Config(self._asgi_app, host=self.host, port=8087, log_level="info"))
# await asyncio.gather(api.serve(), vue.serve())
uvicorn.run(
self._asgi_app, host=self.host, port=self.port, loop="asyncio", workers=1
)
class LocationSimulationQueue(LocationSimulation):
def __init__(self, dvt, context: LocationSimulationState):
super().__init__(dvt)
@@ -2055,7 +2062,7 @@ class LocationSimulationQueue(LocationSimulation):
name=f"simulation-noise-{loc_id}",
)
def _update_queue_data(self):
async def _update_queue_data(self):
data = {
"simulation_queue": {
"active": self.context.simulation_active,
@@ -2065,7 +2072,7 @@ class LocationSimulationQueue(LocationSimulation):
"worker_task": self.context.simulation_task.get_name() if self.context.simulation_task else None,
}
}
self.context.sio.emit("queue_data_update", {
await self.context.sio.emit("queue_data_update", {
"data": data}, namespace="/")
async def _update_location_item(self, loc_id: str) -> None:
@@ -2168,18 +2175,18 @@ class LocationSimulationQueue(LocationSimulation):
break
self.context.next_move = location_item.get("delay") - 1
self.context.simulation_queue_data[loc_id]["delay"] -= 1
await self.context.sio.emit(
"simulation_status",
{
"status": self.context.simulation_active,
"loc_id": current_loc_id,
"latitude": current_latitude,
"longitude": current_longitude,
"start": current_start,
"next_move": self.context.next_move,
},
namespace="/",
)
# await self.context.sio.emit(
# "simulation_status",
# {
# "status": self.context.simulation_active,
# "loc_id": current_loc_id,
# "latitude": current_latitude,
# "longitude": current_longitude,
# "start": current_start,
# "next_move": self.context.next_move,
# },
# namespace="/",
# )
await self._update_location_item(loc_id)
await asyncio.sleep(1)
if self.context.simulation_queue_state == "SHUTDOWN":
@@ -2193,7 +2200,7 @@ class LocationSimulationQueue(LocationSimulation):
self.context.simulation_queue_data[current_loc_id]["end"] = datetime.now(
timezone.utc).isoformat()
await self._update_location_item(current_loc_id)
self._update_queue_data()
await self._update_queue_data()
await self._stop_noise_task()
await self.set(new_latitude, new_longitude)
self.context.simulation_queue_data[loc_id]["status"] = "set"
@@ -2202,25 +2209,25 @@ class LocationSimulationQueue(LocationSimulation):
self._start_noise_task(
loc_id, new_latitude, new_longitude)
active_loc_id = self.context.get_current_loc_id()
await self.context.sio.emit(
"simulation_status",
{
"status": self.context.simulation_active,
"loc_id": active_loc_id,
"latitude": new_latitude,
"longitude": new_longitude,
"start": new_start,
"next_move": None,
},
namespace="/",
)
# await self.context.sio.emit(
# "simulation_status",
# {
# "status": self.context.simulation_active,
# "loc_id": active_loc_id,
# "latitude": new_latitude,
# "longitude": new_longitude,
# "start": new_start,
# "next_move": None,
# },
# namespace="/",
# )
logger.info(
"Set simulated location to %s, %s after %ss delay",
new_latitude,
new_longitude,
new_delay,
)
self._update_queue_data()
await self._update_queue_data()
self.context.simulation_queue_pending_ids.discard(loc_id)
self.context.simulation_queue.task_done()
await self._enqueue_next_queue_item()
@@ -2326,7 +2333,7 @@ class LocationSimulationTestQueue(LocationSimulationBase):
name=f"simulation-noise-{loc_id}",
)
def _update_queue_data(self):
async def _update_queue_data(self):
data = {
"simulation_queue": {
"active": self.context.simulation_active,
@@ -2336,7 +2343,7 @@ class LocationSimulationTestQueue(LocationSimulationBase):
"worker_task": self.context.simulation_task.get_name() if self.context.simulation_task else None,
}
}
self.context.sio.emit("queue_data_update", {
await self.context.sio.emit("queue_data_update", {
"data": data}, namespace="/")
async def _update_location_item(self, loc_id: str) -> None:
@@ -2438,18 +2445,20 @@ class LocationSimulationTestQueue(LocationSimulationBase):
break
self.context.next_move = location_item.get("delay") - 1
self.context.simulation_queue_data[loc_id]["delay"] -= 1
await self.context.sio.emit(
"simulation_status",
{
"status": self.context.simulation_active,
"loc_id": current_loc_id,
"latitude": current_latitude,
"longitude": current_longitude,
"start": current_start,
"next_move": self.context.next_move,
},
namespace="/",
)
# await self.context.sio.emit(
# "simulation_status",
# {
# "status": self.context.simulation_active,
# "loc_id": current_loc_id,
# "latitude": current_latitude,
# "longitude": current_longitude,
# "start": current_start,
# "next_move": self.context.next_move,
# },
# namespace="/",
# )
await self._update_location_item(loc_id)
await asyncio.sleep(1)
if self.context.simulation_queue_state == "SHUTDOWN":
@@ -2463,37 +2472,37 @@ class LocationSimulationTestQueue(LocationSimulationBase):
self.context.simulation_queue_data[current_loc_id]["end"] = datetime.now(
timezone.utc).isoformat()
await self._update_location_item(current_loc_id)
self._update_queue_data()
await self._update_queue_data()
await self._stop_noise_task()
await self.set(new_latitude, new_longitude)
self.context.simulation_queue_data[loc_id]["status"] = "set"
await self._update_location_item(loc_id)
self.context.set_current_loc_id(loc_id)
if self.context.simulation_noise:
self._start_noise_talk(
self._start_noise_task(
loc_id, new_latitude, new_longitude)
active_loc_id = self.context.get_current_loc_id()
await self.context.sio.emit(
"simulation_status",
{
"status": self.context.simulation_active,
"loc_id": active_loc_id,
"latitude": new_latitude,
"longitude": new_longitude,
"start": new_start,
"next_move": None,
},
namespace="/",
)
# await self.context.sio.emit(
# "simulation_status",
# {
# "status": self.context.simulation_active,
# "loc_id": active_loc_id,
# "latitude": new_latitude,
# "longitude": new_longitude,
# "start": new_start,
# "next_move": None,
# },
# namespace="/",
# )
logger.info(
"Set simulated location to %s, %s after %ss delay",
new_latitude,
new_longitude,
new_delay,
)
self._update_queue_data()
await self._update_queue_data()
self.context.simulation_queue_pending_ids.discard(loc_id)
self.context.simulation_queue.task_done()
await self._enqueue_next_queue_item()
finally:
await self._stop_noise_talk()
await self._stop_noise_task()