Source code for qpay_client.v2.clients.async_client

import asyncio
import logging
from typing import Optional, Union

from httpx import AsyncClient, BasicAuth, Response

from ..schemas import (
    EbarimtCreateRequest,
    EbarimtCreateResponse,
    EbarimtGetResponse,
    InvoiceCreateRequest,
    InvoiceCreateResponse,
    InvoiceCreateSimpleRequest,
    InvoiceGetResponse,
    PaymentCancelRequest,
    PaymentCheckRequest,
    PaymentCheckResponse,
    PaymentGetResponse,
    PaymentListRequest,
    PaymentListResponse,
    PaymentRefundRequest,
    SubscriptionGetResponse,
    TokenResponse,
)
from ..settings import QPaySettings
from ..transport import AsyncTransport
from .base import BaseClient
from .decorators import async_auth_required, async_poll_until_paid


[docs] class AsyncQPayClient(BaseClient): """ Asynchronous client for the QPay v2 API. Always use as an async context manager. Authentication runs on ``__aenter__`` and the HTTP connection is closed on ``__aexit__``:: settings = QPaySettings.production( username="...", password="...", invoice_code="..." ) async with AsyncQPayClient(settings) as client: invoice = await client.invoice_create(InvoiceCreateSimpleRequest(...)) result = await client.payment_check(PaymentCheckRequest(...)) Available endpoints: ``invoice_create``, ``invoice_get``, ``invoice_cancel``, ``payment_get``, ``payment_check``, ``payment_cancel``, ``payment_refund``, ``payment_list``, ``ebarimt_create``, ``ebarimt_get``, ``subscription_get``, ``subscription_cancel``. Concurrency-safe: ``authenticate()`` is guarded by an ``asyncio.Lock``, so multiple coroutines sharing one client instance will not race on token refresh. """ def __init__( self, settings: QPaySettings, *, client: Optional[AsyncClient] = None, logger: Optional[logging.Logger] = None, ): """ Initialize AsyncQPayClient object. Args: settings (Settings): QPay client settings. client (Optional[httpx.Client]): Optional custom httpx client. logger (Optional[logging.Logger]): QPay client logger. """ super().__init__(settings, logger=logger) self._transport = AsyncTransport(settings=settings, logger=self._logger, client=client) self._client = self._transport.client self._async_lock = asyncio.Lock() @property def is_closed(self) -> bool: return self._client.is_closed async def __aenter__(self): await self.authenticate() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close()
[docs] async def close(self): """Close connection.""" await self._transport.close()
[docs] async def authenticate(self) -> None: """Authenticate client.""" async with self._async_lock: if self.is_authenticated: return # no need to reauthenticate if not self._auth_state.has_access_token() or self.is_refresh_expired: await self._authenticate_nolock() # first token or refresh token expired else: await self._refresh_access_token_nolock()
async def _request( self, method: str, url: str, **kwargs, ) -> Response: return await self._transport.request( method, url, on_unauthorized=self._refresh_access_token, **kwargs, ) async def _send(self, method: str, url: str, **kwargs) -> Response: return await self._transport._send(method, url, **kwargs) async def _authenticate(self) -> None: """Authenticate the client. Thread safe.""" # locked wrapper async with self._async_lock: await self._authenticate_nolock() async def _refresh_access_token(self) -> None: """Refresh client access. Thread safe.""" # locked wrapper async with self._async_lock: await self._refresh_access_token_nolock() async def _authenticate_nolock(self): """Authenticate the client. Not thread safe.""" response = await self._send( "POST", "/auth/token", auth=BasicAuth( username=self._settings.username, password=self._settings.password, # get password secret ), ) token_response = TokenResponse.model_validate(response.json()) self._auth_state.update(token_response) async def _refresh_access_token_nolock(self): """Refresh client access. Not thread safe.""" if not self._auth_state.is_access_expired(leeway=self._token_leeway): return # access token not expired if self._auth_state.is_refresh_expired(leeway=self._token_leeway): return await self._authenticate_nolock() # Using refresh token response = await self._send( "POST", "/auth/refresh", headers={"Authorization": self._auth_state.refresh_as_header()}, ) if response.is_success: token_response = TokenResponse.model_validate(response.json()) self._auth_state.update(token_response) else: await self._authenticate_nolock()
[docs] @async_auth_required async def invoice_get(self, invoice_id: str) -> InvoiceGetResponse: """Get invoice by Id.""" response = await self._request( "GET", f"/invoice/{invoice_id}", headers=self.headers(), ) data = InvoiceGetResponse.model_validate(response.json()) return data
[docs] @async_auth_required async def invoice_create( self, create_invoice_request: Union[InvoiceCreateRequest, InvoiceCreateSimpleRequest] ) -> InvoiceCreateResponse: """Send invoice create request to Qpay.""" response = await self._request( "POST", "/invoice", headers=self.headers(), json=self._invoice_create_payload(create_invoice_request), ) data = InvoiceCreateResponse.model_validate(response.json()) return data
[docs] @async_auth_required async def invoice_cancel( self, invoice_id: str, ) -> int: """Send cancel invoice request to qpay. Returns status code.""" response = await self._request( "DELETE", f"/invoice/{invoice_id}", headers=self.headers(), ) return response.status_code
[docs] @async_auth_required async def payment_get(self, payment_id: str) -> PaymentGetResponse: """Send get payment requesst to qpay.""" response = await self._request( "GET", f"/payment/{payment_id}", headers=self.headers(), ) data = PaymentGetResponse.model_validate(response.json()) return data
[docs] @async_auth_required @async_poll_until_paid async def payment_check(self, payment_check_request: PaymentCheckRequest) -> PaymentCheckResponse: """Check payment status, polling until a payment is found or retries exhausted.""" response = await self._request( "POST", "/payment/check", headers=self.headers(), json=payment_check_request.model_dump(by_alias=True, exclude_none=True, mode="json"), ) return PaymentCheckResponse.model_validate(response.json())
[docs] @async_auth_required async def payment_cancel( self, payment_id: str, payment_cancel_request: PaymentCancelRequest, ) -> int: """Send payment cancel request. Returns status code.""" response = await self._request( "DELETE", f"/payment/cancel/{payment_id}", headers=self.headers(), json=payment_cancel_request.model_dump(by_alias=True, exclude_none=True, mode="json"), ) return response.status_code
[docs] @async_auth_required async def payment_refund( self, payment_id: str, payment_refund_request: PaymentRefundRequest, ) -> int: """Send refund payment request. Returns status code.""" response = await self._request( "DELETE", f"/payment/refund/{payment_id}", headers=self.headers(), json=payment_refund_request.model_dump(by_alias=True, exclude_none=True, mode="json"), ) return response.status_code
[docs] @async_auth_required async def payment_list(self, payment_list_request: PaymentListRequest) -> PaymentListResponse: """Send list payment request.""" response = await self._request( "POST", "/payment/list", headers=self.headers(), json=payment_list_request.model_dump(by_alias=True, exclude_none=True, mode="json"), ) data = PaymentListResponse.model_validate(response.json()) return data
[docs] @async_auth_required async def ebarimt_create(self, ebarimt_create_request: EbarimtCreateRequest) -> EbarimtCreateResponse: """Send create ebarimt request.""" response = await self._request( "POST", "/ebarimt/create", headers=self.headers(), json=ebarimt_create_request.model_dump(by_alias=True, exclude_none=True, mode="json"), ) data = EbarimtCreateResponse.model_validate(response.json()) return data
[docs] @async_auth_required async def ebarimt_get(self, barimt_id: str) -> EbarimtGetResponse: """Send get ebarimt request.""" response = await self._request( "GET", f"/ebarimt/{barimt_id}", headers=self.headers(), ) data = EbarimtGetResponse.model_validate(response.json()) return data
[docs] @async_auth_required async def subscription_get(self, subscription_id: str) -> SubscriptionGetResponse: """Send get subscription request.""" response = await self._request( "GET", f"/subscription/{subscription_id}", headers=self.headers(), ) data = SubscriptionGetResponse.model_validate(response.json()) return data
[docs] @async_auth_required async def subscription_cancel(self, subscription_id: str) -> int: """Send cancel subscription request.""" response = await self._request( "DELETE", f"/subscription/{subscription_id}", headers=self.headers(), ) return response.status_code