-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker04.js
More file actions
125 lines (104 loc) · 3.46 KB
/
worker04.js
File metadata and controls
125 lines (104 loc) · 3.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// Generated by CoffeeScript 1.8.0
(function() {
'use strict';
/*
WorkQueueMgr Example -- worker04
This app consumes work requests that become available in the 'demo:urlq' queue,
as provided by provider04. For each one it receives, this app computes an
SHA1 value on the request URL (req.url) and outputs that and the request
URL value (req.url) to the result queue (req.q) specified in the work request.
provider04 consumes the data in the result queue.
However, if this app receives a '***stop***' message, it closes the connection
and quits immediately.
Usage:
cd demo/lib
node worker04.js
or
node worker04.js 3
to demonstrate arity feature.
or
node worker04.js 1 5
to demonstrate the timeout feature.
Use this app in conjunction with provider04.js. See the provider04 source code
for more details.
*/
var SHA1, WorkQueueMgr, arity, consumeUrlQueue, createUrlQueue, initEventHandlers, mgr, onReady, request, shutDown, timeout, urlQueue, urlQueueName;
WorkQueueMgr = require('node-redis-queue').WorkQueueMgr;
request = require('request');
SHA1 = require('./lib/helpers/tinySHA1.r4.js').SHA1;
urlQueueName = 'demo:urlq';
urlQueue = null;
arity = parseInt(process.argv[2]) || 1;
timeout = parseInt(process.argv[3]) || 0;
console.log('arity=' + arity + ', timeout=' + timeout);
mgr = new WorkQueueMgr();
onReady = function() {
console.log('channel connected');
initEventHandlers();
createUrlQueue();
consumeUrlQueue();
return console.log('waiting for work...');
};
if (arity === 1) {
console.log('connecting half-duplex');
mgr.connect(onReady);
} else {
console.log('connecting full-duplex');
mgr.connect2(onReady);
}
initEventHandlers = function() {
mgr.on('end', function() {
console.log('worker04 detected Redis connection ended');
return shutDown();
});
mgr.on('error', function(error) {
console.log('worker04 stopping due to error');
throw error;
return shutDown();
});
return mgr.on('timeout', function(keys, cancel) {
return console.log('>>>timeout, keys=', keys);
});
};
createUrlQueue = function() {
urlQueue = mgr.createQueue(urlQueueName);
};
consumeUrlQueue = function() {
return urlQueue.consume(function(req, ack) {
if (typeof req === 'object') {
console.log('worker04 processing request ', req, ' (' + mgr.channel.outstanding + ')');
return request(req.url, function(error, response, body) {
var sha1;
if (!error && response.statusCode === 200) {
sha1 = SHA1(body);
console.log('sending ' + req.url + ' SHA1 = ' + sha1, ' (' + mgr.channel.outstanding + ')');
mgr.channel.push(req.q, {
url: req.url,
sha1: sha1
});
return ack();
} else {
console.log('>>>error: ', error);
mgr.channel.push(req.q, {
url: req.url,
err: error
});
return ack();
}
});
} else {
if (typeof req === 'string' && req === '***stop***') {
console.log('worker04 stopping');
shutDown();
}
console.log('Unexpected message: ', req);
console.log('Type of message = ' + typeof req);
return shutDown();
}
}, arity, timeout);
};
shutDown = function() {
mgr.end();
return process.exit();
};
}).call(this);