使用 Kue 实现定时器功能

参考文档: https://www.npmjs.com/package/kue#delayed-jobs

使用 Kue 实现定时器, 首先是使用 Kue 的队列功能, 定时器只是在队列里面的内容当到达队列头部的时候, 发现延时时间未到则先不执行而已.

目录

  1. 消息队列的初始化
  2. 发布消息
  3. 读取队列消息进行处理

初始化

首先要与 redis 进行连接, 此时未创建任何内容.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const kue = require('kue');

// 初始化链接
const queue = kue.createQueue({
prefix: 'q',
redis: {
port: 6379,
host: 'localhost',
auth: '',
db: 0, // if provided select a non-default redis db
options: {
// see https://github.com/mranney/node_redis#rediscreateclient
}
}
});

生产者

然后分为生产者和消费者部分, 生产者向队列插入数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
const randDelay = Math.floor(Math.random() * 10 * 1e3);
const job = queue
// 向队列 queueName 中写入一条数据, 放在队尾
.create('queueName', {
to: 'message receiver',
startTime: Date.now(),
delay: randDelay
})
// 该条消息的延时时间, 以毫秒计算
.delay(randDelay)
// 将数据写入队列后的回调方法
.save(function(err) {
if (!err) console.log(job.id);
});

job
.on('complete', function(result) {
// 该事件的监听函数, complete 在事件执行成功时候触发(被消费者消费)
console.log('Job completed with data ', result);
})
.on('failed attempt', function(errorMessage, doneAttempts) {
console.log('Job failed');
})
.on('failed', function(errorMessage) {
console.log('Job failed');
})
.on('progress', function(progress, data) {
console.log(
'\r job #' + job.id + ' ' + progress + '% complete with data ',
data
);
});

消费者

消费者从队列读取数据, 然后进行相应的处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 监听 queueName 队列, 并行 3 个线程处理数据,
// 如果等待数据小于三个, 一次性执行完,
// 如果多于3个, 那么即使延时时间到了(或者没有延时时间为及时处理的消息), 也需要在队列等待空闲进程
queue.process('queueName', 3, function(job, done) {
// 消息的接受者, data 是上面生产者定义的数据结构
const address = job.data.to;

// 设置的延时期望值
let setValue = job.data.delay;
// 实际延时实际: 执行实际-创建时间
let realValue = Date.now() - job.data.startTime;
// 输出一下, 看看实际执行时间与期望延时时间相差多不多
console.log(
'set %d real %d delay %d',
setValue,
realValue,
realValue - setValue
);

if (!isValidEmail(address)) {
//done('invalid to address') is possible but discouraged
return done(new Error('invalid to address'));
}

// console.log('Send Email To %s, info ', address, job.data)

// 这里可以模拟 io 类型的耗时操作
// setTimeout(function(){
done();
// }, 3e3);
});

简单的说明

生产者生产数据的时候, 生产一条就需要调用一次 queue.create, 而消费者则启动一次之后, 会一直运行下去, 发现有合适的数据就马上处理, 而且是以多个进程共同监听的.

Donate - Support to make this site better.
捐助 - 支持我让我做得更好.