Source code for aiogremlin.driver.aiohttp.transport
import aiohttp
from gremlin_python.driver import transport
[docs]class AiohttpTransport(transport.AbstractBaseTransport):
def __init__(self, loop):
self._loop = loop
self._connected = False
[docs] async def connect(self, url, *, ssl_context=None):
await self.close()
connector = aiohttp.TCPConnector(
ssl_context=ssl_context, loop=self._loop)
self._client_session = aiohttp.ClientSession(
loop=self._loop, connector=connector)
self._ws = await self._client_session.ws_connect(url)
self._connected = True
[docs] def write(self, message):
self._ws.send_bytes(message)
[docs] async def read(self):
data = await self._ws.receive()
if data.tp == aiohttp.WSMsgType.close:
await self._transport.close()
raise RuntimeError("Connection closed by server")
elif data.tp == aiohttp.WSMsgType.error:
# This won't raise properly, fix
raise data.data
elif data.tp == aiohttp.WSMsgType.closed:
# Hmm
raise RuntimeError("Connection closed by server")
elif data.tp == aiohttp.WSMsgType.text:
# Should return bytes
data = data.data.strip().encode('utf-8')
else:
data = data.data
return data
[docs] async def close(self):
if self._connected:
if not self._ws.closed:
await self._ws.close()
if not self._client_session.closed:
await self._client_session.close()
@property
def closed(self):
return self._ws.closed or self._client_session.closed