node-kafka中获取消息
发布于 8 年前 作者 songqinghehe 12749 次浏览 来自 问答

最近在写node-kafka的应用,但是遇到一个问题,就是读取消息的时候,node读取kafka的消息还没读完,如果不延时获取信息的话,是拿不到数据的,请问该如何操作? 以下为我写的代码:

exports.ognvPullMsg			= function (req, res) {
var userId				= req.body.userId;
if (userId == 'undefined' || userId <= 0) {
	res.send('{"state":"301","message":"\u975e\u6cd5userId"}');
}
var Consumer			= kafka.Consumer;
var client				= new kafka.Client();
var consumer			= new Consumer(client,[{ topic: 'ognv-backend-log-'+userId, partition: 0 }],{autoCommit: false});
var realData			= [];
consumer.on('message', function (message) {
	console.log(message);
	realData.push(message);
});

setTimeout(function() {
	res.send(JSON.stringify(realData));
}, 200);

}

12 回复

‘use strict’;

var kafka = require('kafka-node');
var middleconsumer = require("./model/middleconsumer");
var events = require('events');
var emitter = new events.EventEmitter();
//var HighLevelConsumer = kafka.HighLevelConsumer;
var Consumer = kafka.Consumer;
var Offset = kafka.Offset;
var Client = kafka.Client;
var argv = {
    topic: "hongbao_subscribe_wx70955ddd32ccecc7"
};
var topic = argv.topic || 'topic1';

var client = new Client('localhost:2181');
var topics = [{
        topic: topic,
        partition: 0,
        offset: 8000
    }],
    options = {
        autoCommit: false,
        fetchMaxWaitMs: 1000,
        fetchMaxBytes: 1024 * 1024,
        fromOffset: true
    };

var consumer = new Consumer(client, topics, options);
var offset = new Offset(client);

//consumer.setOffset(topic, 0, 36);
consumer.on('message', function(message) {
    var obj = message;
    var message = JSON.parse(message.value);
    var args = [];
    args.push(message.openId);
    args.push(message.fromUserName);
    args.push(message.toUserName);
    args.push(message.money);
    args.push(message.attach);
    args.push(message.appId);
    args.push(message.cTime);
    emitter.emit('load', args);

});

consumer.on('error', function(err) {
    console.log('error', err);
});

emitter.on('load', function(args) {
        console.log('listener2', args);

        // function insert(args) {
        //     middleconsumer.saveRecord(args)
        //         .then(function(data) {
        //             insert(args);
        //         }, function(er) {

        //         });
        // }
        // insert(args);
});
    /*
     * If consumer get `offsetOutOfRange` event, fetch data from the smallest(oldest) offset
     */
consumer.on('offsetOutOfRange', function(topic) {
    topic.maxNum = 2;
    offset.fetch([topic], function(err, offsets) {
        var min = Math.min.apply(null, offsets[topic.topic][topic.partition]);
        consumer.setOffset(topic.topic, topic.partition, min);
    });
});

@yxz1025 你好,我第一次看你的代码是在csdn上,给你私信了,没有回复我,没想到在这里看到你了。

@yxz1025 我看你的代码时,每向args push信息的时候 都会调用load事件,最后可以拿到全部的数据么?

之前一直很忙 不好意思

你是要从kafka取数据是吧,kafka-node这种需要用co函数包装一下 不然你回调的数据保存不到数据库的

@yxz1025 目前的想法是获取此用户下面的所有数据并展示给用户,没有考虑存到数据库中

我这里通过emit函数将获取到的数据转到loader了,你可以引入一个 var Promise = require(“bluebird”); 然后写一个promise函数,详细代码如下:

'use strict';
var Promise = require("bluebird");
var kafka = require('kafka-node');
var middleconsumer = require("./model/middleconsumer");
var events = require('events');
var emitter = new events.EventEmitter();
//var HighLevelConsumer = kafka.HighLevelConsumer;
var Consumer = kafka.Consumer;
var Offset = kafka.Offset;
var Client = kafka.Client;
var argv = {
    topic: "hongbao_subscribe_wx70955ddd32ccecc7"
};
var topic = argv.topic || 'topic1';

var client = new Client('localhost:2181');
var topics = [{
        topic: topic,
        partition: 0,
        offset: 8000
    }],
    options = {
        autoCommit: false,
        fetchMaxWaitMs: 1000,
        fetchMaxBytes: 1024 * 1024,
        fromOffset: true
    };

var consumer = new Consumer(client, topics, options);
var offset = new Offset(client);

//consumer.setOffset(topic, 0, 36);

/*
 * If consumer get `offsetOutOfRange` event, fetch data from the smallest(oldest) offset
 */
consumer.on('offsetOutOfRange', function(topic) {
    topic.maxNum = 2;
    offset.fetch([topic], function(err, offsets) {
        var min = Math.min.apply(null, offsets[topic.topic][topic.partition]);
        consumer.setOffset(topic.topic, topic.partition, min);
    });
});


//此处返回一个promise
function convertPost(consumer) {
    return new Promise(function(resolve, reject) {
        consumer.on('message', function(message) {
            var obj = message;
            var message = JSON.parse(message.value);
            resolve(message);
        });

        consumer.on('error', function(err) {
            reject(err);
        });
    });

}

@songqinghehe 下面写了一个 你看看,

@yxz1025 谢谢,我先看看,不懂得在请教你一下。

@songqinghehe 好的 只要把获取的流返回到promise就好了

@yxz1025 上面的代码有完整的代码库么,谢谢!

@yxz1025 consumerGroup也可以这些实现么?

回到顶部