From d9d0211f8467f2ed837f1c12acee338d9d453163 Mon Sep 17 00:00:00 2001 From: ashwinismath Date: Wed, 12 Dec 2018 11:33:55 +0530 Subject: [PATCH 1/2] Update group_consumer.js --- lib/group_consumer.js | 187 +++++++++++++++++++++++++----------------- 1 file changed, 110 insertions(+), 77 deletions(-) diff --git a/lib/group_consumer.js b/lib/group_consumer.js index 483a085..988d50f 100644 --- a/lib/group_consumer.js +++ b/lib/group_consumer.js @@ -1,11 +1,11 @@ 'use strict'; -var Promise = require('./bluebird-configured'); -var _ = require('lodash'); +var Promise = require('./bluebird-configured'); +var _ = require('lodash'); var BaseConsumer = require('./base_consumer'); -var Kafka = require('./index'); -var util = require('util'); -var errors = require('./errors'); +var Kafka = require('./index'); +var util = require('util'); +var errors = require('./errors'); function GroupConsumer(options) { this.options = _.defaultsDeep(options || {}, { @@ -78,39 +78,60 @@ GroupConsumer.prototype.init = function (strategies) { self.topics = self.topics.concat(s.subscriptions); }); - return self._fullRejoin(); + return self._fullRejoin() + .catch(function (err) { + throw err; + }); + }); }; GroupConsumer.prototype._joinGroup = function () { + var self = this; return (function _tryJoinGroup(attempt) { + attempt = attempt || 0; if (attempt > 3) { throw new Error('Failed to join the group: GroupCoordinatorNotAvailable'); } return self.client.joinConsumerGroupRequest(self.options.groupId, self.memberId, self.options.sessionTimeout, _.values(self.strategies)) - .catch({ code: 'GroupCoordinatorNotAvailable' }, function () { - return Promise.delay(1000).then(function () { - return _tryJoinGroup(++attempt); + + .catch(function (err) { + return Promise.delay(1000).then(function () { + return _tryJoinGroup(++attempt); + }); + }) + .catch({ code: 'GroupCoordinatorNotAvailable' }, function () { + + return Promise.delay(1000).then(function () { + return _tryJoinGroup(++attempt); + }); }); - }); }()) - .then(function (response) { - if (self.memberId) { - self.client.log('Joined group', self.options.groupId, 'generationId', response.generationId, 'as', response.memberId); - if (response.memberId === response.leaderId) { - self.client.log('Elected as group leader'); + .then(function (response) { + if (self.memberId) { + self.client.log('Joined group', self.options.groupId, 'generationId', response.generationId, 'as', response.memberId); + if (response.memberId === response.leaderId) { + self.client.log('Elected as group leader'); + } } - } - self.memberId = response.memberId; - self.leaderId = response.leaderId; - self.generationId = response.generationId; - self.members = response.members; - self.strategyName = response.groupProtocol; - }); + self.memberId = response.memberId; + self.leaderId = response.leaderId; + self.generationId = response.generationId; + self.members = response.members; + self.strategyName = response.groupProtocol; + }) + .catch(function (err) { + + if (err.length === 1) { + throw err[0]; + } + throw err; + + }); }; GroupConsumer.prototype._syncGroup = function () { @@ -141,29 +162,29 @@ GroupConsumer.prototype._syncGroup = function () { } return []; }) - .then(function (result) { - var assignments = _(result).groupBy('memberId').mapValues(function (mv, mk) { - return { - memberId: mk, - memberAssignment: { - version: 0, - metadata: null, - partitionAssignment: _(mv).groupBy('topic').map(function (tv, tk) { - return { - topic: tk, - partitions: _.map(tv, 'partition') - }; - }).value() - } - }; - }).values().value(); + .then(function (result) { + var assignments = _(result).groupBy('memberId').mapValues(function (mv, mk) { + return { + memberId: mk, + memberAssignment: { + version: 0, + metadata: null, + partitionAssignment: _(mv).groupBy('topic').map(function (tv, tk) { + return { + topic: tk, + partitions: _.map(tv, 'partition') + }; + }).value() + } + }; + }).values().value(); - // console.log(require('util').inspect(assignments, true, 10, true)); - return self.client.syncConsumerGroupRequest(self.options.groupId, self.memberId, self.generationId, assignments); - }) - .then(function (response) { - return self._updateSubscriptions(_.get(response, 'memberAssignment.partitionAssignment', [])); - }); + // console.log(require('util').inspect(assignments, true, 10, true)); + return self.client.syncConsumerGroupRequest(self.options.groupId, self.memberId, self.generationId, assignments); + }) + .then(function (response) { + return self._updateSubscriptions(_.get(response, 'memberAssignment.partitionAssignment', [])); + }); }; GroupConsumer.prototype._rejoin = function () { @@ -179,57 +200,69 @@ GroupConsumer.prototype._rejoin = function () { return self._joinGroup().then(function () { return self._syncGroup(); }) - .catch({ code: 'RebalanceInProgress' }, function () { - return Promise.delay(1000).then(function () { - return _tryRebalance(++attempt); + .catch({ code: 'RebalanceInProgress' }, function () { + + return Promise.delay(1000).then(function () { + return _tryRebalance(++attempt); + }); }); - }); }()); }; GroupConsumer.prototype._fullRejoin = function () { var self = this; - + var fullRejoinAttempt = fullRejoinAttempt || 1; return (function _tryFullRejoin() { self.memberId = null; return self.client.updateGroupCoordinator(self.options.groupId).then(function () { return self._joinGroup().then(function () { // join group return self._rejoin(); // rejoin and sync with received memberId + }).catch(function (err) { + return Promise.reject(err); }); }) - .catch(function (err) { - self.client.error('Full rejoin attempt failed:', err); - return Promise.delay(1000).then(_tryFullRejoin); - }); + .catch(function (err) { + self.client.error('Full rejoin attempt failed:', err); + fullRejoinAttempt++; + if (err.message == "Failed to join the group: GroupCoordinatorNotAvailable" && fullRejoinAttempt > 3) { + if (err.length === 1) { + throw err[0]; + } + throw err; + } else { + return Promise.delay(1000).then(_tryFullRejoin); + } + + }); }()) - .tap(function () { - self._heartbeatPromise = self._heartbeat(); // start sending heartbeats - return null; - }); + .tap(function () { + self._heartbeatPromise = self._heartbeat(); // start sending heartbeats + return null; + }); }; GroupConsumer.prototype._heartbeat = function () { var self = this; return self.client.heartbeatRequest(self.options.groupId, self.memberId, self.generationId) - .catch({ code: 'RebalanceInProgress' }, function () { - // new group member has joined or existing member has left - self.client.log('Rejoining group on RebalanceInProgress'); - return self._rejoin(); - }) - .tap(function () { - self._heartbeatTimeout = setTimeout(function () { - self._heartbeatPromise = self._heartbeat(); - }, self.options.heartbeatTimeout); - }) - .catch(function (err) { - // some severe error, such as GroupCoordinatorNotAvailable or network error - // in this case we should start trying to rejoin from scratch - self.client.error('Sending heartbeat failed: ', err); - return self._fullRejoin().catch(function (_err) { - self.client.error(_err); + .catch({ code: 'RebalanceInProgress' }, function () { + // new group member has joined or existing member has left + self.client.log('Rejoining group on RebalanceInProgress'); + return self._rejoin(); + }) + .tap(function () { + self._heartbeatTimeout = setTimeout(function () { + self._heartbeatPromise = self._heartbeat(); + }, self.options.heartbeatTimeout); + }) + .catch(function (err) { + // some severe error, such as GroupCoordinatorNotAvailable or network error + // in this case we should start trying to rejoin from scratch + self.client.error('Sending heartbeat failed: ', err); + return self._fullRejoin().catch(function (_err) { + self.client.error(_err); + }); }); - }); }; /** @@ -246,9 +279,9 @@ GroupConsumer.prototype.end = function () { clearTimeout(self._heartbeatTimeout); return self.client.leaveGroupRequest(self.options.groupId, self.memberId) - .then(function () { - return BaseConsumer.prototype.end.call(self); - }); + .then(function () { + return BaseConsumer.prototype.end.call(self); + }); }; GroupConsumer.prototype._prepareOffsetRequest = function (type, commits) { From 603184eea77f657c21a611106a3650d3e533b021 Mon Sep 17 00:00:00 2001 From: ashwinismath Date: Wed, 12 Dec 2018 11:36:01 +0530 Subject: [PATCH 2/2] Update client.js --- lib/client.js | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/lib/client.js b/lib/client.js index 4d593a9..5074416 100644 --- a/lib/client.js +++ b/lib/client.js @@ -632,7 +632,7 @@ Client.prototype._findGroupCoordinator = function (groupId) { Client.prototype.joinConsumerGroupRequest = function (groupId, memberId, sessionTimeout, strategies) { var self = this; - return self._findGroupCoordinator(groupId).then(function (connection) { + return self._findGroupCoordinator(groupId).then(function (connection) { var correlationId = self.nextCorrelationId(); var buffer = self.protocol.write().JoinConsumerGroupRequest({ correlationId: correlationId, @@ -650,7 +650,16 @@ Client.prototype.joinConsumerGroupRequest = function (groupId, memberId, session } return result; }); + }) + .catch(function(err){ + if(err[0].code == 'GroupAuthorizationFailed'){ + if (err.length === 1) { + throw err[0]; + } + throw err; + } }); + }; Client.prototype.heartbeatRequest = function (groupId, memberId, generationId) {