Source code for aiogremlin.driver.resultset

import asyncio
import functools

from aiogremlin import exception


[docs]def error_handler(fn): @functools.wraps(fn) async def wrapper(self): msg = await fn(self) if msg: if msg.status_code not in [200, 206]: self.close() raise exception.GremlinServerError( msg.status_code, "{0}: {1}".format(msg.status_code, msg.message)) msg = msg.data return msg return wrapper
[docs]class ResultSet: """Gremlin Server response implementated as an async iterator.""" def __init__(self, request_id, timeout, loop): self._response_queue = asyncio.Queue(loop=loop) self._request_id = request_id self._loop = loop self._timeout = timeout self._done = asyncio.Event(loop=self._loop) self._aggregate_to = None @property def request_id(self): return self._request_id @property def stream(self): return self._response_queue
[docs] def queue_result(self, result): if result is None: self.close() self._response_queue.put_nowait(result)
@property def done(self): """ Readonly property. :returns: `asyncio.Event` object """ return self._done @property def aggregate_to(self): return self._aggregate_to @aggregate_to.setter def aggregate_to(self, val): self._aggregate_to = val async def __aiter__(self): return self async def __anext__(self): msg = await self.one() if not msg: raise StopAsyncIteration return msg
[docs] def close(self): self.done.set() self._loop = None
[docs] @error_handler async def one(self): """Get a single message from the response stream""" if not self._response_queue.empty(): msg = self._response_queue.get_nowait() elif self.done.is_set(): msg = None else: try: msg = await asyncio.wait_for(self._response_queue.get(), timeout=self._timeout, loop=self._loop) except asyncio.TimeoutError: self.close() raise exception.ResponseTimeoutError('Response timed out') return msg
[docs] async def all(self): results = [] async for result in self: results.append(result) return results