import asyncio import json import os from pyicloud import PyiCloudService from pyicloud.exceptions import PyiCloud2FARequiredException class FindMyMonitor: def __init__(self, username, password, queue: asyncio.Queue, token_file="icloud_token.txt"): self.username = username self.password = password self.token_file = token_file self.queue = queue self.api = None self.device = None self.running = True async def authenticate(self): """Authenticates with iCloud, handling 2FA and token storage.""" if os.path.exists(self.token_file): print("Loading stored session...") self.api = PyiCloudService(self.username, cookie_directory="./cookies") else: print("No stored session. Authenticating...") self.api = PyiCloudService(self.username, self.password, cookie_directory="./cookies") if self.api.requires_2fa: print("Two-factor authentication required.") code = input("Enter the code you received: ") result = self.api.validate_2fa_code(code) print(f"Code validation result: {result}") if not result: print("Failed to verify 2FA code") return False # Trust the session self.api.trust_session() print("Successfully authenticated.") return True async def get_location(self): """Fetches the latest latitude and longitude.""" if not self.api: await self.authenticate() # Refresh API data self.api.refresh_client() # Find the device (modify name to match your iPhone name in iCloud) if not self.device: # Assuming you have devices, pick the first or match by name self.device = self.api.devices[0] print(f"Monitoring device: {self.device.name()}") location = self.device.location() if location: return location['latitude'], location['longitude'], location['timeStamp'] return None def start(self): self.running = True def stop(self): self.running = False async def run_monitor(self, interval=60): """Runs the monitor loop.""" if not await self.authenticate(): return if not self.running: self.start() while self.running: try: lat, lon, ts = await self.get_location() print(f"[{ts}] Location: {lat}, {lon}") # Add your logic to update database/API here await self.queue.put(lat, lng, ts) except Exception as e: print(f"Error: {e}") # Re-authenticate if session expired await self.authenticate() await asyncio.sleep(interval)