Pure-Python NetXMS NXCP client library. Implements the NXCP v5 binary protocol including message marshaling, zlib compression, and RSA+AES encryption.
Requires Python 3.10+.
NOTE: NOT A PRODUCTION-READY CODE. Just an experiment for now to collect feedback.
uv add git+https://github.com/netxms/netxms-python.gitfrom netxms import NXClient
with NXClient.connect("your-netxms-server.example.com", "admin", "password") as client:
for alarm in client.alarms:
print(f"[{alarm.severity.name}] {alarm.message}")Specify client type and language:
from netxms.nxcp.constants import ClientType
with NXClient.connect("server", "admin", "password",
client_type=ClientType.WEB, language="de") as client:
...Manual connection (advanced):
client = NXClient("server.example.com", encrypt=False)
client.open()
client.login("admin", "password")
# ...
client.disconnect()with NXClient.connect("server", "admin", "password") as client:
if client.check_connection():
print("Connected")
# Server time (Unix timestamp, updated by keepalive messages)
print(client.server_time)Alarms are accessed through the client.alarms collection. Individual alarms
have action methods for lifecycle operations and comment management.
from netxms import NXClient
from netxms.nxcp.constants import AlarmState, Severity
with NXClient.connect("server", "admin", "password") as client:
# Iterate all active alarms
for alarm in client.alarms:
print(f"[{alarm.severity.name}] {alarm.message}")
# Get a specific alarm by ID
alarm = client.alarms.get(id=42)
# Filter by severity, state, or any field
critical = client.alarms.filter(severity=Severity.CRITICAL)
outstanding = client.alarms.filter(state=AlarmState.OUTSTANDING)
# Comments
alarm.add_comment("Investigating this issue")
for comment in alarm.comments:
print(f"{comment.user_name}: {comment.text}")
comment.update("Updated note")
comment.delete()
# Alarm actions (called directly on the alarm object)
alarm.acknowledge()
alarm.acknowledge(sticky=True, timeout=3600)
alarm.resolve()
alarm.terminate()
alarm.delete()Users and groups are accessed through the client.users collection. User
objects support dirty tracking -- modify fields directly and call update()
to send only the changed fields to the server.
from netxms import NXClient
with NXClient.connect("server", "admin", "password") as client:
# Iterate all users and groups (auto-syncs on first access)
for user in client.users:
print(f"{user.name} (id={user.id})")
# Look up by name or ID
admin = client.users.get(name="admin")
user = client.users.get(id=1)
# Create users and groups
user = client.users.create("operator")
group = client.users.create_group("operators")
# Modify and update (dirty tracking sends only changed fields)
user.description = "API operator account"
user.full_name = "API Operator"
user.update()
# Group membership
group.members = (user.id,)
group.update()
# Password management (action methods on User)
user.set_password("new_secret")
is_valid = user.validate_password("check_this")
# Detach from LDAP
user.detach_ldap()
# Current user attributes (session-level, not on User model)
client.set_current_user_attr("theme", "dark")
value = client.get_current_user_attr("theme")
# Delete
client.users.delete(user)
client.users.delete(group)
# Explicit sync
client.users.sync()Infrastructure objects (nodes, interfaces, containers, etc.) are accessed
through the client.objects collection. Objects are read-only (no dirty
tracking). The collection auto-syncs from the server on first access.
from netxms import NXClient
from netxms.nxcp.constants import ObjectClass, ObjectStatus
from netxms.objects import Node
with NXClient.connect("server", "admin", "password") as client:
# Iterate all objects (auto-syncs on first access)
for obj in client.objects:
print(f"{obj.name} ({obj.object_class.name}) status={obj.status.name}")
# Look up by name or ID
node = client.objects.get(name="my-server")
obj = client.objects.get(id=42)
# Index by object ID (fetch-on-miss)
node = client.objects[42]
# Filter by any field
nodes = client.objects.filter(object_class=ObjectClass.NODE)
problem_nodes = [n for n in nodes if n.status != ObjectStatus.NORMAL]
# Explicit sync (with optional node component sync)
client.objects.sync(sync_node_components=True)
# Check sync status
print(client.objects_synchronized)Object types include Node, Interface, Subnet, Container, Zone, Template,
Cluster, Dashboard, NetworkMap, Rack, Sensor, AccessPoint, MobileDevice,
and many more -- all available from netxms.objects.
Models and objects returned by the client have lazy-resolving properties that automatically look up referenced entities. If the referenced entity is not in the local cache, it is fetched from the server on demand.
with NXClient.connect("server", "admin", "password") as client:
client.objects.sync()
for alarm in client.alarms:
# alarm.source resolves alarm.object_id to an AbstractObject
if alarm.source:
print(f"{alarm.message} on {alarm.source.name}")
# alarm.acknowledged_user resolves alarm.acknowledged_by to a User
if alarm.acknowledged_user:
print(f" Acknowledged by {alarm.acknowledged_user.name}")
# Object parent/child navigation
node = client.objects.get(id=42)
for parent in node.parent_objects:
print(f"Parent: {parent.name}")
for child in node.child_objects:
print(f"Child: {child.name}")Available lazy properties:
Alarm:source,acknowledged_user,resolved_user,terminated_userAlarmComment:authorBulkAlarmStateChangeData:userAbstractObject(and all subclasses):parent_objects,child_objects
All library errors inherit from NXError:
from netxms.exceptions import NXError, NXAuthError, NXConnectionError, NXRequestError
try:
with NXClient.connect("server", "admin", "wrong_password") as client:
pass
except NXAuthError as e:
print(f"Authentication failed: {e} (RCC={e.rcc})")
except NXConnectionError as e:
print(f"Connection failed: {e}")
except NXRequestError as e:
print(f"Request failed: {e} (RCC={e.rcc}, {e.rcc_name})")Exception types: NXConnectionError, NXAuthError, NXRequestError, NXProtocolError, NXTimeoutError.
Subscribe to notification channels and register listeners to receive asynchronous server notifications:
from netxms.nxcp.constants import NXCChannel
with NXClient.connect("server", "admin", "password") as client:
client.subscribe(NXCChannel.ALARMS)
def on_notification(notification_type, data):
print(f"{notification_type}: {data}")
client.add_listener(on_notification)
# Listener fires on a background thread when the server pushes notifications.
# Notification types:
# "alarm_changed" -- data is an Alarm instance
# "user_changed" -- data is a dict with keys: "type" (UserDBUpdateType),
# "user_id" (int), "object" (User or UserGroup, None for deletes)
# "notify" -- data is a dict with keys: "code" (int), "data" (int)
# "bulk_alarm_state_change" -- data is a BulkAlarmStateChangeData instance
# with alarm_ids, user_id, notification_code, change_time
# "object_changed" -- data is an object instance (Node, Interface, etc.)
# "object_deleted" -- data is the object ID (int)
#
# SESSION_KILLED and SERVER_SHUTDOWN notifications trigger automatic disconnect
# and are NOT dispatched to listeners.
client.unsubscribe(NXCChannel.ALARMS)
client.remove_listener(on_notification)Available channels: ALARMS, OBJECTS, EVENTS, AGENT_TUNNELS, AUDIT_LOG,
DC_THRESHOLDS, GEO_AREAS, PACKAGE_JOBS, SNMP_TRAPS, SYSLOG, USERDB.
Subscriptions are reference-counted and automatically cleared on disconnect().
uv sync --dev
pytest
pytest -m integration # run integration tests against a live NetXMS server