爱妃科技nodejs使用多处理器 | | 爱妃科技
正在加载
请稍等

菜单

红楼飞雪 梦

15526773247

文章

Home nodejs nodejs使用多处理器
Home nodejs nodejs使用多处理器

nodejs使用多处理器

nodejs by

我们说过,Node 是单线程的,这意味着Node 只能利用一个处理器来工作。但是,多数服务器都有多个“多核”处理器,一个多核处理器就包含了几个处理器。有两个物理CPU 插槽的服务器可能有24 个逻辑核,也就是说操作系统看起来有24 个处理器。要充分发挥Node 的作用,需要把这些处理器都利用起来。但如果没有多线程,该如何做呢?
Node 提供了一个cluster 模块,可以把任务分配给子进程,就是说Node 把当前程序复制了一份给另一个进程(在Windows 上,它其实是另外一个线程)。每个子进程有些特殊的能力,比如能够与其他子进程共享socket 连接。这样我们就可以写一个Node 程序,让它创建许多其他Node 程序,并把任务分配给它们。需要重点理解的是,当你用cluster 把工作共享到一组复制的Node 程序时,主进程不会参与到每个具体的事务中。主进程管理所有的子进程,但当子进程与I/O 操作交互时,它们是直接进行操作的,不需要通过主进程。这意味着,如果你用cluster 来创建一个Web 服务器,请求将不会通过你的主进程,而是直接连接到子进程。而且,调度这些请求并不会导致系统出现瓶颈。
通过cluster API,你可以把工作分配给Node 进程,并分布在服务器所有可用的处理器上,这能够充分利用资源。让我们看一个简单的cluster 代码例子

使用集群来分发任务

var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
  // 创建工作进程
  for (var i = 0; i < numCPUs; i++) {
    cluster.fork();
  }
  cluster.on('death', function (worker) {
    console.log('worker ' + worker.pid + ' died');
  });
} else {
  // 工作进程创建http 服务器
  http.Server(function (req, res) {
    res.writeHead(200);
    res.end('hello world\n');
  }).listen(8000);
}

在例子中,我们使用了Node 的一些核心模块来把工作平均分配到所有可用的CPU上,这些模块为:cluster 模块、http 模块和os 模块。从os 模块中,我们可以轻松得到系统CPU 的数量。cluster 工作的原理是每一个Node 进程要么是主进程,要么成为工作进程。当一个主进程调用cluster.fork() 方法时,它会创建与主进程一模一样的子进程,除了两个让每个进程可以检查自己是父/ 子进程的属性以外。在主进程中(Node运行时直接调用的那个脚本),cluster.isMaster 会返回true, 而cluster.isWorker 会返回false。而在子进程,cluster.isMaster 返回false, 且
cluster.isWorker 返回true。例子中的主脚本为每个CPU 创建了一个工作进程。每个子进程创建了一个HTTP 服
务 器,这是cluster 另一个独特的地方。在使用cluster 的地方使用listen()监听一个socket 的时候,多个进程可以同时监听同一个socket。如果通过调用nodemyscript.js 的方法启动多个Node 进程,会导致出错,因为第二个进程在启动时会抛出EADDRINUSE 的异常。cluster 提供了跨平台时让多个进程共享socket 的方法。即使多个子进程在共享一个端口上的连接,其中一个堵塞了,也不会影响其他工作进程的新连接。除了共享socket 外, 我们还能用cluster 做更多事情, 因为它是基于child_process 模块的。这个模块会提供一系列属性,其中最有用的一些可以检查子进程健康状态。在上面的例子中,当子进程死亡时,主进程会用console.log() 输出死亡提醒,下面提供了一个更实用的例子,它会调用cluster.fork() 来创建一个新的子进程。

出现死亡进程后重新开启新的进程

if (cluster.isMaster) {
  // 创建工作进程
  for (var i = 0; i < numCPUs; i++) {
    cluster.fork();
  }
  cluster.on('death', function (worker) {
    console.log('worker ' + worker.pid + ' died');
    cluster.fork();
  });
}

这个简单的改造让主进程会不停地把死掉的进程重启,从而保证所有的CPU 都有我们的服务器在运行。然而,这只是对运行状态的基本检查,我们还能用更多花哨的技巧。因为工作进程可以传消息给主进程,所以可以让每个工作进程报告自己的状态,如内存使用量。这让主进程可以觉察哪些工作进程变得不稳定,确认哪些工作进程没有冻结,或者被长时间运行的事件堵塞

通过消息传递来监控工作进程状态

var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;
var rssWarn = (12 * 1024 * 1024),
heapWarn = (10 * 1024 * 1024);
if (cluster.isMaster) {
  for (var i = 0; i < numCPUs; i++) {
    var worker = cluster.fork();
    worker.on('message', function (m) {
      if (m.memory) {
        if (m.memory.rss > rssWarn) {
          console.log('Worker ' + m.process + ' using too much memory.')
        }
      }
    })
  }
} else {
  // 服务器
  http.Server(function (req, res) {
    res.writeHead(200);
    res.end('hello world\n')
  }).listen(8000)
  // 每秒报告一次状态
  setInterval(function report() {
    process.send({
      memory: process.memoryUsage(),
      process: process.pid
    });
  }, 1000)
}

在这个例子里,工作进程报告自己的内存使用量,当 子进程使用了过多内存时,主进程会发送一条警告到日志中去。这是运维团队常用的检测系统健康状态的功能。这让Node 主进程有控制的能力,也带来了好处。这个消息传递的接口也允许主进程把消息发回给工作进程,这意味着你可以把主进程当成工作进程的一个轻量级控制接口。我 们还能用消息传递做更多的事情,而这些事情无法在Node 之外实现。因为Node依赖事件循环来工作,所以有个风险是其中一个事件回调函数运行了很长的时间,这会导致该进程的其他用户需要等待很长时间才能得到服 务。主进程与每个工作进程有一个连接,所以我们可以告诉它定时发送“all OK”消息,这样我们就能够验证事件循环在以合适的速度周转着,并没有被某个回调函数堵塞。可悲的是,即使识别了一个长时间运行的回调函数,我们也无法主 动关闭它。因为我们发送给该进程的任何通知都会加到事件队列里,所以它需要等待已经在长时间运行的回调函数结束后才会被处理。因此,虽然我们能够让主进程识别僵尸进程,但唯一的补救方法就是杀掉工作进程,而这会丢失它正在执行的工作。

做些准备工作就能让你有能力杀掉某个威胁到系统资源的工作进程,

杀死僵尸进程

var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;
var rssWarn = (50 * 1024 * 1024),
heapWarn = (50 * 1024 * 1024)
var workers = {
}
if (cluster.isMaster) {
  for (var i = 0; i < numCPUs; i++) {
    createWorker()
  }
  setInterval(function () {
    var time = new Date().getTime()
    for (pid in workers) {
      if (workers.hasOwnProperty(pid) &&
      workers[pid].lastCb + 5000 < time) {
        console.log('Long running worker ' + pid + ' killed')
        workers[pid].worker.kill()
        delete workers[pid]
        createWorker()
      }
    }
  }, 1000)
} else {
  // 服务器
  http.Server(function (req, res) {
    // 打乱200 个请求中的1 个
    if (Math.floor(Math.random() * 200) === 4) {
      console.log('Stopped ' + process.pid + ' from ever finishing')
      while (true) {
        continue
      }
    }
    res.writeHead(200);
    res.end('hello world from ' + process.pid + '\n')
  }).listen(8000)
  // 每秒钟报告一次状态
  setInterval(function report() {
    process.send({
      cmd: 'reportMem',
      memory: process.memoryUsage(),
      process: process.pid
    })
  }, 1000)
}
function createWorker() {
  var worker = cluster.fork()
  console.log('Created worker: ' + worker.pid)
  // 允许开机时间
  workers[worker.pid] = {
    worker: worker,
    lastCb: new Date().getTime() - 1000
  }
  worker.on('message', function (m) {
    if (m.cmd === 'reportMem') {
      workers[m.process].lastCb = new Date().getTime()
      if (m.memory.rss > rssWarn) {
        console.log('Worker ' + m.process + ' using too much memory.')
      }
    }
  })
}

在这个脚本中,我们给主进程也添加了类似工作进程的定时器。现在,每当一个工作进程向主进程发送报告时,主进程都会记录报告的时间。大约每隔一秒,主进程就会检查所有的工作进 程,看看是否有某个进程已经超过5 秒未更新状态(因为超时是以微秒为单位,所以我们用的是>5000)。如果发现这样的进程,主进程将把阻塞的工作进程杀掉并重启。为了让这个流程更 加高效,我们把创建工作进程的代码放到一个小程序里,这样就能在同一个地方为不同情景提供启动工作,无论是创建新的工作进程还是重启死亡进程。我们也对 HTTP 服务器做了一个小改动,让每个请求有1/200 的概率会出错。你可以运行一下脚本,看看出现错误的可能。如果你同时从多个地方发起并行请求,就能看到整个代码是如何运行的。这些彻底分隔的Node 程序通过消息传递来进行交互。因为主进程是简单的小程序,不会卡住,所以它在任何情况下都能够一直检查其他进程

 

23 2015-06

 

我要 分享

 

 

本文 作者

 

相关 文章