博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
puppeteer_node爬虫分布式进阶
阅读量:5977 次
发布时间:2019-06-20

本文共 4350 字,大约阅读时间需要 14 分钟。

前面的文章将puppeteer做爬虫的基础一直到部署都梳理了一遍,现在来看一下分布式的处理

1) 为什么需要分布式

1. 需要抓取的不同数据有很多,会同时开启无头浏览器去抓取,然后获取到数据后又无厘头的一股脑挤进数据库   2. 无法保证同一时刻需要的数据只有一个操作在进行

2) 分布式选择

因为使用的是node,所以尽可能的寻找node支持的分布式框架
ZooKeeper 和 RabbitMQ 的思想百度上有好多说明,读者可以自行搜索作更详细的了解
node版的
node版的

3) 衔接之前的 puppeteer进阶版_爬取书旗小说 文章内容,文章只是放了一些主要的代码,末尾会附上项目地址,大家可以去撸一撸

发布者,给书旗起一个标识为 37 (channel_id),然后是要抓取书的书籍id(channel_book_id)
// 我们以接口的形式接收爬取的参数    // 简易版请求(除了接收参数不做任何处理) -> 发布者app.get('/v1.0/grasp_book', (req, res, next) => {  // 抓取时需要的参数  if (!req.query.channel_id && !req.query.channel_book_id) {    res.send({      code: 403,      msg: 'params error'    })    return null;  }  // 发布者  // 连接rabbitmq  amqp.connect('amqp://rabbitmq:12345678@127.0.0.1:5672/').then(function(conn) {    return conn.createChannel().then(function(ch) {      // 创建 hello 的消息队列      var q = 'hello';      // 解析为json字符串格式作为传递的数据格式      var msg = JSON.stringify({        "channel_id": req.query.channel_id,        "book_id": req.query.book_id      })      // 连接并保持      var ok = ch.assertQueue(q, {durable: true});      return ok.then(function(_qok) {        // 发送数据到消费者        ch.sendToQueue(q, Buffer.from(msg));        console.log(" [x] Sent '%s'", msg);        return ch.close();      });    }).finally(function() { conn.close(); });  }).catch(console.warn);  res.send({    code: 200,    msg: 'success'  });});
命令行启动app.js,发出请求并查看结果,{} 里面的就是我们发送出去的数据

图片描述

图片描述

消费者以及ZooKeeper

消费者,接收到发布者传递过来的数据建立消息队列,然后用Zookeeper创建临时节点以保持队列依次执行
var zookeeper = require('node-zookeeper-client');// 根据标识动态引入js文件function moduleCustomize(channel_id) {    return require(`../${channel_id}.js`)} var client = zookeeper.createClient('127.0.0.1:2181');async function sleep(second) {  return new Promise((resolve, reject) => {    setTimeout(() => {      resolve('sleep')    }, second)  })}// 连接_zookeeperclient.once('connected', function () {  console.log('Connected to the server.');  // 建立连接 rabbitmp  amqp.connect('amqp://rabbitmq:12345678@127.0.0.1:5672/').then(function(conn) {    return conn.createChannel().then(function(ch) {      // 与名为 hello(由发布者创建) 的消息队列建立连接      var ok = ch.assertQueue('hello', {durable: true});      // 预存为1      ok = ok.then(function() { ch.prefetch(1); });      ok = ok.then(function() {        // doWork 回调函数 -> 执行接收到数据后的操作        ch.consume('hello', doWork, {noAck: false});      });      return ok;      // rabbitmq 处理      function doWork(msg) {          // 接收到数据          var body = msg.content.toString();          console.log("[x] Received '%s'", body);          let _body = JSON.parse(body)          let channel_book_id = _body['channel_book_id'];          let channel_id = _body['channel_id'];          // zookeeper 节点          let path = `/${channel_id + "_" + channel_book_id}`          // 连接_zookeeper 判断是否存在          client.exists(path, function (error, stat) {              if (error) {                  console.log(error.stack);                  return;              }              if (stat) {                console.log('Node exists.');                // 存在则不执行,但需要将数据传递下去                // ack 可以参考 https://www.jianshu.com/p/a5f7fce67803                ch.ack(msg);              } else {                  console.log('Node does not exist.');                  // 操作完成后则释放当前创建的临时节点                  client.create(path, null, zookeeper.CreateMode.EPHEMERAL, function (error) {                    if (error) {                        console.log('Failed to create node: %s due to: %s.', path, error);                    } else {                        console.log('Node: %s is successfully created.', path);                                                  // 根据传入标识(如书旗就是37)动态引入js文件(抓书的操作)                        moduleCustomize(channel_id).init(channel_book_id)                        // 传递数据                        ch.ack(msg);                        // 释放当前创建的临时节点                        client.remove(path, -1, function (error) {                            if (error) {                                console.log(error.stack);                                return;                            }                            console.log('Node is deleted.');                        });                    }                  });                }              console.log('任务执行完毕')            });      }    });  }).catch(console.warn);}); client.connect();
命令行启动rab_consumer.js,{} 里面就是我们接收到消息队列里面的数据了

图片描述

4) 查看mongo结果数据

图片描述

5)

转载地址:http://mkpox.baihongyu.com/

你可能感兴趣的文章
程序局部性原理的一些思考
查看>>
AKOJ-2037-出行方案
查看>>
今天依然在切题的我
查看>>
WebApi服务监控 log4net记录监控日志
查看>>
Jenkins加Shell实现最简单的持续部署
查看>>
day11--RabbitMQ、Redis
查看>>
node项目发送邮件失败
查看>>
HTML常见小问题2
查看>>
iOS 学习
查看>>
原型与原型链
查看>>
Spring Boot 配置文件中的花样,看这一篇足矣!
查看>>
5 极限存在准则及两个重要极限
查看>>
数组相同部分
查看>>
Git 基础 —— 安装 配置 别名 对象
查看>>
Spring Cloud Feign 基础
查看>>
Android初级开发笔记-- activity启动模式的学习(1)
查看>>
单点登录系统SSO概述 | 单点登录讲解(1)
查看>>
UI基本原则
查看>>
DOM编程中,提高程序运行速度需要注意的一些点
查看>>
一种获取过程调用堆栈信息的简单方法
查看>>