Source code for aiogremlin.driver.aiohttp.transport

import aiohttp

from gremlin_python.driver import transport


[docs]class AiohttpTransport(transport.AbstractBaseTransport): def __init__(self, loop): self._loop = loop self._connected = False
[docs] async def connect(self, url, *, ssl_context=None): await self.close() connector = aiohttp.TCPConnector( ssl_context=ssl_context, loop=self._loop) self._client_session = aiohttp.ClientSession( loop=self._loop, connector=connector) self._ws = await self._client_session.ws_connect(url) self._connected = True
[docs] def write(self, message): self._ws.send_bytes(message)
[docs] async def read(self): data = await self._ws.receive() if data.tp == aiohttp.WSMsgType.close: await self._transport.close() raise RuntimeError("Connection closed by server") elif data.tp == aiohttp.WSMsgType.error: # This won't raise properly, fix raise data.data elif data.tp == aiohttp.WSMsgType.closed: # Hmm raise RuntimeError("Connection closed by server") elif data.tp == aiohttp.WSMsgType.text: # Should return bytes data = data.data.strip().encode('utf-8') else: data = data.data return data
[docs] async def close(self): if self._connected: if not self._ws.closed: await self._ws.close() if not self._client_session.closed: await self._client_session.close()
@property def closed(self): return self._ws.closed or self._client_session.closed