Source code for intake_accumulo.source

from collections import namedtuple, OrderedDict

from . import __version__
from intake.source import base

dtypes = [('row', 'str'),
          ('column_family', 'str'),
          ('column_qualifier', 'str'),
          ('column_visibility', 'str'),
          ('time', 'datetime64[ns]'),
          ('value', 'object')]

KeyValue = namedtuple('KeyValue', [dtype[0] for dtype in dtypes])


[docs]class AccumuloSource(base.DataSource): """Read data from Accumulo table. Parameters ---------- table : str The database table that will act as source host : str The server hostname for the given table port : int The server port for the given table username : str The username used to connect to the Accumulo cluster password : str The password used to connect to the Accumulo cluster """ name = 'accumulo' container = 'dataframe' version = __version__ partition_access = False def __init__(self, table, host="localhost", port=42424, username="root", password="secret", metadata=None): self._table = table self._host = host self._port = port self._username = username self._password = password self._client = None super(AccumuloSource, self).__init__(metadata=metadata) def _get_schema(self): return base.Schema(datashape=None, dtype=dtypes, shape=(None, len(dtypes)), npartitions=1, extra_metadata={}) def _get_partition(self, i): import pandas as pd if self._client is None: from .accumulo import Accumulo self._client = Accumulo(self._host, self._port, self._username, self._password) data = [] scanner = self._client.create_scanner(self._table) while True: chunk = self._client.nextk(scanner) for entry in chunk.results: kv = KeyValue(entry.key.row, entry.key.colFamily, entry.key.colQualifier, entry.key.colVisibility, entry.key.timestamp, entry.value) data.append(kv) if not chunk.more: break df = pd.DataFrame(data, columns=KeyValue._fields) return df.astype(dtype=OrderedDict(dtypes)) def _close(self): self._client = None