Source code for aiogremlin.driver.pool
import asyncio
import collections
import aiohttp
from aiogremlin.driver import connection
[docs]class PooledConnection:
"""
Wrapper for :py:class:`Connection<aiogremlin.driver.connection.Connection>`
that helps manage tomfoolery associated with connection pooling.
:param aiogremlin.driver.connection.Connection conn:
:param aiogremlin.driver.pool.ConnectionPool pool:
"""
def __init__(self, conn, pool):
self._conn = conn
self._pool = pool
self._times_acquired = 0
@property
def times_acquired(self):
"""
Readonly property.
:returns: int
"""
return self._times_acquired
[docs] def increment_acquired(self):
"""Increment times acquired attribute by 1"""
self._times_acquired += 1
[docs] def decrement_acquired(self):
"""Decrement times acquired attribute by 1"""
self._times_acquired -= 1
[docs] async def write(self, message):
"""
**coroutine** Submit a script and bindings to the Gremlin Server
:param str processor: Gremlin Server processor argument
:param str op: Gremlin Server op argument
:param args: Keyword arguments for Gremlin Server. Depend on processor
and op.
:returns: :py:class:`aiohttp.ClientResponse` object
"""
return await self._conn.write(message)
submit = write
[docs] async def release_task(self, resp):
await resp.done.wait()
self.release()
[docs] def release(self):
self._pool.release(self)
[docs] async def close(self):
"""Close underlying connection"""
await self._conn.close()
self._conn = None
self._pool = None
@property
def closed(self):
"""
Readonly property.
:returns: bool
"""
return self._conn.closed
[docs]class ConnectionPool:
"""
A pool of connections to a Gremlin Server host.
:param str url: url for host Gremlin Server
:param asyncio.BaseEventLoop loop:
:param ssl.SSLContext ssl_context:
:param str username: Username for database auth
:param str password: Password for database auth
:param float response_timeout: (optional) `None` by default
:param int max_conns: Maximum number of conns to a host
:param int min_connsd: Minimum number of conns to a host
:param int max_times_acquired: Maximum number of times a conn can be
shared by multiple coroutines (clients)
:param int max_inflight: Maximum number of unprocessed requests at any
one time on the connection
"""
def __init__(self, url, loop, ssl_context, username, password, max_conns,
min_conns, max_times_acquired, max_inflight, response_timeout,
message_serializer, provider):
self._url = url
self._loop = loop
self._ssl_context = ssl_context
self._username = username
self._password = password
self._max_conns = max_conns
self._min_conns = min_conns
self._max_times_acquired = max_times_acquired
self._max_inflight = max_inflight
self._response_timeout = response_timeout
self._message_serializer = message_serializer
self._condition = asyncio.Condition(loop=self._loop)
self._available = collections.deque()
self._acquired = collections.deque()
self._provider = provider
@property
def url(self):
"""
Readonly property.
:returns: str
"""
return self._url
[docs] async def init_pool(self):
"""**coroutine** Open minumum number of connections to host"""
for i in range(self._min_conns):
conn = await self._get_connection(self._username,
self._password,
self._max_inflight,
self._response_timeout,
self._message_serializer,
self._provider)
self._available.append(conn)
[docs] def release(self, conn):
"""
Release connection back to pool after use.
:param PooledConnection conn:
"""
if conn.closed:
self._acquired.remove(conn)
else:
conn.decrement_acquired()
if not conn.times_acquired:
self._acquired.remove(conn)
self._available.append(conn)
self._loop.create_task(self._notify())
async def _notify(self):
async with self._condition:
self._condition.notify()
[docs] async def acquire(self):
"""**coroutine** Acquire a new connection from the pool."""
async with self._condition:
while True:
while self._available:
conn = self._available.popleft()
if not conn.closed:
conn.increment_acquired()
self._acquired.append(conn)
return conn
if len(self._acquired) < self._max_conns:
conn = await self._get_connection(self._username, self._password,
self._max_inflight,
self._response_timeout,
self._message_serializer,
self._provider)
conn.increment_acquired()
self._acquired.append(conn)
return conn
else:
for x in range(len(self._acquired)):
conn = self._acquired.popleft()
if conn.times_acquired < self._max_times_acquired:
conn.increment_acquired()
self._acquired.append(conn)
return conn
self._acquired.append(conn)
else:
await self._condition.wait()
[docs] async def close(self):
"""**coroutine** Close connection pool."""
waiters = []
while self._available:
conn = self._available.popleft()
waiters.append(conn.close())
while self._acquired:
conn = self._acquired.popleft()
waiters.append(conn.close())
await asyncio.gather(*waiters, loop=self._loop)
async def _get_connection(self, username, password, max_inflight,
response_timeout, message_serializer, provider):
conn = await connection.Connection.open(
self._url, self._loop, ssl_context=self._ssl_context,
username=username, password=password,
response_timeout=response_timeout,
message_serializer=message_serializer, provider=provider)
conn = PooledConnection(conn, self)
return conn