diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4890598 --- /dev/null +++ b/.gitignore @@ -0,0 +1,113 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +.idea \ No newline at end of file diff --git a/__init__.py b/__init__.py deleted file mode 100644 index 38d745b..0000000 --- a/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from client import NewClient -assert NewClient # silence pyflakes diff --git a/helpers/__init__.py b/helpers/__init__.py deleted file mode 100644 index 4db2fe6..0000000 --- a/helpers/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -import varint -# Silence pyflakes -assert varint diff --git a/pybase/__init__.py b/pybase/__init__.py new file mode 100644 index 0000000..48bd80a --- /dev/null +++ b/pybase/__init__.py @@ -0,0 +1 @@ +from .client import NewClient # noqa diff --git a/client.py b/pybase/client.py similarity index 74% rename from client.py rename to pybase/client.py index 35a0770..1551f9b 100644 --- a/client.py +++ b/pybase/client.py @@ -13,26 +13,37 @@ See the License for the specific language governing permissions and limitations under the License. """ -import zk.client as zk -import region.client as region -from region.region import region_from_cell -from request import request +from __future__ import absolute_import, print_function, unicode_literals +from collections import defaultdict + import logging -import logging.config -from intervaltree import IntervalTree -from threading import Lock -from time import sleep +from builtins import str +from concurrent.futures import as_completed from itertools import chain -from filters import _to_filter -from exceptions import * +from threading import Condition, Lock + +import pybase.region.client as region +import pybase.zk.client as zk +from intervaltree import IntervalTree + +from .exceptions import ( + MasterServerException, + NoSuchTableException, + PyBaseException, + RegionException, + RegionServerException, + ZookeeperException +) +from .filters import _to_filter +from .region.region import region_from_cell +from .request import request -# Using a tiered logger such that all submodules propagate through to this -# logger. Changing the logging level here should affect all other modules. -logger = logging.getLogger('pybase') +logger = logging.getLogger(__name__) -class MainClient: - def __init__(self, zkquorum, pool_size): +class MainClient(object): + + def __init__(self, zkquorum, pool_size, secondary, call_timeout=60): # Location of the ZooKeeper quorum (csv) self.zkquorum = zkquorum # Connection pool size per region server (and master!) @@ -50,6 +61,38 @@ def __init__(self, zkquorum, pool_size): # Mutex used so only one thread can request meta information from # the master at a time. self._master_lookup_lock = Lock() + # Capture if this client is being used for secondary operations + # We don't really care if it fails, best effort only. + self.secondary = secondary + # How long to wait before a call times out + self.call_timeout = call_timeout + + self.zk_client = zk.connect(zkquorum) + + wait_for_master = Condition() + wait_for_master.acquire() + + # register a callback handler when master znode data changes + @self.zk_client.DataWatch(zk.master_znode) + def _update_master_info(data, stat): + initial = self.master_client is None + if data: + with self._master_lookup_lock: + self.update_master_client(*zk.parse_master_info(data)) + + if initial: + wait_for_master.acquire() + wait_for_master.notify_all() + wait_for_master.release() + + wait_time = 0.0 + # wait up to 10s + while self.master_client is None and wait_time < 10.0: + wait_for_master.wait(1.0) + wait_time += 1.0 + + if self.master_client is None: + raise ZookeeperException("Timed out waiting for master server watch to fire") """ HERE LAY CACHE OPERATIONS @@ -57,15 +100,15 @@ def __init__(self, zkquorum, pool_size): def _add_to_region_cache(self, new_region): stop_key = new_region.stop_key - if stop_key == '': + if stop_key == b'': # This is hacky but our interval tree requires hard interval stops. # So what's the largest char out there? chr(255) -> '\xff'. If # you're using '\xff' as a prefix for your rows then this'll cause # a cache miss on every request. - stop_key = '\xff' + stop_key = b'\xff' # Keys are formatted like: 'tablename,key' - start_key = new_region.table + ',' + new_region.start_key - stop_key = new_region.table + ',' + stop_key + start_key = new_region.table + b',' + new_region.start_key + stop_key = new_region.table + b',' + stop_key # Only let one person touch the cache at once. with self._cache_lock: @@ -101,7 +144,7 @@ def _get_from_region_cache(self, table, key): def _delete_from_region_cache(self, table, start_key): # Don't acquire the lock because the calling function should have done # so already - self.region_cache.remove_overlap(table + "," + start_key) + self.region_cache.remove_overlap(table + b"," + start_key) """ HERE LAY REQUESTS @@ -142,6 +185,59 @@ def get(self, table, key, families={}, filters=None): e._handle_exception(self, dest_region=dest_region) # Everything should be dandy now. Repeat the request! return self.get(table, key, families=families, filters=filters) + + def get_many(self, table, keys, families=None): + """ + get row or specified cell with optional filter for all provided keys + :param table: hbase table + :param key: list of row key + :param families: (optional) specifies columns to get, + e.g., {"columnFamily1":["col1","col2"], "colFamily2": "col3"} + :return: tuple of (list of responses with cells, list of exceptions that occurred) + """ + if len(keys) == 0: + return [] + + grouped_by_server = defaultdict(lambda: defaultdict(list)) + for key in keys: + dest_region = self._find_hosting_region(table, key) + # we must call each region server, which can server many key ranges + grouped_by_server[dest_region.region_client.host][dest_region].append(key) + + results = [] + errors = [] + tasks = [] + for grouped_by_region in grouped_by_server.values(): + try: + dest_region = next(iter(grouped_by_region.keys())) + client = dest_region.region_client + rq = request.multi_get(grouped_by_region, families) + tasks.append(client._send_request(rq, _async=True)) + except PyBaseException as e: + e._handle_exception(self, dest_region=dest_region) + errors.append(e) + + try: + for f in as_completed(tasks, timeout=self.call_timeout * len(grouped_by_server)): + try: + response = f.result() + for ra_result in response.regionActionResult: + if ra_result.exception.name != "": + errors.append(client._parse_exception(ra_result.exception.name, + ra_result.exception.value)) + else: + for res_or_err in ra_result.resultOrException: + if res_or_err.exception.name != "": + errors.append(client._parse_exception(res_or_err.exception.name, + res_or_err.exception.value)) + else: + results.append(Result(res_or_err)) + except PyBaseException as e: + e._handle_exception(self, dest_region=dest_region) + errors.append(e) + except TimeoutError: + errors.append(e) + return results, errors def put(self, table, key, values): return self._mutate(table, key, values, request.put_request) @@ -206,13 +302,15 @@ def scan(self, table, start_key='', stop_key=None, families={}, filters=None): # or merged so this recursive call may be scanning multiple regions or only half # of one region). result_set._append_response(self.scan( - table, start_key=previous_stop_key, stop_key=cur_region.stop_key, families=families, filters=filters)) + table, start_key=previous_stop_key, stop_key=cur_region.stop_key, + families=families, filters=filters)) # We continue here because we don't want to append the # first_response results to the result_set. When we did the # recursive scan it rescanned whatever the first_response # initially contained. Appending both will produce duplicates. previous_stop_key = cur_region.stop_key - if previous_stop_key == '' or (stop_key is not None and previous_stop_key > stop_key): + if previous_stop_key == b'' or \ + (stop_key is not None and previous_stop_key > stop_key): break continue # Both calls succeeded! Append the results to the result_set. @@ -223,11 +321,12 @@ def scan(self, table, start_key='', stop_key=None, families={}, filters=None): previous_stop_key = cur_region.stop_key # Stopping criteria. This region is either the end ('') or the end of this region is # beyond the specific stop_key. - if previous_stop_key == '' or (stop_key is not None and previous_stop_key > stop_key): + if previous_stop_key == b'' or (stop_key is not None and previous_stop_key > stop_key): break return result_set - def _scan_hit_region_once(self, previous_stop_key, table, start_key, stop_key, families, filters): + def _scan_hit_region_once(self, previous_stop_key, table, start_key, stop_key, families, + filters): try: # Lookup the next region to scan by searching for the # previous_stop_key (region keys are inclusive on the start and @@ -235,10 +334,11 @@ def _scan_hit_region_once(self, previous_stop_key, table, start_key, stop_key, f cur_region = self._find_hosting_region( table, previous_stop_key) except PyBaseException as e: - # This means that either Master is down or something's funky with the META region. Try handling it - # and recursively perform the same call again. + # This means that either Master is down or something's funky with the META region. + # Try handling it and recursively perform the same call again. e._handle_exception(self) - return self._scan_hit_region_once(previous_stop_key, table, start_key, stop_key, families, filters) + return self._scan_hit_region_once(previous_stop_key, table, start_key, stop_key, + families, filters) # Create the scan request object. The last two values are 'Close' and # 'Scanner_ID' respectively. rq = request.scan_request( @@ -250,7 +350,8 @@ def _scan_hit_region_once(self, previous_stop_key, table, start_key, stop_key, f # Uh oh. Probably a region/region server issue. Handle it and try # again. e._handle_exception(self, dest_region=cur_region) - return self._scan_hit_region_once(previous_stop_key, table, start_key, stop_key, families, filters) + return self._scan_hit_region_once(previous_stop_key, table, start_key, stop_key, + families, filters) return response, cur_region def _scan_region_while_more_results(self, cur_region, response): @@ -273,7 +374,7 @@ def _scan_region_while_more_results(self, cur_region, response): # Now close the scanner. rq = request.scan_request( cur_region, None, None, None, None, True, scanner_id) - _ = cur_region.region_client._send_request(rq) + cur_region.region_client._send_request(rq) # Close it and return the results! return response_set @@ -301,8 +402,7 @@ def _find_hosting_region(self, table, key): dest_region = self._get_from_region_cache(table, key) if dest_region is None: # Nope, still not in the cache. - logger.debug( - 'Region cache miss! Table: %s, Key: %s', table, key) + logger.debug('Region cache miss! Table: %s, Key: %s', table, key) # Ask master for region information. dest_region = self._discover_region(table, key) return dest_region @@ -318,9 +418,9 @@ def _discover_region(self, table, key): except (AttributeError, RegionServerException, RegionException): if self.master_client is None: # I don't know why this can happen but it does. - raise MasterServerException(None, None) + raise MasterServerException(None, None, secondary=self.secondary) raise MasterServerException( - self.master_client.host, self.master_client.port) + self.master_client.host, self.master_client.port, secondary=self.secondary) # Master gave us a response. We need to run and parse the response, # then do all necessary work for entering it into our structures. return self._create_new_region(response, table) @@ -331,31 +431,33 @@ def _create_new_region(self, response, table): # table doesn't exist! if len(cells) == 0: raise NoSuchTableException("Table does not exist.") + server_loc = None # We get ~4 cells back each holding different information. We only care # about two of them. for cell in cells: - if cell.qualifier == "regioninfo": + if cell.qualifier == b"regioninfo": # Take the regioninfo information and parse it into our own # Region representation. new_region = region_from_cell(cell) - elif cell.qualifier == "server": + elif cell.qualifier == b"server": # Grab the host, port of the Region Server that this region is # hosted on. server_loc = cell.value - host, port = cell.value.split(':') + host, port = cell.value.split(b':') else: continue # Do we have an existing client for this region server already? - if server_loc in self.reverse_client_cache: + if server_loc and server_loc in self.reverse_client_cache: # If so, grab it! new_region.region_client = self.reverse_client_cache[server_loc] else: # Otherwise we need to create a new region client instance. - new_client = region.NewClient(host, port, self.pool_size) + new_client = region.NewClient(host, port, self.pool_size, + secondary=self.secondary, call_timeout=self.call_timeout) if new_client is None: # Welp. We can't connect to the server that the Master # supplied. Raise an exception. - raise RegionServerException(host=host, port=port) + raise RegionServerException(host=host, port=port, secondary=self.secondary) logger.info("Created new Client for RegionServer %s", server_loc) # Add it to the host,port -> instance of region client map. self.reverse_client_cache[server_loc] = new_client @@ -367,20 +469,18 @@ def _create_new_region(self, response, table): logger.info("Successfully discovered new region %s", new_region) return new_region - def _recreate_master_client(self): - if self.master_client is not None: - # yep, still no idea why self.master_client can be set to None. + def update_master_client(self, ip, port): + if self.master_client: self.master_client.close() - # Ask ZooKeeper for the location of the Master. - ip, port = zk.LocateMaster(self.zkquorum) + try: - # Try creating a new client instance and setting it as the new - # master_client. - self.master_client = region.NewClient(ip, port, self.pool_size) + # Try creating a new client instance and setting it as the new master_client. + self.master_client = region.NewClient(ip, port, self.pool_size, + secondary=self.secondary, + call_timeout=self.call_timeout) + logger.info("Updated master client to %s:%s", ip, port) except RegionServerException: - # We can't connect to the address that ZK supplied. Raise an - # exception. - raise MasterServerException(ip, port) + raise MasterServerException(ip, port, secondary=self.secondary) """ HERE LAY THE MISCELLANEOUS @@ -402,7 +502,7 @@ def _purge_client(self, region_client): for reg in region_client.regions: self._delete_from_region_cache(reg.table, reg.start_key) self.reverse_client_cache.pop( - region_client.host + ":" + region_client.port, None) + region_client.host + b":" + region_client.port, None) region_client.close() def _purge_region(self, reg): @@ -416,7 +516,11 @@ def _purge_region(self, reg): pass def _construct_meta_key(self, table, key): - return table + "," + key + ",:" + if isinstance(table, str): + table = table.encode('utf8') + if isinstance(key, str): + key = key.encode('utf8') + return table + b',' + key + b',:' def close(self): logger.info("Main client received close request.") @@ -431,7 +535,7 @@ def close(self): self.reverse_client_cache = {} -class Result: +class Result(object): # Called like Result(my_response), takes all the wanted data from # my_response and puts it into our own result structure. @@ -468,7 +572,7 @@ def _append_response(self, rsp): try: self.cells.extend([result.cell for result in rsp.results]) self.stale = self.stale or rsp.stale - except AttributeError as e: + except AttributeError: # This is a single result object we're merging instead. self.cells.extend(rsp.cells) self.stale = self.stale or rsp.stale @@ -478,10 +582,5 @@ def _append_response(self, rsp): # location of ZooKeeper this function will ask ZK for the location of the # meta table and create the region client responsible for future meta # lookups (masterclient). Returns an instance of MainClient -def NewClient(zkquorum, socket_pool_size=1): - # Create the main client. - a = MainClient(zkquorum, socket_pool_size) - # Create the master client. - a._recreate_master_client() - return a - +def NewClient(zkquorum, socket_pool_size=1, secondary=False, call_timeout=60): + return MainClient(zkquorum, socket_pool_size, secondary, call_timeout=call_timeout) diff --git a/exceptions.py b/pybase/exceptions.py similarity index 81% rename from exceptions.py rename to pybase/exceptions.py index 6c022f8..43d0188 100644 --- a/exceptions.py +++ b/pybase/exceptions.py @@ -13,13 +13,17 @@ See the License for the specific language governing permissions and limitations under the License. """ +from __future__ import absolute_import, print_function, unicode_literals + import logging -from time import sleep -from threading import Condition, Lock, RLock, Semaphore -from time import time from collections import defaultdict -logger = logging.getLogger('pybase.' + __name__) -logger.setLevel(logging.DEBUG) +from functools import reduce +from threading import Lock, Semaphore +from time import sleep, time + +from builtins import str + +logger = logging.getLogger(__name__) # All PyBase exceptions inherit from me. Assumes unrecoverable. @@ -30,7 +34,7 @@ class PyBaseException(Exception): # unrecoverable and thus the _handle method # just reraises the exception. def _handle_exception(self, main_client, **kwargs): - raise self.__class__(self.message) + raise self.__class__(str(self)) # Parent of any exceptions involving Zookeeper @@ -61,18 +65,24 @@ class ZookeeperResponseException(ZookeeperException): # Means an RS is dead or unreachable. class RegionServerException(PyBaseException): - def __init__(self, host=None, port=None, region_client=None): - self.host = host - self.port = port + def __init__(self, host=None, port=None, region_client=None, secondary=False): + self.host = host.encode('utf8') if isinstance(host, str) else host + self.port = port.encode('utf8') if isinstance(port, str) else port self.region_client = region_client + self.secondary = secondary def _handle_exception(self, main_client, **kwargs): # region_client not set? Then host/port must have been. Fetch the # client given the host, port if self.region_client is None: - concat = self.host + ":" + self.port + concat = self.host + b":" + self.port self.region_client = main_client.reverse_client_cache.get( concat, None) + + # we don't care about secondaries, move on + if (self.region_client and self.region_client.secondary) or self.secondary: + _let_all_through(self, self.region_client) + # Let one greenlet through per region_client (returns True otherwise # blocks and eventually returns False) if _let_one_through(self, self.region_client): @@ -84,12 +94,15 @@ def _handle_exception(self, main_client, **kwargs): if loc in main_client.reverse_client_cache: # We're the first in and it's our job to kill the client. # Purge it. - logger.warn("Region server %s:%s refusing connections. Purging cache, sleeping, retrying.", - self.region_client.host, self.region_client.port) + logger.warning( + "Region server %s:%s refusing connections. " + "Purging cache, sleeping, retrying.", + self.region_client.host, self.region_client.port + ) main_client._purge_client(self.region_client) # Sleep for an arbitrary amount of time. If this returns # False then we've hit our max retry threshold. Die. - key = self.region_client.host + ':' + self.region_client.port + key = self.region_client.host + ":" + self.region_client.port if not _dynamic_sleep(self, key): raise self finally: @@ -106,18 +119,27 @@ class RegionServerStoppedException(RegionServerException): # All Master exceptions inherit from me class MasterServerException(PyBaseException): - def __init__(self, host, port): - self.host = host - self.port = port + def __init__(self, host, port, secondary=False): + self.host = host.encode('utf8') if isinstance(host, str) else host + self.port = port.encode('utf8') if isinstance(port, str) else port + self.secondary = secondary def _handle_exception(self, main_client, **kwargs): + # we don't care about secondaries, move on + if self.secondary: + _let_all_through(self, None) + # Let one greenlet through. Others block and eventually return False. if _let_one_through(self, None): try: # Makes sure someone else hasn't already fixed the issue. - if main_client.master_client is None or (self.host == main_client.master_client.host and self.port == main_client.master_client.port): - logger.warn( - "Encountered an exception with the Master server. Sleeping then reestablishing.") + if main_client.master_client is None or \ + (self.host == main_client.master_client.host + and self.port == main_client.master_client.port): + logger.warning( + "Encountered an exception with the Master server. " + "Sleeping then reestablishing." + ) if not _dynamic_sleep(self, None): raise self main_client._recreate_master_client() @@ -127,20 +149,24 @@ def _handle_exception(self, main_client, **kwargs): # Master gave us funky data. Unrecoverable. class MasterMalformedResponseException(MasterServerException): - def __init__(self, host, port): - self.host = host - self.port = port - def _handle_exception(self, main_client, **kwargs): - raise self.__class__(self.message) + raise self.__class__(str(self)) # All region exceptions inherit from me. class RegionException(PyBaseException): + def __init__(self, region_client=None): + self.region_client = region_client + def _handle_exception(self, main_client, **kwargs): if "dest_region" in kwargs: rg_n = kwargs["dest_region"].region_name + + # we don't care about secondaries, move on + if self.region_client and self.region_client.secondary: + _let_all_through(self, rg_n) + if _let_one_through(self, rg_n): try: main_client._purge_region(kwargs["dest_region"]) @@ -165,6 +191,10 @@ class NotServingRegionException(RegionException): class RegionOpeningException(RegionException): def _handle_exception(self, main_client, **kwargs): + # we don't care about secondaries, move on + if self.region_client and self.region_client.secondary: + raise self + if "dest_region" in kwargs: rg_n = kwargs["dest_region"].region_name # There's nothing to handle here. We just need to give the region @@ -268,8 +298,10 @@ def _let_all_through(exception, data): # We want to sleep more and more with every exception retry. -def sleep_formula(x): return (x / 1.5)**2 -# [0.0, 0.44, 1.77, 4.0, 7.11, 11.11, 16.0, 21.77, 28.44, 36.0] +def sleep_formula(x): + # [0.0, 0.44, 1.77, 4.0, 7.11, 11.11, 16.0, 21.77, 28.44, 36.0] + return (x / 1.5)**2 + _exception_count = defaultdict(lambda: (0, time())) _max_retries = 7 diff --git a/filters.py b/pybase/filters.py similarity index 88% rename from filters.py rename to pybase/filters.py index ccdc0f5..e9b0da5 100644 --- a/filters.py +++ b/pybase/filters.py @@ -13,10 +13,13 @@ See the License for the specific language governing permissions and limitations under the License. """ +from __future__ import absolute_import, print_function, unicode_literals + import traceback -import pb.Filter_pb2 as pbFilter -import pb.Comparator_pb2 as pbComparator -from pb.HBase_pb2 import BytesBytesPair as pbBytesBytesPair + +from .pb import Comparator_pb2 as pbComparator +from .pb import Filter_pb2 as pbFilter +from .pb.HBase_pb2 import BytesBytesPair as pbBytesBytesPair # You're brave to venture into this file. @@ -45,26 +48,21 @@ # A FilterList is also a Filter. But it's also a list of Filters with an # operator. This allows you to build up complicated boolean expressions by # chaining FilterLists. -class FilterList: +class FilterList(object): def __init__(self, operator, *arg): self.filter_type = pbFilter.FilterList self.name = filter_path + "FilterList" self.operator = operator self.filters = [] - try: - for incoming_filter in arg: - self.filters.append(_to_filter(incoming_filter)) - except TypeError: - # They passed a single filter and not a sequence of filters. - self.filters.append(_to_filter(filters)) + self.add_filters(*arg) def add_filters(self, *arg): for new_filter in arg: self.filters.append(_to_filter(new_filter)) -class ColumnCountGetFilter: +class ColumnCountGetFilter(object): def __init__(self, limit): self.filter_type = pbFilter.ColumnCountGetFilter @@ -72,7 +70,7 @@ def __init__(self, limit): self.limit = limit -class ColumnPaginationFilter: +class ColumnPaginationFilter(object): def __init__(self, limit, offset, column_offset): self.filter_type = pbFilter.ColumnPaginationFilter @@ -82,7 +80,7 @@ def __init__(self, limit, offset, column_offset): self.column_offset = column_offset -class ColumnPrefixFilter: +class ColumnPrefixFilter(object): def __init__(self, prefix): self.filter_type = pbFilter.ColumnPrefixFilter @@ -90,7 +88,7 @@ def __init__(self, prefix): self.prefix = prefix -class ColumnRangeFilter: +class ColumnRangeFilter(object): def __init__(self, min_column, min_column_inclusive, max_column, max_column_inclusive): self.filter_type = pbFilter.ColumnRangeFilter @@ -101,7 +99,7 @@ def __init__(self, min_column, min_column_inclusive, max_column, max_column_incl self.max_column_inclusive = max_column_inclusive -class CompareFilter: +class CompareFilter(object): def __init__(self, compare_op, comparator): self.filter_type = pbFilter.CompareFilter @@ -110,7 +108,7 @@ def __init__(self, compare_op, comparator): self.comparator = _to_comparator(comparator) -class DependentColumnFilter: +class DependentColumnFilter(object): def __init__(self, compare_filter, column_family, column_qualifier, drop_dependent_column): self.filter_type = pbFilter.DependentColumnFilter @@ -121,7 +119,7 @@ def __init__(self, compare_filter, column_family, column_qualifier, drop_depende self.drop_dependent_column = drop_dependent_column -class FamilyFilter: +class FamilyFilter(object): def __init__(self, compare_filter): self.filter_type = pbFilter.FamilyFilter @@ -129,7 +127,7 @@ def __init__(self, compare_filter): self.compare_filter = _to_filter(compare_filter) -class FilterWrapper: +class FilterWrapper(object): def __init__(self, new_filter): self.filter_type = pbFilter.FilterWrapper @@ -137,14 +135,14 @@ def __init__(self, new_filter): self.filter = _to_filter(new_filter) -class FirstKeyOnlyFilter: +class FirstKeyOnlyFilter(object): def __init__(self): self.filter_type = pbFilter.FirstKeyOnlyFilter self.name = filter_path + "FirstKeyOnlyFilter" -class FirstKeyValueMatchingQualifiersFilter: +class FirstKeyValueMatchingQualifiersFilter(object): def __init__(self, qualifiers): self.filter_type = pbFilter.FirstKeyValueMatchingQualifiersFilter @@ -152,7 +150,7 @@ def __init__(self, qualifiers): self.qualifiers = qualifiers -class FuzzyRowFilter: +class FuzzyRowFilter(object): def __init__(self, fuzzy_keys_data): self.filter_type = pbFilter.FuzzyRowFilter @@ -166,7 +164,7 @@ def __init__(self, fuzzy_keys_data): self.fuzzy_keys_data.append(_to_bytes_bytes_pair(fuzzy_keys_data)) -class InclusiveStopFilter: +class InclusiveStopFilter(object): def __init__(self, stop_row_key): self.filter_type = pbFilter.InclusiveStopFilter @@ -174,7 +172,7 @@ def __init__(self, stop_row_key): self.stop_row_key = stop_row_key -class KeyOnlyFilter: +class KeyOnlyFilter(object): def __init__(self, len_as_val): self.filter_type = pbFilter.KeyOnlyFilter @@ -182,7 +180,7 @@ def __init__(self, len_as_val): self.len_as_val = len_as_val -class MultipleColumnPrefixFilter: +class MultipleColumnPrefixFilter(object): def __init__(self, sorted_prefixes): self.filter_type = pbFilter.MultipleColumnPrefixFilter @@ -193,7 +191,7 @@ def __init__(self, sorted_prefixes): self.sorted_prefixes = [sorted_prefixes] -class PageFilter: +class PageFilter(object): def __init__(self, page_size): self.filter_type = pbFilter.PageFilter @@ -201,7 +199,7 @@ def __init__(self, page_size): self.page_size = page_size -class PrefixFilter: +class PrefixFilter(object): def __init__(self, prefix): self.filter_type = pbFilter.PrefixFilter @@ -209,7 +207,7 @@ def __init__(self, prefix): self.prefix = prefix -class QualifierFilter: +class QualifierFilter(object): def __init__(self, compare_filter): self.filter_type = pbFilter.QualifierFilter @@ -217,7 +215,7 @@ def __init__(self, compare_filter): self.compare_filter = _to_pb_filter(compare_filter) -class RandomRowFilter: +class RandomRowFilter(object): def __init__(self, chance): self.filter_type = pbFilter.RandomRowFilter @@ -225,7 +223,7 @@ def __init__(self, chance): self.chance = chance -class RowFilter: +class RowFilter(object): def __init__(self, compare_filter): self.filter_type = pbFilter.RowFilter @@ -233,7 +231,7 @@ def __init__(self, compare_filter): self.compare_filter = _to_filter(compare_filter) -class SkipColumnValueExcludeFilter: +class SkipColumnValueExcludeFilter(object): def __init__(self, single_column_value_filter): self.filter_type = pbFilter.SkipColumnValueExcludeFilter @@ -242,9 +240,10 @@ def __init__(self, single_column_value_filter): single_column_value_filter) -class SkipColumnValueFilter: +class SkipColumnValueFilter(object): - def __init__(self, compare_op, comparator, column_family, column_qualifier, filter_if_missing, latest_version_only): + def __init__(self, compare_op, comparator, column_family, column_qualifier, filter_if_missing, + latest_version_only): self.filter_type = pbFilter.SkipColumnValueFilter self.name = filter_path + "SkipColumnValueFilter" self.compare_op = compare_op @@ -255,7 +254,7 @@ def __init__(self, compare_op, comparator, column_family, column_qualifier, filt self.latest_version_only = latest_version_only -class SkipFilter: +class SkipFilter(object): def __init__(self, orig_filter): self.filter_type = pbFilter.SkipFilter @@ -263,7 +262,7 @@ def __init__(self, orig_filter): self.filter = orig_filter -class TimestampsFilter: +class TimestampsFilter(object): def __init__(self, timestamps): self.filter_type = pbFilter.TimestampsFilter @@ -274,7 +273,7 @@ def __init__(self, timestamps): self.timestamps = [timestamps] -class ValueFilter: +class ValueFilter(object): def __init__(self, compare_filter): self.filter_type = pbFilter.ValueFilter @@ -282,7 +281,7 @@ def __init__(self, compare_filter): self.compare_filter = _to_filter(compare_filter) -class WhileMatchFilter: +class WhileMatchFilter(object): def __init__(self, origFilter): self.filter_type = pbFilter.WhileMatchFilter @@ -290,14 +289,14 @@ def __init__(self, origFilter): self.filter = _to_filter(origFilter) -class FilterAllFilter: +class FilterAllFilter(object): def __init__(self): self.filter_type = pbFilter.FilterAllFilter self.name = filter_path + "FilterAllFilter" -class MultiRowRangeFilter: +class MultiRowRangeFilter(object): def __init__(self, row_range_list): self.filter_type = pbFilter.MultiRowRangeFilter @@ -327,11 +326,14 @@ def _to_filter(orig_filter): ft.serialized_filter = _to_pb_filter(orig_filter).SerializeToString() return ft + def _to_pb_filter(orig_filter): try: ft2 = orig_filter.filter_type() - members = [attr for attr in dir(orig_filter) if not callable( - attr) and not attr.startswith("__") and attr not in ["name", "filter_type", "add_filters"]] + members = [ + attr for attr in dir(orig_filter) + if not callable(attr) and not attr.startswith("__") and + attr not in ["name", "filter_type", "add_filters"]] for member in members: try: val = getattr(orig_filter, member) @@ -351,8 +353,7 @@ def _to_pb_filter(orig_filter): raise ValueError("Malformed Filter provided, %s %s" % (ex, traceback.format_exc())) - -class ByteArrayComparable: +class ByteArrayComparable(object): def __init__(self, value): self.comparable_type = pbComparator.ByteArrayComparable @@ -375,7 +376,7 @@ def _to_comparable(orig_cmp): raise ValueError("Malformed Comparable provided %s %s" % (ex, traceback.format_exc())) -class BinaryComparator: +class BinaryComparator(object): def __init__(self, comparable): self.comparator_type = pbComparator.BinaryComparator @@ -383,7 +384,7 @@ def __init__(self, comparable): self.comparable = _to_comparable(comparable) -class LongComparator: +class LongComparator(object): def __init__(self, comparable): self.comparator_type = pbComparator.LongComparator @@ -391,7 +392,7 @@ def __init__(self, comparable): self.comparable = _to_comparable(comparable) -class BinaryPrefixComparator: +class BinaryPrefixComparator(object): def __init__(self, comparable): self.comparator_type = pbComparator.BinaryPrefixComparator @@ -399,7 +400,7 @@ def __init__(self, comparable): self.comparable = _to_comparable(comparable) -class BitComparator: +class BitComparator(object): def __init__(self, comparable, bitwise_op): self.comparator_type = pbComparator.BitComparator @@ -408,14 +409,14 @@ def __init__(self, comparable, bitwise_op): self.bitwise_op = bitwise_op -class NullComparator: +class NullComparator(object): def __init__(self): self.comparator_type = pbComparator.NullComparator self.name = comparator_path + "NullComparator" -class RegexStringComparator: +class RegexStringComparator(object): def __init__(self, pattern, pattern_flags, charset, engine): self.comparator_type = pbComparator.RegexStringComparator @@ -426,7 +427,7 @@ def __init__(self, pattern, pattern_flags, charset, engine): self.engine = engine -class StringComparator: +class StringComparator(object): def __init__(self, substr): self.comparator_type = pbComparator.BinaryPrefixComparator @@ -457,7 +458,8 @@ def _to_comparator(orig_cmp): except Exception as ex: raise ValueError("Malformed Comparator provided %s %s" % (ex, traceback.format_exc())) -class BytesBytesPair: + +class BytesBytesPair(object): def __init__(self, first, second): self.first = first @@ -474,7 +476,7 @@ def _to_bytes_bytes_pair(bbp): raise ValueError("Malformed BytesBytesPair provided") -class RowRange: +class RowRange(object): def __init__(self, start_row, start_row_inclusive, stop_row, stop_row_inclusive): self.filter_type = pbFilter.RowRange @@ -495,4 +497,3 @@ def _to_row_range(rr): return new except Exception: raise ValueError("Malformed RowRange provided") - diff --git a/helpers/README.md b/pybase/helpers/README.md similarity index 100% rename from helpers/README.md rename to pybase/helpers/README.md diff --git a/pybase/helpers/__init__.py b/pybase/helpers/__init__.py new file mode 100644 index 0000000..c36125f --- /dev/null +++ b/pybase/helpers/__init__.py @@ -0,0 +1,3 @@ +from __future__ import absolute_import, print_function + +from . import varint # noqa diff --git a/helpers/varint.py b/pybase/helpers/varint.py similarity index 74% rename from helpers/varint.py rename to pybase/helpers/varint.py index 13b8a58..4711a2e 100644 --- a/helpers/varint.py +++ b/pybase/helpers/varint.py @@ -27,78 +27,69 @@ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +from __future__ import absolute_import, print_function, unicode_literals + +import six class NotEnoughDataExcption(Exception): pass -def _VarintDecoder(mask): +def _VarintDecoder(mask, result_type=int): """Return an encoder for a basic varint value (does not include tag). - - Decoded values will be bitwise-anded with the given mask before being - returned, e.g. to limit them to 32 bits. The returned decoder does not - take the usual "end" parameter -- the caller is expected to do bounds checking - after the fact (often the caller can defer such checking until later). The - decoder returns a (value, new_pos) pair. - """ - - local_ord = ord - + Decoded values will be bitwise-anded with the given mask before being + returned, e.g. to limit them to 32 bits. The returned decoder does not + take the usual "end" parameter -- the caller is expected to do bounds checking + after the fact (often the caller can defer such checking until later). The + decoder returns a (value, new_pos) pair. + """ def DecodeVarint(buffer, pos): result = 0 shift = 0 while 1: - if pos > len(buffer) - 1: - raise NotEnoughDataExcption("Not enough data to decode varint") - b = local_ord(buffer[pos]) + b = six.indexbytes(buffer, pos) result |= ((b & 0x7f) << shift) pos += 1 if not (b & 0x80): result &= mask + result = result_type(result) return (result, pos) shift += 7 if shift >= 64: raise ValueError('Too many bytes when decoding varint.') - return DecodeVarint -def _SignedVarintDecoder(mask): +def _SignedVarintDecoder(bits, result_type=int): """Like _VarintDecoder() but decodes signed values.""" - - local_ord = ord + signbit = 1 << (bits - 1) + mask = (1 << bits) - 1 def DecodeVarint(buffer, pos): result = 0 shift = 0 while 1: - if pos > len(buffer) - 1: - raise NotEnoughDataExcption("Not enough data to decode varint") - b = local_ord(buffer[pos]) + b = six.indexbytes(buffer, pos) result |= ((b & 0x7f) << shift) pos += 1 if not (b & 0x80): - if result > 0x7fffffffffffffff: - result -= (1 << 64) - result |= ~mask - else: - result &= mask + result &= mask + result = (result ^ signbit) - signbit + result = result_type(result) return (result, pos) shift += 7 if shift >= 64: raise ValueError('Too many bytes when decoding varint.') - return DecodeVarint decodeVarint = _VarintDecoder((1 << 64) - 1) -decodeSignedVarint = _SignedVarintDecoder((1 << 64) - 1) - +decodeSignedVarint = _SignedVarintDecoder(64, int) # Use these versions for values which must be limited to 32 bits. decodeVarint32 = _VarintDecoder((1 << 32) - 1) -decodeSignedVarint32 = _SignedVarintDecoder((1 << 32) - 1) +decodeSignedVarint32 = _SignedVarintDecoder(32, int) def varintSize(value): @@ -150,41 +141,33 @@ def signedVarintSize(value): def _VarintEncoder(): - """Return an encoder for a basic varint value.""" - - local_chr = chr - - def EncodeVarint(write, value): + """Return an encoder for a basic varint value (does not include tag).""" + def EncodeVarint(write, value, unused_deterministic=None): bits = value & 0x7f value >>= 7 while value: - write(local_chr(0x80 | bits)) + write(six.int2byte(0x80|bits)) bits = value & 0x7f value >>= 7 - return write(local_chr(bits)) - + return write(six.int2byte(bits)) return EncodeVarint def _SignedVarintEncoder(): - """Return an encoder for a basic signed varint value.""" - - local_chr = chr - - def EncodeSignedVarint(write, value): + """Return an encoder for a basic signed varint value (does not include + tag).""" + def EncodeSignedVarint(write, value, unused_deterministic=None): if value < 0: value += (1 << 64) bits = value & 0x7f value >>= 7 while value: - write(local_chr(0x80 | bits)) + write(six.int2byte(0x80|bits)) bits = value & 0x7f value >>= 7 - return write(local_chr(bits)) - + return write(six.int2byte(bits)) return EncodeSignedVarint encodeVarint = _VarintEncoder() encodeSignedVarint = _SignedVarintEncoder() - diff --git a/pb/Cell.proto b/pybase/pb/Cell.proto similarity index 100% rename from pb/Cell.proto rename to pybase/pb/Cell.proto diff --git a/pb/Cell_pb2.py b/pybase/pb/Cell_pb2.py similarity index 99% rename from pb/Cell_pb2.py rename to pybase/pb/Cell_pb2.py index a64f8ae..6de3f38 100644 --- a/pb/Cell_pb2.py +++ b/pybase/pb/Cell_pb2.py @@ -1,14 +1,18 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # source: Cell.proto +from __future__ import absolute_import, print_function + import sys -_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) -from google.protobuf.internal import enum_type_wrapper + from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pb2 from google.protobuf import message as _message from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database -from google.protobuf import descriptor_pb2 +from google.protobuf.internal import enum_type_wrapper + +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() diff --git a/pb/Client.proto b/pybase/pb/Client.proto similarity index 100% rename from pb/Client.proto rename to pybase/pb/Client.proto diff --git a/pb/Client_pb2.py b/pybase/pb/Client_pb2.py similarity index 99% rename from pb/Client_pb2.py rename to pybase/pb/Client_pb2.py index df9092e..e59ca92 100644 --- a/pb/Client_pb2.py +++ b/pybase/pb/Client_pb2.py @@ -1,23 +1,28 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # source: Client.proto +from __future__ import absolute_import, print_function + import sys -_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) -from google.protobuf.internal import enum_type_wrapper + from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pb2 from google.protobuf import message as _message from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database -from google.protobuf import descriptor_pb2 +from google.protobuf.internal import enum_type_wrapper + +from . import Cell_pb2 +from . import Comparator_pb2 +from . import Filter_pb2 +from . import HBase_pb2 + +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() -import HBase_pb2 -import Filter_pb2 -import Cell_pb2 -import Comparator_pb2 DESCRIPTOR = _descriptor.FileDescriptor( diff --git a/pb/ClusterId.proto b/pybase/pb/ClusterId.proto similarity index 100% rename from pb/ClusterId.proto rename to pybase/pb/ClusterId.proto diff --git a/pb/ClusterId_pb2.py b/pybase/pb/ClusterId_pb2.py similarity index 97% rename from pb/ClusterId_pb2.py rename to pybase/pb/ClusterId_pb2.py index 2b9517e..20e17d2 100644 --- a/pb/ClusterId_pb2.py +++ b/pybase/pb/ClusterId_pb2.py @@ -1,13 +1,17 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # source: ClusterId.proto +from __future__ import absolute_import, print_function + import sys -_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) + from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pb2 from google.protobuf import message as _message from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database -from google.protobuf import descriptor_pb2 + +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() diff --git a/pb/ClusterStatus.proto b/pybase/pb/ClusterStatus.proto similarity index 100% rename from pb/ClusterStatus.proto rename to pybase/pb/ClusterStatus.proto diff --git a/pb/ClusterStatus_pb2.py b/pybase/pb/ClusterStatus_pb2.py similarity index 99% rename from pb/ClusterStatus_pb2.py rename to pybase/pb/ClusterStatus_pb2.py index 6d00ad4..303fb8c 100644 --- a/pb/ClusterStatus_pb2.py +++ b/pybase/pb/ClusterStatus_pb2.py @@ -1,21 +1,26 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # source: ClusterStatus.proto +from __future__ import absolute_import, print_function + import sys -_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) + from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pb2 from google.protobuf import message as _message from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database -from google.protobuf import descriptor_pb2 + +from . import ClusterId_pb2 +from . import FS_pb2 +from . import HBase_pb2 + +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() -import HBase_pb2 -import ClusterId_pb2 -import FS_pb2 DESCRIPTOR = _descriptor.FileDescriptor( diff --git a/pb/Comparator.proto b/pybase/pb/Comparator.proto similarity index 100% rename from pb/Comparator.proto rename to pybase/pb/Comparator.proto diff --git a/pb/Comparator_pb2.py b/pybase/pb/Comparator_pb2.py similarity index 99% rename from pb/Comparator_pb2.py rename to pybase/pb/Comparator_pb2.py index cfcaa1e..b519d7e 100644 --- a/pb/Comparator_pb2.py +++ b/pybase/pb/Comparator_pb2.py @@ -1,13 +1,17 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # source: Comparator.proto +from __future__ import absolute_import, print_function + import sys -_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) + from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pb2 from google.protobuf import message as _message from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database -from google.protobuf import descriptor_pb2 + +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() diff --git a/pb/ErrorHandling.proto b/pybase/pb/ErrorHandling.proto similarity index 100% rename from pb/ErrorHandling.proto rename to pybase/pb/ErrorHandling.proto diff --git a/pb/ErrorHandling_pb2.py b/pybase/pb/ErrorHandling_pb2.py similarity index 99% rename from pb/ErrorHandling_pb2.py rename to pybase/pb/ErrorHandling_pb2.py index d6e8443..6d9779a 100644 --- a/pb/ErrorHandling_pb2.py +++ b/pybase/pb/ErrorHandling_pb2.py @@ -1,13 +1,17 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # source: ErrorHandling.proto +from __future__ import absolute_import, print_function + import sys -_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) + from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pb2 from google.protobuf import message as _message from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database -from google.protobuf import descriptor_pb2 + +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() diff --git a/pb/FS.proto b/pybase/pb/FS.proto similarity index 100% rename from pb/FS.proto rename to pybase/pb/FS.proto diff --git a/pb/FS_pb2.py b/pybase/pb/FS_pb2.py similarity index 98% rename from pb/FS_pb2.py rename to pybase/pb/FS_pb2.py index 72a61d7..5972995 100644 --- a/pb/FS_pb2.py +++ b/pybase/pb/FS_pb2.py @@ -1,13 +1,17 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # source: FS.proto +from __future__ import absolute_import, print_function + import sys -_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) + from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pb2 from google.protobuf import message as _message from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database -from google.protobuf import descriptor_pb2 + +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() diff --git a/pb/Filter.proto b/pybase/pb/Filter.proto similarity index 100% rename from pb/Filter.proto rename to pybase/pb/Filter.proto diff --git a/pb/Filter_pb2.py b/pybase/pb/Filter_pb2.py similarity index 99% rename from pb/Filter_pb2.py rename to pybase/pb/Filter_pb2.py index 2700503..5090652 100644 --- a/pb/Filter_pb2.py +++ b/pybase/pb/Filter_pb2.py @@ -1,20 +1,25 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # source: Filter.proto +from __future__ import absolute_import, print_function + import sys -_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) + from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pb2 from google.protobuf import message as _message from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database -from google.protobuf import descriptor_pb2 + +from . import Comparator_pb2 +from . import HBase_pb2 + +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() -import HBase_pb2 -import Comparator_pb2 DESCRIPTOR = _descriptor.FileDescriptor( diff --git a/pb/HBase.proto b/pybase/pb/HBase.proto similarity index 100% rename from pb/HBase.proto rename to pybase/pb/HBase.proto diff --git a/pb/HBase_pb2.py b/pybase/pb/HBase_pb2.py similarity index 99% rename from pb/HBase_pb2.py rename to pybase/pb/HBase_pb2.py index 035ba7c..a7acdad 100644 --- a/pb/HBase_pb2.py +++ b/pybase/pb/HBase_pb2.py @@ -1,20 +1,25 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # source: HBase.proto +from __future__ import absolute_import, print_function + import sys -_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) -from google.protobuf.internal import enum_type_wrapper + from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pb2 from google.protobuf import message as _message from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database -from google.protobuf import descriptor_pb2 +from google.protobuf.internal import enum_type_wrapper + +from . import Cell_pb2 + +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() -import Cell_pb2 DESCRIPTOR = _descriptor.FileDescriptor( diff --git a/pb/Master.proto b/pybase/pb/Master.proto similarity index 100% rename from pb/Master.proto rename to pybase/pb/Master.proto diff --git a/pb/Master_pb2.py b/pybase/pb/Master_pb2.py similarity index 99% rename from pb/Master_pb2.py rename to pybase/pb/Master_pb2.py index adc1157..b9af77f 100644 --- a/pb/Master_pb2.py +++ b/pybase/pb/Master_pb2.py @@ -1,23 +1,28 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # source: Master.proto +from __future__ import absolute_import, print_function + import sys -_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) + from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pb2 from google.protobuf import message as _message from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database -from google.protobuf import descriptor_pb2 + +from . import Client_pb2 +from . import ClusterStatus_pb2 +from . import ErrorHandling_pb2 +from . import HBase_pb2 +from . import Quota_pb2 + +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() -import HBase_pb2 -import Client_pb2 -import ClusterStatus_pb2 -import ErrorHandling_pb2 -import Quota_pb2 DESCRIPTOR = _descriptor.FileDescriptor( diff --git a/pb/MultiRowMutation.proto b/pybase/pb/MultiRowMutation.proto similarity index 100% rename from pb/MultiRowMutation.proto rename to pybase/pb/MultiRowMutation.proto diff --git a/pb/MultiRowMutation_pb2.py b/pybase/pb/MultiRowMutation_pb2.py similarity index 98% rename from pb/MultiRowMutation_pb2.py rename to pybase/pb/MultiRowMutation_pb2.py index ed3f4c5..c3bc07c 100644 --- a/pb/MultiRowMutation_pb2.py +++ b/pybase/pb/MultiRowMutation_pb2.py @@ -1,19 +1,24 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # source: MultiRowMutation.proto +from __future__ import absolute_import, print_function + import sys -_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) + from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pb2 from google.protobuf import message as _message from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database -from google.protobuf import descriptor_pb2 + +from . import Client_pb2 + +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() -import Client_pb2 DESCRIPTOR = _descriptor.FileDescriptor( diff --git a/pb/Quota.proto b/pybase/pb/Quota.proto similarity index 100% rename from pb/Quota.proto rename to pybase/pb/Quota.proto diff --git a/pb/Quota_pb2.py b/pybase/pb/Quota_pb2.py similarity index 99% rename from pb/Quota_pb2.py rename to pybase/pb/Quota_pb2.py index 6ca78d5..fc45a25 100644 --- a/pb/Quota_pb2.py +++ b/pybase/pb/Quota_pb2.py @@ -1,20 +1,25 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # source: Quota.proto +from __future__ import absolute_import, print_function + import sys -_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) -from google.protobuf.internal import enum_type_wrapper + from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pb2 from google.protobuf import message as _message from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database -from google.protobuf import descriptor_pb2 +from google.protobuf.internal import enum_type_wrapper + +from . import HBase_pb2 + +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() -import HBase_pb2 DESCRIPTOR = _descriptor.FileDescriptor( diff --git a/pb/README.txt b/pybase/pb/README.txt similarity index 100% rename from pb/README.txt rename to pybase/pb/README.txt diff --git a/pb/RPC.proto b/pybase/pb/RPC.proto similarity index 100% rename from pb/RPC.proto rename to pybase/pb/RPC.proto diff --git a/pb/RPC_pb2.py b/pybase/pb/RPC_pb2.py similarity index 99% rename from pb/RPC_pb2.py rename to pybase/pb/RPC_pb2.py index 612f68d..7562452 100644 --- a/pb/RPC_pb2.py +++ b/pybase/pb/RPC_pb2.py @@ -1,20 +1,25 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # source: RPC.proto +from __future__ import absolute_import, print_function + import sys -_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) + from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pb2 from google.protobuf import message as _message from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database -from google.protobuf import descriptor_pb2 + +from . import HBase_pb2 +from . import Tracing_pb2 + +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() -import Tracing_pb2 -import HBase_pb2 DESCRIPTOR = _descriptor.FileDescriptor( diff --git a/pb/Tracing.proto b/pybase/pb/Tracing.proto similarity index 100% rename from pb/Tracing.proto rename to pybase/pb/Tracing.proto diff --git a/pb/Tracing_pb2.py b/pybase/pb/Tracing_pb2.py similarity index 97% rename from pb/Tracing_pb2.py rename to pybase/pb/Tracing_pb2.py index 90f7442..a1eb33a 100644 --- a/pb/Tracing_pb2.py +++ b/pybase/pb/Tracing_pb2.py @@ -1,13 +1,17 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # source: Tracing.proto +from __future__ import absolute_import, print_function + import sys -_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) + from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pb2 from google.protobuf import message as _message from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database -from google.protobuf import descriptor_pb2 + +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() diff --git a/pb/ZooKeeper.proto b/pybase/pb/ZooKeeper.proto similarity index 100% rename from pb/ZooKeeper.proto rename to pybase/pb/ZooKeeper.proto diff --git a/pb/ZooKeeper_pb2.py b/pybase/pb/ZooKeeper_pb2.py similarity index 99% rename from pb/ZooKeeper_pb2.py rename to pybase/pb/ZooKeeper_pb2.py index 78762d9..6a19f6b 100644 --- a/pb/ZooKeeper_pb2.py +++ b/pybase/pb/ZooKeeper_pb2.py @@ -1,20 +1,25 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # source: ZooKeeper.proto +from __future__ import absolute_import, print_function + import sys -_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) + from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pb2 from google.protobuf import message as _message from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database -from google.protobuf import descriptor_pb2 + +from . import ClusterStatus_pb2 +from . import HBase_pb2 + +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() -import HBase_pb2 -import ClusterStatus_pb2 DESCRIPTOR = _descriptor.FileDescriptor( diff --git a/pb/__init__.py b/pybase/pb/__init__.py similarity index 100% rename from pb/__init__.py rename to pybase/pb/__init__.py diff --git a/region/__init__.py b/pybase/region/__init__.py similarity index 100% rename from region/__init__.py rename to pybase/region/__init__.py diff --git a/region/client.py b/pybase/region/client.py similarity index 53% rename from region/client.py rename to pybase/region/client.py index 12d1c70..608a96f 100644 --- a/region/client.py +++ b/pybase/region/client.py @@ -13,21 +13,23 @@ See the License for the specific language governing permissions and limitations under the License. """ +from __future__ import absolute_import, print_function, unicode_literals + +import logging import socket +from concurrent.futures import ThreadPoolExecutor +from contextlib import contextmanager +from io import BytesIO from struct import pack, unpack -from ..pb.RPC_pb2 import ConnectionHeader, RequestHeader, ResponseHeader -from ..pb.Client_pb2 import GetResponse, MutateResponse, ScanResponse +from threading import current_thread, Condition, Lock + +from ..exceptions import (NoSuchColumnFamilyException, NotServingRegionException, PyBaseException, + RegionMovedException, RegionOpeningException, RegionServerException) from ..helpers import varint -from threading import Lock, Condition -import logging -from time import sleep -from cStringIO import StringIO -from ..exceptions import * +from ..pb.Client_pb2 import GetResponse, MutateResponse, ScanResponse, MultiResponse +from ..pb.RPC_pb2 import ConnectionHeader, RequestHeader, ResponseHeader -logger = logging.getLogger('pybase.' + __name__) -logger.setLevel(logging.DEBUG) -# socket.setdefaulttimeout interfers with gevent. -#socket.setdefaulttimeout(2) +logger = logging.getLogger(__name__) # Used to encode and decode varints in a format protobuf expects. encoder = varint.encodeVarint @@ -36,15 +38,26 @@ # We need to know how to interpret an incoming proto.Message. This maps # the request_type to the response_type. response_types = { - "Get": GetResponse, - "Mutate": MutateResponse, - "Scan": ScanResponse + b"Get": GetResponse, + b"Mutate": MutateResponse, + b"Scan": ScanResponse, + b"Multi": MultiResponse } +@contextmanager +def acquire_timeout(lock, timeout): + result = lock.acquire(timeout=timeout) + try: + yield result + finally: + if result: + lock.release() + + # This Client is created once per RegionServer. Handles all communication # to and from this specific RegionServer. -class Client: +class Client(object): # Variables are as follows: # - Host: The hostname of the RegionServer # - Port: The port of the RegionServer @@ -52,35 +65,19 @@ class Client: # - call_id: A monotonically increasing int used as a sequence number for rpcs. This way # we can match incoming responses with the rpc that made the request. - def __init__(self, host, port): - self.host = host - self.port = port + def __init__(self, host, port, secondary, call_timeout=60): + self.host = host.decode('utf8') if isinstance(host, bytes) else host + self.port = port.decode('utf8') if isinstance(port, bytes) else port self.pool_size = 0 - # We support connection pools so have lists of sockets and read/write - # mutexes on them. + + self.thread_pool = None + self.thread_pool_timeout = call_timeout self.sock_pool = [] - self.write_lock_pool = [] - self.read_lock_pool = [] + # Why yes, we do have a mutex protecting a single variable. self.call_lock = Lock() self.call_id = 0 - # This dictionary and associated sync primitives are for when _receive_rpc - # receives an RPC that isn't theirs. If a thread gets one that isn't - # theirs it means there's another thread who also just sent an RPC. The - # other thread will also get the wrong call_id. So how do we make them - # switch RPCs? - # - # Receive an RPC with incorrect call_id? - # 1. Acquire lock - # 2. Place raw data into missed_rpcs with key call_id - # 3. Notify all other threads to wake up (nothing will happen until you release the lock) - # 4. WHILE: Your call_id is not in the dictionary - # 4.5 Call wait() on the conditional and get comfy. - # 5. Pop your data out - # 6. Release the lock - self.missed_rpcs = {} - self.missed_rpcs_lock = Lock() - self.missed_rpcs_condition = Condition(self.missed_rpcs_lock) + # Set to true when .close is called - this allows threads/greenlets # stuck in _bad_call_id to escape into the error handling code. self.shutting_down = False @@ -90,6 +87,10 @@ def __init__(self, host, port): # amount of meta lookups). self.regions = [] + # Capture if this client is being used for secondary operations + # We don't really care if it fails, best effort only. + self.secondary = secondary + # Sends an RPC over the wire then calls _receive_rpc and returns the # response RPC. # @@ -101,10 +102,14 @@ def __init__(self, host, port): # 4. A varint representing the length of the serialized RPC. # 5. The serialized RPC. # - def _send_request(self, rq): - with self.call_lock: - my_id = self.call_id - self.call_id += 1 + def _send_request(self, rq, lock_timeout=10, _async=False): + with acquire_timeout(self.call_lock, lock_timeout) as acquired: + if acquired: + my_id = self.call_id + self.call_id += 1 + else: + logger.warning('Lock timeout %s RPC to %s:%s', rq.type, self.host, self.port) + raise RegionServerException(region_client=self) serialized_rpc = rq.pb.SerializeToString() header = RequestHeader() header.call_id = my_id @@ -122,20 +127,13 @@ def _send_request(self, rq): to_send = pack(">IB", total_length - 4, len(serialized_header)) to_send += serialized_header + rpc_length_bytes + serialized_rpc - pool_id = my_id % self.pool_size - try: - with self.write_lock_pool[pool_id]: - logger.debug( - 'Sending %s RPC to %s:%s on pool port %s', rq.type, self.host, self.port, pool_id) - self.sock_pool[pool_id].send(to_send) - except socket.error: - # RegionServer dead? - raise RegionServerException(region_client=self) - # Message is sent! Now go listen for the results. - return self._receive_rpc(my_id, rq) + # send and receive the request + future = self.thread_pool.submit(self.send_and_receive_rpc, my_id, rq, to_send) + if _async: + return future + return future.result(timeout=self.thread_pool_timeout) - # Called after sending an RPC, listens for the response and builds the - # correct pbResponse object. + # Sending an RPC, listens for the response and builds the correct pbResponse object. # # The raw bytes we receive are composed (in order) - # @@ -145,27 +143,51 @@ def _send_request(self, rq): # 4. A varint representing the length of the serialized ResponseMessage. # 5. The ResponseMessage. # - def _receive_rpc(self, call_id, rq, data=None): + def send_and_receive_rpc(self, call_id, rq, to_send): + thread_name = current_thread().name + sp = thread_name.split("_") # i.e. splitting "ThreadPoolExecutor-1_0" + pool_id = int(sp[1]) # thread number is now responsible for only using its matching socket + try: + self.sock_pool[pool_id].send(to_send) + except socket.error: + raise RegionServerException(region_client=self) + + return self.receive_rpc(pool_id=pool_id, call_id=call_id, rq=rq) + + def _parse_exception(self, exception_class, stack_trace): + if exception_class in ('org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException', + "java.io.IOException"): + return NoSuchColumnFamilyException() + elif exception_class == 'org.apache.hadoop.hbase.exceptions.RegionMovedException': + return RegionMovedException(region_client=self) + elif exception_class == 'org.apache.hadoop.hbase.NotServingRegionException': + return NotServingRegionException(region_client=self) + elif exception_class == \ + 'org.apache.hadoop.hbase.regionserver.RegionServerStoppedException': + return RegionServerException(region_client=self) + elif exception_class == 'org.apache.hadoop.hbase.exceptions.RegionOpeningException': + return RegionOpeningException(region_client=self) + else: + return PyBaseException( + exception_class + ". Remote traceback:\n%s" % stack_trace) + + def receive_rpc(self, pool_id, call_id, rq): # If the field data is populated that means we should process from that # instead of the socket. - full_data = data - if data is None: - pool_id = call_id % self.pool_size - # Total message length is going to be the first four bytes - # (little-endian uint32) - with self.read_lock_pool[pool_id]: - try: - msg_length = self._recv_n(self.sock_pool[pool_id], 4) - if msg_length is None: - raise - msg_length = unpack(">I", msg_length)[0] - # The message is then going to be however many bytes the first four - # bytes specified. We don't want to overread or underread as that'll - # cause havoc. - full_data = self._recv_n( - self.sock_pool[pool_id], msg_length) - except socket.error: - raise RegionServerException(region_client=self) + full_data = None + # Total message length is going to be the first four bytes + # (little-endian uint32) + try: + msg_length = Client._recv_n(self.sock_pool[pool_id], 4) + if msg_length is None: + raise + msg_length = unpack(">I", msg_length)[0] + # The message is then going to be however many bytes the first four + # bytes specified. We don't want to overread or underread as that'll + # cause havoc. + full_data = Client._recv_n(self.sock_pool[pool_id], msg_length) + except socket.error as e: + raise RegionServerException(region_client=self) # Pass in the full data as well as your current position to the # decoder. It'll then return two variables: # - next_pos: The number of bytes of data specified by the varint @@ -175,58 +197,26 @@ def _receive_rpc(self, call_id, rq, data=None): header.ParseFromString(full_data[pos: pos + next_pos]) pos += next_pos if header.call_id != call_id: - # call_ids don't match? Looks like a different thread nabbed our - # response. - return self._bad_call_id(call_id, rq, header.call_id, full_data) - elif header.exception.exception_class_name != u'': + # Receive an RPC with incorrect call_id, so call receive again to receive the next + # data on the socket. Likely, this means that that some caller abandoned their request + return self.receive_rpc(pool_id, call_id, rq) + elif header.exception.exception_class_name != '': # If we're in here it means a remote exception has happened. exception_class = header.exception.exception_class_name - if exception_class == 'org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException' or exception_class == "java.io.IOException": - raise NoSuchColumnFamilyException() - elif exception_class == 'org.apache.hadoop.hbase.exceptions.RegionMovedException': - raise RegionMovedException() - elif exception_class == 'org.apache.hadoop.hbase.NotServingRegionException': - raise NotServingRegionException() - elif exception_class == 'org.apache.hadoop.hbase.regionserver.RegionServerStoppedException': - raise RegionServerException(region_client=self) - elif exception_class == 'org.apache.hadoop.hbase.exceptions.RegionOpeningException': - raise RegionOpeningException() - else: - raise PyBaseException( - exception_class + ". Remote traceback:\n%s" % header.exception.stack_trace) + if err := self._parse_exception(exception_class, header.exception.stack_trace): + raise err next_pos, pos = decoder(full_data, pos) rpc = response_types[rq.type]() rpc.ParseFromString(full_data[pos: pos + next_pos]) # The rpc is fully built! return rpc - # Receive an RPC with incorrect call_id? - # 1. Acquire lock - # 2. Place raw data into missed_rpcs with key call_id - # 3. Notify all other threads to wake up (nothing will happen until you release the lock) - # 4. WHILE: Your call_id is not in the dictionary - # 4.5 Call wait() on the conditional and get comfy. - # 5. Pop your data out - # 6. Release the lock - def _bad_call_id(self, my_id, my_request, msg_id, data): - with self.missed_rpcs_lock: - logger.debug( - "Received invalid RPC ID. Got: %s, Expected: %s.", msg_id, my_id) - self.missed_rpcs[msg_id] = data - self.missed_rpcs_condition.notifyAll() - while my_id not in self.missed_rpcs: - if self.shutting_down: - raise RegionServerException(region_client=self) - self.missed_rpcs_condition.wait() - new_data = self.missed_rpcs.pop(my_id) - logger.debug("Another thread found my RPC! RPC ID: %s", my_id) - return self._receive_rpc(my_id, my_request, data=new_data) - # Receives exactly n bytes from the socket. Will block until n bytes are # received. If a socket is closed (RegionServer died) then raise an # exception that goes all the way back to the main client - def _recv_n(self, sock, n): - partial_str = StringIO() + @staticmethod + def _recv_n(sock, n): + partial_str = BytesIO() partial_len = 0 while partial_len < n: packet = sock.recv(n - partial_len) @@ -243,25 +233,21 @@ def close(self): sock.close() # We could still have greenlets waiting in the bad_call_id pools! Wake # them up so they can fail to error handling as well. - self.missed_rpcs_condition.acquire() - self.missed_rpcs_condition.notifyAll() - self.missed_rpcs_condition.release() # Creates a new RegionServer client. Creates the socket, initializes the # connection and returns an instance of Client. -def NewClient(host, port, pool_size): - c = Client(host, port) +def NewClient(host, port, pool_size, secondary=False, call_timeout=60): + c = Client(host, port, secondary, call_timeout) try: c.pool_size = pool_size + c.thread_pool = ThreadPoolExecutor(pool_size) for x in range(pool_size): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.connect((host, int(port))) + s.connect((c.host, int(port))) _send_hello(s) - s.settimeout(2) + s.settimeout(call_timeout) c.sock_pool.append(s) - c.read_lock_pool.append(Lock()) - c.write_lock_pool.append(Lock()) except (socket.error, socket.timeout): return None return c @@ -278,7 +264,7 @@ def _send_hello(sock): # 1. "HBas\x00\x50". Magic prefix that HBase requires. # 2. Little-endian uint32 indicating length of serialized ConnectionHeader # 3. Serialized ConnectionHeader - message = "HBas\x00\x50" + pack(">I", len(serialized)) + serialized + message = b"HBas\x00\x50" + pack(">I", len(serialized)) + serialized sock.send(message) @@ -287,5 +273,4 @@ def _send_hello(sock): def _to_varint(val): temp = [] encoder(temp.append, val) - return "".join(temp) - + return b"".join(temp) diff --git a/region/region.py b/pybase/region/region.py similarity index 91% rename from region/region.py rename to pybase/region/region.py index e1c01f6..5479b02 100644 --- a/region/region.py +++ b/pybase/region/region.py @@ -13,11 +13,14 @@ See the License for the specific language governing permissions and limitations under the License. """ +from __future__ import absolute_import, print_function, unicode_literals + from struct import unpack + from ..pb.HBase_pb2 import RegionInfo as pbRegionInfo -class Region: +class Region(object): def __init__(self, table, name, start, stop): self.table = table @@ -42,7 +45,8 @@ def region_from_cell(cell): if magic != 1346524486: # Either it's a corrupt message or an unsupported region info version. raise RuntimeError( - "HBase returned an invalid response (are you running a version of HBase supporting Protobufs?)") + "HBase returned an invalid response (are you running a version of HBase supporting " + "Protobufs?)") region_info = pbRegionInfo() region_info.ParseFromString(cell.value[4:-4]) table = region_info.table_name.qualifier @@ -50,4 +54,3 @@ def region_from_cell(cell): start_key = region_info.start_key stop_key = region_info.end_key return Region(table, region_name, start_key, stop_key) - diff --git a/request/__init__.py b/pybase/request/__init__.py similarity index 100% rename from request/__init__.py rename to pybase/request/__init__.py diff --git a/request/request.py b/pybase/request/request.py similarity index 76% rename from request/request.py rename to pybase/request/request.py index 4602e8d..8b8a886 100644 --- a/request/request.py +++ b/pybase/request/request.py @@ -1,28 +1,34 @@ -from ..pb.Client_pb2 import GetRequest, MutateRequest, ScanRequest, Column, MutationProto -from ..filters import _to_filter +from __future__ import absolute_import, print_function, unicode_literals + +from builtins import str + from ..exceptions import MalformedFamilies, MalformedValues +from ..filters import _to_filter +from ..pb.Client_pb2 import Action, Column, GetRequest, MultiRequest, MutateRequest, MutationProto, RegionAction, ScanRequest # Table + Family used when requesting meta information from the # MetaRegionServer -metaTableName = "hbase:meta,,1" -metaInfoFamily = {"info": []} +metaTableName = b"hbase:meta,,1" +metaInfoFamily = {b"info": []} -class Request: +class Request(object): - def __init__(self, type, pb): + def __init__(self, type, pb): # noqa: B002 self.type = type self.pb = pb def master_request(meta_key): + if isinstance(meta_key, str): + meta_key = meta_key.encode('utf8') rq = GetRequest() rq.get.row = meta_key rq.get.column.extend(families_to_columns(metaInfoFamily)) rq.get.closest_row_before = True rq.region.type = 1 rq.region.value = metaTableName - return Request("Get", rq) + return Request(b"Get", rq) def get_request(region, key, families, filters): @@ -34,8 +40,25 @@ def get_request(region, key, families, filters): rq.region.value = region.region_name if pbFilter is not None: rq.get.filter.CopyFrom(pbFilter) - return Request("Get", rq) - + return Request(b"Get", rq) + +def region_action(region, keys, families): + ra = RegionAction() + ra.region.type = 1 + ra.region.value = region.region_name + ra.atomic = False + for key in keys: + action = Action() + action.get.row = key + action.get.column.extend(families_to_columns(families)) + ra.action.append(action) + return ra + +def multi_get(regions_and_keys, families): + rq = MultiRequest() + rq.regionAction.extend([region_action(region, keys, families) + for region, keys in regions_and_keys.items()]) + return Request(b"Multi", rq) def put_request(region, key, values): rq = MutateRequest() @@ -44,7 +67,7 @@ def put_request(region, key, values): rq.mutation.row = key rq.mutation.mutate_type = 2 rq.mutation.column_value.extend(values_to_column_values(values)) - return Request("Mutate", rq) + return Request(b"Mutate", rq) def delete_request(region, key, values): @@ -55,7 +78,7 @@ def delete_request(region, key, values): rq.mutation.mutate_type = 3 rq.mutation.column_value.extend( values_to_column_values(values, delete=True)) - return Request("Mutate", rq) + return Request(b"Mutate", rq) def append_request(region, key, values): @@ -65,7 +88,7 @@ def append_request(region, key, values): rq.mutation.row = key rq.mutation.mutate_type = 0 rq.mutation.column_value.extend(values_to_column_values(values)) - return Request("Mutate", rq) + return Request(b"Mutate", rq) def increment_request(region, key, values): @@ -75,7 +98,7 @@ def increment_request(region, key, values): rq.mutation.row = key rq.mutation.mutate_type = 1 rq.mutation.column_value.extend(values_to_column_values(values)) - return Request("Mutate", rq) + return Request(b"Mutate", rq) def scan_request(region, start_key, stop_key, families, filters, close, scanner_id): @@ -87,14 +110,14 @@ def scan_request(region, start_key, stop_key, families, filters, close, scanner_ rq.close_scanner = close if scanner_id is not None: rq.scanner_id = int(scanner_id) - return Request("Scan", rq) + return Request(b"Scan", rq) rq.scan.column.extend(families_to_columns(families)) rq.scan.start_row = start_key if stop_key is not None: rq.scan.stop_row = stop_key if filters is not None: rq.scan.filter.CopyFrom(filters) - return Request("Scan", rq) + return Request(b"Scan", rq) # Converts a dictionary specifying ColumnFamilys -> Qualifiers into the Column pb type. @@ -164,4 +187,3 @@ def values_to_column_values(val, delete=False): return col_vals except Exception: raise MalformedValues() - diff --git a/zk/__init__.py b/pybase/zk/__init__.py similarity index 100% rename from zk/__init__.py rename to pybase/zk/__init__.py diff --git a/pybase/zk/client.py b/pybase/zk/client.py new file mode 100644 index 0000000..7b2c347 --- /dev/null +++ b/pybase/zk/client.py @@ -0,0 +1,71 @@ +""" + Copyright 2015 Samuel Curley + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" +from __future__ import absolute_import, print_function, unicode_literals + +import logging +from struct import unpack +from time import sleep + +from kazoo.client import KazooClient +from kazoo.exceptions import NoNodeError +from kazoo.handlers.threading import KazooTimeoutError + +from ..exceptions import (ZookeeperConnectionException, + ZookeeperResponseException, ZookeeperZNodeException) +from ..pb.ZooKeeper_pb2 import MetaRegionServer + +logger = logging.getLogger(__name__) + +znode = "/hbase" +master_znode = znode + "/meta-region-server" + + +def connect(zkquorum, establish_connection_timeout=5): + zk = KazooClient(hosts=zkquorum) + + try: + zk.start(timeout=establish_connection_timeout) + except KazooTimeoutError: + raise ZookeeperConnectionException("Cannot connect to ZooKeeper at {}".format(zkquorum)) + + return zk + + +def parse_master_info(resp): + if len(resp) == 0: + # Empty response is bad. + raise ZookeeperResponseException("ZooKeeper returned an empty response") + # The first byte must be \xff and the next four bytes are a little-endian + # uint32 containing the length of the meta. + first_byte, meta_length = unpack(">cI", resp[:5]) + if first_byte != b'\xff': + # Malformed response + raise ZookeeperResponseException("ZooKeeper returned an invalid response") + if meta_length < 1 or meta_length > 65000: + # Is this really an error? + raise ZookeeperResponseException("ZooKeeper returned too much meta information") + # ZNode data in HBase are serialized protobufs with a four byte magic + # 'PBUF' prefix. + magic = unpack(">I", resp[meta_length + 5:meta_length + 9])[0] + if magic != 1346524486: + # 4 bytes: PBUF + raise ZookeeperResponseException("ZooKeeper returned an invalid response (are you running " + "a version of HBase supporting Protobufs?)") + rsp = resp[meta_length + 9:] + meta = MetaRegionServer() + meta.ParseFromString(rsp) + logger.info('Discovered Master at %s:%s', meta.server.host_name, meta.server.port) + return meta.server.host_name, meta.server.port \ No newline at end of file diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..78bbac2 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,12 @@ +[flake8] +max-line-length = 100 +ignore = E123,E133,E226,E241,E242,T003 +exclude = pybase/pb,build,dist + +[isort] +line_length=100 +indent=' ' +balanced_wrapping=True + +[bdist_wheel] +universal=1 diff --git a/setup.py b/setup.py index 7b304cc..a903560 100644 --- a/setup.py +++ b/setup.py @@ -1,14 +1,24 @@ -from distutils.core import setup +from __future__ import unicode_literals + +from setuptools import find_packages, setup setup(name='pybase', - version='0.1', + version='4.1.0', description='Native python client to hbase 1.0+', url='https://github.com/CurleySamuel/PyBase', author='Sam Curley', author_email='CurleySamuel@gmail.com', license='Apache License 2.0', - packages=['pybase', 'pybase.zk', 'pybase.pb', 'pybase.request', - 'pybase.region', 'pybase.helpers', 'pybase.tests'], - package_dir={'pybase': '.'}, - install_requires=["intervaltree","kazoo","six", "zope.interface", "protobuf"], + packages=find_packages('.', exclude=['tests']), + install_requires=["intervaltree >= 3.0, < 4.0", + "kazoo", "six", "zope.interface", "protobuf"], + classifiers=[ + "Programming Language :: Python", + "Programming Language :: Python :: 2", + "Programming Language :: Python :: 2.7", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.4", + "Programming Language :: Python :: 3.5", + "Programming Language :: Python :: 3.6", + ], zip_safe=False) diff --git a/tests/test_integration.py b/tests/test_integration.py index 3aea68a..2053494 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -1,9 +1,12 @@ +from __future__ import absolute_import, print_function, unicode_literals + import unittest -import pybase from collections import defaultdict from time import sleep -from pybase.exceptions import * +import pybase +from pybase.exceptions import (MalformedValues, NoSuchColumnFamilyException, + NoSuchTableException, ZookeeperException) # Please note that all below unit tests require the existence of a table # to play with. Table must contain two column families specified below as well. @@ -22,11 +25,8 @@ def test_new_client_good(self): self.assertIsNotNone(c.master_client.host) def test_new_client_bad(self): - try: - c = pybase.NewClient("badzkquorum") - self.assertEqual(1, 0) - except ZookeeperException: - pass + with self.assertRaises(ZookeeperException): + pybase.NewClient("badzkquorum") def test_client_close(self): c = pybase.NewClient(zkquorum) @@ -80,11 +80,8 @@ def test_get_specific_cell(self): self.assertNotIn(cf2, resd.keys()) def test_get_bad_table(self): - try: - res = self.c.get("asdasdasd", "plsfail") - self.assertEqual(1, 0) - except NoSuchTableException: - pass + with self.assertRaises(NoSuchTableException): + self.c.get("asdasdasd", "plsfail") def test_get_bad_row(self): res = self.c.get(table, "plsfail") @@ -112,11 +109,8 @@ def test_get_with_filter(self): def test_get_with_bad_filter(self): ft = "badfilter" - try: - res = self.c.get(table, self.row_prefix, filters=ft) - self.assertEqual(1, 0) - except ValueError: - pass + with self.assertRaises(ValueError): + self.c.get(table, self.row_prefix, filters=ft) class TestPut(unittest.TestCase): @@ -214,17 +208,14 @@ def test_scan_simple(self): def test_scan_with_range(self): rsp = self.c.scan( - table, start_key=self.row_prefix + "0", stop_key=self.row_prefix + "50", filters=self.pFilter) + table, start_key=self.row_prefix + "0", stop_key=self.row_prefix + "50", + filters=self.pFilter) # It's not 100 because rows are compared lexicographically. self.assertEqual(len(rsp.flatten_cells()), 92) def test_scan_with_bad_range(self): - try: - rsp = self.c.scan( - table, start_key="hmm", stop_key=24, filters=self.pFilter) - self.assertEqual(1, 0) - except TypeError: - pass + with self.assertRaises(TypeError): + self.c.scan(table, start_key="hmm", stop_key=24, filters=self.pFilter) def test_scan_with_families(self): fam = {cf1: ["oberyn"]} @@ -233,11 +224,8 @@ def test_scan_with_families(self): def test_scan_with_bad_column_family(self): fam = {"hodor": ["stillhodor"]} - try: - rsp = self.c.scan(table, filters=self.pFilter, families=fam) - self.assertEqual(1, 0) - except NoSuchColumnFamilyException: - pass + with self.assertRaises(NoSuchColumnFamilyException): + self.c.scan(table, filters=self.pFilter, families=fam) def test_scan_with_bad_column_qualifier(self): fam = {cf1: ["badqual"], cf2: ["one"]} @@ -314,11 +302,8 @@ def test_delete_bad_column_family(self): "i am hodor": "" } } - try: - rsp = self.c.delete(table, self.row_prefix + "2", value) - self.assertEqual(0, 1) - except NoSuchColumnFamilyException: - pass + with self.assertRaises(NoSuchColumnFamilyException): + self.c.delete(table, self.row_prefix + "2", value) def test_delete_bad_column_qualifier(self): value = { @@ -396,11 +381,8 @@ def test_append_bad_column_family(self): "oberyn": "is the", } } - try: - rsp = self.c.append(table, self.row_prefix + "3", values) - self.assertEqual(1, 0) - except NoSuchColumnFamilyException: - pass + with self.assertRaises(NoSuchColumnFamilyException): + self.c.append(table, self.row_prefix + "3", values) def test_append_bad_column_qualifier(self): values = { @@ -529,11 +511,8 @@ def test_increment_bad_column_family(self): } # TODO: Throwing RuntimeError: java.io.IOException when it should be throwing # column family exception. - try: - rsp = self.c.increment(table, self.row_prefix + "2", new_values) - self.assertEqual(1, 0) - except NoSuchColumnFamilyException: - pass + with self.assertRaises(NoSuchColumnFamilyException): + self.c.increment(table, self.row_prefix + "2", new_values) def test_increment_new_column_qualifier(self): new_values = { diff --git a/tests/test_integration_availability.py b/tests/test_integration_availability.py index 8c97dff..679cdc0 100644 --- a/tests/test_integration_availability.py +++ b/tests/test_integration_availability.py @@ -1,9 +1,11 @@ +from __future__ import absolute_import, print_function, unicode_literals + +import os +import subprocess import unittest + import pybase -from collections import defaultdict -from pybase.exceptions import * -import subprocess -import os +from pybase.exceptions import ZookeeperException # Please note that all below unit tests require the existence of a table # to play with. Table must contain two column families specified below as well. @@ -13,6 +15,7 @@ cf1 = "cf1" cf2 = "cf2" + class TestAvailability(unittest.TestCase): @classmethod @@ -98,25 +101,24 @@ def test_region_server_musical_chairs(self): pass - # Currently no admin functionality. Have to go through the hbase shell to # do things like moving regions, rebalancing, etc. def hbase_shell(cmd): echo = subprocess.Popen( ('echo', '"' + cmd + ';exit"'), stdout=subprocess.PIPE) - output = subprocess.check_output(('hbase', 'shell'), stdin=echo.stdout) + subprocess.check_output(('hbase', 'shell'), stdin=echo.stdout) echo.wait() def start_region_servers(server_ids): - print "" + print("") a = [os.environ['HBASE_HOME'] + "/bin/local-regionservers.sh", "start", ' '.join(server_ids)] subprocess.call(a) def stop_region_servers(server_ids): - print "" + print("") a = [os.environ['HBASE_HOME'] + "/bin/local-regionservers.sh", "stop", ' '.join(server_ids)] subprocess.call(a) diff --git a/zk/client.py b/zk/client.py deleted file mode 100644 index 5c396d8..0000000 --- a/zk/client.py +++ /dev/null @@ -1,89 +0,0 @@ -""" - Copyright 2015 Samuel Curley - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -""" -from kazoo.client import KazooClient -from kazoo.handlers.threading import KazooTimeoutError -from kazoo.exceptions import NoNodeError -from ..pb.ZooKeeper_pb2 import MetaRegionServer -from ..exceptions import * -from struct import unpack -from time import sleep -import logging -logger = logging.getLogger('pybase.' + __name__) -logger.setLevel(logging.DEBUG) - -znode = "/hbase" - - -# LocateMeta takes a string representing the location of the ZooKeeper -# quorum. It then asks ZK for the location of the MetaRegionServer, -# returning a tuple containing (host_name, port). -def LocateMaster(zkquorum, establish_connection_timeout=5, missing_znode_retries=5, zk=None): - - if zk is None: - # Using Kazoo for interfacing with ZK - zk = KazooClient(hosts=zkquorum) - try: - zk.start(timeout=establish_connection_timeout) - except KazooTimeoutError: - raise ZookeeperConnectionException( - "Cannot connect to ZooKeeper at {}".format(zkquorum)) - # MetaRegionServer information is located at /hbase/meta-region-server - try: - rsp, znodestat = zk.get(znode + "/meta-region-server") - except NoNodeError: - if missing_znode_retries == 0: - raise ZookeeperZNodeException( - "ZooKeeper does not contain meta-region-server node.") - logger.warn( - "ZooKeeper does not contain meta-region-server node. Retrying in 2 seconds. (%s retries remaining)", missing_znode_retries) - sleep(2.0) - return LocateMeta(zkquorum, establish_connection_timeout=establish_connection_timeout, missing_znode_retries=missing_znode_retries - 1, zk=zk) - # We don't need to maintain a connection to ZK. If we need it again we'll - # recreate the connection. A possible future implementation can subscribe - # to ZK and listen for when RegionServers go down, then pre-emptively - # reestablish those regions instead of waiting for a failed rpc to come - # back. Only issue is that if too many clients subscribe ZK may become - # overloaded. - zk.stop() - if len(rsp) == 0: - # Empty response is bad. - raise ZookeeperResponseException( - "ZooKeeper returned an empty response") - # The first byte must be \xff and the next four bytes are a little-endian - # uint32 containing the length of the meta. - first_byte, meta_length = unpack(">cI", rsp[:5]) - if first_byte != '\xff': - # Malformed response - raise ZookeeperResponseException( - "ZooKeeper returned an invalid response") - if meta_length < 1 or meta_length > 65000: - # Is this really an error? - raise ZookeeperResponseException( - "ZooKeeper returned too much meta information") - # ZNode data in HBase are serialized protobufs with a four byte magic - # 'PBUF' prefix. - magic = unpack(">I", rsp[meta_length + 5:meta_length + 9])[0] - if magic != 1346524486: - # 4 bytes: PBUF - raise ZookeeperResponseException( - "ZooKeeper returned an invalid response (are you running a version of HBase supporting Protobufs?)") - rsp = rsp[meta_length + 9:] - meta = MetaRegionServer() - meta.ParseFromString(rsp) - logger.info('Discovered Master at %s:%s', - meta.server.host_name, meta.server.port) - return meta.server.host_name, meta.server.port -