import asyncio
import logging
from typing import Optional, Union
from httpx import AsyncClient, BasicAuth, Response
from ..schemas import (
EbarimtCreateRequest,
EbarimtCreateResponse,
EbarimtGetResponse,
InvoiceCreateRequest,
InvoiceCreateResponse,
InvoiceCreateSimpleRequest,
InvoiceGetResponse,
PaymentCancelRequest,
PaymentCheckRequest,
PaymentCheckResponse,
PaymentGetResponse,
PaymentListRequest,
PaymentListResponse,
PaymentRefundRequest,
SubscriptionGetResponse,
TokenResponse,
)
from ..settings import QPaySettings
from ..transport import AsyncTransport
from .base import BaseClient
from .decorators import async_auth_required, async_poll_until_paid
[docs]
class AsyncQPayClient(BaseClient):
"""
Asynchronous client for the QPay v2 API.
Always use as an async context manager. Authentication runs on ``__aenter__``
and the HTTP connection is closed on ``__aexit__``::
settings = QPaySettings.production(
username="...", password="...", invoice_code="..."
)
async with AsyncQPayClient(settings) as client:
invoice = await client.invoice_create(InvoiceCreateSimpleRequest(...))
result = await client.payment_check(PaymentCheckRequest(...))
Available endpoints: ``invoice_create``, ``invoice_get``, ``invoice_cancel``,
``payment_get``, ``payment_check``, ``payment_cancel``, ``payment_refund``,
``payment_list``, ``ebarimt_create``, ``ebarimt_get``,
``subscription_get``, ``subscription_cancel``.
Concurrency-safe: ``authenticate()`` is guarded by an ``asyncio.Lock``,
so multiple coroutines sharing one client instance will not race on token refresh.
"""
def __init__(
self,
settings: QPaySettings,
*,
client: Optional[AsyncClient] = None,
logger: Optional[logging.Logger] = None,
):
"""
Initialize AsyncQPayClient object.
Args:
settings (Settings): QPay client settings.
client (Optional[httpx.Client]): Optional custom httpx client.
logger (Optional[logging.Logger]): QPay client logger.
"""
super().__init__(settings, logger=logger)
self._transport = AsyncTransport(settings=settings, logger=self._logger, client=client)
self._client = self._transport.client
self._async_lock = asyncio.Lock()
@property
def is_closed(self) -> bool:
return self._client.is_closed
async def __aenter__(self):
await self.authenticate()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
[docs]
async def close(self):
"""Close connection."""
await self._transport.close()
[docs]
async def authenticate(self) -> None:
"""Authenticate client."""
async with self._async_lock:
if self.is_authenticated:
return # no need to reauthenticate
if not self._auth_state.has_access_token() or self.is_refresh_expired:
await self._authenticate_nolock() # first token or refresh token expired
else:
await self._refresh_access_token_nolock()
async def _request(
self,
method: str,
url: str,
**kwargs,
) -> Response:
return await self._transport.request(
method,
url,
on_unauthorized=self._refresh_access_token,
**kwargs,
)
async def _send(self, method: str, url: str, **kwargs) -> Response:
return await self._transport._send(method, url, **kwargs)
async def _authenticate(self) -> None:
"""Authenticate the client. Thread safe."""
# locked wrapper
async with self._async_lock:
await self._authenticate_nolock()
async def _refresh_access_token(self) -> None:
"""Refresh client access. Thread safe."""
# locked wrapper
async with self._async_lock:
await self._refresh_access_token_nolock()
async def _authenticate_nolock(self):
"""Authenticate the client. Not thread safe."""
response = await self._send(
"POST",
"/auth/token",
auth=BasicAuth(
username=self._settings.username,
password=self._settings.password, # get password secret
),
)
token_response = TokenResponse.model_validate(response.json())
self._auth_state.update(token_response)
async def _refresh_access_token_nolock(self):
"""Refresh client access. Not thread safe."""
if not self._auth_state.is_access_expired(leeway=self._token_leeway):
return # access token not expired
if self._auth_state.is_refresh_expired(leeway=self._token_leeway):
return await self._authenticate_nolock()
# Using refresh token
response = await self._send(
"POST",
"/auth/refresh",
headers={"Authorization": self._auth_state.refresh_as_header()},
)
if response.is_success:
token_response = TokenResponse.model_validate(response.json())
self._auth_state.update(token_response)
else:
await self._authenticate_nolock()
[docs]
@async_auth_required
async def invoice_get(self, invoice_id: str) -> InvoiceGetResponse:
"""Get invoice by Id."""
response = await self._request(
"GET",
f"/invoice/{invoice_id}",
headers=self.headers(),
)
data = InvoiceGetResponse.model_validate(response.json())
return data
[docs]
@async_auth_required
async def invoice_create(
self, create_invoice_request: Union[InvoiceCreateRequest, InvoiceCreateSimpleRequest]
) -> InvoiceCreateResponse:
"""Send invoice create request to Qpay."""
response = await self._request(
"POST",
"/invoice",
headers=self.headers(),
json=self._invoice_create_payload(create_invoice_request),
)
data = InvoiceCreateResponse.model_validate(response.json())
return data
[docs]
@async_auth_required
async def invoice_cancel(
self,
invoice_id: str,
) -> int:
"""Send cancel invoice request to qpay. Returns status code."""
response = await self._request(
"DELETE",
f"/invoice/{invoice_id}",
headers=self.headers(),
)
return response.status_code
[docs]
@async_auth_required
async def payment_get(self, payment_id: str) -> PaymentGetResponse:
"""Send get payment requesst to qpay."""
response = await self._request(
"GET",
f"/payment/{payment_id}",
headers=self.headers(),
)
data = PaymentGetResponse.model_validate(response.json())
return data
[docs]
@async_auth_required
@async_poll_until_paid
async def payment_check(self, payment_check_request: PaymentCheckRequest) -> PaymentCheckResponse:
"""Check payment status, polling until a payment is found or retries exhausted."""
response = await self._request(
"POST",
"/payment/check",
headers=self.headers(),
json=payment_check_request.model_dump(by_alias=True, exclude_none=True, mode="json"),
)
return PaymentCheckResponse.model_validate(response.json())
[docs]
@async_auth_required
async def payment_cancel(
self,
payment_id: str,
payment_cancel_request: PaymentCancelRequest,
) -> int:
"""Send payment cancel request. Returns status code."""
response = await self._request(
"DELETE",
f"/payment/cancel/{payment_id}",
headers=self.headers(),
json=payment_cancel_request.model_dump(by_alias=True, exclude_none=True, mode="json"),
)
return response.status_code
[docs]
@async_auth_required
async def payment_refund(
self,
payment_id: str,
payment_refund_request: PaymentRefundRequest,
) -> int:
"""Send refund payment request. Returns status code."""
response = await self._request(
"DELETE",
f"/payment/refund/{payment_id}",
headers=self.headers(),
json=payment_refund_request.model_dump(by_alias=True, exclude_none=True, mode="json"),
)
return response.status_code
[docs]
@async_auth_required
async def payment_list(self, payment_list_request: PaymentListRequest) -> PaymentListResponse:
"""Send list payment request."""
response = await self._request(
"POST",
"/payment/list",
headers=self.headers(),
json=payment_list_request.model_dump(by_alias=True, exclude_none=True, mode="json"),
)
data = PaymentListResponse.model_validate(response.json())
return data
[docs]
@async_auth_required
async def ebarimt_create(self, ebarimt_create_request: EbarimtCreateRequest) -> EbarimtCreateResponse:
"""Send create ebarimt request."""
response = await self._request(
"POST",
"/ebarimt/create",
headers=self.headers(),
json=ebarimt_create_request.model_dump(by_alias=True, exclude_none=True, mode="json"),
)
data = EbarimtCreateResponse.model_validate(response.json())
return data
[docs]
@async_auth_required
async def ebarimt_get(self, barimt_id: str) -> EbarimtGetResponse:
"""Send get ebarimt request."""
response = await self._request(
"GET",
f"/ebarimt/{barimt_id}",
headers=self.headers(),
)
data = EbarimtGetResponse.model_validate(response.json())
return data
[docs]
@async_auth_required
async def subscription_get(self, subscription_id: str) -> SubscriptionGetResponse:
"""Send get subscription request."""
response = await self._request(
"GET",
f"/subscription/{subscription_id}",
headers=self.headers(),
)
data = SubscriptionGetResponse.model_validate(response.json())
return data
[docs]
@async_auth_required
async def subscription_cancel(self, subscription_id: str) -> int:
"""Send cancel subscription request."""
response = await self._request(
"DELETE",
f"/subscription/{subscription_id}",
headers=self.headers(),
)
return response.status_code