Source code for aiogremlin.process.graph_traversal

from aiogremlin.process.traversal import AsyncTraversalStrategies
from aiogremlin.remote.remote_connection import AsyncRemoteStrategy

from gremlin_python.process import graph_traversal, traversal


[docs]class AsyncGraphTraversal(graph_traversal.GraphTraversal): """Implements async iteration protocol and updates relevant methods""" def __aiter__(self): return self async def __anext__(self): if self.traversers is None: await self.traversal_strategies.apply_strategies(self) if self.last_traverser is None: self.last_traverser = await self.traversers.__anext__() object = self.last_traverser.object self.last_traverser.bulk = self.last_traverser.bulk - 1 if self.last_traverser.bulk <= 0: self.last_traverser = None return object
[docs] async def toList(self): """Reture results as ``list``.""" results = [] async for result in self: results.append(result) return results
[docs] async def toSet(self): """Return results as ``set``.""" results = set() async for result in self: results.add(result) return results
[docs] async def iterate(self): """Iterate over results.""" while True: try: await self.nextTraverser() except StopAsyncIteration: return self
[docs] async def nextTraverser(self): """Return next traverser.""" if self.traversers is None: await self.traversal_strategies.apply_strategies(self) if self.last_traverser is None: return await self.traversers.__anext__() else: temp = self.last_traverser self.last_traverser = None return temp
[docs] async def next(self, amount=None): """Return iterator with optionaly defined amount of items.""" if not amount: try: return await self.__anext__() except StopAsyncIteration: return results = [] for i in range(amount): try: result = await self.__anext__() except StopAsyncIteration: return results results.append(result) return results
class __(graph_traversal.__): graph_traversal = AsyncGraphTraversal
[docs]class AsyncGraphTraversalSource(graph_traversal.GraphTraversalSource): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.graph_traversal = AsyncGraphTraversal
[docs] def withRemote(self, remote_connection): source = self.get_graph_traversal_source() source.traversal_strategies.add_strategies([AsyncRemoteStrategy(remote_connection)]) return source
[docs] def get_graph_traversal_source(self): return self.__class__( self.graph, AsyncTraversalStrategies(self.traversal_strategies), traversal.Bytecode(self.bytecode))