Skip to content

API reference

Core public surface for building Signal bots. See Getting started for setup and Advanced usage for patterns.

Quick usage

from signal_client import SignalClient, command


@command("!ping")
async def ping(ctx):
    await ctx.reply_text("pong")


client = SignalClient()
client.register(ping)
await client.start()
sequenceDiagram
    participant Bot
    participant API as signal-cli-rest-api
    Bot->>API: subscribe websocket
    API-->>Bot: message event
    Bot-->>Bot: handler via @command
    Bot->>API: REST reply

Runtime

The main class for interacting with the Signal API and processing messages.

This class orchestrates the various components of the signal-client, including API clients, message listeners, and worker pools.

Source code in signal_client/app/bot.py
class SignalClient:
    """The main class for interacting with the Signal API and processing messages.

    This class orchestrates the various components of the signal-client,
    including API clients, message listeners, and worker pools.
    """

    def __init__(
        self,
        config: dict | None = None,
        app: Application | None = None,
        header_provider: HeaderProvider | None = None,
    ) -> None:
        """Initialize the SignalClient.

        Args:
            config: A dictionary of configuration overrides.
            app: An optional pre-initialized Application instance.
            header_provider: An optional callable or object that provides
                             additional HTTP headers for API requests.

        """
        check_supported_versions()
        settings = Settings.from_sources(config=config)

        ensure_structlog_configured(redaction_enabled=settings.log_redaction_enabled)
        self.app = app or Application(settings, header_provider=header_provider)
        self.settings = settings
        self._commands: list[Command] = []
        self._registered_command_ids: set[int] = set()
        self._middleware: list[
            Callable[[Context, Callable[[Context], Awaitable[None]]], Awaitable[None]]
        ] = []

    def register(self, command: Command) -> None:
        """Register a new command with the client.

        Registered commands will be executed when matching incoming messages.

        Args:
            command: The Command instance to register.

        """
        self._commands.append(command)
        self._register_with_worker_pool(command)

    def use(
        self,
        middleware: Callable[
            [Context, Callable[[Context], Awaitable[None]]], Awaitable[None]
        ],
    ) -> None:
        """Register middleware to wrap command execution.

        Middleware functions are called before command execution and can
        modify the context or prevent command execution.

        Args:
            middleware: The middleware callable to register.

        """
        if middleware in self._middleware:
            return
        self._middleware.append(middleware)
        if self.app.worker_pool is not None:
            self.app.worker_pool.register_middleware(middleware)

    async def __aenter__(self) -> Self:
        """Asynchronous context manager entry."""
        await self.app.initialize()
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        tb: TracebackType | None,
    ) -> None:
        """Asynchronous context manager exit.

        Ensures proper shutdown of the client.
        """
        await self.shutdown()

    async def start(self) -> None:
        """Start the SignalClient, including message listening and worker processing.

        This method will block indefinitely until the client is shut down.
        """
        await self.app.initialize()
        if self.app.worker_pool is None or self.app.message_service is None:
            message = "Runtime not initialized. Call await app.initialize() first."
            raise RuntimeError(message)
        worker_pool = self.app.worker_pool
        message_service = self.app.message_service

        for command in self._commands:
            self._register_with_worker_pool(command)

        for middleware in self._middleware:
            worker_pool.register_middleware(middleware)

        worker_pool.start()

        try:
            await asyncio.gather(
                message_service.listen(),
                worker_pool.join(),
            )
        finally:
            await self.shutdown()

    async def shutdown(self) -> None:
        """Shut down the SignalClient gracefully.

        This involves closing the websocket, waiting for queues to empty,
        stopping workers, and closing the aiohttp session.
        """
        # 1. Stop accepting new messages
        if self.app.websocket_client is not None:
            await self.app.websocket_client.close()

        # 2. Wait for the queue to be empty
        if self.app.queue is not None:
            await self.app.queue.join()

        # 3. Stop the workers
        if self.app.worker_pool is not None:
            self.app.worker_pool.stop()
            await self.app.worker_pool.join()

        # 4. Close the session and shutdown resources
        if self.app.session is not None:
            await self.app.session.close()
        close_storage = getattr(self.app.storage, "close", None)
        if close_storage is not None:
            await close_storage()

    def _register_with_worker_pool(self, command: Command) -> None:
        """Register a command with the worker pool if not already present.

        Args:
            command: The command to register.

        """
        if id(command) in self._registered_command_ids:
            return

        if self.app.worker_pool is None:
            return

        self.app.worker_pool.register(command)
        self._registered_command_ids.add(id(command))

    @property
    def queue(self) -> asyncio.Queue[QueuedMessage]:
        """Get the message queue.

        Raises:
            RuntimeError: If the application is not initialized.

        Returns:
            The asyncio.Queue for messages.

        """
        if self.app.queue is None:
            message = "Runtime not initialized. Call await app.initialize() first."
            raise RuntimeError(message)
        return self.app.queue

    @property
    def worker_pool(self) -> WorkerPool:
        """Get the worker pool.

        Raises:
            RuntimeError: If the application is not initialized.

        Returns:
            The WorkerPool instance.

        """
        if self.app.worker_pool is None:
            message = "Runtime not initialized. Call await app.initialize() first."
            raise RuntimeError(message)
        return self.app.worker_pool

    @property
    def api_clients(self) -> APIClients:
        """Get the API clients.

        Raises:
            RuntimeError: If the application is not initialized.

        Returns:
            The APIClients instance.

        """
        if self.app.api_clients is None:
            message = "Runtime not initialized. Call await app.initialize() first."
            raise RuntimeError(message)
        return self.app.api_clients

    @property
    def websocket_client(self) -> WebSocketClient:
        """Get the WebSocket client.

        Raises:
            RuntimeError: If the application is not initialized.

        Returns:
            The WebSocketClient instance.

        """
        if self.app.websocket_client is None:
            message = "Runtime not initialized. Call await app.initialize() first."
            raise RuntimeError(message)
        return self.app.websocket_client

    @property
    def message_service(self) -> MessageService:
        """Get the message service.

        Raises:
            RuntimeError: If the application is not initialized.

        Returns:
            The MessageService instance.

        """
        if self.app.message_service is None:
            message = "Runtime not initialized. Call await app.initialize() first."
            raise RuntimeError(message)
        return self.app.message_service

    async def set_websocket_client(self, websocket_client: WebSocketClient) -> None:
        """Override the websocket client (primarily for testing purposes).

        Args:
            websocket_client: The new WebSocketClient instance to use.

        """
        await self.app.initialize()
        self.app.websocket_client = websocket_client
        if self.app.message_service is not None:
            self.app.message_service.set_websocket_client(websocket_client)

queue property

queue: Queue[QueuedMessage]

Get the message queue.

Raises:

Type Description
RuntimeError

If the application is not initialized.

Returns:

Type Description
Queue[QueuedMessage]

The asyncio.Queue for messages.

worker_pool property

worker_pool: WorkerPool

Get the worker pool.

Raises:

Type Description
RuntimeError

If the application is not initialized.

Returns:

Type Description
WorkerPool

The WorkerPool instance.

api_clients property

api_clients: APIClients

Get the API clients.

Raises:

Type Description
RuntimeError

If the application is not initialized.

Returns:

Type Description
APIClients

The APIClients instance.

websocket_client property

websocket_client: WebSocketClient

Get the WebSocket client.

Raises:

Type Description
RuntimeError

If the application is not initialized.

Returns:

Type Description
WebSocketClient

The WebSocketClient instance.

message_service property

message_service: MessageService

Get the message service.

Raises:

Type Description
RuntimeError

If the application is not initialized.

Returns:

Type Description
MessageService

The MessageService instance.

__init__

__init__(
    config: dict | None = None,
    app: Application | None = None,
    header_provider: HeaderProvider | None = None,
) -> None

Initialize the SignalClient.

Parameters:

Name Type Description Default
config dict | None

A dictionary of configuration overrides.

None
app Application | None

An optional pre-initialized Application instance.

None
header_provider HeaderProvider | None

An optional callable or object that provides additional HTTP headers for API requests.

None
Source code in signal_client/app/bot.py
def __init__(
    self,
    config: dict | None = None,
    app: Application | None = None,
    header_provider: HeaderProvider | None = None,
) -> None:
    """Initialize the SignalClient.

    Args:
        config: A dictionary of configuration overrides.
        app: An optional pre-initialized Application instance.
        header_provider: An optional callable or object that provides
                         additional HTTP headers for API requests.

    """
    check_supported_versions()
    settings = Settings.from_sources(config=config)

    ensure_structlog_configured(redaction_enabled=settings.log_redaction_enabled)
    self.app = app or Application(settings, header_provider=header_provider)
    self.settings = settings
    self._commands: list[Command] = []
    self._registered_command_ids: set[int] = set()
    self._middleware: list[
        Callable[[Context, Callable[[Context], Awaitable[None]]], Awaitable[None]]
    ] = []

register

register(command: Command) -> None

Register a new command with the client.

Registered commands will be executed when matching incoming messages.

Parameters:

Name Type Description Default
command Command

The Command instance to register.

required
Source code in signal_client/app/bot.py
def register(self, command: Command) -> None:
    """Register a new command with the client.

    Registered commands will be executed when matching incoming messages.

    Args:
        command: The Command instance to register.

    """
    self._commands.append(command)
    self._register_with_worker_pool(command)

use

use(
    middleware: Callable[
        [Context, Callable[[Context], Awaitable[None]]],
        Awaitable[None],
    ],
) -> None

Register middleware to wrap command execution.

Middleware functions are called before command execution and can modify the context or prevent command execution.

Parameters:

Name Type Description Default
middleware Callable[[Context, Callable[[Context], Awaitable[None]]], Awaitable[None]]

The middleware callable to register.

required
Source code in signal_client/app/bot.py
def use(
    self,
    middleware: Callable[
        [Context, Callable[[Context], Awaitable[None]]], Awaitable[None]
    ],
) -> None:
    """Register middleware to wrap command execution.

    Middleware functions are called before command execution and can
    modify the context or prevent command execution.

    Args:
        middleware: The middleware callable to register.

    """
    if middleware in self._middleware:
        return
    self._middleware.append(middleware)
    if self.app.worker_pool is not None:
        self.app.worker_pool.register_middleware(middleware)

__aenter__ async

__aenter__() -> Self

Asynchronous context manager entry.

Source code in signal_client/app/bot.py
async def __aenter__(self) -> Self:
    """Asynchronous context manager entry."""
    await self.app.initialize()
    return self

__aexit__ async

__aexit__(
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None

Asynchronous context manager exit.

Ensures proper shutdown of the client.

Source code in signal_client/app/bot.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc: BaseException | None,
    tb: TracebackType | None,
) -> None:
    """Asynchronous context manager exit.

    Ensures proper shutdown of the client.
    """
    await self.shutdown()

start async

start() -> None

Start the SignalClient, including message listening and worker processing.

This method will block indefinitely until the client is shut down.

Source code in signal_client/app/bot.py
async def start(self) -> None:
    """Start the SignalClient, including message listening and worker processing.

    This method will block indefinitely until the client is shut down.
    """
    await self.app.initialize()
    if self.app.worker_pool is None or self.app.message_service is None:
        message = "Runtime not initialized. Call await app.initialize() first."
        raise RuntimeError(message)
    worker_pool = self.app.worker_pool
    message_service = self.app.message_service

    for command in self._commands:
        self._register_with_worker_pool(command)

    for middleware in self._middleware:
        worker_pool.register_middleware(middleware)

    worker_pool.start()

    try:
        await asyncio.gather(
            message_service.listen(),
            worker_pool.join(),
        )
    finally:
        await self.shutdown()

shutdown async

shutdown() -> None

Shut down the SignalClient gracefully.

This involves closing the websocket, waiting for queues to empty, stopping workers, and closing the aiohttp session.

Source code in signal_client/app/bot.py
async def shutdown(self) -> None:
    """Shut down the SignalClient gracefully.

    This involves closing the websocket, waiting for queues to empty,
    stopping workers, and closing the aiohttp session.
    """
    # 1. Stop accepting new messages
    if self.app.websocket_client is not None:
        await self.app.websocket_client.close()

    # 2. Wait for the queue to be empty
    if self.app.queue is not None:
        await self.app.queue.join()

    # 3. Stop the workers
    if self.app.worker_pool is not None:
        self.app.worker_pool.stop()
        await self.app.worker_pool.join()

    # 4. Close the session and shutdown resources
    if self.app.session is not None:
        await self.app.session.close()
    close_storage = getattr(self.app.storage, "close", None)
    if close_storage is not None:
        await close_storage()

set_websocket_client async

set_websocket_client(
    websocket_client: WebSocketClient,
) -> None

Override the websocket client (primarily for testing purposes).

Parameters:

Name Type Description Default
websocket_client WebSocketClient

The new WebSocketClient instance to use.

required
Source code in signal_client/app/bot.py
async def set_websocket_client(self, websocket_client: WebSocketClient) -> None:
    """Override the websocket client (primarily for testing purposes).

    Args:
        websocket_client: The new WebSocketClient instance to use.

    """
    await self.app.initialize()
    self.app.websocket_client = websocket_client
    if self.app.message_service is not None:
        self.app.message_service.set_websocket_client(websocket_client)

Explicit wiring of Signal client runtime components.

This class is responsible for initializing and managing the lifecycle of all components within the Signal client, including API clients, storage, message queues, and worker pools.

Source code in signal_client/app/application.py
class Application:
    """Explicit wiring of Signal client runtime components.

    This class is responsible for initializing and managing the lifecycle
    of all components within the Signal client, including API clients,
    storage, message queues, and worker pools.
    """

    def __init__(
        self, settings: Settings, *, header_provider: HeaderProvider | None = None
    ) -> None:
        """Initialize the Application instance.

        Args:
            settings: The application settings.
            header_provider: An optional callable or object that provides
                             additional HTTP headers for API requests.

        """
        ensure_structlog_configured(redaction_enabled=settings.log_redaction_enabled)
        self._log = structlog.get_logger()
        self.settings = settings
        self._header_provider = header_provider
        self.session: aiohttp.ClientSession | None = None
        self.rate_limiter = RateLimiter(
            rate_limit=settings.rate_limit, period=settings.rate_limit_period
        )
        self.circuit_breaker = CircuitBreaker(
            failure_threshold=settings.circuit_breaker_failure_threshold,
            reset_timeout=settings.circuit_breaker_reset_timeout,
            failure_rate_threshold=settings.circuit_breaker_failure_rate_threshold,
            min_requests_for_rate_calc=(
                settings.circuit_breaker_min_requests_for_rate_calc
            ),
        )

        self.storage = self._create_storage()
        if isinstance(self.storage, MemoryStorage):
            self._log_warning(
                "Using transient in-memory storage. No data will be persisted.",
                event_slug="storage.in_memory.active",
            )

        self.api_clients: APIClients | None = None

        self.queue: asyncio.Queue[QueuedMessage] | None = None
        self.websocket_client: WebSocketClient | None = None
        self.dead_letter_queue: DeadLetterQueue | None = None
        self.persistent_queue: PersistentQueue | None = None
        self.ingest_checkpoint_store: IngestCheckpointStore | None = None
        self.intake_controller: IntakeController | None = None
        self.message_parser = MessageParser()
        self.lock_manager: LockManager | None = None
        self.context_dependencies: ContextDependencies | None = None
        self.context_factory: Callable[[Message], Context] | None = None
        self.message_service: MessageService | None = None
        self.worker_pool: WorkerPool | None = None
        self._circuit_state_lock: asyncio.Lock | None = None
        self._open_circuit_endpoints: set[str] = set()

    def _log_warning(self, message: str, **kwargs: object) -> None:
        """Emit a warning, tolerating minimal structlog configurations.

        Falls back to stdlib logging if the current structlog logger does not
        accept keyword arguments (e.g., when using PrintLogger).
        """
        safe_log(self._log, "warning", message, **kwargs)

    async def initialize(self) -> None:
        """Initialize all components of the application.

        This method sets up the AIOHTTP client session, API clients,
        message queues, storage, and worker pools. It must be called
        before the application can start processing messages.
        """
        if self.queue is not None:
            return

        self.session = aiohttp.ClientSession()
        self.api_clients = self._create_api_clients(self.session)

        self.queue = asyncio.Queue(maxsize=self.settings.queue_size)
        if self.settings.durable_queue_enabled:
            self.persistent_queue = PersistentQueue(
                storage=self.storage,
                key=self.settings.ingest_queue_name,
                max_length=self.settings.durable_queue_max_length,
            )
        self.ingest_checkpoint_store = IngestCheckpointStore(
            storage=self.storage,
            key=self.settings.ingest_checkpoint_key,
            window_size=self.settings.ingest_checkpoint_window,
        )
        self.intake_controller = IntakeController(
            default_pause_seconds=self.settings.ingest_pause_seconds
        )
        self.rate_limiter.set_wait_listener(self._handle_rate_limit_wait)
        self._circuit_state_lock = asyncio.Lock()
        self.websocket_client = WebSocketClient(
            signal_service_url=self.settings.signal_service,
            phone_number=self.settings.phone_number,
            websocket_path=self.settings.websocket_path,
        )
        redis_client = (
            self.storage.client
            if self.settings.distributed_locks_enabled
            and isinstance(self.storage, RedisStorage)
            else None
        )
        self.lock_manager = LockManager(
            redis_client=redis_client,
            lock_timeout_seconds=self.settings.distributed_lock_timeout,
        )
        self.dead_letter_queue = DeadLetterQueue(
            storage=self.storage,
            queue_name=self.settings.dlq_name,
            max_retries=self.settings.dlq_max_retries,
        )
        self.context_dependencies = ContextDependencies(
            accounts_client=self.api_clients.accounts,
            attachments_client=self.api_clients.attachments,
            contacts_client=self.api_clients.contacts,
            devices_client=self.api_clients.devices,
            general_client=self.api_clients.general,
            groups_client=self.api_clients.groups,
            identities_client=self.api_clients.identities,
            messages_client=self.api_clients.messages,
            profiles_client=self.api_clients.profiles,
            reactions_client=self.api_clients.reactions,
            receipts_client=self.api_clients.receipts,
            search_client=self.api_clients.search,
            sticker_packs_client=self.api_clients.sticker_packs,
            lock_manager=self.lock_manager,
            phone_number=self.settings.phone_number,
            settings=self.settings,
        )
        self.context_factory = partial(Context, dependencies=self.context_dependencies)
        self.message_service = MessageService(
            websocket_client=self.websocket_client,
            queue=self.queue,
            dead_letter_queue=self.dead_letter_queue,
            persistent_queue=self.persistent_queue,
            intake_controller=self.intake_controller,
            enqueue_timeout=self.settings.queue_put_timeout,
            backpressure_policy=(
                BackpressurePolicy.DROP_OLDEST
                if self.settings.queue_drop_oldest_on_timeout
                else BackpressurePolicy.FAIL_FAST
            ),
        )
        self.circuit_breaker.register_state_listener(self._handle_circuit_state_change)
        self.worker_pool = WorkerPool(
            context_factory=self.context_factory,
            queue=self.queue,
            message_parser=self.message_parser,
            dead_letter_queue=self.dead_letter_queue,
            checkpoint_store=self.ingest_checkpoint_store,
            pool_size=self.settings.worker_pool_size,
            shard_count=self.settings.worker_shard_count,
            lock_manager=self.lock_manager,
        )
        if self.persistent_queue:
            replay = await self.persistent_queue.replay()
            for item in replay:
                queued = QueuedMessage(raw=item.raw, enqueued_at=item.enqueued_at)
                try:
                    self.queue.put_nowait(queued)
                except asyncio.QueueFull:
                    self._log_warning(
                        "persistent_queue.replay_dropped",
                        reason="queue_full",
                        queue_depth=self.queue.qsize(),
                        queue_maxsize=self.queue.maxsize,
                    )
                    break

    def _create_storage(self) -> Storage:
        """Create and return the appropriate storage backend based on settings.

        Returns:
            An instance of a concrete Storage implementation (RedisStorage,
            SQLiteStorage, or MemoryStorage).

        """
        storage_type = self.settings.storage_type.lower()
        if storage_type == "redis":
            return RedisStorage(
                host=self.settings.redis_host,
                port=self.settings.redis_port,
            )
        if storage_type == "sqlite":
            return SQLiteStorage(database=self.settings.sqlite_database)
        return MemoryStorage()

    def _create_api_clients(self, session: aiohttp.ClientSession) -> APIClients:
        """Create and return a collection of API clients.

        Args:
            session: The aiohttp client session to use for requests.

        Returns:
            An APIClients instance containing all initialized API clients.

        """
        client_config = ClientConfig(
            session=session,
            base_url=self.settings.base_url,
            retries=self.settings.api_retries,
            backoff_factor=self.settings.api_backoff_factor,
            timeout=self.settings.api_timeout,
            rate_limiter=self.rate_limiter,
            circuit_breaker=self.circuit_breaker,
            default_headers=self._default_api_headers(),
            header_provider=self._header_provider,
            endpoint_timeouts=self.settings.api_endpoint_timeouts,
            idempotency_header_name=self.settings.api_idempotency_header,
        )
        return APIClients(
            accounts=AccountsClient(client_config=client_config),
            attachments=AttachmentsClient(client_config=client_config),
            contacts=ContactsClient(client_config=client_config),
            devices=DevicesClient(client_config=client_config),
            general=GeneralClient(client_config=client_config),
            groups=GroupsClient(client_config=client_config),
            identities=IdentitiesClient(client_config=client_config),
            messages=MessagesClient(client_config=client_config),
            profiles=ProfilesClient(client_config=client_config),
            reactions=ReactionsClient(client_config=client_config),
            receipts=ReceiptsClient(client_config=client_config),
            search=SearchClient(client_config=client_config),
            sticker_packs=StickerPacksClient(client_config=client_config),
        )

    def _default_api_headers(self) -> dict[str, str]:
        """Construct a dictionary of default headers for API requests.

        Includes authorization header if an API token is configured.

        Returns:
            A dictionary of HTTP headers.

        """
        headers = dict(self.settings.api_default_headers)
        token = (self.settings.api_auth_token or "").strip()
        if token:
            scheme = (self.settings.api_auth_scheme or "").strip()
            auth_value = f"{scheme} {token}".strip() if scheme else token
            headers.setdefault("Authorization", auth_value)
        return headers

    async def shutdown(self) -> None:
        """Shut down the application gracefully."""
        if self.websocket_client is not None:
            await self.websocket_client.close()
        if self.session is not None:
            await self.session.close()
        close_storage = getattr(self.storage, "close", None)
        if close_storage is not None:
            await close_storage()

    async def _handle_circuit_state_change(
        self, endpoint: str, state: CircuitBreakerState
    ) -> None:
        """Handle changes in the circuit breaker state for a given endpoint.

        If a circuit opens, the intake controller may be paused. If all
        circuits close, the intake controller may be resumed.

        Args:
            endpoint: The API endpoint whose circuit breaker state changed.
            state: The new state of the circuit breaker.

        """
        if self.intake_controller is None or self._circuit_state_lock is None:
            return

        pause = False
        resume = False
        pause_duration = float(
            max(
                self.settings.ingest_pause_seconds,
                self.settings.circuit_breaker_reset_timeout,
            )
        )
        async with self._circuit_state_lock:
            if state is CircuitBreakerState.OPEN:
                self._open_circuit_endpoints.add(endpoint)
                pause = True
            elif state is CircuitBreakerState.HALF_OPEN:
                self._open_circuit_endpoints.add(endpoint)
            elif state is CircuitBreakerState.CLOSED:
                self._open_circuit_endpoints.discard(endpoint)
                if not self._open_circuit_endpoints:
                    resume = True

        if pause:
            await self.intake_controller.pause(
                reason="circuit_open", duration=pause_duration
            )
        elif resume:
            await self.intake_controller.resume_now()

    async def _handle_rate_limit_wait(self, wait_time: float) -> None:
        """Handle a rate limit wait by pausing the intake controller.

        Args:
            wait_time: The duration (in seconds) to wait due to rate limiting.

        """
        if self.intake_controller is None:
            return

        pause_for = max(wait_time, self.settings.ingest_pause_seconds)
        await self.intake_controller.pause(reason="rate_limited", duration=pause_for)

__init__

__init__(
    settings: Settings,
    *,
    header_provider: HeaderProvider | None = None
) -> None

Initialize the Application instance.

Parameters:

Name Type Description Default
settings Settings

The application settings.

required
header_provider HeaderProvider | None

An optional callable or object that provides additional HTTP headers for API requests.

None
Source code in signal_client/app/application.py
def __init__(
    self, settings: Settings, *, header_provider: HeaderProvider | None = None
) -> None:
    """Initialize the Application instance.

    Args:
        settings: The application settings.
        header_provider: An optional callable or object that provides
                         additional HTTP headers for API requests.

    """
    ensure_structlog_configured(redaction_enabled=settings.log_redaction_enabled)
    self._log = structlog.get_logger()
    self.settings = settings
    self._header_provider = header_provider
    self.session: aiohttp.ClientSession | None = None
    self.rate_limiter = RateLimiter(
        rate_limit=settings.rate_limit, period=settings.rate_limit_period
    )
    self.circuit_breaker = CircuitBreaker(
        failure_threshold=settings.circuit_breaker_failure_threshold,
        reset_timeout=settings.circuit_breaker_reset_timeout,
        failure_rate_threshold=settings.circuit_breaker_failure_rate_threshold,
        min_requests_for_rate_calc=(
            settings.circuit_breaker_min_requests_for_rate_calc
        ),
    )

    self.storage = self._create_storage()
    if isinstance(self.storage, MemoryStorage):
        self._log_warning(
            "Using transient in-memory storage. No data will be persisted.",
            event_slug="storage.in_memory.active",
        )

    self.api_clients: APIClients | None = None

    self.queue: asyncio.Queue[QueuedMessage] | None = None
    self.websocket_client: WebSocketClient | None = None
    self.dead_letter_queue: DeadLetterQueue | None = None
    self.persistent_queue: PersistentQueue | None = None
    self.ingest_checkpoint_store: IngestCheckpointStore | None = None
    self.intake_controller: IntakeController | None = None
    self.message_parser = MessageParser()
    self.lock_manager: LockManager | None = None
    self.context_dependencies: ContextDependencies | None = None
    self.context_factory: Callable[[Message], Context] | None = None
    self.message_service: MessageService | None = None
    self.worker_pool: WorkerPool | None = None
    self._circuit_state_lock: asyncio.Lock | None = None
    self._open_circuit_endpoints: set[str] = set()

initialize async

initialize() -> None

Initialize all components of the application.

This method sets up the AIOHTTP client session, API clients, message queues, storage, and worker pools. It must be called before the application can start processing messages.

Source code in signal_client/app/application.py
async def initialize(self) -> None:
    """Initialize all components of the application.

    This method sets up the AIOHTTP client session, API clients,
    message queues, storage, and worker pools. It must be called
    before the application can start processing messages.
    """
    if self.queue is not None:
        return

    self.session = aiohttp.ClientSession()
    self.api_clients = self._create_api_clients(self.session)

    self.queue = asyncio.Queue(maxsize=self.settings.queue_size)
    if self.settings.durable_queue_enabled:
        self.persistent_queue = PersistentQueue(
            storage=self.storage,
            key=self.settings.ingest_queue_name,
            max_length=self.settings.durable_queue_max_length,
        )
    self.ingest_checkpoint_store = IngestCheckpointStore(
        storage=self.storage,
        key=self.settings.ingest_checkpoint_key,
        window_size=self.settings.ingest_checkpoint_window,
    )
    self.intake_controller = IntakeController(
        default_pause_seconds=self.settings.ingest_pause_seconds
    )
    self.rate_limiter.set_wait_listener(self._handle_rate_limit_wait)
    self._circuit_state_lock = asyncio.Lock()
    self.websocket_client = WebSocketClient(
        signal_service_url=self.settings.signal_service,
        phone_number=self.settings.phone_number,
        websocket_path=self.settings.websocket_path,
    )
    redis_client = (
        self.storage.client
        if self.settings.distributed_locks_enabled
        and isinstance(self.storage, RedisStorage)
        else None
    )
    self.lock_manager = LockManager(
        redis_client=redis_client,
        lock_timeout_seconds=self.settings.distributed_lock_timeout,
    )
    self.dead_letter_queue = DeadLetterQueue(
        storage=self.storage,
        queue_name=self.settings.dlq_name,
        max_retries=self.settings.dlq_max_retries,
    )
    self.context_dependencies = ContextDependencies(
        accounts_client=self.api_clients.accounts,
        attachments_client=self.api_clients.attachments,
        contacts_client=self.api_clients.contacts,
        devices_client=self.api_clients.devices,
        general_client=self.api_clients.general,
        groups_client=self.api_clients.groups,
        identities_client=self.api_clients.identities,
        messages_client=self.api_clients.messages,
        profiles_client=self.api_clients.profiles,
        reactions_client=self.api_clients.reactions,
        receipts_client=self.api_clients.receipts,
        search_client=self.api_clients.search,
        sticker_packs_client=self.api_clients.sticker_packs,
        lock_manager=self.lock_manager,
        phone_number=self.settings.phone_number,
        settings=self.settings,
    )
    self.context_factory = partial(Context, dependencies=self.context_dependencies)
    self.message_service = MessageService(
        websocket_client=self.websocket_client,
        queue=self.queue,
        dead_letter_queue=self.dead_letter_queue,
        persistent_queue=self.persistent_queue,
        intake_controller=self.intake_controller,
        enqueue_timeout=self.settings.queue_put_timeout,
        backpressure_policy=(
            BackpressurePolicy.DROP_OLDEST
            if self.settings.queue_drop_oldest_on_timeout
            else BackpressurePolicy.FAIL_FAST
        ),
    )
    self.circuit_breaker.register_state_listener(self._handle_circuit_state_change)
    self.worker_pool = WorkerPool(
        context_factory=self.context_factory,
        queue=self.queue,
        message_parser=self.message_parser,
        dead_letter_queue=self.dead_letter_queue,
        checkpoint_store=self.ingest_checkpoint_store,
        pool_size=self.settings.worker_pool_size,
        shard_count=self.settings.worker_shard_count,
        lock_manager=self.lock_manager,
    )
    if self.persistent_queue:
        replay = await self.persistent_queue.replay()
        for item in replay:
            queued = QueuedMessage(raw=item.raw, enqueued_at=item.enqueued_at)
            try:
                self.queue.put_nowait(queued)
            except asyncio.QueueFull:
                self._log_warning(
                    "persistent_queue.replay_dropped",
                    reason="queue_full",
                    queue_depth=self.queue.qsize(),
                    queue_maxsize=self.queue.maxsize,
                )
                break

shutdown async

shutdown() -> None

Shut down the application gracefully.

Source code in signal_client/app/application.py
async def shutdown(self) -> None:
    """Shut down the application gracefully."""
    if self.websocket_client is not None:
        await self.websocket_client.close()
    if self.session is not None:
        await self.session.close()
    close_storage = getattr(self.storage, "close", None)
    if close_storage is not None:
        await close_storage()

Commands and context

signal_client.core.command.command

command(
    *triggers: str | Pattern,
    whitelisted: Sequence[str] | None = None,
    case_sensitive: bool = False,
    name: str | None = None,
    description: str | None = None,
    usage: str | None = None
) -> Callable[
    [Callable[[Context], Awaitable[None]]], Command
]

Define a new command via decorator.

This decorator simplifies the creation of Command instances by allowing you to define triggers, whitelisted senders, and other metadata directly on the handler function.

Parameters:

Name Type Description Default
*triggers str | Pattern

One or more strings or regular expressions that will trigger this command.

()
whitelisted Sequence[str] | None

An optional list of sender IDs that are allowed to execute this command.

None
case_sensitive bool

If True, string triggers will be matched case-sensitively.

False
name str | None

An optional name for the command.

None
description str | None

An optional description for the command.

None
usage str | None

Optional usage instructions for the command.

None

Returns:

Type Description
Callable[[Callable[[Context], Awaitable[None]]], Command]

A decorator that transforms an asynchronous function into a Command object.

Raises:

Type Description
ValueError

If no triggers are provided.

Source code in signal_client/core/command.py
def command(
    *triggers: str | re.Pattern,
    whitelisted: Sequence[str] | None = None,
    case_sensitive: bool = False,
    name: str | None = None,
    description: str | None = None,
    usage: str | None = None,
) -> Callable[[Callable[[Context], Awaitable[None]]], Command]:
    """Define a new command via decorator.

    This decorator simplifies the creation of Command instances by allowing
    you to define triggers, whitelisted senders, and other metadata directly
    on the handler function.

    Args:
        *triggers: One or more strings or regular expressions that will
                   trigger this command.
        whitelisted: An optional list of sender IDs that are allowed to
                     execute this command.
        case_sensitive: If True, string triggers will be matched case-sensitively.
        name: An optional name for the command.
        description: An optional description for the command.
        usage: Optional usage instructions for the command.

    Returns:
        A decorator that transforms an asynchronous function into a Command object.

    Raises:
        ValueError: If no triggers are provided.

    """
    if not triggers:
        message = "At least one trigger must be provided."
        raise ValueError(message)

    metadata = CommandMetadata(name=name, description=description, usage=usage)

    def decorator(handler: Callable[[Context], Awaitable[None]]) -> Command:
        cmd = Command(
            triggers=list(triggers),
            whitelisted=list(whitelisted) if whitelisted is not None else None,
            case_sensitive=case_sensitive,
            metadata=metadata,
        )
        return cmd.with_handler(handler)

    return decorator

Provide helpers for command handlers interacting with the Signal API.

Instances of this class are passed to command handler functions, encapsulating the incoming message and all necessary API clients and utilities.

Source code in signal_client/core/context.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
class Context:
    """Provide helpers for command handlers interacting with the Signal API.

    Instances of this class are passed to command handler functions,
    encapsulating the incoming message and all necessary API clients
    and utilities.
    """

    def __init__(
        self,
        message: Message,
        dependencies: ContextDependencies,
    ) -> None:
        """Initialize a Context instance.

        Args:
            message: The incoming message that triggered the command.
            dependencies: An instance of ContextDependencies providing
                          access to various clients and managers.

        """
        self.message = message
        self.accounts = dependencies.accounts_client
        self.attachments = dependencies.attachments_client
        self.contacts = dependencies.contacts_client
        self.devices = dependencies.devices_client
        self.general = dependencies.general_client
        self.groups = dependencies.groups_client
        self.identities = dependencies.identities_client
        self.messages = dependencies.messages_client
        self.profiles = dependencies.profiles_client
        self.reactions = dependencies.reactions_client
        self.receipts = dependencies.receipts_client
        self.search = dependencies.search_client
        self.sticker_packs = dependencies.sticker_packs_client
        self.settings = dependencies.settings
        self._phone_number = dependencies.phone_number
        self._lock_manager = dependencies.lock_manager

        self._attachment_downloader = AttachmentDownloader(self.attachments)

    async def send(self, request: SendMessageRequest) -> SendMessageResponse | None:
        """Send a message to a recipient.

        Args:
            request: The SendMessageRequest object containing message details.

        Returns:
            A SendMessageResponse object if successful, otherwise None.

        """
        normalized = self._prepare_send_request(request)
        request_options = (
            RequestOptions(idempotency_key=normalized.idempotency_key)
            if normalized.idempotency_key
            else None
        )
        response = await self.messages.send(
            normalized.model_dump(exclude_none=True, by_alias=True),
            request_options=request_options,
        )
        return SendMessageResponse.from_raw(response)

    async def reply(self, request: SendMessageRequest) -> SendMessageResponse | None:
        """Reply to the incoming message, quoting it.

        The original message's author, content, and timestamp are
        automatically included in the quote.

        Args:
            request: The SendMessageRequest object containing message details.

        Returns:
            A SendMessageResponse object if successful, otherwise None.

        """
        request.quote_author = self.message.source
        request.quote_message = self.message.message
        request.quote_timestamp = self.message.timestamp
        return await self.send(request)

    async def send_text(  # noqa: PLR0913
        self,
        text: str,
        *,
        recipients: Sequence[str] | None = None,
        mentions: list[MessageMention] | None = None,
        quote_mentions: list[MessageMention] | None = None,
        base64_attachments: list[str] | None = None,
        link_preview: LinkPreview | None = None,
        text_mode: Literal["normal", "styled"] | None = None,
        notify_self: bool | None = None,
        edit_timestamp: int | None = None,
        sticker: str | None = None,
        view_once: bool = False,
    ) -> SendMessageResponse | None:
        """Send a plain text message.

        Args:
            text: The text content of the message.
            recipients: Optional list of recipient IDs (phone numbers or group
                IDs). Defaults to the sender of the incoming message.
            mentions: Optional list of MessageMention objects for @mentions.
            quote_mentions: Optional list of MessageMention objects for mentions
                within a quote.
            base64_attachments: Optional list of base64 encoded attachments.
            link_preview: Optional LinkPreview object for a URL preview.
            text_mode: 'normal' for plain text, 'styled' for markdown.
            notify_self: Whether to send a notification to self.
            edit_timestamp: Timestamp of the message to edit.
            sticker: ID of a sticker to send.
            view_once: If True, the message/attachments can only be viewed once.

        Returns:
            A SendMessageResponse object if successful, otherwise None.

        """
        request = SendMessageRequest(
            message=text,
            recipients=list(recipients) if recipients else None,
            base64_attachments=base64_attachments or [],
            mentions=mentions,
            quote_mentions=quote_mentions,
            link_preview=link_preview,
            text_mode=text_mode,
            notify_self=notify_self,
            edit_timestamp=edit_timestamp,
            sticker=sticker,
            view_once=view_once,
        )
        return await self.send(request)

    async def reply_text(  # noqa: PLR0913
        self,
        text: str,
        *,
        recipients: Sequence[str] | None = None,
        mentions: list[MessageMention] | None = None,
        quote_mentions: list[MessageMention] | None = None,
        base64_attachments: list[str] | None = None,
        link_preview: LinkPreview | None = None,
        text_mode: Literal["normal", "styled"] | None = None,
        notify_self: bool | None = None,
        edit_timestamp: int | None = None,
        sticker: str | None = None,
        view_once: bool = False,
    ) -> SendMessageResponse | None:
        """Reply to the incoming message with plain text, quoting it.

        Args:
            text: The text content of the reply message.
            recipients: Optional list of recipient IDs (phone numbers or group
                IDs). Defaults to the sender of the incoming message.
            mentions: Optional list of MessageMention objects for @mentions.
            quote_mentions: Optional list of MessageMention objects for mentions
                within a quote.
            base64_attachments: Optional list of base64 encoded attachments.
            link_preview: Optional LinkPreview object for a URL preview.
            text_mode: 'normal' for plain text, 'styled' for markdown.
            notify_self: Whether to send a notification to self.
            edit_timestamp: Timestamp of the message to edit.
            sticker: ID of a sticker to send.
            view_once: If True, the message/attachments can only be viewed once.

        Returns:
            A SendMessageResponse object if successful, otherwise None.

        """
        request = SendMessageRequest(
            message=text,
            recipients=list(recipients) if recipients else None,
            base64_attachments=base64_attachments or [],
            mentions=mentions,
            quote_mentions=quote_mentions,
            link_preview=link_preview,
            text_mode=text_mode,
            notify_self=notify_self,
            edit_timestamp=edit_timestamp,
            sticker=sticker,
            view_once=view_once,
        )
        return await self.reply(request)

    async def send_markdown(
        self,
        text: str,
        *,
        recipients: Sequence[str] | None = None,
        mentions: list[MessageMention] | None = None,
    ) -> SendMessageResponse | None:
        """Send a message with markdown formatting.

        Args:
            text: The markdown formatted text content of the message.
            recipients: Optional list of recipient IDs.
            mentions: Optional list of MessageMention objects for @mentions.

        Returns:
            A SendMessageResponse object if successful, otherwise None.

        """
        return await self.send_text(
            text,
            recipients=recipients,
            mentions=mentions,
            text_mode="styled",
        )

    async def send_sticker(
        self,
        pack_id: str,
        sticker_id: int | str,
        *,
        caption: str | None = None,
        recipients: Sequence[str] | None = None,
        notify_self: bool | None = None,
    ) -> SendMessageResponse | None:
        """Send a sticker message.

        Args:
            pack_id: The ID of the sticker pack.
            sticker_id: The ID of the sticker within the pack.
            caption: Optional caption for the sticker.
            recipients: Optional list of recipient IDs.
            notify_self: Whether to send a notification to self.

        Returns:
            A SendMessageResponse object if successful, otherwise None.

        """
        sticker_ref = f"{pack_id}:{sticker_id}"
        return await self.send_text(
            caption or "",
            recipients=recipients,
            sticker=sticker_ref,
            notify_self=notify_self,
        )

    async def send_view_once(
        self,
        attachments: list[str],
        *,
        message: str = "",
        recipients: Sequence[str] | None = None,
        notify_self: bool | None = None,
    ) -> SendMessageResponse | None:
        """Send a view-once message with attachments.

        Args:
            attachments: List of base64 encoded attachments.
            message: Optional text message to accompany the view-once attachments.
            recipients: Optional list of recipient IDs.
            notify_self: Whether to send a notification to self.

        Returns:
            A SendMessageResponse object if successful, otherwise None.

        """
        return await self.send_text(
            message,
            recipients=recipients,
            base64_attachments=attachments,
            view_once=True,
            notify_self=notify_self,
        )

    async def send_with_preview(  # noqa: PLR0913
        self,
        url: str,
        *,
        message: str | None = None,
        title: str | None = None,
        description: str | None = None,
        recipients: Sequence[str] | None = None,
        text_mode: Literal["normal", "styled"] | None = None,
    ) -> SendMessageResponse | None:
        """Send a message with a link preview.

        Args:
            url: The URL for which to generate a preview.
            message: Optional text message to accompany the link preview.
            title: Optional title for the link preview.
            description: Optional description for the link preview.
            recipients: Optional list of recipient IDs.
            text_mode: 'normal' for plain text, 'styled' for markdown.

        Returns:
            A SendMessageResponse object if successful, otherwise None.

        """
        preview = LinkPreview(
            url=url,
            title=title,
            description=description,
        )
        return await self.send_text(
            message or url,
            recipients=recipients,
            link_preview=preview,
            text_mode=text_mode,
        )

    @asynccontextmanager
    async def download_attachments(
        self,
        attachments: Sequence[AttachmentPointer] | None = None,
        *,
        max_total_bytes: int = DEFAULT_MAX_TOTAL_BYTES,
        dest_dir: str | Path | None = None,
    ) -> AsyncGenerator[list[Path], None]:
        """Download attachments associated with the current message or a provided list.

        This is an asynchronous context manager that yields a list of `Path`
        objects pointing to the downloaded files. The files are cleaned up
        automatically upon exiting the context.

        Args:
            attachments: An optional sequence of `AttachmentPointer` objects
                to download.
                If not provided, attachments from `self.message` are used.
            max_total_bytes: The maximum total size (in bytes) of attachments
                to download.
                Defaults to `DEFAULT_MAX_TOTAL_BYTES`.
            dest_dir: Optional directory to save the attachments. If not
                provided, a temporary directory is used.

        Yields:
            A list of `pathlib.Path` objects to the downloaded attachment files.

        """
        pointers: list[AttachmentPointer] = []
        if attachments is not None:
            pointers = list(attachments)
        elif self.message.attachments:
            pointers = list(self.message.attachments)

        downloader = (
            self._attachment_downloader
            if max_total_bytes == self._attachment_downloader.max_total_bytes
            else AttachmentDownloader(self.attachments, max_total_bytes=max_total_bytes)
        )
        async with downloader.download(pointers, dest_dir=dest_dir) as files:
            yield files

    def mention_author(
        self, text: str, mention_text: str | None = None
    ) -> MessageMention:
        """Create a MessageMention object for the author of the current message.

        Args:
            text: The full text content where the mention is located.
            mention_text: The specific text used for the mention (e.g., "@Alice").
                          If None, defaults to `self.message.source`.

        Returns:
            A `MessageMention` object suitable for use in `SendMessageRequest`.

        Raises:
            ValueError: If the `mention_text` is not found within the provided `text`.

        """
        mention_value = mention_text or self.message.source
        start = text.find(mention_value)
        if start < 0:
            message = "mention text must exist within the provided text"
            raise ValueError(message)
        return MessageMention(
            author=self.message.source, start=start, length=len(mention_value)
        )

    async def reply_with_quote_mentions(
        self,
        text: str,
        mentions: list[MessageMention] | None = None,
    ) -> SendMessageResponse | None:
        """Reply with text while quoting and mentioning the original author.

        Attempts to mention the author of the original message when possible.

        Args:
            text: The text content of the reply message.
            mentions: Optional list of additional MessageMention objects for @mentions.

        Returns:
            A SendMessageResponse object if successful, otherwise None.

        """
        quote_mentions = mentions
        if quote_mentions is None and self.message.message:
            try:
                quote_mentions = [self.mention_author(self.message.message)]
            except ValueError:
                quote_mentions = None

        return await self.reply_text(
            text,
            quote_mentions=quote_mentions,
        )

    async def react(self, emoji: str) -> None:
        """Add a reaction (emoji) to the incoming message.

        Args:
            emoji: The emoji string to react with (e.g., "👍").

        """
        request = ReactionRequest(
            reaction=emoji,
            target_author=self.message.source,
            timestamp=self.message.timestamp,
            recipient=self._recipient(),
        )
        await self.reactions.send_reaction(
            self._phone_number,
            request.model_dump(by_alias=True, exclude_none=True),
        )

    async def remove_reaction(self) -> None:
        """Remove a reaction from the incoming message.

        This method will only attempt removal if `self.message.reaction_emoji` is
        set, indicating a reaction was present on the original message.
        """
        if not self.message.reaction_emoji:
            return

        request = ReactionRequest(
            reaction=self.message.reaction_emoji,
            target_author=self.message.source,
            timestamp=self.message.timestamp,
            recipient=self._recipient(),
        )
        await self.reactions.remove_reaction(
            self._phone_number,
            request.model_dump(by_alias=True, exclude_none=True),
        )

    async def show_typing(self) -> None:
        """Send a typing indicator to the sender of the incoming message."""
        request = TypingIndicatorRequest(recipient=self._recipient())
        await self.messages.set_typing_indicator(
            self._phone_number, request.model_dump(exclude_none=True, by_alias=True)
        )

    async def hide_typing(self) -> None:
        """Hide the typing indicator from the sender of the incoming message."""
        request = TypingIndicatorRequest(recipient=self._recipient())
        await self.messages.unset_typing_indicator(
            self._phone_number, request.model_dump(exclude_none=True, by_alias=True)
        )

    async def start_typing(self) -> None:
        """Alias for show_typing for backward compatibility."""
        await self.show_typing()

    async def stop_typing(self) -> None:
        """Alias for hide_typing for backward compatibility."""
        await self.hide_typing()

    async def send_receipt(
        self,
        target_timestamp: int,
        *,
        receipt_type: Literal["read", "viewed"] = "read",
        recipient: str | None = None,
    ) -> None:
        """Send a read or viewed receipt for a message.

        Args:
            target_timestamp: The timestamp of the message for which to send
                the receipt.
            receipt_type: The type of receipt to send ('read' or 'viewed').
                Defaults to 'read'.
            recipient: Optional recipient ID. Defaults to the sender of the
                incoming message.

        """
        payload = ReceiptRequest(
            recipient=recipient or self._recipient(),
            timestamp=target_timestamp,
            receipt_type=receipt_type,
        )
        await self.receipts.send_receipt(
            self._phone_number,
            payload.model_dump(exclude_none=True, by_alias=True),
        )

    async def remote_delete(
        self,
        target_timestamp: int,
        *,
        recipient: str | None = None,
        idempotency_key: str | None = None,
    ) -> RemoteDeleteResponse | None:
        """Remotely delete a message.

        Args:
            target_timestamp: The timestamp of the message to delete.
            recipient: Optional recipient ID. Defaults to the sender of the
                incoming message.
            idempotency_key: An optional idempotency key for the request.

        Returns:
            A RemoteDeleteResponse object if successful, otherwise None.

        """
        request = RemoteDeleteRequest(
            recipient=recipient or self._recipient(),
            timestamp=target_timestamp,
            idempotency_key=idempotency_key,
        )
        request_options = (
            RequestOptions(idempotency_key=request.idempotency_key)
            if request.idempotency_key
            else None
        )
        response = await self.messages.remote_delete(
            self._phone_number,
            request.model_dump(exclude_none=True, by_alias=True),
            request_options=request_options,
        )
        return RemoteDeleteResponse.from_raw(response)

    @asynccontextmanager
    async def lock(self, resource_id: str) -> AsyncGenerator[None, None]:
        """Acquire a distributed lock for a specific resource.

        This is an asynchronous context manager that ensures exclusive access
        to a resource across multiple instances of the application.

        Args:
            resource_id: A unique identifier for the resource to lock.

        Yields:
            None, while the lock is held.

        """
        async with self._lock_manager.lock(resource_id):
            yield

    def _prepare_send_request(self, request: SendMessageRequest) -> SendMessageRequest:
        """Prepare a SendMessageRequest by backfilling recipient and sender number.

        Args:
            request: The SendMessageRequest to prepare.

        Returns:
            The prepared SendMessageRequest.

        """
        if not request.recipients:
            request.recipients = [self._recipient()]
        if request.number is None:
            request.number = self._phone_number
        return request

    def _recipient(self) -> str:
        """Determine the default recipient for a message based on the incoming message.

        If the incoming message is from a group, the group ID is returned.
        Otherwise, the sender's source ID is returned.

        Returns:
            The recipient ID (phone number or group ID).

        """
        if self.message.is_group() and self.message.group:
            return self.message.group["groupId"]
        return self.message.source

reply_text async

reply_text(
    text: str,
    *,
    recipients: Sequence[str] | None = None,
    mentions: list[MessageMention] | None = None,
    quote_mentions: list[MessageMention] | None = None,
    base64_attachments: list[str] | None = None,
    link_preview: LinkPreview | None = None,
    text_mode: Literal["normal", "styled"] | None = None,
    notify_self: bool | None = None,
    edit_timestamp: int | None = None,
    sticker: str | None = None,
    view_once: bool = False
) -> SendMessageResponse | None

Reply to the incoming message with plain text, quoting it.

Parameters:

Name Type Description Default
text str

The text content of the reply message.

required
recipients Sequence[str] | None

Optional list of recipient IDs (phone numbers or group IDs). Defaults to the sender of the incoming message.

None
mentions list[MessageMention] | None

Optional list of MessageMention objects for @mentions.

None
quote_mentions list[MessageMention] | None

Optional list of MessageMention objects for mentions within a quote.

None
base64_attachments list[str] | None

Optional list of base64 encoded attachments.

None
link_preview LinkPreview | None

Optional LinkPreview object for a URL preview.

None
text_mode Literal['normal', 'styled'] | None

'normal' for plain text, 'styled' for markdown.

None
notify_self bool | None

Whether to send a notification to self.

None
edit_timestamp int | None

Timestamp of the message to edit.

None
sticker str | None

ID of a sticker to send.

None
view_once bool

If True, the message/attachments can only be viewed once.

False

Returns:

Type Description
SendMessageResponse | None

A SendMessageResponse object if successful, otherwise None.

Source code in signal_client/core/context.py
async def reply_text(  # noqa: PLR0913
    self,
    text: str,
    *,
    recipients: Sequence[str] | None = None,
    mentions: list[MessageMention] | None = None,
    quote_mentions: list[MessageMention] | None = None,
    base64_attachments: list[str] | None = None,
    link_preview: LinkPreview | None = None,
    text_mode: Literal["normal", "styled"] | None = None,
    notify_self: bool | None = None,
    edit_timestamp: int | None = None,
    sticker: str | None = None,
    view_once: bool = False,
) -> SendMessageResponse | None:
    """Reply to the incoming message with plain text, quoting it.

    Args:
        text: The text content of the reply message.
        recipients: Optional list of recipient IDs (phone numbers or group
            IDs). Defaults to the sender of the incoming message.
        mentions: Optional list of MessageMention objects for @mentions.
        quote_mentions: Optional list of MessageMention objects for mentions
            within a quote.
        base64_attachments: Optional list of base64 encoded attachments.
        link_preview: Optional LinkPreview object for a URL preview.
        text_mode: 'normal' for plain text, 'styled' for markdown.
        notify_self: Whether to send a notification to self.
        edit_timestamp: Timestamp of the message to edit.
        sticker: ID of a sticker to send.
        view_once: If True, the message/attachments can only be viewed once.

    Returns:
        A SendMessageResponse object if successful, otherwise None.

    """
    request = SendMessageRequest(
        message=text,
        recipients=list(recipients) if recipients else None,
        base64_attachments=base64_attachments or [],
        mentions=mentions,
        quote_mentions=quote_mentions,
        link_preview=link_preview,
        text_mode=text_mode,
        notify_self=notify_self,
        edit_timestamp=edit_timestamp,
        sticker=sticker,
        view_once=view_once,
    )
    return await self.reply(request)

send_markdown async

send_markdown(
    text: str,
    *,
    recipients: Sequence[str] | None = None,
    mentions: list[MessageMention] | None = None
) -> SendMessageResponse | None

Send a message with markdown formatting.

Parameters:

Name Type Description Default
text str

The markdown formatted text content of the message.

required
recipients Sequence[str] | None

Optional list of recipient IDs.

None
mentions list[MessageMention] | None

Optional list of MessageMention objects for @mentions.

None

Returns:

Type Description
SendMessageResponse | None

A SendMessageResponse object if successful, otherwise None.

Source code in signal_client/core/context.py
async def send_markdown(
    self,
    text: str,
    *,
    recipients: Sequence[str] | None = None,
    mentions: list[MessageMention] | None = None,
) -> SendMessageResponse | None:
    """Send a message with markdown formatting.

    Args:
        text: The markdown formatted text content of the message.
        recipients: Optional list of recipient IDs.
        mentions: Optional list of MessageMention objects for @mentions.

    Returns:
        A SendMessageResponse object if successful, otherwise None.

    """
    return await self.send_text(
        text,
        recipients=recipients,
        mentions=mentions,
        text_mode="styled",
    )

send_with_preview async

send_with_preview(
    url: str,
    *,
    message: str | None = None,
    title: str | None = None,
    description: str | None = None,
    recipients: Sequence[str] | None = None,
    text_mode: Literal["normal", "styled"] | None = None
) -> SendMessageResponse | None

Send a message with a link preview.

Parameters:

Name Type Description Default
url str

The URL for which to generate a preview.

required
message str | None

Optional text message to accompany the link preview.

None
title str | None

Optional title for the link preview.

None
description str | None

Optional description for the link preview.

None
recipients Sequence[str] | None

Optional list of recipient IDs.

None
text_mode Literal['normal', 'styled'] | None

'normal' for plain text, 'styled' for markdown.

None

Returns:

Type Description
SendMessageResponse | None

A SendMessageResponse object if successful, otherwise None.

Source code in signal_client/core/context.py
async def send_with_preview(  # noqa: PLR0913
    self,
    url: str,
    *,
    message: str | None = None,
    title: str | None = None,
    description: str | None = None,
    recipients: Sequence[str] | None = None,
    text_mode: Literal["normal", "styled"] | None = None,
) -> SendMessageResponse | None:
    """Send a message with a link preview.

    Args:
        url: The URL for which to generate a preview.
        message: Optional text message to accompany the link preview.
        title: Optional title for the link preview.
        description: Optional description for the link preview.
        recipients: Optional list of recipient IDs.
        text_mode: 'normal' for plain text, 'styled' for markdown.

    Returns:
        A SendMessageResponse object if successful, otherwise None.

    """
    preview = LinkPreview(
        url=url,
        title=title,
        description=description,
    )
    return await self.send_text(
        message or url,
        recipients=recipients,
        link_preview=preview,
        text_mode=text_mode,
    )

react async

react(emoji: str) -> None

Add a reaction (emoji) to the incoming message.

Parameters:

Name Type Description Default
emoji str

The emoji string to react with (e.g., "👍").

required
Source code in signal_client/core/context.py
async def react(self, emoji: str) -> None:
    """Add a reaction (emoji) to the incoming message.

    Args:
        emoji: The emoji string to react with (e.g., "👍").

    """
    request = ReactionRequest(
        reaction=emoji,
        target_author=self.message.source,
        timestamp=self.message.timestamp,
        recipient=self._recipient(),
    )
    await self.reactions.send_reaction(
        self._phone_number,
        request.model_dump(by_alias=True, exclude_none=True),
    )

send_receipt async

send_receipt(
    target_timestamp: int,
    *,
    receipt_type: Literal["read", "viewed"] = "read",
    recipient: str | None = None
) -> None

Send a read or viewed receipt for a message.

Parameters:

Name Type Description Default
target_timestamp int

The timestamp of the message for which to send the receipt.

required
receipt_type Literal['read', 'viewed']

The type of receipt to send ('read' or 'viewed'). Defaults to 'read'.

'read'
recipient str | None

Optional recipient ID. Defaults to the sender of the incoming message.

None
Source code in signal_client/core/context.py
async def send_receipt(
    self,
    target_timestamp: int,
    *,
    receipt_type: Literal["read", "viewed"] = "read",
    recipient: str | None = None,
) -> None:
    """Send a read or viewed receipt for a message.

    Args:
        target_timestamp: The timestamp of the message for which to send
            the receipt.
        receipt_type: The type of receipt to send ('read' or 'viewed').
            Defaults to 'read'.
        recipient: Optional recipient ID. Defaults to the sender of the
            incoming message.

    """
    payload = ReceiptRequest(
        recipient=recipient or self._recipient(),
        timestamp=target_timestamp,
        receipt_type=receipt_type,
    )
    await self.receipts.send_receipt(
        self._phone_number,
        payload.model_dump(exclude_none=True, by_alias=True),
    )

download_attachments async

download_attachments(
    attachments: Sequence[AttachmentPointer] | None = None,
    *,
    max_total_bytes: int = DEFAULT_MAX_TOTAL_BYTES,
    dest_dir: str | Path | None = None
) -> AsyncGenerator[list[Path], None]

Download attachments associated with the current message or a provided list.

This is an asynchronous context manager that yields a list of Path objects pointing to the downloaded files. The files are cleaned up automatically upon exiting the context.

Parameters:

Name Type Description Default
attachments Sequence[AttachmentPointer] | None

An optional sequence of AttachmentPointer objects to download. If not provided, attachments from self.message are used.

None
max_total_bytes int

The maximum total size (in bytes) of attachments to download. Defaults to DEFAULT_MAX_TOTAL_BYTES.

DEFAULT_MAX_TOTAL_BYTES
dest_dir str | Path | None

Optional directory to save the attachments. If not provided, a temporary directory is used.

None

Yields:

Type Description
AsyncGenerator[list[Path], None]

A list of pathlib.Path objects to the downloaded attachment files.

Source code in signal_client/core/context.py
@asynccontextmanager
async def download_attachments(
    self,
    attachments: Sequence[AttachmentPointer] | None = None,
    *,
    max_total_bytes: int = DEFAULT_MAX_TOTAL_BYTES,
    dest_dir: str | Path | None = None,
) -> AsyncGenerator[list[Path], None]:
    """Download attachments associated with the current message or a provided list.

    This is an asynchronous context manager that yields a list of `Path`
    objects pointing to the downloaded files. The files are cleaned up
    automatically upon exiting the context.

    Args:
        attachments: An optional sequence of `AttachmentPointer` objects
            to download.
            If not provided, attachments from `self.message` are used.
        max_total_bytes: The maximum total size (in bytes) of attachments
            to download.
            Defaults to `DEFAULT_MAX_TOTAL_BYTES`.
        dest_dir: Optional directory to save the attachments. If not
            provided, a temporary directory is used.

    Yields:
        A list of `pathlib.Path` objects to the downloaded attachment files.

    """
    pointers: list[AttachmentPointer] = []
    if attachments is not None:
        pointers = list(attachments)
    elif self.message.attachments:
        pointers = list(self.message.attachments)

    downloader = (
        self._attachment_downloader
        if max_total_bytes == self._attachment_downloader.max_total_bytes
        else AttachmentDownloader(self.attachments, max_total_bytes=max_total_bytes)
    )
    async with downloader.download(pointers, dest_dir=dest_dir) as files:
        yield files

lock async

lock(resource_id: str) -> AsyncGenerator[None, None]

Acquire a distributed lock for a specific resource.

This is an asynchronous context manager that ensures exclusive access to a resource across multiple instances of the application.

Parameters:

Name Type Description Default
resource_id str

A unique identifier for the resource to lock.

required

Yields:

Type Description
AsyncGenerator[None, None]

None, while the lock is held.

Source code in signal_client/core/context.py
@asynccontextmanager
async def lock(self, resource_id: str) -> AsyncGenerator[None, None]:
    """Acquire a distributed lock for a specific resource.

    This is an asynchronous context manager that ensures exclusive access
    to a resource across multiple instances of the application.

    Args:
        resource_id: A unique identifier for the resource to lock.

    Yields:
        None, while the lock is held.

    """
    async with self._lock_manager.lock(resource_id):
        yield

show_typing async

show_typing() -> None

Send a typing indicator to the sender of the incoming message.

Source code in signal_client/core/context.py
async def show_typing(self) -> None:
    """Send a typing indicator to the sender of the incoming message."""
    request = TypingIndicatorRequest(recipient=self._recipient())
    await self.messages.set_typing_indicator(
        self._phone_number, request.model_dump(exclude_none=True, by_alias=True)
    )

hide_typing async

hide_typing() -> None

Hide the typing indicator from the sender of the incoming message.

Source code in signal_client/core/context.py
async def hide_typing(self) -> None:
    """Hide the typing indicator from the sender of the incoming message."""
    request = TypingIndicatorRequest(recipient=self._recipient())
    await self.messages.unset_typing_indicator(
        self._phone_number, request.model_dump(exclude_none=True, by_alias=True)
    )

Represents a single command that the bot can respond to.

A command is defined by its triggers (patterns that match incoming messages), an optional whitelist of allowed senders, and a handler function that executes the command's logic.

Source code in signal_client/core/command.py
class Command:
    """Represents a single command that the bot can respond to.

    A command is defined by its triggers (patterns that match incoming messages),
    an optional whitelist of allowed senders, and a handler function that
    executes the command's logic.
    """

    def __init__(
        self,
        triggers: list[str | re.Pattern],
        whitelisted: list[str] | None = None,
        *,
        case_sensitive: bool = False,
        metadata: CommandMetadata | None = None,
    ) -> None:
        """Initialize a Command instance.

        Args:
            triggers: A list of strings or regular expressions that will
                      trigger this command.
            whitelisted: An optional list of sender IDs (phone numbers or group IDs)
                         that are allowed to execute this command. If empty or None,
                         all senders are allowed.
            case_sensitive: If True, string triggers will be matched case-sensitively.
                            Defaults to False.
            metadata: Optional CommandMetadata to provide name, description, and usage.

        """
        self.triggers = triggers
        self.whitelisted = whitelisted or []
        self.case_sensitive = case_sensitive
        meta = metadata or CommandMetadata()
        self.name = meta.name
        self.description = meta.description
        self.usage = meta.usage
        self.handle: Callable[[Context], Awaitable[None]] | None = None

    def with_handler(self, handler: Callable[[Context], Awaitable[None]]) -> Command:
        """Assign a handler function to this command.

        If name or description are not already set, they will be inferred
        from the handler function's name and docstring.

        Args:
            handler: The asynchronous function that will be executed when
                     this command is triggered. It must accept a Context object.

        Returns:
            The Command instance with the handler assigned.

        """
        self.handle = handler
        if self.name is None:
            self.name = handler.__name__
        if self.description is None:
            doc = inspect.getdoc(handler)
            self.description = doc.strip() if doc else None
        return self

    async def __call__(self, context: Context) -> None:
        """Execute the command's handler function.

        Args:
            context: The Context object containing message details and API clients.

        Raises:
            CommandError: If no handler has been assigned to the command.

        """
        if self.handle is None:
            message = _COMMAND_HANDLER_NOT_SET
            raise CommandError(message)
        await self.handle(context)

__init__

__init__(
    triggers: list[str | Pattern],
    whitelisted: list[str] | None = None,
    *,
    case_sensitive: bool = False,
    metadata: CommandMetadata | None = None
) -> None

Initialize a Command instance.

Parameters:

Name Type Description Default
triggers list[str | Pattern]

A list of strings or regular expressions that will trigger this command.

required
whitelisted list[str] | None

An optional list of sender IDs (phone numbers or group IDs) that are allowed to execute this command. If empty or None, all senders are allowed.

None
case_sensitive bool

If True, string triggers will be matched case-sensitively. Defaults to False.

False
metadata CommandMetadata | None

Optional CommandMetadata to provide name, description, and usage.

None
Source code in signal_client/core/command.py
def __init__(
    self,
    triggers: list[str | re.Pattern],
    whitelisted: list[str] | None = None,
    *,
    case_sensitive: bool = False,
    metadata: CommandMetadata | None = None,
) -> None:
    """Initialize a Command instance.

    Args:
        triggers: A list of strings or regular expressions that will
                  trigger this command.
        whitelisted: An optional list of sender IDs (phone numbers or group IDs)
                     that are allowed to execute this command. If empty or None,
                     all senders are allowed.
        case_sensitive: If True, string triggers will be matched case-sensitively.
                        Defaults to False.
        metadata: Optional CommandMetadata to provide name, description, and usage.

    """
    self.triggers = triggers
    self.whitelisted = whitelisted or []
    self.case_sensitive = case_sensitive
    meta = metadata or CommandMetadata()
    self.name = meta.name
    self.description = meta.description
    self.usage = meta.usage
    self.handle: Callable[[Context], Awaitable[None]] | None = None

with_handler

with_handler(
    handler: Callable[[Context], Awaitable[None]],
) -> Command

Assign a handler function to this command.

If name or description are not already set, they will be inferred from the handler function's name and docstring.

Parameters:

Name Type Description Default
handler Callable[[Context], Awaitable[None]]

The asynchronous function that will be executed when this command is triggered. It must accept a Context object.

required

Returns:

Type Description
Command

The Command instance with the handler assigned.

Source code in signal_client/core/command.py
def with_handler(self, handler: Callable[[Context], Awaitable[None]]) -> Command:
    """Assign a handler function to this command.

    If name or description are not already set, they will be inferred
    from the handler function's name and docstring.

    Args:
        handler: The asynchronous function that will be executed when
                 this command is triggered. It must accept a Context object.

    Returns:
        The Command instance with the handler assigned.

    """
    self.handle = handler
    if self.name is None:
        self.name = handler.__name__
    if self.description is None:
        doc = inspect.getdoc(handler)
        self.description = doc.strip() if doc else None
    return self

__call__ async

__call__(context: Context) -> None

Execute the command's handler function.

Parameters:

Name Type Description Default
context Context

The Context object containing message details and API clients.

required

Raises:

Type Description
CommandError

If no handler has been assigned to the command.

Source code in signal_client/core/command.py
async def __call__(self, context: Context) -> None:
    """Execute the command's handler function.

    Args:
        context: The Context object containing message details and API clients.

    Raises:
        CommandError: If no handler has been assigned to the command.

    """
    if self.handle is None:
        message = _COMMAND_HANDLER_NOT_SET
        raise CommandError(message)
    await self.handle(context)

Settings

Bases: BaseSettings

Single, explicit configuration surface for the Signal client.

Settings are loaded from environment variables and an optional .env file. All settings can be overridden via constructor arguments.

Source code in signal_client/core/config.py
class Settings(BaseSettings):
    """Single, explicit configuration surface for the Signal client.

    Settings are loaded from environment variables and an optional .env file.
    All settings can be overridden via constructor arguments.
    """

    phone_number: str = Field(..., validation_alias="SIGNAL_PHONE_NUMBER")
    signal_service: str = Field(..., validation_alias="SIGNAL_SERVICE_URL")
    base_url: str = Field(..., validation_alias="SIGNAL_API_URL")

    api_retries: int = Field(
        3, description="Number of times to retry API requests on transient errors."
    )
    api_backoff_factor: float = Field(
        0.5, description="Factor for exponential backoff between API retries."
    )
    api_timeout: int = Field(
        30, description="Default timeout (in seconds) for API requests."
    )
    api_auth_token: str | None = Field(
        default=None,
        validation_alias="SIGNAL_API_TOKEN",
        description="API authentication token.",
    )
    api_auth_scheme: str = Field(
        "Bearer",
        description="Authentication scheme (e.g., 'Bearer') for the API token.",
    )
    api_default_headers: dict[str, str] = Field(
        default_factory=dict,
        description="Default headers to send with all API requests.",
    )
    api_endpoint_timeouts: dict[str, float] = Field(
        default_factory=dict,
        description="Per-endpoint timeout overrides (e.g., {'/messages': 60}).",
    )
    api_idempotency_header: str = Field(
        "Idempotency-Key", description="Header name for idempotency keys."
    )

    queue_size: int = Field(
        1000, description="Maximum number of messages to queue for processing."
    )
    worker_pool_size: int = Field(
        4, description="Number of worker tasks to process messages concurrently."
    )
    worker_shard_count: int = Field(
        0,
        description="Number of shards for worker pool. "
        "Defaults to worker_pool_size if 0.",
    )
    queue_put_timeout: float = Field(
        1.0, description="Timeout (in seconds) for putting messages into the queue."
    )
    queue_drop_oldest_on_timeout: bool = Field(
        default=True,
        description="If True, drop oldest message if queue is full and put times out.",
    )
    durable_queue_enabled: bool = Field(
        default=False, description="Enable persistent queueing for messages."
    )
    durable_queue_max_length: int = Field(
        10000, description="Maximum length of the durable queue."
    )
    ingest_checkpoint_window: int = Field(
        5000, description="Number of messages after which to save ingest checkpoint."
    )
    ingest_queue_name: str = Field(
        "signal_client_ingest",
        description="Name of the ingest queue in persistent storage.",
    )
    ingest_checkpoint_key: str = Field(
        "signal_client_ingest_checkpoint",
        description="Key for storing ingest checkpoint in persistent storage.",
    )
    ingest_pause_seconds: float = Field(
        1.0, description="Default duration (in seconds) to pause message ingestion."
    )
    distributed_locks_enabled: bool = Field(
        default=False, description="Enable distributed locking for worker coordination."
    )
    distributed_lock_timeout: int = Field(
        30, description="Timeout (in seconds) for acquiring distributed locks."
    )

    rate_limit: int = Field(
        50, description="Maximum number of API requests per rate_limit_period."
    )
    rate_limit_period: int = Field(
        1, description="Time window (in seconds) for rate limiting."
    )
    websocket_path: str | None = Field(
        default=None,
        validation_alias="SIGNAL_WS_PATH",
        description="WebSocket path for Signal service.",
    )

    circuit_breaker_failure_threshold: int = Field(
        5, description="Number of failures before circuit opens."
    )
    circuit_breaker_reset_timeout: int = Field(
        30, description="Time (in seconds) before a half-open state is attempted."
    )
    circuit_breaker_failure_rate_threshold: float = Field(
        0.5, description="Failure rate threshold (0.0-1.0) to open the circuit."
    )
    circuit_breaker_min_requests_for_rate_calc: int = Field(
        10, description="Minimum requests needed to calculate failure rate."
    )

    storage_type: str = Field(
        "memory", description="Type of storage backend: 'memory', 'sqlite', or 'redis'."
    )
    redis_host: str = Field(
        "localhost", description="Redis host for 'redis' storage type."
    )
    redis_port: int = Field(6379, description="Redis port for 'redis' storage type.")
    sqlite_database: str = Field(
        "signal_client.db",
        description="SQLite database file for 'sqlite' storage type.",
    )

    dlq_name: str = Field(
        "signal_client_dlq",
        description="Name of the Dead Letter Queue in persistent storage.",
    )
    dlq_max_retries: int = Field(
        5, description="Maximum number of retries for messages in the DLQ."
    )

    log_redaction_enabled: bool = Field(
        default=True, description="Enable or disable PII redaction in logs."
    )

    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
        populate_by_name=True,
        extra="ignore",
        case_sensitive=False,
    )

    @model_validator(mode="after")
    def validate_storage(self) -> Self:
        """Validate storage-related settings based on the chosen storage_type.

        Ensures that required fields for 'redis' and 'sqlite' storage are provided
        and that numeric fields have valid positive values.

        Raises:
            ValueError: If storage configuration is invalid.

        Returns:
            The validated Settings instance.

        """
        self._validate_storage_type()
        self._validate_queue_limits()
        self._normalize_worker_shards()
        self._validate_endpoint_timeouts()
        self._ensure_idempotency_header()
        return self

    def _validate_storage_type(self) -> None:
        storage_type = self.storage_type.lower()
        validators = {
            "redis": self._validate_redis_storage,
            "sqlite": self._validate_sqlite_storage,
            "memory": lambda: None,
        }
        validator = validators.get(storage_type)
        if validator is None:
            message = f"Unsupported storage_type '{self.storage_type}'."
            raise ValueError(message)
        validator()

    def _validate_redis_storage(self) -> None:
        if not self.redis_host:
            message = "Redis storage requires 'redis_host'."
            raise ValueError(message)
        if self.redis_port is None:
            message = "Redis storage requires 'redis_port'."
            raise ValueError(message)
        if isinstance(self.redis_port, int) and self.redis_port <= 0:
            message = "'redis_port' must be a positive integer."
            raise ValueError(message)

    def _validate_sqlite_storage(self) -> None:
        if not self.sqlite_database:
            message = "SQLite storage requires 'sqlite_database'."
            raise ValueError(message)

    def _validate_queue_limits(self) -> None:
        if self.durable_queue_max_length <= 0:
            message = "'durable_queue_max_length' must be positive."
            raise ValueError(message)
        if self.ingest_checkpoint_window <= 0:
            message = "'ingest_checkpoint_window' must be positive."
            raise ValueError(message)
        if self.ingest_pause_seconds < 0:
            message = "'ingest_pause_seconds' must be non-negative."
            raise ValueError(message)
        if self.distributed_lock_timeout <= 0:
            message = "'distributed_lock_timeout' must be positive."
            raise ValueError(message)

    def _normalize_worker_shards(self) -> None:
        if (
            self.worker_shard_count <= 0
            or self.worker_shard_count > self.worker_pool_size
        ):
            self.worker_shard_count = self.worker_pool_size

    def _validate_endpoint_timeouts(self) -> None:
        for path, timeout in self.api_endpoint_timeouts.items():
            if timeout is None or float(timeout) <= 0:
                message = (
                    f"'api_endpoint_timeouts' entry for '{path}' must be positive."
                )
                raise ValueError(message)

    def _ensure_idempotency_header(self) -> None:
        if not self.api_idempotency_header:
            message = "'api_idempotency_header' cannot be empty."
            raise ValueError(message)

    @classmethod
    def from_sources(cls: type[Self], config: dict[str, Any] | None = None) -> Self:
        """Load settings from environment variables and an optional dictionary.

        Args:
            config: An optional dictionary to override environment-loaded settings.

        Raises:
            ConfigurationError: If any required settings are missing or invalid.

        Returns:
            A validated Settings instance.

        """
        try:
            env_payload: dict[str, Any] = {}
            try:
                env_payload = cls().model_dump()  # type: ignore[call-arg]
            except ValidationError:
                env_payload = {}

            payload: dict[str, Any] = (
                env_payload if config is None else {**env_payload, **config}
            )
            with _without_required_env():
                settings = cls.model_validate(payload)
            cls._validate_required_fields(settings)
        except ValidationError as validation_error:
            raise cls._wrap_validation_error(validation_error) from validation_error
        else:
            return settings

    @classmethod
    def _wrap_validation_error(cls, error: ValidationError) -> ConfigurationError:
        """Wrap a Pydantic ValidationError in a custom ConfigurationError.

        Args:
            error: The original Pydantic ValidationError.

        Returns:
            A ConfigurationError with a more user-friendly message.

        """

        def _error_field(err: Mapping[str, object]) -> str:
            loc = err.get("loc")
            if not isinstance(loc, (list, tuple)):
                return ""
            return str(loc[-1]) if loc else ""

        missing = cls._missing_fields(error)
        errors = error.errors(include_url=False)
        fields = {_error_field(err) for err in errors if _error_field(err)}
        invalid_fields = sorted(fields - {field.split("/")[-1] for field in missing})
        invalid_errors = [err for err in errors if _error_field(err) in invalid_fields]

        if missing and invalid_fields:
            missing_list = ", ".join(sorted(missing))
            invalid_list = ", ".join(invalid_fields)
            first_error = (
                invalid_errors[0]["msg"] if invalid_errors else errors[0]["msg"]
            )
            message = (
                f"Invalid configuration overrides. Missing: {missing_list}. "
                f"Invalid: {invalid_list} ({first_error})."
            )
            return ConfigurationError(message)

        if missing:
            missing_list = ", ".join(sorted(missing))
            message = f"Missing required configuration values: {missing_list}."
            return ConfigurationError(message)

        first_error = errors[0]["msg"]
        field_list = ", ".join(sorted(fields)) if fields else "configuration"
        message = f"Invalid configuration for {field_list}: {first_error}."
        return ConfigurationError(message)

    @classmethod
    def _missing_fields(cls, error: ValidationError) -> set[str]:
        """Extract missing field names from a Pydantic ValidationError.

        Args:
            error: The Pydantic ValidationError instance.

        Returns:
            A set of strings representing the names of missing fields.

        """
        missing: set[str] = set()
        for err in error.errors(include_url=False):
            if err.get("type") not in {"missing", "value_error.missing"}:
                continue
            loc = err.get("loc")
            if not loc:
                continue
            field_name = str(loc[-1])
            alias = cls._env_alias_for_field(field_name)
            missing.add(alias or field_name)
        return missing

    @classmethod
    def _env_alias_for_field(cls, field_name: str) -> str | None:
        """Get the environment variable alias for a given field name.

        Args:
            field_name: The name of the setting field.

        Returns:
            The environment variable alias if found, otherwise None.

        """
        field = cls.model_fields.get(field_name)
        if not field:
            return None
        alias = field.validation_alias
        if not alias:
            return None
        return (
            str(alias)
            if not isinstance(alias, tuple)
            else "/".join(str(item) for item in alias)
        )

    @classmethod
    def _validate_required_fields(cls, settings: Self) -> None:
        """Validate that essential required fields are present.

        Args:
            settings: The Settings instance to validate.

        Raises:
            ConfigurationError: If any essential required fields are missing.

        """
        missing = [
            field
            for field in ("phone_number", "signal_service", "base_url")
            if not getattr(settings, field)
        ]
        if missing:
            missing_list = ", ".join(missing)
            message = f"Missing required configuration values: {missing_list}."
            raise ConfigurationError(message)

validate_storage

validate_storage() -> Self

Validate storage-related settings based on the chosen storage_type.

Ensures that required fields for 'redis' and 'sqlite' storage are provided and that numeric fields have valid positive values.

Raises:

Type Description
ValueError

If storage configuration is invalid.

Returns:

Type Description
Self

The validated Settings instance.

Source code in signal_client/core/config.py
@model_validator(mode="after")
def validate_storage(self) -> Self:
    """Validate storage-related settings based on the chosen storage_type.

    Ensures that required fields for 'redis' and 'sqlite' storage are provided
    and that numeric fields have valid positive values.

    Raises:
        ValueError: If storage configuration is invalid.

    Returns:
        The validated Settings instance.

    """
    self._validate_storage_type()
    self._validate_queue_limits()
    self._normalize_worker_shards()
    self._validate_endpoint_timeouts()
    self._ensure_idempotency_header()
    return self

from_sources classmethod

from_sources(config: dict[str, Any] | None = None) -> Self

Load settings from environment variables and an optional dictionary.

Parameters:

Name Type Description Default
config dict[str, Any] | None

An optional dictionary to override environment-loaded settings.

None

Raises:

Type Description
ConfigurationError

If any required settings are missing or invalid.

Returns:

Type Description
Self

A validated Settings instance.

Source code in signal_client/core/config.py
@classmethod
def from_sources(cls: type[Self], config: dict[str, Any] | None = None) -> Self:
    """Load settings from environment variables and an optional dictionary.

    Args:
        config: An optional dictionary to override environment-loaded settings.

    Raises:
        ConfigurationError: If any required settings are missing or invalid.

    Returns:
        A validated Settings instance.

    """
    try:
        env_payload: dict[str, Any] = {}
        try:
            env_payload = cls().model_dump()  # type: ignore[call-arg]
        except ValidationError:
            env_payload = {}

        payload: dict[str, Any] = (
            env_payload if config is None else {**env_payload, **config}
        )
        with _without_required_env():
            settings = cls.model_validate(payload)
        cls._validate_required_fields(settings)
    except ValidationError as validation_error:
        raise cls._wrap_validation_error(validation_error) from validation_error
    else:
        return settings

Observability

signal_client.observability.health_server.start_health_server async

start_health_server(
    application: Application,
    *,
    host: str = "127.0.0.1",
    port: int = 8081
) -> HealthServer

Create and start a HealthServer for the given application.

Source code in signal_client/observability/health_server.py
async def start_health_server(
    application: Application,
    *,
    host: str = "127.0.0.1",
    port: int = 8081,
) -> HealthServer:
    """Create and start a HealthServer for the given application."""
    server = HealthServer(application, host=host, port=port)
    await server.start()
    return server

signal_client.observability.metrics_server.start_metrics_server

start_metrics_server(
    port: int = 8000,
    addr: str = "127.0.0.1",
    *,
    registry: CollectorRegistry | None = None
) -> object

Start an HTTP server that exposes Prometheus metrics at /.

Returns the server object so callers can stop it if desired.

Source code in signal_client/observability/metrics.py
def start_metrics_server(
    port: int = 8000,
    addr: str = "127.0.0.1",
    *,
    registry: CollectorRegistry | None = None,
) -> object:
    """Start an HTTP server that exposes Prometheus metrics at `/`.

    Returns the server object so callers can stop it if desired.
    """
    return start_http_server(port, addr=addr, registry=registry or REGISTRY)

signal_client.observability.logging.ensure_structlog_configured

ensure_structlog_configured(
    *,
    json_output: bool = False,
    redaction_enabled: bool = True
) -> None

Idempotently configure structlog unless already configured externally.

Source code in signal_client/observability/logging.py
def ensure_structlog_configured(
    *,
    json_output: bool = False,
    redaction_enabled: bool = True,
) -> None:
    """Idempotently configure structlog unless already configured externally."""
    _StructlogGuard.ensure_configured(
        json_output=json_output,
        redaction_enabled=redaction_enabled,
    )

Troubleshooting

  • Missing members in the reference? Run poetry run mkdocs build --strict to surface mkdocstrings import errors (often missing deps).
  • Import path errors: prefer signal_client.<module> paths shown above; avoid private modules not exported in __all__.

Next steps

  • Use the reference alongside Examples to map API calls to runnable scripts.
  • See Advanced usage for middleware and resiliency tuning using these APIs.

Exceptions

signal_client.core.exceptions

Custom exceptions for the Signal client.

AuthenticationError

Bases: SignalAPIError

Raised for authentication failures (HTTP status code 401 Unauthorized).

Source code in signal_client/core/exceptions.py
class AuthenticationError(SignalAPIError):
    """Raised for authentication failures (HTTP status code 401 Unauthorized)."""

    def __init__(self, message: str, status_code: int | None = 401) -> None:
        """Initialize an authentication error."""
        super().__init__(message, status_code)

__init__

__init__(
    message: str, status_code: int | None = 401
) -> None

Initialize an authentication error.

Source code in signal_client/core/exceptions.py
def __init__(self, message: str, status_code: int | None = 401) -> None:
    """Initialize an authentication error."""
    super().__init__(message, status_code)

ConfigurationError

Bases: Exception

Raised when there is an issue with the application's configuration.

Source code in signal_client/core/exceptions.py
class ConfigurationError(Exception):
    """Raised when there is an issue with the application's configuration."""

GroupNotFoundError

Bases: SignalAPIError

Raised when a requested group is not found (HTTP status 404.

For group-related operations).

Source code in signal_client/core/exceptions.py
class GroupNotFoundError(SignalAPIError):
    """Raised when a requested group is not found (HTTP status 404.

    For group-related operations).
    """

    def __init__(self, message: str, status_code: int | None = 404) -> None:
        """Initialize a group-not-found error."""
        super().__init__(message, status_code)

__init__

__init__(
    message: str, status_code: int | None = 404
) -> None

Initialize a group-not-found error.

Source code in signal_client/core/exceptions.py
def __init__(self, message: str, status_code: int | None = 404) -> None:
    """Initialize a group-not-found error."""
    super().__init__(message, status_code)

InvalidRecipientError

Bases: SignalAPIError

Raised when a message cannot be sent due to an invalid recipient.

Applies to HTTP status 404 on send operations.

Source code in signal_client/core/exceptions.py
class InvalidRecipientError(SignalAPIError):
    """Raised when a message cannot be sent due to an invalid recipient.

    Applies to HTTP status 404 on send operations.
    """

    def __init__(self, message: str, status_code: int | None = 404) -> None:
        """Initialize an invalid recipient error."""
        super().__init__(message, status_code)

__init__

__init__(
    message: str, status_code: int | None = 404
) -> None

Initialize an invalid recipient error.

Source code in signal_client/core/exceptions.py
def __init__(self, message: str, status_code: int | None = 404) -> None:
    """Initialize an invalid recipient error."""
    super().__init__(message, status_code)

RateLimitError

Bases: SignalAPIError

Raised when the API rate limit is exceeded (HTTP status codes 413 or 429).

Source code in signal_client/core/exceptions.py
class RateLimitError(SignalAPIError):
    """Raised when the API rate limit is exceeded (HTTP status codes 413 or 429)."""

    def __init__(self, message: str, status_code: int | None = 429) -> None:
        """Initialize a rate limit error."""
        super().__init__(message, status_code)

__init__

__init__(
    message: str, status_code: int | None = 429
) -> None

Initialize a rate limit error.

Source code in signal_client/core/exceptions.py
def __init__(self, message: str, status_code: int | None = 429) -> None:
    """Initialize a rate limit error."""
    super().__init__(message, status_code)

ServerError

Bases: SignalAPIError

Raised for server-side errors (HTTP status codes 5xx).

Source code in signal_client/core/exceptions.py
class ServerError(SignalAPIError):
    """Raised for server-side errors (HTTP status codes 5xx)."""

    def __init__(self, message: str, status_code: int | None = 500) -> None:
        """Initialize a server error."""
        super().__init__(message, status_code)

__init__

__init__(
    message: str, status_code: int | None = 500
) -> None

Initialize a server error.

Source code in signal_client/core/exceptions.py
def __init__(self, message: str, status_code: int | None = 500) -> None:
    """Initialize a server error."""
    super().__init__(message, status_code)

SignalAPIError

Bases: Exception

Base exception for all API-related errors.

This exception is raised for general API errors that do not fall into more specific categories (e.g., unexpected status codes).

Source code in signal_client/core/exceptions.py
class SignalAPIError(Exception):
    """Base exception for all API-related errors.

    This exception is raised for general API errors that do not fall into
    more specific categories (e.g., unexpected status codes).
    """

    def __init__(self, message: str, status_code: int | None = None) -> None:
        """Store an API error message and optional status code."""
        super().__init__(message)
        self.status_code = status_code

__init__

__init__(
    message: str, status_code: int | None = None
) -> None

Store an API error message and optional status code.

Source code in signal_client/core/exceptions.py
def __init__(self, message: str, status_code: int | None = None) -> None:
    """Store an API error message and optional status code."""
    super().__init__(message)
    self.status_code = status_code

UnsupportedMessageError

Bases: SignalClientError

Custom exception for unsupported message types encountered during processing.

Source code in signal_client/core/exceptions.py
class UnsupportedMessageError(SignalClientError):
    """Custom exception for unsupported message types encountered during processing."""