feat: app state module with status enum and subscriber pattern
This commit is contained in:
@@ -0,0 +1,36 @@
|
||||
import asyncio
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Callable
|
||||
|
||||
|
||||
class Status(str, Enum):
|
||||
IDLE = "idle"
|
||||
RECORDING = "recording"
|
||||
PROCESSING = "processing"
|
||||
ERROR = "error"
|
||||
|
||||
|
||||
@dataclass
|
||||
class AppState:
|
||||
status: Status = Status.IDLE
|
||||
recording_user: str | None = None # which user triggered the current recording
|
||||
last_error: str | None = None
|
||||
_listeners: list[Callable] = field(default_factory=list, repr=False)
|
||||
|
||||
def subscribe(self, callback: Callable):
|
||||
self._listeners.append(callback)
|
||||
|
||||
async def notify(self):
|
||||
for cb in self._listeners:
|
||||
if asyncio.iscoroutinefunction(cb):
|
||||
await cb(self)
|
||||
else:
|
||||
cb(self)
|
||||
|
||||
async def set_status(self, status: Status):
|
||||
self.status = status
|
||||
await self.notify()
|
||||
|
||||
|
||||
state = AppState()
|
||||
Reference in New Issue
Block a user