From 174b1623085b49bb7068b4c6b6d19cc0149bbe89 Mon Sep 17 00:00:00 2001 From: isra17 Date: Mon, 29 May 2017 14:06:24 -0400 Subject: [PATCH 01/10] Set partition as busy when a new batch is sent to it. This prevent a spider from being overwhelmed by requests because it takes too long to process a batch. The DB Worker will be able to send a new batch once a crawler requests more requests and send the offset message to update its state to the DB Worker. --- frontera/contrib/messagebus/kafkabus.py | 3 +++ frontera/contrib/messagebus/zeromq/__init__.py | 9 ++++++--- frontera/core/messagebus.py | 7 +++++++ frontera/worker/db.py | 9 ++++++++- tests/mocks/message_bus.py | 7 +++++-- 5 files changed, 29 insertions(+), 6 deletions(-) diff --git a/frontera/contrib/messagebus/kafkabus.py b/frontera/contrib/messagebus/kafkabus.py index 490262891..bd86583b5 100644 --- a/frontera/contrib/messagebus/kafkabus.py +++ b/frontera/contrib/messagebus/kafkabus.py @@ -131,6 +131,9 @@ def flush(self): def get_offset(self, partition_id): pass + def partition(self, key): + return self._partitioner(key) + class SpiderLogStream(BaseSpiderLogStream): def __init__(self, messagebus): diff --git a/frontera/contrib/messagebus/zeromq/__init__.py b/frontera/contrib/messagebus/zeromq/__init__.py index 3f99fc973..27dfb8c34 100644 --- a/frontera/contrib/messagebus/zeromq/__init__.py +++ b/frontera/contrib/messagebus/zeromq/__init__.py @@ -8,7 +8,7 @@ import six from frontera.core.messagebus import BaseMessageBus, BaseSpiderLogStream, BaseStreamConsumer, \ - BaseSpiderFeedStream, BaseScoringLogStream + BaseSpiderFeedStream, BaseScoringLogStream, BaseStreamProducer from frontera.contrib.backends.partitioners import FingerprintPartitioner, Crc32NamePartitioner from frontera.contrib.messagebus.zeromq.socket_config import SocketConfig from six.moves import range @@ -61,7 +61,7 @@ def get_offset(self, partition_id): return self.counter -class Producer(object): +class Producer(BaseStreamProducer): def __init__(self, context, location, identity): self.identity = identity self.sender = context.zeromq.socket(zmq.PUB) @@ -80,7 +80,7 @@ def send(self, key, *messages): # Raise TypeError if any message is not encoded as bytes if any(not isinstance(m, six.binary_type) for m in messages): raise TypeError("all produce message payloads must be type bytes") - partition = self.partitioner.partition(key) + partition = self.partition(key) counter = self.counters.get(partition, 0) for msg in messages: self.sender.send_multipart([self.identity + pack(">B", partition), msg, @@ -100,6 +100,9 @@ def flush(self): def get_offset(self, partition_id): return self.counters.get(partition_id, None) + def partition(self, key): + return self.partitioner.partition(key) + class SpiderLogProducer(Producer): def __init__(self, context, location, partitions): diff --git a/frontera/core/messagebus.py b/frontera/core/messagebus.py index 3782f6c00..7886c280f 100644 --- a/frontera/core/messagebus.py +++ b/frontera/core/messagebus.py @@ -67,6 +67,13 @@ def get_offset(self, partition_id): """ raise NotImplementedError + def partition(self, key): + """ + Returns partition id for key. + :param key: str key used for partitioning, None for non-keyed channels + """ + raise NotImplementedError + def close(self): """ Performs all necessary cleanup and closes the producer. diff --git a/frontera/worker/db.py b/frontera/worker/db.py index 6f9abad85..bda24ad8e 100644 --- a/frontera/worker/db.py +++ b/frontera/worker/db.py @@ -174,6 +174,7 @@ def consume_incoming(self, *args, **kwargs): continue else: lag = producer_offset - offset + logger.info('Spider lag: {} - {} = {}'.format(producer_offset, offset, lag)) if lag < 0: # non-sense in general, happens when SW is restarted and not synced yet with Spiders. continue @@ -256,6 +257,7 @@ def get_fingerprint(request): else: raise Exception("Unexpected value in self.spider_feed_partitioning") + busy_partitions = set() for request in self._backend.get_next_requests(self.max_next_requests, partitions=partitions): try: request.meta[b'jid'] = self.job_id @@ -267,7 +269,12 @@ def get_fingerprint(request): continue finally: count += 1 - self.spider_feed_producer.send(get_key(request), eo) + key = get_key(request) + self.spider_feed_producer.send(key, eo) + busy_partitions.add(self.spider_feed_producer.partition(key)) + + for partition_id in busy_partitions: + self.spider_feed.mark_busy(partition_id) self.stats['pushed_since_start'] += count self.stats['last_batch_size'] = count diff --git a/tests/mocks/message_bus.py b/tests/mocks/message_bus.py index f8b6f582b..0b37ec2b1 100644 --- a/tests/mocks/message_bus.py +++ b/tests/mocks/message_bus.py @@ -1,5 +1,5 @@ from frontera.core.messagebus import BaseMessageBus, BaseSpiderLogStream, BaseStreamConsumer, \ - BaseScoringLogStream, BaseSpiderFeedStream + BaseScoringLogStream, BaseSpiderFeedStream, BaseStreamProducer class Consumer(BaseStreamConsumer): @@ -27,7 +27,7 @@ def get_offset(self, partition_id): return self.offset -class Producer(object): +class Producer(BaseStreamProducer): def __init__(self): self.messages = [] @@ -42,6 +42,9 @@ def flush(self): def get_offset(self, partition_id): return self.offset + def partition(self, key): + return 0 + class ScoringLogStream(BaseScoringLogStream): From 2f7e3cd4c8ccb0b9ec3e2a4fdd01c0125fc1695f Mon Sep 17 00:00:00 2001 From: isra17 Date: Mon, 29 May 2017 14:25:56 -0400 Subject: [PATCH 02/10] Add unittest for busy partition on new_batch --- tests/test_worker_db.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/test_worker_db.py b/tests/test_worker_db.py index 05b91d0c2..8edddb17f 100644 --- a/tests/test_worker_db.py +++ b/tests/test_worker_db.py @@ -78,12 +78,32 @@ def test_offset(self): dbw.spider_feed_producer.offset = 100 dbw.consume_incoming() assert 2 in dbw.spider_feed.available_partitions() + msg1 = dbw._encoder.encode_offset(2, 20) msg2 = dbw._encoder.encode_offset(3, 0) dbw.spider_log_consumer.put_messages([msg1, msg2]) dbw.consume_incoming() assert 3 in dbw.spider_feed.available_partitions() assert 2 not in dbw.spider_feed.available_partitions() + dbw._backend.queue.put_requests([r1, r2, r3]) assert dbw.new_batch() == 3 assert 3 in dbw._backend.partitions + + def test_busy_on_new_batch(self): + dbw = self.dbw_setup(True) + msg1 = dbw._encoder.encode_offset(0, 64) + msg2 = dbw._encoder.encode_offset(1, 64) + dbw.spider_log_consumer.put_messages([msg1, msg2]) + dbw.spider_feed_producer.offset = 64 + dbw.consume_incoming() + + assert 0 in dbw.spider_feed.available_partitions() + assert 1 in dbw.spider_feed.available_partitions() + # Send 1 request to partition 1, marking it as busy. + # Partition 2 received no request, so it stays available. + dbw._backend.queue.put_requests([r1]) + assert dbw.new_batch() == 1 + assert 0 in dbw._backend.partitions + assert 0 not in dbw.spider_feed.available_partitions() + assert 1 in dbw.spider_feed.available_partitions() From d4aa8db3ea2bb152e419c58309dab6071877ad26 Mon Sep 17 00:00:00 2001 From: isra17 Date: Mon, 29 May 2017 15:07:44 -0400 Subject: [PATCH 03/10] Remove debug log --- frontera/worker/db.py | 1 - 1 file changed, 1 deletion(-) diff --git a/frontera/worker/db.py b/frontera/worker/db.py index bda24ad8e..cce4fcc03 100644 --- a/frontera/worker/db.py +++ b/frontera/worker/db.py @@ -174,7 +174,6 @@ def consume_incoming(self, *args, **kwargs): continue else: lag = producer_offset - offset - logger.info('Spider lag: {} - {} = {}'.format(producer_offset, offset, lag)) if lag < 0: # non-sense in general, happens when SW is restarted and not synced yet with Spiders. continue From 0b646e41bdd6457b02143431bbfb553af4a4ad69 Mon Sep 17 00:00:00 2001 From: isra17 Date: Tue, 4 Jul 2017 18:06:34 -0400 Subject: [PATCH 04/10] Track lag by regularly checking consumer and producer lag --- .travis.yml | 1 + .../contrib/messagebus/zeromq/__init__.py | 25 +++++++++++++------ frontera/core/messagebus.py | 13 +++------- frontera/worker/db.py | 21 ++-------------- tests/mocks/message_bus.py | 25 +++++++++++-------- tests/test_worker_db.py | 22 ++++++++-------- 6 files changed, 51 insertions(+), 56 deletions(-) diff --git a/.travis.yml b/.travis.yml index a2f138639..b87257bcc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,6 +3,7 @@ python: 2.7 branches: only: - master + - busy-partitions - /^\d\.\d+$/ - /^\d\.\d+\.\d+(rc\d+|dev\d+)?$/ diff --git a/frontera/contrib/messagebus/zeromq/__init__.py b/frontera/contrib/messagebus/zeromq/__init__.py index 27dfb8c34..46822cc5d 100644 --- a/frontera/contrib/messagebus/zeromq/__init__.py +++ b/frontera/contrib/messagebus/zeromq/__init__.py @@ -179,22 +179,32 @@ def __init__(self, messagebus): self.consumer_hwm = messagebus.spider_feed_rcvhwm self.producer_hwm = messagebus.spider_feed_sndhwm self.hostname_partitioning = messagebus.hostname_partitioning + self.max_next_requests = messagebus.max_next_requests + self._producer = None def consumer(self, partition_id): return Consumer(self.context, self.out_location, partition_id, b'sf', seq_warnings=True, hwm=self.consumer_hwm) def producer(self): - return SpiderFeedProducer(self.context, self.in_location, self.partitions, - self.producer_hwm, self.hostname_partitioning) + if not self._producer: + self._producer = SpiderFeedProducer( + self.context, self.in_location, self.partitions, + self.producer_hwm, self.hostname_partitioning) + return self._producer def available_partitions(self): - return self.ready_partitions + if not self._producer: + return [] - def mark_ready(self, partition_id): - self.ready_partitions.add(partition_id) + partitions = [] + for partition_id, last_offset in self.partitions_offset.items(): + lag = self._producer.get_offset(partition_id) - last_offset + if lag < self.max_next_requests: + partitions.append(partition_id) + return partitions - def mark_busy(self, partition_id): - self.ready_partitions.discard(partition_id) + def set_spider_offset(self, partition_id, offset): + self.partitions_offset[partition_id] = offset class Context(object): @@ -213,6 +223,7 @@ def __init__(self, settings): self.spider_feed_sndhwm = int(settings.get('MAX_NEXT_REQUESTS') * len(self.spider_feed_partitions) * 1.2) self.spider_feed_rcvhwm = int(settings.get('MAX_NEXT_REQUESTS') * 2.0) self.hostname_partitioning = settings.get('QUEUE_HOSTNAME_PARTITIONING') + self.max_next_requests = settings.get('MAX_NEXT_REQUESTS') if self.socket_config.is_ipv6: self.context.zeromq.setsockopt(zmq.IPV6, True) diff --git a/frontera/core/messagebus.py b/frontera/core/messagebus.py index 7886c280f..3c78fee49 100644 --- a/frontera/core/messagebus.py +++ b/frontera/core/messagebus.py @@ -165,18 +165,11 @@ def available_partitions(self): """ raise NotImplementedError - def mark_ready(self, partition_id): + def set_spider_offset(self, partition_id, offset): """ - Marks partition as ready/available for receiving new batches. - :param partition_id: int - :return: nothing - """ - pass - - def mark_busy(self, partition_id): - """ - Marks partition as busy, so that spider assigned to this partition is busy processing previous batches. + Set a partition's message sent offset. :param partition_id: int + :param offset: int :return: nothing """ pass diff --git a/frontera/worker/db.py b/frontera/worker/db.py index cce4fcc03..960d35040 100644 --- a/frontera/worker/db.py +++ b/frontera/worker/db.py @@ -169,18 +169,7 @@ def consume_incoming(self, *args, **kwargs): continue if type == 'offset': _, partition_id, offset = msg - producer_offset = self.spider_feed_producer.get_offset(partition_id) - if producer_offset is None: - continue - else: - lag = producer_offset - offset - if lag < 0: - # non-sense in general, happens when SW is restarted and not synced yet with Spiders. - continue - if lag < self.max_next_requests or offset == 0: - self.spider_feed.mark_ready(partition_id) - else: - self.spider_feed.mark_busy(partition_id) + self.spider_feed.set_spider_offset(partition_id, offset) continue logger.debug('Unknown message type %s', type) except Exception as exc: @@ -256,7 +245,6 @@ def get_fingerprint(request): else: raise Exception("Unexpected value in self.spider_feed_partitioning") - busy_partitions = set() for request in self._backend.get_next_requests(self.max_next_requests, partitions=partitions): try: request.meta[b'jid'] = self.job_id @@ -268,12 +256,7 @@ def get_fingerprint(request): continue finally: count += 1 - key = get_key(request) - self.spider_feed_producer.send(key, eo) - busy_partitions.add(self.spider_feed_producer.partition(key)) - - for partition_id in busy_partitions: - self.spider_feed.mark_busy(partition_id) + self.spider_feed_producer.send(get_key(request), eo) self.stats['pushed_since_start'] += count self.stats['last_batch_size'] = count diff --git a/tests/mocks/message_bus.py b/tests/mocks/message_bus.py index 0b37ec2b1..953e10a4c 100644 --- a/tests/mocks/message_bus.py +++ b/tests/mocks/message_bus.py @@ -73,23 +73,28 @@ def consumer(self, partition_id, type): class SpiderFeedStream(BaseSpiderFeedStream): def __init__(self, messagebus): - self.ready_partitions = set(messagebus.spider_feed_partitions) + self._producer = Producer() + self.max_next_requests = messagebus.max_next_requests + self.partitions_offset = {} + for partition_id in messagebus.spider_feed_partitions: + self.partitions_offset[partition_id] = 0 def producer(self): - return Producer() + return self._producer def consumer(self, partition_id): return Consumer() def available_partitions(self): - return self.ready_partitions - - def mark_ready(self, partition_id): - self.ready_partitions.add(partition_id) - - def mark_busy(self, partition_id): - self.ready_partitions.discard(partition_id) - + partitions = [] + for partition_id, last_offset in self.partitions_offset.items(): + lag = self._producer.get_offset(partition_id) - last_offset + if lag < self.max_next_requests or last_offset == 0: + partitions.append(partition_id) + return partitions + + def set_spider_offset(self, partition_id, offset): + self.partitions_offset[partition_id] = offset class FakeMessageBus(BaseMessageBus): diff --git a/tests/test_worker_db.py b/tests/test_worker_db.py index 8edddb17f..3e114d009 100644 --- a/tests/test_worker_db.py +++ b/tests/test_worker_db.py @@ -11,8 +11,8 @@ class TestDBWorker(object): - def dbw_setup(self, distributed=False): - settings = Settings() + def dbw_setup(self, distributed=False, settings=None): + settings = settings or Settings() settings.MAX_NEXT_REQUESTS = 64 settings.MESSAGE_BUS = 'tests.mocks.message_bus.FakeMessageBus' if distributed: @@ -90,20 +90,22 @@ def test_offset(self): assert dbw.new_batch() == 3 assert 3 in dbw._backend.partitions - def test_busy_on_new_batch(self): + def test_partition_available(self): dbw = self.dbw_setup(True) msg1 = dbw._encoder.encode_offset(0, 64) - msg2 = dbw._encoder.encode_offset(1, 64) + msg2 = dbw._encoder.encode_offset(1, 0) dbw.spider_log_consumer.put_messages([msg1, msg2]) dbw.spider_feed_producer.offset = 64 dbw.consume_incoming() assert 0 in dbw.spider_feed.available_partitions() + assert 1 not in dbw.spider_feed.available_partitions() + + msg3 = dbw._encoder.encode_offset(1, 1) + dbw.spider_log_consumer.put_messages([msg3]) + dbw.consume_incoming() assert 1 in dbw.spider_feed.available_partitions() - # Send 1 request to partition 1, marking it as busy. - # Partition 2 received no request, so it stays available. - dbw._backend.queue.put_requests([r1]) - assert dbw.new_batch() == 1 - assert 0 in dbw._backend.partitions + + dbw.spider_feed_producer.offset = 128 assert 0 not in dbw.spider_feed.available_partitions() - assert 1 in dbw.spider_feed.available_partitions() + assert 1 not in dbw.spider_feed.available_partitions() From dacd2b88532ccbf2e1c6359d9f8bab68d3934800 Mon Sep 17 00:00:00 2001 From: isra17 Date: Thu, 6 Jul 2017 12:48:11 -0400 Subject: [PATCH 05/10] Fix partition test --- tests/test_worker_db.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/tests/test_worker_db.py b/tests/test_worker_db.py index 3e114d009..626a162cd 100644 --- a/tests/test_worker_db.py +++ b/tests/test_worker_db.py @@ -92,20 +92,25 @@ def test_offset(self): def test_partition_available(self): dbw = self.dbw_setup(True) - msg1 = dbw._encoder.encode_offset(0, 64) + msg1 = dbw._encoder.encode_offset(0, 128) msg2 = dbw._encoder.encode_offset(1, 0) dbw.spider_log_consumer.put_messages([msg1, msg2]) - dbw.spider_feed_producer.offset = 64 + dbw.spider_feed_producer.offset = 128 dbw.consume_incoming() assert 0 in dbw.spider_feed.available_partitions() + assert 1 in dbw.spider_feed.available_partitions() + + msg3 = dbw._encoder.encode_offset(1, 1) + dbw.spider_log_consumer.put_messages([msg3]) + dbw.consume_incoming() assert 1 not in dbw.spider_feed.available_partitions() msg3 = dbw._encoder.encode_offset(1, 1) dbw.spider_log_consumer.put_messages([msg3]) dbw.consume_incoming() - assert 1 in dbw.spider_feed.available_partitions() + assert 1 not in dbw.spider_feed.available_partitions() - dbw.spider_feed_producer.offset = 128 + dbw.spider_feed_producer.offset = 256 assert 0 not in dbw.spider_feed.available_partitions() assert 1 not in dbw.spider_feed.available_partitions() From f7d2cd0db6665d0b52541e0570eba0048d11276b Mon Sep 17 00:00:00 2001 From: isra17 Date: Thu, 6 Jul 2017 13:01:18 -0400 Subject: [PATCH 06/10] revert travis --- .travis.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index b87257bcc..a2f138639 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,7 +3,6 @@ python: 2.7 branches: only: - master - - busy-partitions - /^\d\.\d+$/ - /^\d\.\d+\.\d+(rc\d+|dev\d+)?$/ From 64b61de39985cb6b358d998326a4e54f60878f08 Mon Sep 17 00:00:00 2001 From: isra17 Date: Thu, 6 Jul 2017 13:11:38 -0400 Subject: [PATCH 07/10] Review code --- frontera/contrib/messagebus/kafkabus.py | 3 --- frontera/contrib/messagebus/zeromq/__init__.py | 7 ++----- frontera/core/messagebus.py | 11 +++-------- tests/mocks/message_bus.py | 3 --- tests/test_worker_db.py | 6 ++---- 5 files changed, 7 insertions(+), 23 deletions(-) diff --git a/frontera/contrib/messagebus/kafkabus.py b/frontera/contrib/messagebus/kafkabus.py index bd86583b5..490262891 100644 --- a/frontera/contrib/messagebus/kafkabus.py +++ b/frontera/contrib/messagebus/kafkabus.py @@ -131,9 +131,6 @@ def flush(self): def get_offset(self, partition_id): pass - def partition(self, key): - return self._partitioner(key) - class SpiderLogStream(BaseSpiderLogStream): def __init__(self, messagebus): diff --git a/frontera/contrib/messagebus/zeromq/__init__.py b/frontera/contrib/messagebus/zeromq/__init__.py index 46822cc5d..306e48fc7 100644 --- a/frontera/contrib/messagebus/zeromq/__init__.py +++ b/frontera/contrib/messagebus/zeromq/__init__.py @@ -80,7 +80,7 @@ def send(self, key, *messages): # Raise TypeError if any message is not encoded as bytes if any(not isinstance(m, six.binary_type) for m in messages): raise TypeError("all produce message payloads must be type bytes") - partition = self.partition(key) + partition = self.partitioner.partition(key) counter = self.counters.get(partition, 0) for msg in messages: self.sender.send_multipart([self.identity + pack(">B", partition), msg, @@ -100,9 +100,6 @@ def flush(self): def get_offset(self, partition_id): return self.counters.get(partition_id, None) - def partition(self, key): - return self.partitioner.partition(key) - class SpiderLogProducer(Producer): def __init__(self, context, location, partitions): @@ -223,7 +220,7 @@ def __init__(self, settings): self.spider_feed_sndhwm = int(settings.get('MAX_NEXT_REQUESTS') * len(self.spider_feed_partitions) * 1.2) self.spider_feed_rcvhwm = int(settings.get('MAX_NEXT_REQUESTS') * 2.0) self.hostname_partitioning = settings.get('QUEUE_HOSTNAME_PARTITIONING') - self.max_next_requests = settings.get('MAX_NEXT_REQUESTS') + self.max_next_requests = int(settings.get('MAX_NEXT_REQUESTS')) if self.socket_config.is_ipv6: self.context.zeromq.setsockopt(zmq.IPV6, True) diff --git a/frontera/core/messagebus.py b/frontera/core/messagebus.py index 3c78fee49..53d98c2a5 100644 --- a/frontera/core/messagebus.py +++ b/frontera/core/messagebus.py @@ -67,13 +67,6 @@ def get_offset(self, partition_id): """ raise NotImplementedError - def partition(self, key): - """ - Returns partition id for key. - :param key: str key used for partitioning, None for non-keyed channels - """ - raise NotImplementedError - def close(self): """ Performs all necessary cleanup and closes the producer. @@ -167,7 +160,9 @@ def available_partitions(self): def set_spider_offset(self, partition_id, offset): """ - Set a partition's message sent offset. + Set the message processed offset for a given partition. Used to + calculate the lag between the message sent and message processed + to prevent overflowing the queue of an unresponsive partition. :param partition_id: int :param offset: int :return: nothing diff --git a/tests/mocks/message_bus.py b/tests/mocks/message_bus.py index 953e10a4c..528c7ca72 100644 --- a/tests/mocks/message_bus.py +++ b/tests/mocks/message_bus.py @@ -42,9 +42,6 @@ def flush(self): def get_offset(self, partition_id): return self.offset - def partition(self, key): - return 0 - class ScoringLogStream(BaseScoringLogStream): diff --git a/tests/test_worker_db.py b/tests/test_worker_db.py index 626a162cd..a4f93d1b8 100644 --- a/tests/test_worker_db.py +++ b/tests/test_worker_db.py @@ -11,8 +11,8 @@ class TestDBWorker(object): - def dbw_setup(self, distributed=False, settings=None): - settings = settings or Settings() + def dbw_setup(self, distributed=False): + settings = Settings() settings.MAX_NEXT_REQUESTS = 64 settings.MESSAGE_BUS = 'tests.mocks.message_bus.FakeMessageBus' if distributed: @@ -78,14 +78,12 @@ def test_offset(self): dbw.spider_feed_producer.offset = 100 dbw.consume_incoming() assert 2 in dbw.spider_feed.available_partitions() - msg1 = dbw._encoder.encode_offset(2, 20) msg2 = dbw._encoder.encode_offset(3, 0) dbw.spider_log_consumer.put_messages([msg1, msg2]) dbw.consume_incoming() assert 3 in dbw.spider_feed.available_partitions() assert 2 not in dbw.spider_feed.available_partitions() - dbw._backend.queue.put_requests([r1, r2, r3]) assert dbw.new_batch() == 3 assert 3 in dbw._backend.partitions From 58ff5f4f0f459437cbd8a15f2009e9b3d021fad3 Mon Sep 17 00:00:00 2001 From: isra17 Date: Tue, 11 Jul 2017 10:06:56 -0400 Subject: [PATCH 08/10] Init partitions_offset dict --- frontera/contrib/messagebus/zeromq/__init__.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/frontera/contrib/messagebus/zeromq/__init__.py b/frontera/contrib/messagebus/zeromq/__init__.py index 306e48fc7..a6e6ff71d 100644 --- a/frontera/contrib/messagebus/zeromq/__init__.py +++ b/frontera/contrib/messagebus/zeromq/__init__.py @@ -172,7 +172,9 @@ def __init__(self, messagebus): self.in_location = messagebus.socket_config.db_out() self.out_location = messagebus.socket_config.spiders_in() self.partitions = messagebus.spider_feed_partitions - self.ready_partitions = set(self.partitions) + self.partitions_offset = {} + for partition_id in self.partitions: + self.partitions_offset[partition_id] = 0 self.consumer_hwm = messagebus.spider_feed_rcvhwm self.producer_hwm = messagebus.spider_feed_sndhwm self.hostname_partitioning = messagebus.hostname_partitioning From d2ef60e3a046d9d4ea6e6e966c5b7f46f52e3376 Mon Sep 17 00:00:00 2001 From: isra17 Date: Tue, 11 Jul 2017 10:54:05 -0400 Subject: [PATCH 09/10] Handle new partition --- frontera/contrib/messagebus/zeromq/__init__.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/frontera/contrib/messagebus/zeromq/__init__.py b/frontera/contrib/messagebus/zeromq/__init__.py index a6e6ff71d..1f407768b 100644 --- a/frontera/contrib/messagebus/zeromq/__init__.py +++ b/frontera/contrib/messagebus/zeromq/__init__.py @@ -197,7 +197,10 @@ def available_partitions(self): partitions = [] for partition_id, last_offset in self.partitions_offset.items(): - lag = self._producer.get_offset(partition_id) - last_offset + producer_offset = self._producer.get_offset(partition_id) + if producer_offset is None: + producer_offset = 0 + lag = producer_offset - last_offset if lag < self.max_next_requests: partitions.append(partition_id) return partitions From a848c739e870d424c753b3942f0870d5c5d071fe Mon Sep 17 00:00:00 2001 From: isra17 Date: Fri, 21 Jul 2017 14:14:41 -0400 Subject: [PATCH 10/10] Fix bug where no spider is running --- frontera/contrib/messagebus/zeromq/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frontera/contrib/messagebus/zeromq/__init__.py b/frontera/contrib/messagebus/zeromq/__init__.py index 1f407768b..6e28f4c5e 100644 --- a/frontera/contrib/messagebus/zeromq/__init__.py +++ b/frontera/contrib/messagebus/zeromq/__init__.py @@ -201,7 +201,7 @@ def available_partitions(self): if producer_offset is None: producer_offset = 0 lag = producer_offset - last_offset - if lag < self.max_next_requests: + if lag < self.max_next_requests or not producer_offset: partitions.append(partition_id) return partitions