diff --git a/argos/noSQLdask/cassandraBag.py b/argos/noSQLdask/cassandraBag.py index 7f7d46e..366c8e2 100755 --- a/argos/noSQLdask/cassandraBag.py +++ b/argos/noSQLdask/cassandraBag.py @@ -4,22 +4,35 @@ Provides the ``CassandraBag`` class for querying device telemetry from a Cassandra database (typically the ThingsBoard backend) using Dask for parallel, partitioned reads. + +Performance notes: + +- Uses a single shared Cassandra session (connection pooling) instead of + opening/closing a connection per partition. +- Queries all metric keys in a single CQL query with ``key IN (...)`` + instead of looping per key. +- Uses ``dask.delayed`` → ``pd.DataFrame`` instead of ``dask.bag`` to + avoid the bag→DataFrame→pivot materialization overhead. +- Uses ``extend()`` instead of list concatenation to avoid O(n^2). """ from cassandra.cluster import Cluster import dask -import dask.bag +import dask.dataframe as dd import pandas import numpy as np class CassandraBag: """ - Dask bag interface for querying Cassandra time-series data. + Dask interface for querying Cassandra time-series data. Designed for querying the ThingsBoard ``ts_kv_cf`` table, which stores device telemetry as key-value pairs partitioned by month. + Uses a single shared Cassandra session for all queries (created on + init, closed on ``close()`` or garbage collection). + Parameters ---------- deviceID : str @@ -30,16 +43,24 @@ class CassandraBag: The database (keyspace) name. Defaults to ``"thingsboard"``. set_name : str, optional The table name. Defaults to ``"ts_kv_cf"``. + fetch_size : int, optional + Number of rows per Cassandra fetch page. Higher values reduce + round-trips but use more memory. Defaults to 50000. Examples -------- >>> bag = CassandraBag(deviceID="727b0e40-5b96-11e9-989b-eb5e36f2a0b8") >>> df = bag.getDataFrame("2024-01-01", "2024-01-31") + >>> bag.close() """ - def __init__(self, deviceID, IP='127.0.0.1', db_name='thingsboard', set_name='ts_kv_cf'): + def __init__(self, deviceID, IP='127.0.0.1', db_name='thingsboard', + set_name='ts_kv_cf', fetch_size=50000): """ - Initialize CassandraBag and fetch available metric keys. + Initialize CassandraBag with a persistent session. + + Creates a Cassandra cluster connection and session that is reused + across all queries. Call ``close()`` when done to release resources. Parameters ---------- @@ -51,35 +72,114 @@ def __init__(self, deviceID, IP='127.0.0.1', db_name='thingsboard', set_name='ts The database (keyspace) name. Defaults to ``"thingsboard"``. set_name : str, optional The table name. Defaults to ``"ts_kv_cf"``. + fetch_size : int, optional + Number of rows per Cassandra fetch page. Defaults to 50000. """ self.IP = IP self.db_name = db_name self.set_name = set_name self.deviceID = deviceID + self.fetch_size = fetch_size + + # Persistent connection — reused across all queries + self._cluster = Cluster([self.IP]) + self._session = self._cluster.connect(self.db_name) + self._session.default_fetch_size = self.fetch_size + self.keys = self._keys() + def close(self): + """ + Close the Cassandra session and cluster connection. + + Call this when you are done querying to release resources. + Safe to call multiple times. + """ + if self._session is not None: + self._session.shutdown() + self._session = None + if self._cluster is not None: + self._cluster.shutdown() + self._cluster = None + + def __del__(self): + """Close the connection on garbage collection.""" + self.close() + + def __enter__(self): + """Support context manager usage.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Close on context manager exit.""" + self.close() + def _keys(self): - """Fetch available metric keys for the device from ts_kv_latest_cf.""" - cluster = Cluster([self.IP]) - session = cluster.connect(self.db_name) - keysQuery = "SELECT key FROM ts_kv_latest_cf WHERE entity_type='DEVICE' AND entity_id=%s" % (self.deviceID) - keys_data_set = session.execute(keysQuery) - session.shutdown() - cluster.shutdown() + """ + Fetch available metric keys for the device from ts_kv_latest_cf. - keys_row_list = list(keys_data_set) - keys = [] - for keys_row in keys_row_list: - keys.append(keys_row.key) + Returns + ------- + list[str] + List of metric key names available for this device. + """ + query = ( + "SELECT key FROM ts_kv_latest_cf " + "WHERE entity_type='DEVICE' AND entity_id=%s" + ) % self.deviceID + rows = self._session.execute(query) + return [row.key for row in rows] - return keys + def _read_partition(self, start_ts, end_ts): + """ + Read all data for a single time partition from Cassandra. + + Queries all metric keys in a single CQL query per monthly + Cassandra partition using ``key IN (...)``. + + Parameters + ---------- + start_ts : int + Start timestamp in milliseconds. + end_ts : int + End timestamp in milliseconds. + + Returns + ------- + pandas.DataFrame + A DataFrame with columns ``ts``, ``key``, ``dbl_v``. + Empty DataFrame if no data found. + """ + monthly_partitions = self._splitTimesToPartitions(start_ts, end_ts) + keys_csv = ", ".join(f"'{k}'" for k in self.keys) + + all_rows = [] + for part_start, part_end in monthly_partitions: + query = ( + f"SELECT ts, key, dbl_v FROM {self.set_name} " + f"WHERE entity_type='DEVICE' AND entity_id={self.deviceID} " + f"AND key IN ({keys_csv}) " + f"AND partition={part_start} " + f"AND ts>={max(part_start, start_ts)} " + f"AND ts<={min(part_end, end_ts)}" + ) + rows = self._session.execute(query) + all_rows.extend(rows) + + if not all_rows: + return pandas.DataFrame(columns=['ts', 'key', 'dbl_v']) + + return pandas.DataFrame( + [(r.ts, r.key, r.dbl_v) for r in all_rows], + columns=['ts', 'key', 'dbl_v'] + ) def bag(self, start_time, end_time, npartitions=10): """ - Create a Dask bag for parallel reads over a time range. + Create a Dask DataFrame for parallel reads over a time range. Splits the time range into ``npartitions`` equal intervals and - creates a Dask bag that reads each interval in parallel. + reads each interval in parallel using ``dask.delayed``. Parameters ---------- @@ -93,70 +193,33 @@ def bag(self, start_time, end_time, npartitions=10): Returns ------- - dask.bag.Bag - A Dask bag of ``(ts, key, dbl_v)`` named tuples, suitable - for further processing or conversion to DataFrame. + dask.dataframe.DataFrame + A Dask DataFrame with columns ``ts``, ``key``, ``dbl_v``. """ - if type(start_time) == str: - start_time = int(pandas.Timestamp(start_time).tz_localize("Israel").timestamp()*1000) - if type(end_time) == str: - end_time = int(pandas.Timestamp(end_time).tz_localize("Israel").timestamp()*1000) - times = np.linspace(start_time, end_time, npartitions+1) - partition_times = list(zip(times[0:-1], times[1:])) - - b = (dask.bag.from_sequence(partition_times) - .map(self._read_datetime_interval_from_set) - .flatten()) - return b - - def _read_datetime_interval_from_set(self, args): - """Read data for a single time interval from Cassandra, handling monthly partitions.""" - start_ts = int(args[0]) - end_ts = int(args[1]) - partitionsIntervals = self._splitTimesToPartitions(start_ts, end_ts) - cluster = Cluster([self.IP]) - session = cluster.connect(self.db_name) - items = [] - for key in list(self.keys): - for partitionsInterval in partitionsIntervals: - query = "SELECT ts, key, dbl_v FROM %s WHERE entity_type='DEVICE' AND entity_id=%s AND key='%s' AND partition=%s AND ts>=%s AND ts<=%s" % (self.set_name, self.deviceID, key, str(partitionsInterval[0]),str(max(partitionsInterval[0], start_ts)), str(min(partitionsInterval[1], end_ts))) - data_set = session.execute(query) - items = items + list(data_set) - session.shutdown() - cluster.shutdown() - return items + if isinstance(start_time, str): + start_time = int(pandas.Timestamp(start_time).tz_localize("Israel").timestamp() * 1000) + if isinstance(end_time, str): + end_time = int(pandas.Timestamp(end_time).tz_localize("Israel").timestamp() * 1000) - def _splitTimesToPartitions(self, start_ts, end_ts): - """Split a time range into monthly Cassandra partition boundaries.""" - start_date = pandas.Timestamp.fromtimestamp(start_ts/1000.0) - end_date = pandas.Timestamp.fromtimestamp(end_ts/1000.0) - startPartitionTimestamp = pandas.Timestamp(year=start_date.year, month=start_date.month, day=1, unit='ms') - if end_date.month == 12: - endPartitionTimestamp = pandas.Timestamp(year=end_date.year, month=1, day=1, unit='ms') - else: - endPartitionTimestamp = pandas.Timestamp(year=end_date.year, month=end_date.month+1, day=1, unit='ms') - partitions = self._ts_partitions(startPartitionTimestamp, endPartitionTimestamp) - return list(zip(partitions[0:-1], partitions[1:])) - - def _ts_partitions(self, startTimestamp, endTimestamp): - """Generate a list of monthly partition timestamps between start and end.""" - partitionsAsDate = [startTimestamp.date()] - partitionsAsTs = [int(startTimestamp.timestamp()*1000)] - while(partitionsAsDate[-1] != endTimestamp.date()): - if partitionsAsDate[-1].month == 12: - partitionTimestamp = pandas.Timestamp(year=partitionsAsDate[-1].year + 1, month=1, day=1, unit='ms') - else: - partitionTimestamp = pandas.Timestamp(year=partitionsAsDate[-1].year, month=partitionsAsDate[-1].month + 1, day=1, unit='ms') - partitionsAsDate.append(partitionTimestamp.date()) - partitionsAsTs.append(int(partitionTimestamp.timestamp()*1000)) - return partitionsAsTs + boundaries = np.linspace(start_time, end_time, npartitions + 1, dtype=np.int64) + + delayed_parts = [] + for i in range(npartitions): + part = dask.delayed(self._read_partition)(int(boundaries[i]), int(boundaries[i + 1])) + delayed_parts.append(part) + + meta = pandas.DataFrame({'ts': pandas.Series(dtype='int64'), + 'key': pandas.Series(dtype='str'), + 'dbl_v': pandas.Series(dtype='float64')}) + + return dd.from_delayed(delayed_parts, meta=meta) def getDataFrame(self, start_time, end_time, npartitions=10): """ Query device data and return as a pivoted Pandas DataFrame. - Reads all metric keys for the device over the time range, then - pivots the result so that rows are timestamps and columns are + Reads all metric keys for the device over the time range in + parallel, then pivots so rows are timestamps and columns are metric keys. Parameters @@ -174,7 +237,48 @@ def getDataFrame(self, start_time, end_time, npartitions=10): A pivoted DataFrame with timestamps as the index, metric keys as columns, and ``dbl_v`` (double) as values. """ - bag = self.bag(start_time=start_time, end_time=end_time, npartitions=npartitions) - df = bag.to_dataframe(meta={'ts': int, 'key': str, 'dbl_v': float}) - df = df.compute().pivot_table(index='ts',columns='key',values='dbl_v') - return df + ddf = self.bag(start_time=start_time, end_time=end_time, npartitions=npartitions) + df = ddf.compute() + if df.empty: + return df + return df.pivot_table(index='ts', columns='key', values='dbl_v') + + def _splitTimesToPartitions(self, start_ts, end_ts): + """ + Split a time range into monthly Cassandra partition boundaries. + + Parameters + ---------- + start_ts : int + Start timestamp in milliseconds. + end_ts : int + End timestamp in milliseconds. + + Returns + ------- + list[tuple[int, int]] + List of (partition_start_ms, partition_end_ms) tuples aligned + to monthly boundaries. + """ + start_date = pandas.Timestamp.fromtimestamp(start_ts / 1000.0) + end_date = pandas.Timestamp.fromtimestamp(end_ts / 1000.0) + + # First day of start month + start_partition = pandas.Timestamp(year=start_date.year, month=start_date.month, day=1) + # First day of month after end + if end_date.month == 12: + end_partition = pandas.Timestamp(year=end_date.year + 1, month=1, day=1) + else: + end_partition = pandas.Timestamp(year=end_date.year, month=end_date.month + 1, day=1) + + # Build list of monthly boundaries + boundaries_ms = [] + current = start_partition + while current <= end_partition: + boundaries_ms.append(int(current.timestamp() * 1000)) + if current.month == 12: + current = pandas.Timestamp(year=current.year + 1, month=1, day=1) + else: + current = pandas.Timestamp(year=current.year, month=current.month + 1, day=1) + + return list(zip(boundaries_ms[:-1], boundaries_ms[1:])) diff --git a/argos/noSQLdask/mongoBag.py b/argos/noSQLdask/mongoBag.py index 031d29f..3ddade8 100644 --- a/argos/noSQLdask/mongoBag.py +++ b/argos/noSQLdask/mongoBag.py @@ -3,18 +3,33 @@ Provides the ``MongoBag`` class for querying time-series data from MongoDB using Dask for parallel, partitioned reads. + +Performance notes: + +- Uses a single shared MongoClient (connection pooling) instead of + opening/closing a connection per partition. +- Uses datetime objects in queries instead of string comparison, allowing + MongoDB to use time-based indexes efficiently. +- Uses ``dask.delayed`` → ``pd.DataFrame`` instead of ``dask.bag`` to + avoid unnecessary materialization overhead. +- Supports projections to limit returned fields. """ import dask +import dask.dataframe as dd import pymongo import pandas + class MongoBag: """ - Dask bag interface for querying MongoDB time-series collections. + Dask interface for querying MongoDB time-series collections. Partitions a time range into intervals and reads each interval in - parallel using Dask bags. + parallel using ``dask.delayed``. + + Uses a single shared ``MongoClient`` for all queries (connection pooled + by pymongo). Call ``close()`` when done. Parameters ---------- @@ -25,18 +40,63 @@ class MongoBag: datetimeField : str, optional The name of the timestamp field in the documents. Defaults to ``"timestamp"``. + host : str, optional + The MongoDB connection URI. Defaults to ``"localhost"``. Examples -------- >>> bag = MongoBag(db_name="mydb", collection_name="sensor_data") - >>> dask_bag = bag.bag("2024-01-01", "2024-01-31", periods=20) - >>> df = dask_bag.to_dataframe().compute() + >>> df = bag.getDataFrame("2024-01-01", "2024-01-31", periods=20) + >>> bag.close() + + Or as a context manager: + + >>> with MongoBag("mydb", "sensor_data") as bag: + ... df = bag.getDataFrame("2024-01-01", "2024-01-31") """ - _db_name = None - _collection_name = None + def __init__(self, db_name, collection_name, datetimeField="timestamp", + host="localhost"): + """ + Initialize the MongoBag with a persistent MongoClient. - _timestamp_field = None + Parameters + ---------- + db_name : str + The MongoDB database name. + collection_name : str + The collection name. + datetimeField : str, optional + The timestamp field name. Defaults to ``"timestamp"``. + host : str, optional + The MongoDB connection URI. Defaults to ``"localhost"``. + """ + self._db_name = db_name + self._collection_name = collection_name + self._timestamp_field = datetimeField + self._client = pymongo.MongoClient(host) + + def close(self): + """ + Close the MongoDB client connection. + + Safe to call multiple times. + """ + if self._client is not None: + self._client.close() + self._client = None + + def __del__(self): + """Close the connection on garbage collection.""" + self.close() + + def __enter__(self): + """Support context manager usage.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Close on context manager exit.""" + self.close() @property def db_name(self): @@ -74,29 +134,56 @@ def timestamp_field(self): """ return self._timestamp_field - def __init__(self,db_name, collection_name,datetimeField="timestamp"): + def _read_partition(self, start_ts, end_ts, projection=None, **qry): """ - Initialize the MongoBag. + Read documents from MongoDB for a single time interval. + + Uses datetime objects for the query (not strings) so MongoDB + can use time-based indexes. Parameters ---------- - db_name : str - The MongoDB database name. - collection_name : str - The collection name. - datetimeField : str, optional - The timestamp field name. Defaults to ``"timestamp"``. + start_ts : datetime-like + Start of the time interval. + end_ts : datetime-like + End of the time interval. + projection : dict, optional + MongoDB projection to limit returned fields. + Example: ``{"_id": 0, "timestamp": 1, "value": 1}``. + **qry : dict + Additional MongoDB query filters. + + Returns + ------- + pandas.DataFrame + A DataFrame of documents matching the query. + Empty DataFrame if no results. """ - self._db_name = db_name - self._collection_name = collection_name - self._timestamp_field = datetimeField + full_qry = { + self._timestamp_field: { + '$gte': start_ts, + '$lt': end_ts + } + } + full_qry.update(qry) + + collection = self._client[self._db_name][self._collection_name] + + cursor = collection.find(full_qry, projection=projection) + items = list(cursor) + + if not items: + return pandas.DataFrame() - def bag(self, start_time, end_time, periods: int = 10 ,freq : str =None,**qry): + return pandas.DataFrame(items) + + def bag(self, start_time, end_time, periods=10, freq=None, + projection=None, **qry): """ - Create a Dask bag for parallel reads over a time range. + Create a Dask DataFrame for parallel reads over a time range. Splits the time range into partitions using ``pandas.date_range`` - and reads each partition in parallel. + and reads each partition in parallel using ``dask.delayed``. Parameters ---------- @@ -109,31 +196,74 @@ def bag(self, start_time, end_time, periods: int = 10 ,freq : str =None,**qry): freq : str, optional Partition frequency (e.g., ``"1D"``, ``"1H"``). If set, ``periods`` is ignored. + projection : dict, optional + MongoDB projection to limit returned fields. **qry : dict - Additional MongoDB query filters merged into each partition's - query. + Additional MongoDB query filters. Returns ------- - dask.bag.Bag - A Dask bag of document dicts from the collection. + dask.dataframe.DataFrame + A Dask DataFrame of documents from the collection. """ - start_time = pandas.to_datetime(start_time) - end_time = pandas.to_datetime(end_time) + end_time = pandas.to_datetime(end_time) - dateRange = pandas.date_range(start_time,end_time,periods=periods,freq=freq,tz="israel") + date_range = pandas.date_range( + start_time, end_time, periods=periods, freq=freq, tz="israel" + ) - partitions_requests = list(zip(dateRange[:-1], dateRange[1:])) - b = (dask.bag.from_sequence(partitions_requests) - .map(lambda x: self.read_datetime_interval_from_collection(x,**qry)) - .flatten()) - return b + delayed_parts = [] + for i in range(len(date_range) - 1): + part = dask.delayed(self._read_partition)( + date_range[i].to_pydatetime(), + date_range[i + 1].to_pydatetime(), + projection=projection, + **qry + ) + delayed_parts.append(part) - def read_datetime_interval_from_collection(self, args,**qry): + return dd.from_delayed(delayed_parts) + + def getDataFrame(self, start_time, end_time, periods=10, freq=None, + projection=None, **qry): + """ + Query data and return as a Pandas DataFrame. + + Convenience method that calls ``bag()`` and computes immediately. + + Parameters + ---------- + start_time : str + Start of the time range. + end_time : str + End of the time range. + periods : int, optional + Number of partitions. Defaults to 10. + freq : str, optional + Partition frequency. + projection : dict, optional + MongoDB projection to limit returned fields. + **qry : dict + Additional MongoDB query filters. + + Returns + ------- + pandas.DataFrame + A DataFrame of all matching documents. + """ + ddf = self.bag(start_time, end_time, periods=periods, freq=freq, + projection=projection, **qry) + return ddf.compute() + + def read_datetime_interval_from_collection(self, args, **qry): """ Read documents from MongoDB for a single time interval. + .. deprecated:: + Use ``_read_partition()`` instead. This method is kept for + backwards compatibility. + Parameters ---------- args : tuple[Timestamp, Timestamp] @@ -146,14 +276,4 @@ def read_datetime_interval_from_collection(self, args,**qry): list[dict] A list of document dicts matching the time range and filters. """ - - start_ts = args[0].strftime("%Y-%-m-%-d %-H:%-M:%-S.%f") - end_ts = args[1].strftime("%Y-%-m-%-d %-H:%-M:%-S.%f") - - full_qry = {self._timestamp_field: {'$gte': start_ts, '$lte': end_ts}} - full_qry.update(qry) - - with pymongo.MongoClient() as mongo_client: - collection = mongo_client[self.db_name][self.collection_name] - items = list(collection.find(full_qry)) - return items + return self._read_partition(args[0], args[1], **qry).to_dict('records')