爱妃科技nodejs 关系型数据库 | | 爱妃科技
正在加载
请稍等

菜单

红楼飞雪 梦

15526773247

文章

Home nodejs nodejs 关系型数据库
Home nodejs nodejs 关系型数据库

nodejs 关系型数据库

nodejs by

我们有许多的理由依旧使用传统的SQL 数据库,而Node 对流行的开源数据库都有模块支持。

1、MySQL

MySQL 成为开源世界的主力军是有原因的:它免费提供了与大型商用数据库一样的众多功能。当前,MySQL 拥有很高的性能和丰富的功能。

(1). 使用NodeDB

node-db 模块提供了常用数据库系统的原生代码接口,包括与MySQL 的接口。它通过该模块公开的通用API 来给Node 使用。虽然node-db 不止支持MySQL 一家,但本片文章将集中讨论如何在应用代码中使用MySQL。自从Oracle 收购了Sun 公司,MySQL 及其社区的未来走向已受到很多猜测。有些组织主张迁移到直接替代品,如MariaDB,或者是彻底更换到其他的关系型数据库管理系统(RDBMS)。虽然 MySQL 不会很快消失,但你需要判断它是否是工作的最佳选择。

<1>安装。MySQL 客户端开发库是Node 数据库模块必选的依赖项。在Ubuntu,你
可以使用apt 命令安装这些库:

sudo apt-get install libmysqlclient-dev

然后使用npm 来安装名为db-mysql 的包:

npm install -g db-mysql

要运行本部分的示例代码,需要运行一个名为upandrunning 的数据库,并且提供一个账号,其用户名和密码都是dev。以下的脚本会创建基本的数据库表和结构:

DROP DATABASE IF EXISTS upandrunning;
CREATE DATABASE upandrunning;
GRANT ALL PRIVILEGES ON upandrunning.* TO 'dev'@'%' IDENTIFIED BY 'dev';
USE upandrunning;
CREATE TABLE users(
id int AUTO_INCREMENT PRIMARY KEY,
user_login varchar(25),
user_nicename varchar(75)
);

(2) 选择。下例演示了如何从WordPress 用户表中选出所有的ID 和user_name列的内容。

从MySQL 选出数据

var mysql = require('db-mysql');
var connectParams = {
  'hostname': 'localhost',
  'user': 'dev',
  'password': 'dev',
  'database': 'upandrunning'
}
var db = new mysql.Database(connectParams);
db.connect(function (error) {
  if (error) return console.log('Failed to connect');
  this.query().select(['id',
  'user_login']).from('users').execute(function (error, rows, columns) {
    if (error) {
      console.log('Error on query');
    } else {
      console.log(rows);
    }
  });
});

你也许能够猜到,这里执行的效果等价于SQL 命令SELECT id, user_loginFROM users。输出为:

{
  id: 1,
  user_login: 'mwilson'
}

<3>插入。插入数据和选择数据的方法类似,因为命令都是以同样的方式串联起来的。下例演示了如何进行INSERT INTO users ( user_login ) VALUES (‘newbie’); 的操作。

插入到MySQL 中

var mysql = require('db-mysql');
var connectParams = {
  'hostname': 'localhost',
  'user': 'dev',
  'password': 'dev',
  'database': 'upandrunning'
}
var db = new mysql.Database(connectParams);
db.connect(function (error) {
  if (error) return console.log('Failed to connect');
  this.query().insert('users', [
    'user_login'
  ], [
    'newbie'
  ]).execute(function (error, rows, columns) {
    if (error) {
      console.log('Error on query');
      console.log(error);
    } 
    else console.log(rows);
  });
});

输出是:

{
  id: 2,
  affected: 1,
  warning: 0
}

.insert 命令接受下列3 个参数。

  • 表的名称。
  • 待插入列的名称。
  • 插入每列的对应值。

数据库驱动会处理好转义,并把数据类型转换成列需要的值。所以你不需要在传给这个模块的代码上担心SQL 注入攻击。

(4) 更新。与选择和插入一样,更新操作也是依赖链式函数来生成等价的SQL 操作。下例展示的是使用查询条件来限定更新的目标,而不是对整个数据库表的所有记录都作精心修改

在MySQL 中更新数据

var mysql = require('db-mysql');
var connectParams = {
  'hostname': 'localhost',
  'user': 'dev',
  'password': 'dev',
  'database': 'unandrunning'
}
var db = new mysql.Database(connectParams);
db.connect(function (error) {
  if (error) return console.log('Failed to connect');
  this.query().update('users').set({
    'user_nicename': 'New User'
  }).where('user_login = ?', [
    'newbie'
  ]).execute(function (error, rows, columns) {
    if (error) {
      console.log('Error on query');
      console.log(error);
    } 
    else console.log(rows);
  });
});

输出是:

{
  id: 0,
  affected: 1,
  warning: 0
}

更新一行数据包含下面3 个部分。

  • .update命令,接受表名(本例为 users)作为参数。
  • .set命令,使用 key-value 对象来确定哪一列需要修改,以及它们的新值是多少。
  • .where命令,告诉 MySQK 该如何过滤需要修改的记录。

<5>删除。如下例 所示,删除与更新非常类似,唯一的不同是,在删除的时候,不需要指定哪一列有更新。如果没有指定where 条件,那么整个表的数据都将被删除干净。

从MySQL 中删除数据

var mysql = require('db-mysql');
var connectParams = {
  'hostname': 'localhost',
  'user': 'dev',
  'password': 'dev',
  'database': 'upandrunning'
}
var db = new mysql.Database(connectParams);
db.connect(function (error) {
  if (error) return console.log('Failed to connect');
  this.query().delete ().from('users').where('user_login = ?', [
    'newbie'
  ]).execute(function (error, rows, columns) {
    if (error) {
      console.log('Error on query');
      console.log(error);
    } 
    else console.log(rows);
  });
});

输出是:

{
  id: 0,
  affected: 1,
  warning: 0
}

.delete 命令和.update 命令类似,唯一的不同是它不接受任何列的名字和数据。在这个例子里,where 条件里使用了通配符:’user_login = ?’。 代码中的问号会被user_login 参数替换掉,然后再执行。第二个参数是个数组,因为如果使用了多个问号,数据库驱动就需要从这个参数中按顺序取值使用。

(2). Sequelize

Sequelize 是一个对象关系映射(ORM),它把之前部分介绍的重复工作简化了。你可以使用Sequelize 来定义数据库与程序间共享的对象,这样就不需要为每个操作写查询语句,而是直接通过操作这些对象来写入或读取数据库。当你在进行维护或者增加新数据列的时候,这绝对能节省不少时间,而且能在整个数据管理工作中减少错误。Sequelize 支持通过npm 安装:

npm install sequelize

上面的例子中已经创建好了数据库和示例用户,现在是时候在数据库里创建Author 实体了,Sequelize 替你处理了创建操作,所以此时不需要手动执行任何SQL 操作。、

用Sequelize 创建实例

var Sequelize = require('sequelize');
var db = new Sequelize('upandrunning', 'dev', 'dev', {
  host: 'localhost'
});
var Author = db.define('Author', {
  name: Sequelize.STRING,
  biography: Sequelize.TEXT
});
Author.sync().on('success', function () {
  console.log('Author table was created.');
}).on('failure', function (error) {
  console.log('Unable to create author table');
});

输出是:

Executing: CREATE TABLE IF NOT EXISTS ‘Authors‘ ( ‘name‘ VARCHAR(255),
‘biography‘
TEXT, ‘id‘ INT NOT NULL auto_increment , ‘createdAt‘ DATETIME NOT NULL,
‘updatedAt‘
DATETIME NOT NULL, PRIMARY KEY ( ‘id‘ )) ENGINE=InnoDB;
Author table was created.

在本例中,Author 被定义为一个包含了name 字段和biograph 字段的实例。在输出中能够看见,Sequelize 添加了一个自增的主键列、createdAt 列和updatedAt列。这是许多ORM 采用的典型方法,提供了标准化的接口让Sequelize 能够关联和处理你的数据。Sequelize 与之前介绍的库有所区别,它是基于监听事件驱动的架构,而不是其他地方采用的回调函数驱动的架构。这意味着,你需要在每个操作之后同时监听成功和失败事件,而不是在操作返回值中包含成功或失败的信息。下例创建了多对多关系的两个表,操作的顺序如下。

  • (1) 设置实例的结构。
  • (2) 把结构(schema)与真实的数据库进行同步。
  • (3) 创建并保存一个Book 对象。
  • (4) 创建并保存一个Author 对象。
  • (5) 建立author 和book 之间的关系。

用Sequelize 保存记录和关系

var Sequelize = require('sequelize');
var db = new Sequelize('upandrunning', 'dev', 'dev', {
  host: 'localhost'
});
var Author = db.define('Author', {
  name: Sequelize.STRING,
  biography: Sequelize.TEXT
});
var Book = db.define('Book', {
  name: Sequelize.STRING
});
Author.hasMany(Book);
Book.hasMany(Author);
db.sync().on('success', function () {
  Book.build({
    name: 'Through the Storm'
  }).save().on('success', function (book) {
    console.log('Book saved');
    Author.build({
      name: 'Lynne Spears',
      biography: 'Author and mother of Britney'
    }).save().on('success', function (record) {
      console.log('Author saved.');
      record.setBooks([book]);
      record.save().on('success', function () {
        console.log('Author & Book Relation created');
      });
    });
  }).on('failure', function (error) {
    console.log('Could not save book');
  });
}).on('failure', function (error) {
  console.log('Failed to sync database');
});

为了确保所有的实例都设置正确,需要在等待book 成功保存到数据库之后再创建author。同样,在author 成功保存到数据库之后,book 才能被添加到author上。这样确保了Sequelize 能够取到author 和book 的ID,并且建立它们之间的关系。输出如下:

Executing: CREATE TABLE IF NOT EXISTS ‘AuthorsBooks‘
( ‘BookId‘ INT , ‘AuthorId‘ INT , ‘createdAt‘ DATETIME NOT NULL,
‘updatedAt‘ DATETIME NOT NULL,
PRIMARY KEY ( ‘BookId‘ , ‘AuthorId‘ )) ENGINE=InnoDB;
Executing: CREATE TABLE IF NOT EXISTS ‘Authors‘
( ‘name‘ VARCHAR(255), ‘biography‘ TEXT,‘id‘ INT NOT NULL AUTO_INCREMENT , ‘createdAt‘ DATETIME NOT NULL,
‘updatedAt‘ DATETIME NOT NULL, PRIMARY KEY ( ‘id‘ ))
ENGINE=InnoDB;
Executing: CREATE TABLE IF NOT EXISTS ‘Books‘
( ‘name‘ VARCHAR(255), ‘id‘ INT NOT NULL AUTO_INCREMENT ,
‘createdAt‘ DATETIME NOT NULL, ‘updatedAt‘ DATETIME NOT NULL,
PRIMARY KEY ( ‘id‘ )) ENGINE=InnoDB;
Executing: CREATE TABLE IF NOT EXISTS ‘AuthorsBooks‘
( ‘BookId‘ INT , ‘AuthorId‘ INT , ‘createdAt‘ DATETIME NOT NULL,
‘updatedAt‘ DATETIME NOT NULL,
PRIMARY KEY ( ‘BookId‘ , ‘AuthorId‘ )) ENGINE=InnoDB;
Executing: INSERT INTO ‘Books‘ ( ‘name‘ ,'id‘ , ‘createdAt‘ , ‘updatedAt‘ )
VALUES ('Through the Storm',NULL,'2011-12-01 20:51:59',
'2011-12-01 20:51:59');
Book saved
Executing: INSERT INTO ‘Authors‘ ( ‘name‘ , ‘biography‘ , ‘id‘ , ‘createdAt‘ ,
‘up datedAt‘ )
VALUES ('Lynne Spears','Author AND mother of Britney',
NULL,'2011-12-01 20:51:59','2011-12-01 20:51:59');
Author saved.
Executing: UPDATE ‘Authors‘ SET ‘name‘ = ‘Lynne Spears‘ ,
‘biography‘ = ‘Author and mother of Britney‘ , ‘id‘ =3,
‘createdAt‘ ='2011-12-01 20:51:59',
‘updatedAt‘ ='2011-12-01 20:51:59' WHERE ‘id‘ =3
Author & Book Relation created
Executing: SELECT * FROM ‘AuthorsBooks‘ WHERE ‘AuthorId‘ =3;
Executing: INSERT INTO ‘AuthorsBooks‘ ( ‘AuthorId‘ , ‘BookId‘ , ‘createdAt‘ ,
‘u pdatedAt')
VALUES (3,3,'2011-12-01 20:51:59','2011-12-01 20:51:59');

2、PostgreSQL

PostgreSQL 是面向对象的RDBMS,诞生于加州大学伯克利分校。其前身是Ingres数据库,Michael Stonebraker 教授是该项目的发起者及领头人。从1985 年到1993年,Postres 团队发布了该软件的4 个版本。在项目接近尾声时,越来越多的用户开始支持此项目,同时提出了大量新功能需求,也使开发团队面临着巨大压力。在伯克利团队开发之后,开源社区的开发者接管了该项目,把原来的QUEL 语言解析器改为SQL 语言解析器,并将项目改名为PostgreSQL。自从1997 年PostgreSQL 发布了第一个版本PostgreSQL 6.0,该数据库系统就作为功能强大的发行版赢得了良好的声誉,对于有Oracle 背景的用户来说尤其方便好用。

(1). 安装

可用于产品线的PostgreSQL 版本(已经被Yammer.com 这样的大型网站使用)可以从npm 资源库下载,如下:

npm install pg

这需要先安装pg_config,你可以通过libpq-dev 包找到它。

(2). 选择

下例假设你已经创建了一个叫做upandrunning 的数据库并且授予了dev 用户(密码也是dev)权限。

从PostgreSQL 选出数据

var pg = require('pg');
var connectionString = 'pg://dev:dev@localhost:5432/upandrunning';
pg.connect(connectionString, function (err, client) {
  if (err) {
    console.log(err);
  } else {
    var sqlStmt = 'SELECT username, firstname, lastname FROM users';
    client.query(sqlStmt, null, function (err, result) {
      if (err) {
        console.log(err);
      } else {
        console.log(result);
      }
      pg.end();
    });
  }
});

输出是:

{
  rows:
  [
    {
      username: 'bshilbo',
      firstname: 'Bilbo',
      lastname: 'Shilbo'
    }
  ]
}

这与MySQL 驱动的链式调用方法有很大区别。使用PostgreSQL 的时候,它会要你直接编写SQL 查询语句。如上述例子所示,调用end() 函数会关闭连接,结束Node 事件循环。

(3). 插入、更新和删除

当手工输入SQL 查询语句时,你可能倾向于直接把数据值通过字符串连接操作扔在代码里,但聪明的程序员会寻求方法来保护自己不被SQL 注入攻击。pg 库接受参数形式的查询,这样就能够用上来自外部资源(比如网页的表单)里的值。下面的例子演示了如何插入,更新和删除数据。

插入到PostgreSQL

var pg = require('pg');
var connectionString = 'pg://dev:dev@localhost:5432/upandrunning';
pg.connect(connectionString, function (err, client) {
  if (err) {
    console.log(err);
  } else {
    var sqlStmt = 'INSERT INTO users( username, firstname, lastname ) ';
    sqlStmt += 'VALUES ( $1, $2, $3)';
    var sqlParams = [
      'jdoe',
      'John',
      'Doe'
    ];
    var query = client.query(sqlStmt, sqlParams, function (err, result) {
      if (err) {
        console.log(err);
      } else {
        console.log(result);
      }
      pg.end();
    });
  }
});

输出是:

{
  rows: [
  ],
  command: 'INSERT',
  rowCount: 1,
  oid: 0
}

query 命令以SQL 语句作为第一个参数,第二个参数是一个数据值的数组。MySQL驱动使用问号作为参数值替换标记,而PostgreSQL 使用的是序号参数。能够为参数排号有利于你更好地控制数值的组织方式。

在PostgreSQL 中更新数据

var pg = require('pg');
var connectionString = 'pg://dev:dev@localhost:5432/upandrunning';
pg.connect(connectionString, function (err, client) {
  if (err) {
    console.log(err);
  } else {
    var sqlStmt = 'UPDATE users '
    + 'SET firstname = $1 '
    + 'WHERE username = $2';
    var sqlParams = [
      'jane',
      'jdoe'
    ];
    var query = client.query(sqlStmt, sqlParams, function (err, result) {
      if (err) {
        console.log(err);
      } else {
        console.log(result);
      }
      pg.end();
    });
  }
});

从PostgreSQL 中删除数据

var pg = require('pg');
var connectionString = 'pg://dev:dev@localhost:5432/upandrunning';
pg.connect(connectionString, function (err, client) {
  if (err) {
    console.log(err);
  } else {
    var sqlStmt = 'DELETE FROM users WHERE username = $1';
    var sqlParams = [
      'jdoe'
    ];
    var query = client.query(sqlStmt, sqlParams, function (err, result) {
      if (err) {
        console.log(err);
      } else {
        console.log(result);
      }
      pg.end();
    });
  }
});

3 连接池

生产环境通常由多种资源组成:Web 服务器、缓存服务器和数据库服务器。数据库通常是部署在Web 服务器之外的独立机器上,这使得面向公众的网站不必重新配置和修改复杂的数据库集群就可以垂直增长。因此应用开发程序员需要留心访问这些资源时的性能实现情况,以及这些访问开销会如何影响网站的表现。连接池在Web 开发中是非常重要的概念,因为建立一个数据库连接的开销相对来说还是很大的。为每个请求创建一个甚至多个连接会对高流量网站造成不必要的额外负担,也会导致性能下降。解决方案是在内部缓存池里维护数据库连接,当某连接不再需要时,它会被放回连接池里,这样就能立刻为下一个进入的请求服务了。许多数据库驱动提供了连接池功能,但该模式违反了Node 的“一个模块,一个功能”的理念。所以,Node 开发者在数据层之上应使用通用的连接池(genericpool)模块来进行数据库连接服务(如下例)。generic-pool 模块会重用已有的连接,尽可能地防止因为创建新的数据库连接而带来的开销,而且这个模块可以用在任何数据库上。

使用node-db 的连接池

var mysql = require('db-mysql');
var poolModule = require('generic-pool');
var connectParams = {
  'hostname': 'localhost',
  'user': 'dev',
  'password': 'dev',
  'database': 'zborowski'
}
var pool = poolModule.Pool({
  name: 'mysql',
  create: function (callback) {
    var db = new mysql.Database(connectParams);
    db.connect(function (error) {
      callback(error, db);
    });
  },
  destroy: function (client) {
    client.disconnect();
  },
  max: 10,
  idleTimeoutMillis: 3000,
  log: true
});
pool.acquire(function (error, client) {
  if (error) return console.log('Failed to connect');
  client.query().select(['id',
  'user_login']).from('wp_users').execute(function (error, rows, columns) {
    if (error) {
      console.log('Error on query');
    } else {
      console.log(rows);
    }
    pool.release(client);
  });
});

输出为:

pool mysql - dispense() clients=1 available=0
pool mysql - dispense() - creating obj - count=1
[ { id: 1, user_login: 'mwilson' } ]
pool mysql - timeout: 1319413992199
pool mysql - dispense() clients=0 available=1
pool mysql - availableObjects.length=1
pool mysql - availableObjects.length=1
pool mysql - removeIdle() destroying obj - now:1319413992211
timeout:1319413992199
pool mysql - removeIdle() all objects removed

连接池通过神奇的创建(create)和销毁(destroy)函数来工作。当客户尝试获取一个连接时,如果没有已经打开的连接,连接池会调用创建函数。如果一个连接闲置太久了(由idleTimeoutMillis 属性来指定空闲间隔,以毫秒来计算),它会被销毁并且释放内存资源。

Node 连接池的优雅之处在于,它可以表示任何持久化的资源。选择数据库可谓得天独厚,同时你也可以轻松地写些命令来维护与外界资源(如会话缓存,甚至是硬件接口)的连接。

4 消息队列协议

之前,我们举了邮递员的例子来描述Node 的事件循环架构。如果邮递员碰到哪家关了门,他就无法继续投递信件了。想象一下,如果有一名好心的老门卫能够把门打开,让邮递员通过呢?但是门卫已经上了年纪,且因为服务多年而身体虚弱,他需要多花点时间才能清理道路,因此这段时间内邮递员暂时无法继续投递信件。

这就类似堵塞的进程,但这种状况不会一直持续。最终,门卫会把门打开,然后邮递员又能继续他的业务了。如果邮递员到达的每一个屋子都有类似的开门进程,会把整个通道都拖慢。在Node 程序里,这类堵塞将严重降低系统性能。在计算机领域,造成类似情况的原因很多,可能是因为在注册过程中需要发送用户邮件,需要对用户输入进行大量的数学运算,或者是某个任务需要花费的时间超过了用户期望的等待时间,等等。Node 的事件驱动设计可用来应对大多数情况,它采用的是异步函数和回调的方法。但是如果一个事件特别“重”的话,就不应该放在Node 内部处理。Node 应该只负责快速运算和处理返回的结果。

以一个普通的用户注册流程为例。当用户自己注册时,应用程序会在数据库中保存一条新的记录,并发送邮件给该用户。它也许还会记录下注册过程中的一些统计数据,比如整个过程包括了几个步骤、花费了多少时间。如果用户刚在你的网页上点击提交按钮,系统就马上处理那么多的操作,其实并没有太大意义。比如,发送邮件的流程也许需要花费几秒钟(如果你运气不佳,要花上几分钟)来完成,数据库调用可以等到用户受到欢迎之后再进行操作,统计数据可以从程序的主逻辑独立出去处理。这样的情况下,你可以选择生成一条消息,来通知程序的其他部分有新用户注册了,这样的程序也可能是完全运行在另外一台服务器上的。这就是我们所称的发布- 订阅模型(publish-subscribe pattern)。

再假设你有一个集群的机器运行了Node.js 程序。当一台新机器要加入到集群的时候,它发出一条信息来请求配置信息。配置服务器返回的信息包含了新机器整合到集群中所需要的配置信息列表,这称为请求- 回复模型(request-reply pattern)。消息队列允许程序员发布事件然后继续其他操作,通过进程间通信频道,提高了并
发处理的效率,并实现了更高的扩展性。

RabbitMQ

RabbitMQ 是一个消息代理,支持高级消息队列协议(AMQP)。它适用的情景有跨服务器的数据交换和同一台服务器上的跨进程通信。RabbitMQ 使用Erlang 语言编写,能够提供集群的高可用性,并且很容易安装和使用。

(1). 安装RabbitMQ

如果你使用Linux 系统,大部分发行版都提供了RabbitMQ 的安装包。你也可以从http://www.rabbitmq.com 下载此软件,然后从源代码编译。一旦安装好了RabbitMQ,并启动起来,就可以使用npm 来获得Node 的AMQP 驱动:

npm install amqp

(2). 发布与订阅

RabbitMQ 使用标准的AMQP 协议进行通信。AMQP 源于金融服务行业,在金融领域消息的可靠性关系重大。AMQP 提供了对厂商中立的抽象规范,可以提供通用的(不只针对金融行业的)消息中间件服务,并且旨在解决不同类型系统间通信的问题。AMQP 与E-mail 的概念很像:E-mail 消息有其头信息和格式的规范,但内容可以是任何格式,文本、图片或视频都可以;两个公司之间不需要运行同一款E-mail服务器就能通信。AMQP 还可以在不同平台间通信。比如,用PHP 编写的发布者可以给用JavaScript 编写的消费者发送消息。

AMQP/RabbitMQ 使用方法

var connection = require('amqp').createConnection();
connection.on('ready', function () {
  console.log('Connected to ' + connection.serverProperties.product);
  var e = connection.exchange('up-and-running');
  var q = connection.queue('up-and-running-queue');
  q.on('queueDeclareOk', function (args) {
    console.log('Queue opened');
    q.bind(e, '#');
    q.on('queueBindOk', function () {
      console.log('Queue bound');
      q.on('basicConsumeOk', function () {
        console.log('Consumer has subscribed, publishing message.');
        e.publish('routingKey', {
          hello: 'world'
        });
      });
    });
    q.subscribe(function (msg) {
      console.log('Message received:');
      console.log(msg);
      connection.end();
    });
  });
});

输出为:

Connected to RabbitMQ
Queue opened
Queue bound
Consumer has subscribed, publishing message.
Message received:
{ hello: 'world' }

createConnection 命令建立了一个到RabbitMQ 消息代理的连接,默认情况(依照AMQP 协议)是localhost 的5672 端口。如果需要,这个命令可以被重载,如:

createConnection({
  host: 'dev.mycompany.com',
  port: 5555
})

接下来定义了queue 和exchange。这一步并不是严格要求的,因为AMQP 代理会被要求提供一个默认的exchange。但是通过指定up-and-running 作为exchange 的名字,你可以让程序与运行在同一台服务器上的其他exchange 隔离开。exchange 是负责接收消息并把它们传递给绑定的队列的实体。队列自己并不会做任何操作,它必须绑定到某个exchange 之后才能进行其他操作。q.bind(e, ‘#’) 命令告诉AMQP 把名为up-and-running-queue 的队列添加到名为up-and-running 的exchange 上,并且让exchange 监听所有传给它的消息(通过’#’ 参数)。你可以很方便地把# 改为其他关键字来过滤消息。一旦声明好了queue 和exchange,我们就设置了basicConsumeOk 的事件监听,当客户端订阅了此队列之后,AMQP 库会触发这个事件。而后Node 会发布一条hello world 消息,以及用来过滤的关键routingKey 给exchange。在这个例子里,过滤的关键词是什么并没有关系,因为队列绑定了所有内容(通过bind(‘#’) 命令)。
但AMQP 的中心思想是发布者永远不知道哪些订阅者连接了,所以需要有一个作为路由的关键词备用。最后,发送了subscribe 命令。回调函数是作为参数传入的,并且每次符合条件的消息传给exchange 并通过queue 传输之后,都会调用它。在本例中,回调函数会导致程序退出,这对于演示的目的来说足够了。但在真实的应用中,你不太可能这样
做。当subscribe 命令成功执行后,AMQP 会分发basicConsumeOk 事件,这会触发hello world 消息的发布,然后中断演示程序。

(3). 工作队列

如果长时间运行的任务超出了用户的容忍度(比如等待一个网页加载时),或者是该任务会堵塞整个程序,使用队列就很合适。使用RabbitMQ,是否可以把任务分散到多个工作进程中,并确保所有任务都能完成呢?即使第一个工作进程在处理它们的时候中途死掉也行吗?

用AMQP 发布长时间运行任务

var connection = require('amqp').createConnection();
var count = 0;
connection.on('ready', function () {
  console.log('Connected to ' + connection.serverProperties.product);
  var e = connection.exchange('up-and-running');
  var q = connection.queue('up-and-running-queue');
  q.on('queueDeclareOk', function (args) {
    console.log('Queue opened');
    q.bind(e, '#');
    q.on('queueBindOk', function () {
      console.log('Queue bound');
      setInterval(function () {
        console.log('Publishing message #' + ++count);
        e.publish('routingKey', {
          count: count
        });
      }, 1000);
    });
  });
});

这个例子是的简单发布- 订阅例子的修改版。但它只是一个发布者,所以移除了订阅相关的事件监听器。代替它的是一个定时器,用来每隔1000 毫秒(即每秒)往队列发布一条消息。该消息包含了一个count 变量,表示每次发布增加的次数。这段代码可以用来实现一个简单的工作者应用。下例 示范了如何写相应的客户端。

用AMQP 处理长时间运行任务

var connection = require('amqp').createConnection();
function sleep(milliseconds)
{
  var start = new Date().getTime();
  while (new Date().getTime() < start + milliseconds);
}
connection.on('ready', function () {
  console.log('Connected to ' + connection.serverProperties.product);
  var e = connection.exchange('up-and-running');
  var q = connection.queue('up-and-running-queue');
  q.on('queueDeclareOk', function (args) {
    q.bind(e, '#');
    q.subscribe({
      ack: true
    }, function (msg) {
      console.log('Message received:');
      console.log(msg.count);
      sleep(5000);
      console.log('Processed. Waiting for next message.');
      q.shift();
    });
  });
});

客户端从队列获取消息再处理它(在这个例子里是睡眠5 秒),然后从队列获取下一条消息,并不断重复。虽然Node 里并没有sleep 函数,但你可以用死循环来代替它,如例子所示。这里有个问题。回想一下,前面的发布者是每隔1 秒发送一条消息到队列的,但因为客户端要花费5 秒才能处理一条消息,它会很快就落后于发布者。解决方法呢?打开另外一个窗口并运行第二个客户端,现在消息处理的速度已经翻倍了。但这样依然不够快,还不能跟上发布者生产的数量。通过增加客户端可以进一步分散负载,并且避免让未处理的消息落在后面,这种部署就称为工作队列。工作队列的工作原理是发布的消息在连接到队列的客户端间循环触发。subscribe命令的{ack:true} 参数是通知AMQP 等待用户确认,看该消息是否已经处理完成。此反馈确认由shift 方法提供,该方法在应答过后把消息从队列中移除,同
时将其从服务里拿掉。这样,如果一个工作进程在处理某个消息的过程中死掉了,RabbitMQ 代理会把消息发给下一个可用的客户端。这里不需要使用超时,只要客户端是连接上的,消息就会从工作流中移除。只有当客户端没有发送反馈就断开了,该消息才会被发送到下一个客户端。

一个常见的“疑难杂症”是开发者常常忘记调用q.shift() 命令。如果你
忘记这个操作,程序依然会正常运行。但当某个客户端断开的时候,服务
器会把该客户端处理过的所有消息都重新放到队列中。
另一个副作用是,RabbitMQ 使用的内存会不停地上升。这是因为,即使消
息已经从队列的活跃状态中移除,它们依然会保存在内存里,直到等来客
户端的反馈结果并被客户端移除。

 

30 2015-06

 

我要 分享

 

 

本文 作者

 

相关 文章