Skip to content
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## 0.19.0 [unreleased]

### Features

1. [#198](https://github.com/InfluxCommunity/influxdb3-python/pull/198): Support custom tag order via `tag_order` write option.
See [Sort tags by priority](https://docs.influxdata.com/influxdb3/enterprise/write-data/best-practices/schema-design/#sort-tags-by-query-priority) for more.

## 0.18.0 [2026-02-19]

### Features
Expand Down
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,30 @@ You can write data using the Point class, or supplying line protocol.
point = Point("measurement").tag("location", "london").field("temperature", 42)
client.write(point)
```

### Control tag order for first-write column order (InfluxDB 3 Enterprise)
```python
from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions, WriteType, write_client_options

point = Point("cpu") \
.tag("host", "server-a") \
.tag("region", "us-east") \
.tag("rack", "r1") \
.field("usage", 0.42)

write_options = WriteOptions(
write_type=WriteType.synchronous,
tag_order=["region", "host"],
)

client = InfluxDBClient3(
token="your-token",
host="your-host",
database="your-database",
write_client_options=write_client_options(write_options=write_options),
)
client.write(point)
```
### Using Line Protocol
```python
point = "measurement fieldname=0"
Expand Down
3 changes: 2 additions & 1 deletion influxdb_client_3/write_client/client/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ def _serialize(self, record, write_precision, payload, **kwargs):
elif isinstance(record, Point):
precision_from_point = kwargs.get('precision_from_point', True)
precision = record.write_precision if precision_from_point else write_precision
self._serialize(record.to_line_protocol(precision=precision), precision, payload, **kwargs)
self._serialize(record.to_line_protocol(precision=precision, tag_order=kwargs.get('tag_order')),
precision, payload, **kwargs)

elif isinstance(record, dict):
self._serialize(Point.from_dict(record, write_precision=write_precision, **kwargs),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from influxdb_client_3.write_client.domain import WritePrecision
from influxdb_client_3.write_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, _ESCAPE_MEASUREMENT, \
DEFAULT_WRITE_PRECISION
DEFAULT_WRITE_PRECISION, ordered_tag_keys

logger = logging.getLogger('influxdb_client.client.write.dataframe_serializer')

Expand Down Expand Up @@ -130,8 +130,8 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION

# keys holds a list of string keys.
keys = []
# tags holds a list of tag f-string segments ordered alphabetically by tag key.
tags = []
# tag_segments holds map of tag key -> tag f-string segment.
tag_segments = {}
# fields holds a list of field f-string segments ordered alphabetically by field key
fields = []
# field_indexes holds the index into each row of all the fields.
Expand Down Expand Up @@ -188,7 +188,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
}}"""
else:
key_value = f',{key_format}={{str({val_format}).translate(_ESCAPE_KEY)}}'
tags.append(key_value)
tag_segments[key] = key_value
continue
elif timestamp_column is not None and key in timestamp_column:
timestamp_index = field_index
Expand Down Expand Up @@ -225,7 +225,8 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION

measurement_name = str(data_frame_measurement_name).translate(_ESCAPE_MEASUREMENT)

tags = ''.join(tags)
tag_keys = ordered_tag_keys(list(tag_segments.keys()), kwargs.get('tag_order'))
tag_string = ''.join(tag_segments[tag_key] for tag_key in tag_keys)
fields = ''.join(fields)
timestamp = '{p[%s].value}' % timestamp_index
if precision == WritePrecision.US:
Expand All @@ -235,7 +236,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
elif precision == WritePrecision.S:
timestamp = '{int(p[%s].value / 1e9)}' % timestamp_index

f = eval(f'lambda p: f"""{{measurement_name}}{tags} {fields} {timestamp}"""', {
f = eval(f'lambda p: f"""{{measurement_name}}{tag_string} {fields} {timestamp}"""', {
'measurement_name': measurement_name,
'_ESCAPE_KEY': _ESCAPE_KEY,
'_ESCAPE_STRING': _ESCAPE_STRING,
Expand Down
54 changes: 50 additions & 4 deletions influxdb_client_3/write_client/client/write/point.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import math
import warnings
from builtins import int
from collections.abc import Iterable
from datetime import datetime, timedelta, timezone
from decimal import Decimal
from numbers import Integral
Expand Down Expand Up @@ -215,11 +216,12 @@ def field(self, field, value):
self._fields[field] = value
return self

def to_line_protocol(self, precision=None):
def to_line_protocol(self, precision=None, tag_order=None):
"""
Create LineProtocol.

:param precision: required precision of LineProtocol. If it's not set then use the precision from ``Point``.
:param tag_order: optional list of tag names to prioritize in serialized output
"""
_measurement = _escape_key(self._name, _ESCAPE_MEASUREMENT)
if _measurement.startswith("#"):
Expand All @@ -229,7 +231,7 @@ def to_line_protocol(self, precision=None):
- https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/#comments
"""
warnings.warn(message, SyntaxWarning)
_tags = _append_tags(self._tags)
_tags = _append_tags(self._tags, tag_order)
_fields = _append_fields(self._fields, self._field_types)
if not _fields:
return ""
Expand All @@ -252,9 +254,10 @@ def __str__(self):
return self.to_line_protocol()


def _append_tags(tags):
def _append_tags(tags, tag_order=None):
_return = []
for tag_key, tag_value in sorted(tags.items()):
for tag_key in ordered_tag_keys(sorted(tags.keys()), tag_order):
tag_value = tags.get(tag_key)

if tag_value is None:
continue
Expand All @@ -267,6 +270,49 @@ def _append_tags(tags):
return f"{',' if _return else ''}{','.join(_return)} "


def sanitize_tag_order(tag_order):
if tag_order is None:
return []

if isinstance(tag_order, (str, bytes)):
raise TypeError("tag_order must be an iterable of strings, not str/bytes")

if not isinstance(tag_order, Iterable):
raise TypeError("tag_order must be an iterable of strings")

sanitized = []
seen = set()
for tag in tag_order:
if tag is None or tag == "":
continue
if not isinstance(tag, str):
raise TypeError("tag_order entries must be strings")
if tag in seen:
continue
seen.add(tag)
sanitized.append(tag)
return sanitized


def ordered_tag_keys(existing_keys, tag_order=None):
ordered_keys = list(existing_keys)
if not tag_order:
return ordered_keys

remaining = set(ordered_keys)
prioritized = []
for tag_key in tag_order:
if not tag_key:
continue
if tag_key not in remaining:
continue
remaining.remove(tag_key)
prioritized.append(tag_key)

prioritized.extend([tag_key for tag_key in ordered_keys if tag_key in remaining])
return prioritized


def _append_fields(fields, field_types):
_return = []

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import logging
import math

from influxdb_client_3.write_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, DEFAULT_WRITE_PRECISION
from influxdb_client_3.write_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, DEFAULT_WRITE_PRECISION, \
ordered_tag_keys

logger = logging.getLogger('influxdb_client.client.write.polars_dataframe_serializer')

Expand Down Expand Up @@ -36,6 +37,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION
self.chunk_size = chunk_size
self.measurement_name = kwargs.get("data_frame_measurement_name", "measurement")
self.tag_columns = kwargs.get("data_frame_tag_columns", [])
self.tag_order = kwargs.get("tag_order", None)
self.timestamp_column = kwargs.get("data_frame_timestamp_column", None)
self.timestamp_timezone = kwargs.get("data_frame_timestamp_timezone", None)

Expand All @@ -62,25 +64,31 @@ def escape_value(self, value):
return str(value).translate(_ESCAPE_STRING)

def to_line_protocol(self, row):
# Filter out None or empty values for tags
tags = ""
tag_values = {}
tag_keys = []
for col in self.tag_columns:
value = row[self.column_indices[col]]
if value is None or value == "":
continue
if col not in tag_values:
tag_keys.append(col)
tag_values[col] = value

if self.point_settings.defaultTags:
for key, value in self.point_settings.defaultTags.items():
if value is None or value == "":
continue
if key in tag_values:
continue
tag_keys.append(key)
tag_values[key] = value

final_tag_keys = ordered_tag_keys(tag_keys, self.tag_order)
tags = ",".join(
f'{self.escape_key(col)}={self.escape_key(row[self.column_indices[col]])}'
for col in self.tag_columns
if row[self.column_indices[col]] is not None and row[self.column_indices[col]] != ""
f'{self.escape_key(key)}={self.escape_key(tag_values[key])}'
for key in final_tag_keys
)

if self.point_settings.defaultTags:
default_tags = ",".join(
f'{self.escape_key(key)}={self.escape_key(value)}'
for key, value in self.point_settings.defaultTags.items()
)
# Ensure there's a comma between existing tags and default tags if both are present
if tags and default_tags:
tags += ","
tags += default_tags

# add escape symbols for special characters to tags

fields = ",".join(
Expand Down
16 changes: 14 additions & 2 deletions influxdb_client_3/write_client/client/write_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from influxdb_client_3.write_client.client._base import _BaseWriteApi, _HAS_DATACLASS
from influxdb_client_3.write_client.client.util.helpers import get_org_query_param
from influxdb_client_3.write_client.client.write.dataframe_serializer import DataframeSerializer
from influxdb_client_3.write_client.client.write.point import Point, DEFAULT_WRITE_PRECISION
from influxdb_client_3.write_client.client.write.point import Point, DEFAULT_WRITE_PRECISION, sanitize_tag_order
from influxdb_client_3.write_client.client.write.retry import WritesRetry
from influxdb_client_3.write_client.domain import WritePrecision
from influxdb_client_3.write_client.rest import _UTF_8_encoding
Expand All @@ -43,6 +43,8 @@
'record_time_key',
'record_tag_keys',
'record_field_keys',
# Point serialization-specific kwargs
'tag_order',
}

logger = logging.getLogger('influxdb_client_3.write_client.client.write_api')
Expand Down Expand Up @@ -81,6 +83,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
max_close_wait=300_000,
write_precision=DEFAULT_WRITE_PRECISION,
no_sync=DEFAULT_WRITE_NO_SYNC,
tag_order=None,
timeout=DEFAULT_WRITE_TIMEOUT,
write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None:
"""
Expand All @@ -100,6 +103,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
:param max_close_wait: the maximum time to wait for writes to be flushed if close() is called
:param write_precision: precision to use when writing points to InfluxDB
:param no_sync: skip waiting for WAL persistence on write
:param tag_order: optional list of tag names used to prioritize tag serialization order
:param timeout: timeout to use when writing to the database in milliseconds. Default is 10_000
:param write_scheduler:
"""
Expand All @@ -117,6 +121,7 @@ def __init__(self, write_type: WriteType = WriteType.batching,
self.write_precision = write_precision
self.timeout = timeout
self.no_sync = no_sync
self.tag_order = sanitize_tag_order(tag_order)

def to_retry_strategy(self, **kwargs):
"""
Expand Down Expand Up @@ -380,6 +385,11 @@ def write(self, bucket: str, org: str = None,
if write_precision is None:
write_precision = self._write_options.write_precision

if 'tag_order' in kwargs:
kwargs['tag_order'] = sanitize_tag_order(kwargs.get('tag_order'))
else:
kwargs['tag_order'] = self._write_options.tag_order

if self._write_options.write_type is WriteType.batching:
return self._write_batching(bucket, org, record,
write_precision, **kwargs)
Expand Down Expand Up @@ -520,7 +530,9 @@ def _write_batching(self, bucket, org, data,
precision, **kwargs)

elif isinstance(data, Point):
self._write_batching(bucket, org, data.to_line_protocol(), data.write_precision, **kwargs)
self._write_batching(bucket, org,
data.to_line_protocol(tag_order=kwargs.get('tag_order')),
data.write_precision, **kwargs)

elif isinstance(data, dict):
self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision, **kwargs),
Expand Down
10 changes: 5 additions & 5 deletions influxdb_client_3/write_client/domain/write_precision.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class WritePrecision(object):
def __init__(self): # noqa: E501,D401,D403
"""WritePrecision - a model defined in OpenAPI.""" # noqa: E501 self.discriminator = None

def to_dict(self):
def to_dict(self): # pragma: no cover
"""Return the model properties as a dict."""
result = {}

Expand All @@ -54,21 +54,21 @@ def to_dict(self):

return result

def to_str(self):
def to_str(self): # pragma: no cover
"""Return the string representation of the model."""
return pprint.pformat(self.to_dict())

def __repr__(self):
def __repr__(self): # pragma: no cover
"""For `print` and `pprint`."""
return self.to_str()

def __eq__(self, other):
def __eq__(self, other): # pragma: no cover
"""Return true if both objects are equal."""
if not isinstance(other, WritePrecision):
return False

return self.__dict__ == other.__dict__

def __ne__(self, other):
def __ne__(self, other): # pragma: no cover
"""Return true if both objects are not equal."""
return not self == other
24 changes: 24 additions & 0 deletions tests/test_dataframe_serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,30 @@ def test_tags_order(self):
self.assertEqual(1, len(points))
self.assertEqual("h2o,a=a,b=b,c=c level=2i 1586048400000000000", points[0])

points = data_frame_to_list_of_points(data_frame=data_frame,
point_settings=PointSettings(),
data_frame_measurement_name='h2o',
data_frame_tag_columns={"c", "a", "b"},
tag_order=["c", "a"])

self.assertEqual(1, len(points))
self.assertEqual("h2o,c=c,a=a,b=b level=2i 1586048400000000000", points[0])

ps = PointSettings(z="from-default", c="override-ignored")
points_with_defaults = data_frame_to_list_of_points(
data_frame=data_frame,
point_settings=ps,
data_frame_measurement_name='h2o',
data_frame_tag_columns={"c", "a", "b"},
tag_order=["z", "c", "a"],
)

self.assertEqual(1, len(points_with_defaults))
self.assertEqual(
"h2o,z=from-default,c=c,a=a,b=b level=2i 1586048400000000000",
points_with_defaults[0]
)

def test_escape_text_value(self):
now = pd.Timestamp('2020-04-05 00:00+00:00')
an_hour_ago = now - timedelta(hours=1)
Expand Down
Loading