Files
tueit_Transkriptor/api/state.py
T

37 lines
882 B
Python

import asyncio
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable
class Status(str, Enum):
IDLE = "idle"
RECORDING = "recording"
PROCESSING = "processing"
ERROR = "error"
@dataclass
class AppState:
status: Status = Status.IDLE
recording_user: str | None = None # which user triggered the current recording
last_error: str | None = None
_listeners: list[Callable] = field(default_factory=list, repr=False)
def subscribe(self, callback: Callable):
self._listeners.append(callback)
async def notify(self):
for cb in self._listeners:
if asyncio.iscoroutinefunction(cb):
await cb(self)
else:
cb(self)
async def set_status(self, status: Status):
self.status = status
await self.notify()
state = AppState()