<html><head><meta name="color-scheme" content="light dark"></head><body><pre style="word-wrap: break-word; white-space: pre-wrap;">"""
Psycopg null connection pool module (async version).
"""

# Copyright (C) 2022 The Psycopg Team

from __future__ import annotations

import logging
from typing import Any, cast, Dict, Optional, Type

from psycopg import AsyncConnection
from psycopg.pq import TransactionStatus

from .abc import ACT, AsyncConnectionCB, AsyncConnectFailedCB
from .errors import PoolTimeout, TooManyRequests
from ._compat import ConnectionTimeout
from ._acompat import AEvent
from .base_null_pool import _BaseNullConnectionPool
from .pool_async import AsyncConnectionPool, AddConnection

logger = logging.getLogger("psycopg.pool")


class AsyncNullConnectionPool(_BaseNullConnectionPool, AsyncConnectionPool[ACT]):
    def __init__(
        self,
        conninfo: str = "",
        *,
        connection_class: Type[ACT] = cast(Type[ACT], AsyncConnection),
        kwargs: Optional[Dict[str, Any]] = None,
        min_size: int = 0,  # Note: min_size default value changed to 0.
        max_size: Optional[int] = None,
        open: bool | None = None,
        configure: Optional[AsyncConnectionCB[ACT]] = None,
        check: Optional[AsyncConnectionCB[ACT]] = None,
        reset: Optional[AsyncConnectionCB[ACT]] = None,
        name: Optional[str] = None,
        timeout: float = 30.0,
        max_waiting: int = 0,
        max_lifetime: float = 60 * 60.0,
        max_idle: float = 10 * 60.0,
        reconnect_timeout: float = 5 * 60.0,
        reconnect_failed: Optional[AsyncConnectFailedCB] = None,
        num_workers: int = 3,
    ):
        super().__init__(
            conninfo,
            open=open,
            connection_class=connection_class,
            check=check,
            configure=configure,
            reset=reset,
            kwargs=kwargs,
            min_size=min_size,
            max_size=max_size,
            name=name,
            timeout=timeout,
            max_waiting=max_waiting,
            max_lifetime=max_lifetime,
            max_idle=max_idle,
            reconnect_timeout=reconnect_timeout,
            num_workers=num_workers,
        )

    async def wait(self, timeout: float = 30.0) -&gt; None:
        """
        Create a connection for test.

        Calling this function will verify that the connectivity with the
        database works as expected. However the connection will not be stored
        in the pool.

        Close the pool, and raise `PoolTimeout`, if not ready within *timeout*
        sec.
        """
        self._check_open_getconn()

        async with self._lock:
            assert not self._pool_full_event
            self._pool_full_event = AEvent()

        logger.info("waiting for pool %r initialization", self.name)
        self.run_task(AddConnection(self))
        if not await self._pool_full_event.wait_timeout(timeout):
            await self.close()  # stop all the tasks
            raise PoolTimeout(f"pool initialization incomplete after {timeout} sec")

        async with self._lock:
            assert self._pool_full_event
            self._pool_full_event = None

        logger.info("pool %r is ready to use", self.name)

    async def _get_ready_connection(self, timeout: Optional[float]) -&gt; Optional[ACT]:
        if timeout is not None and timeout &lt;= 0.0:
            raise PoolTimeout()

        conn: Optional[ACT] = None
        if self.max_size == 0 or self._nconns &lt; self.max_size:
            # Create a new connection for the client
            try:
                conn = await self._connect(timeout=timeout)
            except ConnectionTimeout as ex:
                raise PoolTimeout(str(ex)) from None
            self._nconns += 1

        elif self.max_waiting and len(self._waiting) &gt;= self.max_waiting:
            self._stats[self._REQUESTS_ERRORS] += 1
            raise TooManyRequests(
                f"the pool {self.name!r} has already"
                + f" {len(self._waiting)} requests waiting"
            )
        return conn

    async def _maybe_close_connection(self, conn: ACT) -&gt; bool:
        # Close the connection if no client is waiting for it, or if the pool
        # is closed. For extra refcare remove the pool reference from it.
        # Maintain the stats.
        async with self._lock:
            if not self._closed and self._waiting:
                return False

            conn._pool = None
            if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
                self._stats[self._RETURNS_BAD] += 1
            await conn.close()
            self._nconns -= 1
            return True

    async def resize(self, min_size: int, max_size: Optional[int] = None) -&gt; None:
        """Change the size of the pool during runtime.

        Only *max_size* can be changed; *min_size* must remain 0.
        """
        min_size, max_size = self._check_size(min_size, max_size)

        logger.info(
            "resizing %r to min_size=%s max_size=%s", self.name, min_size, max_size
        )
        async with self._lock:
            self._min_size = min_size
            self._max_size = max_size

    async def check(self) -&gt; None:
        """No-op, as the pool doesn't have connections in its state."""
        pass

    async def _add_to_pool(self, conn: ACT) -&gt; None:
        # Remove the pool reference from the connection before returning it
        # to the state, to avoid to create a reference loop.
        # Also disable the warning for open connection in conn.__del__
        conn._pool = None

        # Critical section: if there is a client waiting give it the connection
        # otherwise put it back into the pool.
        async with self._lock:
            while self._waiting:
                # If there is a client waiting (which is still waiting and
                # hasn't timed out), give it the connection and notify it.
                pos = self._waiting.popleft()
                if await pos.set(conn):
                    break
            else:
                # No client waiting for a connection: close the connection
                await conn.close()

                # If we have been asked to wait for pool init, notify the
                # waiter if the pool is ready.
                if self._pool_full_event:
                    self._pool_full_event.set()
                else:
                    # The connection created by wait shouldn't decrease the
                    # count of the number of connection used.
                    self._nconns -= 1
</pre></body></html>