Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions pubnub/endpoints/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,6 @@ def callback(params_to_merge):
custom_params['pnsdk'] = self.pubnub.sdk_name
custom_params['uuid'] = self.pubnub.uuid

for query_key, query_value in self.pubnub._telemetry_manager.operation_latencies().items():
custom_params[query_key] = query_value

if self.is_auth_required():
if self.pubnub._get_token():
custom_params["auth"] = self.pubnub._get_token()
Expand Down
1 change: 0 additions & 1 deletion pubnub/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ class PNOperationType(object):
PNRemoveSpaceUsersOperation = 82
PNFetchUserMembershipsOperation = 85
PNFetchSpaceMembershipsOperation = 86
# NOTE: remember to update PubNub.managers.TelemetryManager.endpoint_name_for_operation() when adding operations


class PNHeartbeatNotificationOptions(object):
Expand Down
185 changes: 9 additions & 176 deletions pubnub/managers.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
import logging
from abc import abstractmethod, ABCMeta

import time
import copy
import base64
import random

from cbor2 import loads

from . import utils
from .enums import PNStatusCategory, PNReconnectionPolicy, PNOperationType
from .models.consumer.common import PNStatus
from .models.server.subscribe import SubscribeEnvelope
from .dtos import SubscribeOperation, UnsubscribeOperation
from .callbacks import SubscribeCallback, ReconnectionCallback
from .models.subscription_item import SubscriptionItem
from .errors import PNERR_INVALID_ACCESS_TOKEN
from .exceptions import PubNubException
from pubnub import utils
from pubnub.enums import PNStatusCategory, PNReconnectionPolicy
from pubnub.models.consumer.common import PNStatus
from pubnub.models.server.subscribe import SubscribeEnvelope
from pubnub.dtos import SubscribeOperation, UnsubscribeOperation
from pubnub.callbacks import SubscribeCallback, ReconnectionCallback
from pubnub.models.subscription_item import SubscriptionItem
from pubnub.errors import PNERR_INVALID_ACCESS_TOKEN
from pubnub.exceptions import PubNubException

logger = logging.getLogger("pubnub")

Expand Down Expand Up @@ -398,171 +396,6 @@ def get_custom_params(self):
return {}


class TelemetryManager:
TIMESTAMP_DIVIDER = 1000
MAXIMUM_LATENCY_DATA_AGE = 60
CLEAN_UP_INTERVAL = 1
CLEAN_UP_INTERVAL_MULTIPLIER = 1000

def __init__(self):
self.latencies = {}

@abstractmethod
def _start_clean_up_timer(self):
pass

@abstractmethod
def _stop_clean_up_timer(self):
pass

def operation_latencies(self):
operation_latencies = {}

for endpoint_name, endpoint_latencies in self.latencies.items():
latency_key = 'l_' + endpoint_name

endpoint_average_latency = self.average_latency_from_data(endpoint_latencies)

if endpoint_average_latency > 0:
operation_latencies[latency_key] = endpoint_average_latency

return operation_latencies

def clean_up_telemetry_data(self):
current_timestamp = time.time()
copy_latencies = copy.deepcopy(self.latencies)

for endpoint_name, endpoint_latencies in copy_latencies.items():
for latency_information in endpoint_latencies:
if current_timestamp - latency_information["timestamp"] > self.MAXIMUM_LATENCY_DATA_AGE:
self.latencies[endpoint_name].remove(latency_information)

if len(self.latencies[endpoint_name]) == 0:
del self.latencies[endpoint_name]

def store_latency(self, latency, operation_type):
if operation_type != PNOperationType.PNSubscribeOperation and latency > 0:
endpoint_name = self.endpoint_name_for_operation(operation_type)

store_timestamp = time.time()

if endpoint_name not in self.latencies:
self.latencies[endpoint_name] = []

latency_entry = {
"timestamp": store_timestamp,
"latency": latency,
}

self.latencies[endpoint_name].append(latency_entry)

@staticmethod
def average_latency_from_data(endpoint_latencies):
total_latency = 0

for latency_data in endpoint_latencies:
total_latency += latency_data['latency']

return total_latency / len(endpoint_latencies)

@staticmethod
def endpoint_name_for_operation(operation_type):
endpoint = {
PNOperationType.PNPublishOperation: 'pub',
PNOperationType.PNFireOperation: 'pub',
PNOperationType.PNSendFileNotification: "pub",

PNOperationType.PNHistoryOperation: 'hist',
PNOperationType.PNHistoryDeleteOperation: 'hist',
PNOperationType.PNMessageCountOperation: 'mc',

PNOperationType.PNUnsubscribeOperation: 'pres',
PNOperationType.PNWhereNowOperation: 'pres',
PNOperationType.PNHereNowOperation: 'pres',
PNOperationType.PNGetState: 'pres',
PNOperationType.PNSetStateOperation: 'pres',
PNOperationType.PNHeartbeatOperation: 'pres',

PNOperationType.PNAddChannelsToGroupOperation: 'cg',
PNOperationType.PNRemoveChannelsFromGroupOperation: 'cg',
PNOperationType.PNChannelGroupsOperation: 'cg',
PNOperationType.PNChannelsForGroupOperation: 'cg',
PNOperationType.PNRemoveGroupOperation: 'cg',

PNOperationType.PNAddPushNotificationsOnChannelsOperation: 'push',
PNOperationType.PNPushNotificationEnabledChannelsOperation: 'push',
PNOperationType.PNRemoveAllPushNotificationsOperation: 'push',
PNOperationType.PNRemovePushNotificationsFromChannelsOperation: 'push',

PNOperationType.PNAccessManagerAudit: 'pam',
PNOperationType.PNAccessManagerGrant: 'pam',
PNOperationType.PNAccessManagerRevoke: 'pam',
PNOperationType.PNTimeOperation: 'pam',

PNOperationType.PNAccessManagerGrantToken: 'pamv3',
PNOperationType.PNAccessManagerRevokeToken: 'pamv3',

PNOperationType.PNSignalOperation: 'sig',

PNOperationType.PNSetUuidMetadataOperation: 'obj',
PNOperationType.PNGetUuidMetadataOperation: 'obj',
PNOperationType.PNRemoveUuidMetadataOperation: 'obj',
PNOperationType.PNGetAllUuidMetadataOperation: 'obj',

PNOperationType.PNSetChannelMetadataOperation: 'obj',
PNOperationType.PNGetChannelMetadataOperation: 'obj',
PNOperationType.PNRemoveChannelMetadataOperation: 'obj',
PNOperationType.PNGetAllChannelMetadataOperation: 'obj',

PNOperationType.PNSetChannelMembersOperation: 'obj',
PNOperationType.PNGetChannelMembersOperation: 'obj',
PNOperationType.PNRemoveChannelMembersOperation: 'obj',
PNOperationType.PNManageChannelMembersOperation: 'obj',

PNOperationType.PNSetMembershipsOperation: 'obj',
PNOperationType.PNGetMembershipsOperation: 'obj',
PNOperationType.PNRemoveMembershipsOperation: 'obj',
PNOperationType.PNManageMembershipsOperation: 'obj',

PNOperationType.PNAddMessageAction: 'msga',
PNOperationType.PNGetMessageActions: 'msga',
PNOperationType.PNDeleteMessageAction: 'msga',

PNOperationType.PNGetFilesAction: 'file',
PNOperationType.PNDeleteFileOperation: 'file',
PNOperationType.PNGetFileDownloadURLAction: 'file',
PNOperationType.PNFetchFileUploadS3DataAction: 'file',
PNOperationType.PNDownloadFileAction: 'file',
PNOperationType.PNSendFileAction: 'file',


PNOperationType.PNFetchMessagesOperation: "hist",

PNOperationType.PNCreateSpaceOperation: "obj",
PNOperationType.PNUpdateSpaceOperation: "obj",
PNOperationType.PNFetchSpaceOperation: "obj",
PNOperationType.PNFetchSpacesOperation: "obj",
PNOperationType.PNRemoveSpaceOperation: "obj",

PNOperationType.PNCreateUserOperation: "obj",
PNOperationType.PNUpdateUserOperation: "obj",
PNOperationType.PNFetchUserOperation: "obj",
PNOperationType.PNFetchUsersOperation: "obj",
PNOperationType.PNRemoveUserOperation: "obj",

PNOperationType.PNAddUserSpacesOperation: "obj",
PNOperationType.PNAddSpaceUsersOperation: "obj",
PNOperationType.PNUpdateUserSpacesOperation: "obj",

PNOperationType.PNUpdateSpaceUsersOperation: "obj",
PNOperationType.PNFetchUserMembershipsOperation: "obj",
PNOperationType.PNFetchSpaceMembershipsOperation: "obj",

}[operation_type]

return endpoint


class TokenManager:
def __init__(self):
self.token = None
Expand Down
11 changes: 1 addition & 10 deletions pubnub/pubnub.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
- Message queueing and worker thread management
- Automatic reconnection handling
- Custom request handler support
- Telemetry tracking

Usage Example:
```python
Expand Down Expand Up @@ -71,7 +70,7 @@
from pubnub.endpoints.presence.leave import Leave
from pubnub.endpoints.pubsub.subscribe import Subscribe
from pubnub.enums import PNStatusCategory, PNHeartbeatNotificationOptions, PNOperationType, PNReconnectionPolicy
from pubnub.managers import SubscriptionManager, PublishSequenceManager, ReconnectionManager, TelemetryManager
from pubnub.managers import SubscriptionManager, PublishSequenceManager, ReconnectionManager
from pubnub.models.consumer.common import PNStatus
from pubnub.pnconfiguration import PNConfiguration
from pubnub.pubnub_core import PubNubCore
Expand Down Expand Up @@ -127,8 +126,6 @@ def __init__(self, config: PNConfiguration, *, custom_request_handler: Type[Base

self._publish_sequence_manager = PublishSequenceManager(PubNubCore.MAX_SEQUENCE)

self._telemetry_manager = NativeTelemetryManager()

def sdk_platform(self) -> str:
"""Get the SDK platform identifier.

Expand Down Expand Up @@ -716,9 +713,3 @@ def reset(self):
self.result = None
self.status = None
self.done_event.clear()


class NativeTelemetryManager(TelemetryManager):
def store_latency(self, latency, operation_type):
super(NativeTelemetryManager, self).store_latency(latency, operation_type)
self.clean_up_telemetry_data()
23 changes: 1 addition & 22 deletions pubnub/pubnub_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async def main():
from pubnub.request_handlers.base import BaseRequestHandler
from pubnub.request_handlers.async_httpx import AsyncHttpxRequestHandler
from pubnub.workers import SubscribeMessageWorker
from pubnub.managers import SubscriptionManager, PublishSequenceManager, ReconnectionManager, TelemetryManager
from pubnub.managers import SubscriptionManager, PublishSequenceManager, ReconnectionManager
from pubnub import utils
from pubnub.enums import PNStatusCategory, PNHeartbeatNotificationOptions, PNOperationType, PNReconnectionPolicy
from pubnub.callbacks import SubscribeCallback, ReconnectionCallback
Expand Down Expand Up @@ -153,7 +153,6 @@ def __init__(self, config, custom_event_loop=None, subscription_manager=None, *,
self._subscription_manager = subscription_manager(self)

self._publish_sequence_manager = AsyncioPublishSequenceManager(self.event_loop, PubNubCore.MAX_SEQUENCE)
self._telemetry_manager = AsyncioTelemetryManager()

@property
def _connector(self):
Expand Down Expand Up @@ -835,23 +834,3 @@ async def wait_for_presence_on(self, *channel_names):
continue
finally:
self.presence_queue.task_done()


class AsyncioTelemetryManager(TelemetryManager):
def __init__(self):
TelemetryManager.__init__(self)
self.loop = asyncio.get_event_loop()
self._schedule_next_cleanup()

def _schedule_next_cleanup(self):
self._timer = self.loop.call_later(
self.CLEAN_UP_INTERVAL * self.CLEAN_UP_INTERVAL_MULTIPLIER / 1000,
self._clean_up_schedule_next
)

def _clean_up_schedule_next(self):
self.clean_up_telemetry_data()
self._schedule_next_cleanup()

def _stop_clean_up_timer(self):
self._timer.cancel()
2 changes: 0 additions & 2 deletions pubnub/pubnub_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ def my_listener(message, event):
from pubnub.endpoints.push.remove_channels_from_push import RemoveChannelsFromPush
from pubnub.endpoints.push.remove_device import RemoveDeviceFromPush
from pubnub.endpoints.push.list_push_provisions import ListPushProvisions
from pubnub.managers import TelemetryManager

if TYPE_CHECKING:
from pubnub.endpoints.file_operations.send_file_asyncio import AsyncioSendFile
Expand Down Expand Up @@ -192,7 +191,6 @@ def __init__(self, config: PNConfiguration) -> None:

self._subscription_manager = None
self._publish_sequence_manager = None
self._telemetry_manager = TelemetryManager()
self._base_path_manager = BasePathManager(config)
self._token_manager = TokenManager()
self._subscription_registry = PNSubscriptionRegistry(self)
Expand Down
4 changes: 0 additions & 4 deletions pubnub/request_handlers/async_aiohttp.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import aiohttp
import asyncio
import logging
import time
import json # noqa # pylint: disable=W0611
import urllib

Expand Down Expand Up @@ -98,7 +97,6 @@ async def async_request(self, options_func, cancellation_event):
try:
if not self._session:
await self.create_session()
start_timestamp = time.time()
response = await asyncio.wait_for(
self._session.request(
options.method_string,
Expand Down Expand Up @@ -205,8 +203,6 @@ async def async_request(self, options_func, cancellation_event):
)
)
else:
self.pubnub._telemetry_manager.store_latency(time.time() - start_timestamp, options.operation_type)

return AsyncioEnvelope(
result=create_response(data) if not options.non_json_response else create_response(response, data),
status=create_status(
Expand Down
4 changes: 0 additions & 4 deletions pubnub/request_handlers/async_httpx.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from asyncio import Event
import asyncio
import logging
import time
import httpx
import json # noqa # pylint: disable=W0611
import urllib
Expand Down Expand Up @@ -113,7 +112,6 @@ async def async_request(self, options_func, cancellation_event):
try:
if not self._session:
await self.create_session()
start_timestamp = time.time()
response = await asyncio.wait_for(
self._session.request(**request_arguments),
options.request_timeout
Expand Down Expand Up @@ -215,8 +213,6 @@ async def async_request(self, options_func, cancellation_event):
)
)
else:
self.pubnub._telemetry_manager.store_latency(time.time() - start_timestamp, options.operation_type)

return AsyncioEnvelope(
result=create_response(data) if not options.non_json_response else create_response(response, data),
status=create_status(
Expand Down
2 changes: 1 addition & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pytest-cov>=6.0.0
pycryptodomex>=3.21.0
flake8>=7.1.2
pytest>=8.3.5
pytest-asyncio>=0.24.0,<1.0.0
pytest-asyncio>=1.0.0
httpx>=0.28
h2>=4.1
requests>=2.32.2
Expand Down
4 changes: 3 additions & 1 deletion tests/examples/native_sync/test_examples.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# flake8: noqa
import os
from examples.native_sync.file_handling import main as test_file_handling
from examples.native_sync.message_reactions import main as test_message_reactions

from examples.native_sync.message_reactions import main as test_message_reactions
os.environ['CI'] = '1'
2 changes: 0 additions & 2 deletions tests/functional/push/test_add_channels_to_push.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

from pubnub.endpoints.push.add_channels_to_push import AddChannelsToPush
from tests.helper import pnconf, pnconf_env_copy, sdk_name
from pubnub.managers import TelemetryManager
from pubnub.enums import PNPushType, PNPushEnvironment


Expand All @@ -26,7 +25,6 @@ def setUp(self):
)

self.pubnub.uuid = "UUID_AddChannelsTest"
self.pubnub._telemetry_manager = TelemetryManager()
self.add_channels = AddChannelsToPush(self.pubnub)

def test_push_add_single_channel(self):
Expand Down
Loading
Loading