前面的文章将puppeteer做爬虫的基础一直到部署都梳理了一遍,现在来看一下分布式的处理
1) 为什么需要分布式
1. 需要抓取的不同数据有很多,会同时开启无头浏览器去抓取,然后获取到数据后又无厘头的一股脑挤进数据库 2. 无法保证同一时刻需要的数据只有一个操作在进行
2) 分布式选择
因为使用的是node,所以尽可能的寻找node支持的分布式框架 ZooKeeper 和 RabbitMQ 的思想百度上有好多说明,读者可以自行搜索作更详细的了解 node版的 node版的
3) 衔接之前的 puppeteer进阶版_爬取书旗小说 文章内容,文章只是放了一些主要的代码,末尾会附上项目地址,大家可以去撸一撸
发布者,给书旗起一个标识为 37 (channel_id),然后是要抓取书的书籍id(channel_book_id)
// 我们以接口的形式接收爬取的参数 // 简易版请求(除了接收参数不做任何处理) -> 发布者app.get('/v1.0/grasp_book', (req, res, next) => { // 抓取时需要的参数 if (!req.query.channel_id && !req.query.channel_book_id) { res.send({ code: 403, msg: 'params error' }) return null; } // 发布者 // 连接rabbitmq amqp.connect('amqp://rabbitmq:12345678@127.0.0.1:5672/').then(function(conn) { return conn.createChannel().then(function(ch) { // 创建 hello 的消息队列 var q = 'hello'; // 解析为json字符串格式作为传递的数据格式 var msg = JSON.stringify({ "channel_id": req.query.channel_id, "book_id": req.query.book_id }) // 连接并保持 var ok = ch.assertQueue(q, {durable: true}); return ok.then(function(_qok) { // 发送数据到消费者 ch.sendToQueue(q, Buffer.from(msg)); console.log(" [x] Sent '%s'", msg); return ch.close(); }); }).finally(function() { conn.close(); }); }).catch(console.warn); res.send({ code: 200, msg: 'success' });});
命令行启动app.js,发出请求并查看结果,{} 里面的就是我们发送出去的数据
消费者以及ZooKeeper
消费者,接收到发布者传递过来的数据建立消息队列,然后用Zookeeper创建临时节点以保持队列依次执行
var zookeeper = require('node-zookeeper-client');// 根据标识动态引入js文件function moduleCustomize(channel_id) { return require(`../${channel_id}.js`)} var client = zookeeper.createClient('127.0.0.1:2181');async function sleep(second) { return new Promise((resolve, reject) => { setTimeout(() => { resolve('sleep') }, second) })}// 连接_zookeeperclient.once('connected', function () { console.log('Connected to the server.'); // 建立连接 rabbitmp amqp.connect('amqp://rabbitmq:12345678@127.0.0.1:5672/').then(function(conn) { return conn.createChannel().then(function(ch) { // 与名为 hello(由发布者创建) 的消息队列建立连接 var ok = ch.assertQueue('hello', {durable: true}); // 预存为1 ok = ok.then(function() { ch.prefetch(1); }); ok = ok.then(function() { // doWork 回调函数 -> 执行接收到数据后的操作 ch.consume('hello', doWork, {noAck: false}); }); return ok; // rabbitmq 处理 function doWork(msg) { // 接收到数据 var body = msg.content.toString(); console.log("[x] Received '%s'", body); let _body = JSON.parse(body) let channel_book_id = _body['channel_book_id']; let channel_id = _body['channel_id']; // zookeeper 节点 let path = `/${channel_id + "_" + channel_book_id}` // 连接_zookeeper 判断是否存在 client.exists(path, function (error, stat) { if (error) { console.log(error.stack); return; } if (stat) { console.log('Node exists.'); // 存在则不执行,但需要将数据传递下去 // ack 可以参考 https://www.jianshu.com/p/a5f7fce67803 ch.ack(msg); } else { console.log('Node does not exist.'); // 操作完成后则释放当前创建的临时节点 client.create(path, null, zookeeper.CreateMode.EPHEMERAL, function (error) { if (error) { console.log('Failed to create node: %s due to: %s.', path, error); } else { console.log('Node: %s is successfully created.', path); // 根据传入标识(如书旗就是37)动态引入js文件(抓书的操作) moduleCustomize(channel_id).init(channel_book_id) // 传递数据 ch.ack(msg); // 释放当前创建的临时节点 client.remove(path, -1, function (error) { if (error) { console.log(error.stack); return; } console.log('Node is deleted.'); }); } }); } console.log('任务执行完毕') }); } }); }).catch(console.warn);}); client.connect();
命令行启动rab_consumer.js,{} 里面就是我们接收到消息队列里面的数据了
4) 查看mongo结果数据
5)