diff --git a/CHANGELOG.md b/CHANGELOG.md index 1770318..839b845 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,11 @@ ## 0.19.0 [unreleased] +### Features + +1. [#198](https://github.com/InfluxCommunity/influxdb3-python/pull/198): Support custom tag order via `tag_order` write option. + See [Sort tags by priority](https://docs.influxdata.com/influxdb3/enterprise/write-data/best-practices/schema-design/#sort-tags-by-query-priority) for more. + ## 0.18.0 [2026-02-19] ### Features diff --git a/README.md b/README.md index 0be7419..522ee1c 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,30 @@ You can write data using the Point class, or supplying line protocol. point = Point("measurement").tag("location", "london").field("temperature", 42) client.write(point) ``` + +### Control tag order for first-write column order (InfluxDB 3 Enterprise) +```python +from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions, WriteType, write_client_options + +point = Point("cpu") \ + .tag("host", "server-a") \ + .tag("region", "us-east") \ + .tag("rack", "r1") \ + .field("usage", 0.42) + +write_options = WriteOptions( + write_type=WriteType.synchronous, + tag_order=["region", "host"], +) + +client = InfluxDBClient3( + token="your-token", + host="your-host", + database="your-database", + write_client_options=write_client_options(write_options=write_options), +) +client.write(point) +``` ### Using Line Protocol ```python point = "measurement fieldname=0" diff --git a/influxdb_client_3/write_client/client/_base.py b/influxdb_client_3/write_client/client/_base.py index 783eb3f..34ff05d 100644 --- a/influxdb_client_3/write_client/client/_base.py +++ b/influxdb_client_3/write_client/client/_base.py @@ -246,7 +246,8 @@ def _serialize(self, record, write_precision, payload, **kwargs): elif isinstance(record, Point): precision_from_point = kwargs.get('precision_from_point', True) precision = record.write_precision if precision_from_point else write_precision - self._serialize(record.to_line_protocol(precision=precision), precision, payload, **kwargs) + self._serialize(record.to_line_protocol(precision=precision, tag_order=kwargs.get('tag_order')), + precision, payload, **kwargs) elif isinstance(record, dict): self._serialize(Point.from_dict(record, write_precision=write_precision, **kwargs), diff --git a/influxdb_client_3/write_client/client/write/dataframe_serializer.py b/influxdb_client_3/write_client/client/write/dataframe_serializer.py index 245d980..55d7319 100644 --- a/influxdb_client_3/write_client/client/write/dataframe_serializer.py +++ b/influxdb_client_3/write_client/client/write/dataframe_serializer.py @@ -10,7 +10,7 @@ from influxdb_client_3.write_client.domain import WritePrecision from influxdb_client_3.write_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, _ESCAPE_MEASUREMENT, \ - DEFAULT_WRITE_PRECISION + DEFAULT_WRITE_PRECISION, ordered_tag_keys logger = logging.getLogger('influxdb_client.client.write.dataframe_serializer') @@ -130,8 +130,8 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION # keys holds a list of string keys. keys = [] - # tags holds a list of tag f-string segments ordered alphabetically by tag key. - tags = [] + # tag_segments holds map of tag key -> tag f-string segment. + tag_segments = {} # fields holds a list of field f-string segments ordered alphabetically by field key fields = [] # field_indexes holds the index into each row of all the fields. @@ -188,7 +188,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION }}""" else: key_value = f',{key_format}={{str({val_format}).translate(_ESCAPE_KEY)}}' - tags.append(key_value) + tag_segments[key] = key_value continue elif timestamp_column is not None and key in timestamp_column: timestamp_index = field_index @@ -225,7 +225,8 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION measurement_name = str(data_frame_measurement_name).translate(_ESCAPE_MEASUREMENT) - tags = ''.join(tags) + tag_keys = ordered_tag_keys(list(tag_segments.keys()), kwargs.get('tag_order')) + tag_string = ''.join(tag_segments[tag_key] for tag_key in tag_keys) fields = ''.join(fields) timestamp = '{p[%s].value}' % timestamp_index if precision == WritePrecision.US: @@ -235,7 +236,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION elif precision == WritePrecision.S: timestamp = '{int(p[%s].value / 1e9)}' % timestamp_index - f = eval(f'lambda p: f"""{{measurement_name}}{tags} {fields} {timestamp}"""', { + f = eval(f'lambda p: f"""{{measurement_name}}{tag_string} {fields} {timestamp}"""', { 'measurement_name': measurement_name, '_ESCAPE_KEY': _ESCAPE_KEY, '_ESCAPE_STRING': _ESCAPE_STRING, diff --git a/influxdb_client_3/write_client/client/write/point.py b/influxdb_client_3/write_client/client/write/point.py index bc7211d..8346b0c 100644 --- a/influxdb_client_3/write_client/client/write/point.py +++ b/influxdb_client_3/write_client/client/write/point.py @@ -3,6 +3,7 @@ import math import warnings from builtins import int +from collections.abc import Iterable from datetime import datetime, timedelta, timezone from decimal import Decimal from numbers import Integral @@ -215,11 +216,12 @@ def field(self, field, value): self._fields[field] = value return self - def to_line_protocol(self, precision=None): + def to_line_protocol(self, precision=None, tag_order=None): """ Create LineProtocol. :param precision: required precision of LineProtocol. If it's not set then use the precision from ``Point``. + :param tag_order: optional list of tag names to prioritize in serialized output """ _measurement = _escape_key(self._name, _ESCAPE_MEASUREMENT) if _measurement.startswith("#"): @@ -229,7 +231,7 @@ def to_line_protocol(self, precision=None): - https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/#comments """ warnings.warn(message, SyntaxWarning) - _tags = _append_tags(self._tags) + _tags = _append_tags(self._tags, tag_order) _fields = _append_fields(self._fields, self._field_types) if not _fields: return "" @@ -252,9 +254,10 @@ def __str__(self): return self.to_line_protocol() -def _append_tags(tags): +def _append_tags(tags, tag_order=None): _return = [] - for tag_key, tag_value in sorted(tags.items()): + for tag_key in ordered_tag_keys(sorted(tags.keys()), tag_order): + tag_value = tags.get(tag_key) if tag_value is None: continue @@ -267,6 +270,49 @@ def _append_tags(tags): return f"{',' if _return else ''}{','.join(_return)} " +def sanitize_tag_order(tag_order): + if tag_order is None: + return [] + + if isinstance(tag_order, (str, bytes)): + raise TypeError("tag_order must be an iterable of strings, not str/bytes") + + if not isinstance(tag_order, Iterable): + raise TypeError("tag_order must be an iterable of strings") + + sanitized = [] + seen = set() + for tag in tag_order: + if tag is None or tag == "": + continue + if not isinstance(tag, str): + raise TypeError("tag_order entries must be strings") + if tag in seen: + continue + seen.add(tag) + sanitized.append(tag) + return sanitized + + +def ordered_tag_keys(existing_keys, tag_order=None): + ordered_keys = list(existing_keys) + if not tag_order: + return ordered_keys + + remaining = set(ordered_keys) + prioritized = [] + for tag_key in tag_order: + if not tag_key: + continue + if tag_key not in remaining: + continue + remaining.remove(tag_key) + prioritized.append(tag_key) + + prioritized.extend([tag_key for tag_key in ordered_keys if tag_key in remaining]) + return prioritized + + def _append_fields(fields, field_types): _return = [] diff --git a/influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py b/influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py index 2156cf7..61f272c 100644 --- a/influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py +++ b/influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py @@ -7,7 +7,8 @@ import logging import math -from influxdb_client_3.write_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, DEFAULT_WRITE_PRECISION +from influxdb_client_3.write_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, DEFAULT_WRITE_PRECISION, \ + ordered_tag_keys logger = logging.getLogger('influxdb_client.client.write.polars_dataframe_serializer') @@ -36,6 +37,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION self.chunk_size = chunk_size self.measurement_name = kwargs.get("data_frame_measurement_name", "measurement") self.tag_columns = kwargs.get("data_frame_tag_columns", []) + self.tag_order = kwargs.get("tag_order", None) self.timestamp_column = kwargs.get("data_frame_timestamp_column", None) self.timestamp_timezone = kwargs.get("data_frame_timestamp_timezone", None) @@ -62,25 +64,31 @@ def escape_value(self, value): return str(value).translate(_ESCAPE_STRING) def to_line_protocol(self, row): - # Filter out None or empty values for tags - tags = "" + tag_values = {} + tag_keys = [] + for col in self.tag_columns: + value = row[self.column_indices[col]] + if value is None or value == "": + continue + if col not in tag_values: + tag_keys.append(col) + tag_values[col] = value + if self.point_settings.defaultTags: + for key, value in self.point_settings.defaultTags.items(): + if value is None or value == "": + continue + if key in tag_values: + continue + tag_keys.append(key) + tag_values[key] = value + + final_tag_keys = ordered_tag_keys(tag_keys, self.tag_order) tags = ",".join( - f'{self.escape_key(col)}={self.escape_key(row[self.column_indices[col]])}' - for col in self.tag_columns - if row[self.column_indices[col]] is not None and row[self.column_indices[col]] != "" + f'{self.escape_key(key)}={self.escape_key(tag_values[key])}' + for key in final_tag_keys ) - if self.point_settings.defaultTags: - default_tags = ",".join( - f'{self.escape_key(key)}={self.escape_key(value)}' - for key, value in self.point_settings.defaultTags.items() - ) - # Ensure there's a comma between existing tags and default tags if both are present - if tags and default_tags: - tags += "," - tags += default_tags - # add escape symbols for special characters to tags fields = ",".join( diff --git a/influxdb_client_3/write_client/client/write_api.py b/influxdb_client_3/write_client/client/write_api.py index b071856..250a07e 100644 --- a/influxdb_client_3/write_client/client/write_api.py +++ b/influxdb_client_3/write_client/client/write_api.py @@ -22,7 +22,7 @@ from influxdb_client_3.write_client.client._base import _BaseWriteApi, _HAS_DATACLASS from influxdb_client_3.write_client.client.util.helpers import get_org_query_param from influxdb_client_3.write_client.client.write.dataframe_serializer import DataframeSerializer -from influxdb_client_3.write_client.client.write.point import Point, DEFAULT_WRITE_PRECISION +from influxdb_client_3.write_client.client.write.point import Point, DEFAULT_WRITE_PRECISION, sanitize_tag_order from influxdb_client_3.write_client.client.write.retry import WritesRetry from influxdb_client_3.write_client.domain import WritePrecision from influxdb_client_3.write_client.rest import _UTF_8_encoding @@ -43,6 +43,8 @@ 'record_time_key', 'record_tag_keys', 'record_field_keys', + # Point serialization-specific kwargs + 'tag_order', } logger = logging.getLogger('influxdb_client_3.write_client.client.write_api') @@ -81,6 +83,7 @@ def __init__(self, write_type: WriteType = WriteType.batching, max_close_wait=300_000, write_precision=DEFAULT_WRITE_PRECISION, no_sync=DEFAULT_WRITE_NO_SYNC, + tag_order=None, timeout=DEFAULT_WRITE_TIMEOUT, write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None: """ @@ -100,6 +103,7 @@ def __init__(self, write_type: WriteType = WriteType.batching, :param max_close_wait: the maximum time to wait for writes to be flushed if close() is called :param write_precision: precision to use when writing points to InfluxDB :param no_sync: skip waiting for WAL persistence on write + :param tag_order: optional list of tag names used to prioritize tag serialization order :param timeout: timeout to use when writing to the database in milliseconds. Default is 10_000 :param write_scheduler: """ @@ -117,6 +121,7 @@ def __init__(self, write_type: WriteType = WriteType.batching, self.write_precision = write_precision self.timeout = timeout self.no_sync = no_sync + self.tag_order = sanitize_tag_order(tag_order) def to_retry_strategy(self, **kwargs): """ @@ -380,6 +385,11 @@ def write(self, bucket: str, org: str = None, if write_precision is None: write_precision = self._write_options.write_precision + if 'tag_order' in kwargs: + kwargs['tag_order'] = sanitize_tag_order(kwargs.get('tag_order')) + else: + kwargs['tag_order'] = self._write_options.tag_order + if self._write_options.write_type is WriteType.batching: return self._write_batching(bucket, org, record, write_precision, **kwargs) @@ -520,7 +530,9 @@ def _write_batching(self, bucket, org, data, precision, **kwargs) elif isinstance(data, Point): - self._write_batching(bucket, org, data.to_line_protocol(), data.write_precision, **kwargs) + self._write_batching(bucket, org, + data.to_line_protocol(tag_order=kwargs.get('tag_order')), + data.write_precision, **kwargs) elif isinstance(data, dict): self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision, **kwargs), diff --git a/influxdb_client_3/write_client/domain/write_precision.py b/influxdb_client_3/write_client/domain/write_precision.py index 4917201..cf175dd 100644 --- a/influxdb_client_3/write_client/domain/write_precision.py +++ b/influxdb_client_3/write_client/domain/write_precision.py @@ -30,7 +30,7 @@ class WritePrecision(object): def __init__(self): # noqa: E501,D401,D403 """WritePrecision - a model defined in OpenAPI.""" # noqa: E501 self.discriminator = None - def to_dict(self): + def to_dict(self): # pragma: no cover """Return the model properties as a dict.""" result = {} @@ -54,21 +54,21 @@ def to_dict(self): return result - def to_str(self): + def to_str(self): # pragma: no cover """Return the string representation of the model.""" return pprint.pformat(self.to_dict()) - def __repr__(self): + def __repr__(self): # pragma: no cover """For `print` and `pprint`.""" return self.to_str() - def __eq__(self, other): + def __eq__(self, other): # pragma: no cover """Return true if both objects are equal.""" if not isinstance(other, WritePrecision): return False return self.__dict__ == other.__dict__ - def __ne__(self, other): + def __ne__(self, other): # pragma: no cover """Return true if both objects are not equal.""" return not self == other diff --git a/tests/test_dataframe_serializer.py b/tests/test_dataframe_serializer.py index 7cb67e6..419125f 100644 --- a/tests/test_dataframe_serializer.py +++ b/tests/test_dataframe_serializer.py @@ -276,6 +276,30 @@ def test_tags_order(self): self.assertEqual(1, len(points)) self.assertEqual("h2o,a=a,b=b,c=c level=2i 1586048400000000000", points[0]) + points = data_frame_to_list_of_points(data_frame=data_frame, + point_settings=PointSettings(), + data_frame_measurement_name='h2o', + data_frame_tag_columns={"c", "a", "b"}, + tag_order=["c", "a"]) + + self.assertEqual(1, len(points)) + self.assertEqual("h2o,c=c,a=a,b=b level=2i 1586048400000000000", points[0]) + + ps = PointSettings(z="from-default", c="override-ignored") + points_with_defaults = data_frame_to_list_of_points( + data_frame=data_frame, + point_settings=ps, + data_frame_measurement_name='h2o', + data_frame_tag_columns={"c", "a", "b"}, + tag_order=["z", "c", "a"], + ) + + self.assertEqual(1, len(points_with_defaults)) + self.assertEqual( + "h2o,z=from-default,c=c,a=a,b=b level=2i 1586048400000000000", + points_with_defaults[0] + ) + def test_escape_text_value(self): now = pd.Timestamp('2020-04-05 00:00+00:00') an_hour_ago = now - timedelta(hours=1) diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index 4b489d6..c928f60 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -1,7 +1,7 @@ import re import unittest +from collections import defaultdict from unittest.mock import patch - from pytest_httpserver import HTTPServer from influxdb_client_3 import InfluxDBClient3, WritePrecision, DefaultWriteOptions, Point, WriteOptions, WriteType, \ @@ -90,7 +90,8 @@ def test_write_options(self): max_retry_time=0, max_retry_delay=0, timeout=30_000, - flush_interval=500,)) + flush_interval=500, + tag_order=["region", "", "host", "region"])) ) self.assertIsInstance(client._write_client_options["write_options"], WriteOptions) @@ -103,6 +104,13 @@ def test_write_options(self): self.assertEqual(0, client._write_client_options["write_options"].max_retry_delay) self.assertEqual(WriteType.synchronous, client._write_client_options["write_options"].write_type) self.assertEqual(500, client._write_client_options["write_options"].flush_interval) + self.assertEqual(["region", "host"], client._write_client_options["write_options"].tag_order) + + with self.assertRaisesRegex(TypeError, "tag_order must be an iterable of strings, not str/bytes"): + WriteOptions(tag_order="region,host") + + with self.assertRaisesRegex(TypeError, "tag_order entries must be strings"): + WriteOptions(tag_order=["region", 1]) def test_default_write_options(self): client = InfluxDBClient3( @@ -118,6 +126,7 @@ def test_default_write_options(self): self.assertEqual(DefaultWriteOptions.write_precision.value, client._write_client_options["write_options"].write_precision) self.assertEqual(DefaultWriteOptions.timeout.value, client._write_client_options["write_options"].timeout) + self.assertEqual([], client._write_client_options["write_options"].tag_order) @asyncio_run async def test_query_async(self): @@ -147,13 +156,60 @@ def test_write_api_custom_options_no_error(self): write_options = WriteOptions(write_type=WriteType.batching) write_client_option = {'write_options': write_options} client = InfluxDBClient3(write_client_options=write_client_option) + sync_client = None try: client._write_api._write_batching("bucket", "org", Point.measurement("test"), None) + client._write_api._write_batching("bucket", "org", { + "measurement": "test", + "fields": {"value": 1} + }, None) + df = pd.DataFrame({ + "value": [1, 2], + }, index=pd.to_datetime(["2024-01-01T00:00:00Z", "2024-01-01T01:00:00Z"])) + client._write_api._write_batching( + "bucket", "org", df, None, + data_frame_measurement_name="test_measurement", + ) + point = Point.measurement("test").tag("host", "h1").field("value", 1).time(1, WritePrecision.S) + payload = defaultdict(list) + client._write_api._serialize(point, WritePrecision.NS, payload, tag_order=["host"]) + self.assertIn(WritePrecision.S, payload) + + payload_forced = defaultdict(list) + client._write_api._serialize(point, WritePrecision.NS, payload_forced, + precision_from_point=False, tag_order=["host"]) + self.assertIn(WritePrecision.NS, payload_forced) + + sync_client = InfluxDBClient3( + host="localhost", + org="my_org", + database="my_db", + token="my_token", + write_client_options=write_client_options( + write_options=WriteOptions(write_type=WriteType.synchronous)) + ) + with patch.object(sync_client._write_api, "_post_write", return_value=None) as mock_post: + sync_point = Point.measurement("measurement") \ + .tag("host", "h1") \ + .tag("region", "us-east") \ + .field("value", 1) + sync_client.write(record=sync_point, tag_order=["region", "", "host", "region"]) + + args, kwargs = mock_post.call_args + body = kwargs.get("body") + if body is None and len(args) >= 4: + body = args[3] + if isinstance(body, bytes): + body = body.decode("utf-8") + self.assertIn("measurement,region=us-east,host=h1 value=1i", body) + self.assertTrue(True) except Exception as e: self.fail(f"Write API with default options raised an exception: {str(e)}") finally: client._write_api._on_complete() # abort batch writes - otherwise test cycles through urllib3 retries + if sync_client is not None: + sync_client.close() def test_default_client(self): expected_precision = DefaultWriteOptions.write_precision.value @@ -181,10 +237,12 @@ def verify_client_write_options(c): self.assertEqual(write_options.write_precision, expected_precision) self.assertEqual(write_options.write_type, expected_write_type) self.assertEqual(write_options.no_sync, expected_no_sync) + self.assertEqual(write_options.tag_order, []) self.assertEqual(c._write_api._write_options.write_precision, expected_precision) self.assertEqual(c._write_api._write_options.write_type, expected_write_type) self.assertEqual(c._write_api._write_options.no_sync, expected_no_sync) + self.assertEqual(c._write_api._write_options.tag_order, []) env_client = InfluxDBClient3.from_env() verify_client_write_options(env_client) diff --git a/tests/test_point.py b/tests/test_point.py index 1559b62..98f724c 100644 --- a/tests/test_point.py +++ b/tests/test_point.py @@ -1,7 +1,8 @@ import datetime import unittest -from influxdb_client_3.write_client.client.write.point import EPOCH, Point +from influxdb_client_3 import WritePrecision +from influxdb_client_3.write_client.client.write.point import EPOCH, Point, _np_is_subtype class TestPoint(unittest.TestCase): @@ -12,3 +13,55 @@ def test_epoch(self): def test_point(self): point = Point.measurement("h2o").tag("location", "europe").field("level", 2.2).time(1_000_000) self.assertEqual('h2o,location=europe level=2.2 1000000', point.to_line_protocol()) + + def test_point_tag_order(self): + point = Point.measurement("h2o") \ + .tag("drop", None) \ + .tag("rack", "r1") \ + .tag("host", "h1") \ + .tag("region", "us-east") \ + .field("level", 2) + + self.assertEqual('h2o,host=h1,rack=r1,region=us-east level=2i', point.to_line_protocol()) + self.assertEqual('h2o,region=us-east,host=h1,rack=r1 level=2i', + point.to_line_protocol(tag_order=["region", "", "host", "region", "missing"])) + + def test_point_field_types_and_time_conversion(self): + point = Point.measurement("m") \ + .field("drop", None) \ + .field("flag", True) \ + .field("name", "abc") \ + .field("value", 1) + + self.assertEqual('m flag=true,name="abc",value=1i', point.to_line_protocol()) + + dt = datetime.datetime(1970, 1, 1, 0, 0, 1, tzinfo=datetime.timezone.utc) + self.assertEqual('m value=1i 1000000000', + Point.measurement("m").field("value", 1).time(dt, WritePrecision.NS).to_line_protocol()) + self.assertEqual('m value=1i 1000000', + Point.measurement("m").field("value", 1).time(dt, WritePrecision.US).to_line_protocol()) + self.assertEqual('m value=1i 1000', + Point.measurement("m").field("value", 1).time(dt, WritePrecision.MS).to_line_protocol()) + self.assertEqual('m value=1i 1', + Point.measurement("m").field("value", 1).time(dt, WritePrecision.S).to_line_protocol()) + self.assertEqual('m value=1i 1000000', + Point.measurement("m").field("value", 1) + .time(datetime.timedelta(seconds=1), WritePrecision.US).to_line_protocol()) + self.assertEqual('m value=1i 1', + Point.measurement("m").field("value", 1) + .time("1970-01-01T00:00:01Z", WritePrecision.S).to_line_protocol()) + + with self.assertRaisesRegex(ValueError, 'not supported'): + Point.measurement("m").field("bad", object()).to_line_protocol() + with self.assertRaises(ValueError): + Point.measurement("m").field("value", 1).time([]).to_line_protocol() + + def test_np_is_subtype(self): + try: + import numpy as np + except ImportError: + self.skipTest("numpy not installed") + + self.assertTrue(_np_is_subtype(np.float64(1.0), 'float')) + self.assertTrue(_np_is_subtype(np.int64(1), 'int')) + self.assertFalse(_np_is_subtype(np.int64(1), 'other')) diff --git a/tests/test_polars.py b/tests/test_polars.py index 70571c9..61ff206 100644 --- a/tests/test_polars.py +++ b/tests/test_polars.py @@ -17,6 +17,7 @@ def test_to_list_of_points(self): df = pl.DataFrame(data={ "name": ['iot-devices', 'iot-devices', 'iot-devices'], "building": ['5a', '5a', '5a'], + "region": ['us-east', 'us-east', 'us-east'], "temperature": [72.3, 72.1, 72.2], "time": pl.Series(["2022-10-01T12:01:00Z", "2022-10-02T12:01:00Z", "2022-10-03T12:01:00Z"]) .str.to_datetime(time_unit='ns') @@ -27,12 +28,101 @@ def test_to_list_of_points(self): data_frame_timestamp_column='time') expected = [ - 'iot-devices,building=5a name="iot-devices",temperature=72.3 1664625660000000000', - 'iot-devices,building=5a name="iot-devices",temperature=72.1 1664712060000000000', - 'iot-devices,building=5a name="iot-devices",temperature=72.2 1664798460000000000' + 'iot-devices,building=5a name="iot-devices",region="us-east",temperature=72.3 1664625660000000000', + 'iot-devices,building=5a name="iot-devices",region="us-east",temperature=72.1 1664712060000000000', + 'iot-devices,building=5a name="iot-devices",region="us-east",temperature=72.2 1664798460000000000' ] self.assertEqual(expected, actual) + def test_to_list_of_points_with_tag_order(self): + import polars as pl + ps = PointSettings() + df = pl.DataFrame(data={ + "name": ['iot-devices', 'iot-devices', 'iot-devices'], + "building": ['5a', '5a', '5a'], + "region": ['us-east', 'us-east', 'us-east'], + "temperature": [72.3, 72.1, 72.2], + "time": pl.Series(["2022-10-01T12:01:00Z", "2022-10-02T12:01:00Z", "2022-10-03T12:01:00Z"]) + .str.to_datetime(time_unit='ns') + }) + actual = polars_data_frame_to_list_of_points(df, ps, + data_frame_measurement_name='iot-devices', + data_frame_tag_columns=['building', 'region'], + data_frame_timestamp_column='time', + tag_order=['region', 'building']) + + expected = [ + 'iot-devices,region=us-east,building=5a name="iot-devices",temperature=72.3 1664625660000000000', + 'iot-devices,region=us-east,building=5a name="iot-devices",temperature=72.1 1664712060000000000', + 'iot-devices,region=us-east,building=5a name="iot-devices",temperature=72.2 1664798460000000000' + ] + self.assertEqual(expected, actual) + + def test_to_list_of_points_with_default_tags(self): + import polars as pl + ps = PointSettings(env="prod", building="ignored", skip_empty="") + df = pl.DataFrame(data={ + "name": ['iot-devices'], + "building": [''], + "temperature": [72.3], + "time": pl.Series(["2022-10-01T12:01:00Z"]).str.to_datetime(time_unit='ns') + }) + + actual = polars_data_frame_to_list_of_points( + df, ps, + data_frame_measurement_name='iot-devices', + data_frame_tag_columns=['building'], + data_frame_timestamp_column='time', + tag_order=['env', 'building'], + ) + + expected = [ + 'iot-devices,env=prod,building=ignored name="iot-devices",temperature=72.3 1664625660000000000' + ] + self.assertEqual(expected, actual) + + def test_to_list_of_points_with_precision_variants(self): + import polars as pl + ps = PointSettings() + df = pl.DataFrame(data={ + "name": ['iot-devices'], + "temperature": [72.3], + "time": pl.Series(["2022-10-01T12:01:00Z"]).str.to_datetime(time_unit='ns') + }) + + actual_us = polars_data_frame_to_list_of_points( + df, ps, precision='us', + data_frame_measurement_name='iot-devices', + data_frame_timestamp_column='time') + self.assertEqual( + ['iot-devices name="iot-devices",temperature=72.3 1664625660000000'], + actual_us + ) + + actual_ms = polars_data_frame_to_list_of_points( + df, ps, precision='ms', + data_frame_measurement_name='iot-devices', + data_frame_timestamp_column='time') + self.assertEqual( + ['iot-devices name="iot-devices",temperature=72.3 1664625660000'], + actual_ms + ) + + actual_s = polars_data_frame_to_list_of_points( + df, ps, precision='s', + data_frame_measurement_name='iot-devices', + data_frame_timestamp_column='time') + self.assertEqual( + ['iot-devices name="iot-devices",temperature=72.3 1664625660'], + actual_s + ) + + with self.assertRaisesRegex(ValueError, "Unsupported precision"): + polars_data_frame_to_list_of_points( + df, ps, precision='bad', + data_frame_measurement_name='iot-devices', + data_frame_timestamp_column='time') + @unittest.skipIf(importlib.util.find_spec("polars") is None, 'Polars package not installed') class TestWritePolars(unittest.TestCase):