当前位置:Linux教程 - Linux - UNIX环境下如何应用消息队列

UNIX环境下如何应用消息队列

一、 引 言

---- 进 入 九 十 年 代 后, 随 着 计 算 机 和 网 络 技 术 的 发 展, 很 多 数 据 处 理 系 统 都 采 用 开 放 系 统 结 构 的 客 户 机/ 服 务 器 网 络 模 式。 即 客 户 机 提 出 任 务 请 求, 由 服 务 器 做 相 应 处 理, 执 行 被 请 求 的 任 务, 然 后 将 结 果 返 回 给 客 户 机。 例 如: 银 行ATM 的 前 置 机 和 数 据 处 理 的 主 机 之 间 即 构 成 客 户 机/ 服 务 器 模 式; 电 话 银 行 的 前 置 机 和 银 行 数 据 处 理 主 机 之 间 也 构 成 这 种 模 式 结 构。 还 有POS 等。 这 样, 各 种 应 用 的 请 求 是 很 频 繁 的, 数 据 主 机 在 处 理 通 存 通 兑, ATM, 电 话 银 行, POS 等 各 种 请 求 时, 如 果 没 有 相 应 机 制 的 控 制, 数 据 将 出 现 混 乱, 有 可 能 产 生 透 支, 也 有 可 能 处 理 密 码 已 改 变 的 帐 户.。 数 据 的 完 整 性, 安 全 性 无 法 控 制。 而 消 息 队 列 正 是 解 决 这 一 问 题 的 有 力 工 具。 它 使 主 机 在 处 理 各 种 请 求 时, 按 照 先 后 顺 序 有 条 不 紊 地 进 行, 保 证 了 数 据 的 一 致 性 和 安 全 性。

二、 基 本 概 念

---- 1. 队 列

---- 队 列 是 信 息 的 线 性 表, 它 的 访 问 次 序 是 先 进 先 出(FIFO)。 也 就 是 说, 置 入 队 列 中 的 第 一 个 数 据 项 将 是 从 队 列 中 第 一 次 读 出 的 数 据 项, 置 入 的 第 二 项 将 是 读 出 的 第 二 项, 依 此 类 推。 这 是 队 列 允 许 的 唯 一 存 取 操 作, 其 它 随 机 访 问 是 不 允 许 的。 这 种 数 据 结 构 保 证 对 数 据 资 源 的 请 求 将 严 格 按 照 先 后 顺 序 进 行, 因 而 可 用 于 对 事 件 的 调 度 并 起 到I/O 缓 冲 的 作 用。

---- 2. 报 文

---- 发 送 进 程 和 接 收 进 程 进 行 信 息 的 交 换, 一 般 是 通 过 将 信 息 划 分 为 若 干 段 放 入 数 据 交 换 缓 冲 器 中, 进 程 间 通 过 对 该 缓 冲 器 的 存 取 来 实 现 通 信。 因 此, 数 据 是 以 不 连 续 的 形 式 在 进 程 间 传 送, 这 些 不 连 续 的 部 分 就 叫 报 文。

---- 3. 消 息 队 列

---- 将 报 文 按 队 列 的 结 构 进 行 组 织 就 叫 消 息 队 列。 该 队 列 用 于 存 放 正 被 发 送 或 接 收 的 每 一 个 报 文 的 标 题 信 息。 每 一 个 消 息 队 列 还 对 应 有 一 个 数 据 结 构, 它 含 有 消 息 队 列 的 存 取 权 限, 和 消 息 队 列 的 当 前 状 态 信 息 等 信 息。 消 息 队 列 可 进 行"" 发 送"" 和"" 接 收"" 操 作。

三、 消 息 队 列 的 编 程 要 点 及 运 作 过 程

---- 1. 消 息 队 列 的 创 建

---- 在 报 文 能 够 发 送 和 接 收 之 前, 必 须 创 建 一 个 能 够 唯 一 被 识 别 出 的 消 息 队 列 和 数 据 结 构, 这 个 被 创 建 的 唯 一 标 识 符 叫 做 消 息 队 列 描 述 符(msqid), 用 来 识 别 或 引 用 相 关 的 消 息 队 列 和 数 据 结 构。 用msgget(long key, int msgflg) 系 统 调 用 来 创 建 消 息 队 列, 其 中 key 是 一 个 长 整 型, 可 由 用 户 设 定 也 可 通 过ftok() 获 得。msgflg 的 值 是 八 进 制 的 消 息 队 列 操 作 权 和 控 制 命 令 的 组 合。 操 作 权 定 义 为:

操作允许权 八进制整数

用户可读 0400

用户可写 0200

同组可读 0040

同组可写 0020

其它可读 0004

其它可写 0002


---- 操 作 权 可 相 加 而 派 生, 如 用 户 可"" 读""、"" 写"" 的 权 限 为0400|0200=0600。 控 制 命 令 可 取IPC_CREAT 或IPC_EXCL。 如 果 要 创 建 一 个key=888 且 属 主 和 同 组 可 读 写 的 消 息 队 列, 执 行 以 下 系 统 调 用msgget(0x888,0660|IPC_CREAT)。 创 建 后 可 用ipcs 命 令 看 到 以 下 信 息:


IPC status from /dev/mem as of Sun

Jan 25 06:49:52 1970

T ID KEY MODE OWNER GROUP

Message Queues:

. q 7 0x00000888 --rw-rw----

root system

...


---- 它 的 消 息 队 列 描 述 符 是7, 属 主 是root , 同 组 是system, 存 取 权 是 属 主、 用 户 可 读 写。 如 果 执 行msgget(0x888,0660|IPC_CREAT) 时, 与0x888 对 应 的 消 息 队 列 已 存 在, 则 返 回 该 消 息 队 列 的 描 述 符msqid。
---- 2. 消 息 的 发 送

---- 消 息 队 列 一 经 创 建 即 可 用msgsnd(int msqid, void *msgp, size_t msgsz, int msgflg) 发 送 消 息。msgqid 是 经msgget 创 建 的 消 息 队 列 描 述 符,msgp 是 指 向 消 息 段 的 指 针, 该 指 针 所 指 结 构 含 有 报 文 类 型 和 要 发 送 或 接 收 的 报 文:

struct msgbuf {

long mtype;/*消息类型*/

char mtext[512];

/* 消息正文,512暂定为消息段的大小*/

}


---- msgsz 是msgp 参 量 指 向 的 数 据 结 构 中 字 符 数 组 的 长 度, 即 报 文 长 度, 最 大 值 由MSGMAX 确 定。msgflg 是 当 消 息 队 列 满 时( 队 列 中 无 空 闲 空 间), 系 统 要 采 取 的 行 动. 如 果msgflg&IPC_NOWAIT= 真, 调 用 进 程 立 即 返 回, 不 发 送 该 消 息。 如 果msgflg&IPC_NOWAIT= 假, 调 用 进 程 暂 停 执 行, 处 于"" 挂 起"" 状 态, 且 不 发 送 该 消 息。 直 到 下 列 情 况 之 一 出 现:
---- - 引 起 暂 停 的 条 件 不 再 存 在, 如 队 列 出 现 空 闲, 即 可 发 送
---- - 该 消 系 队 列 被 从 系 统 中 删 去
---- - 调 用 进 程 接 收 到 一 个 要 捕 捉 的 信 号, 如 中 断 信 号, 此 时 不 发 送 消 息, 调 用 进 程 按signal 中 描 述 的 方 式 执 行。

---- 如 果msgsnd 返 回0 则 发 送 成 功。 返 回-1 则 表 示 发 送 失 败, 错 误 类 型 可 具 体 查 看errno。

---- 3. 消 息 的 接 收

---- 用 msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg) 系 统 调 用 从 msqid 消 息 队 列 中 读 取 一 条 信 息 并 将 其 放 入 消 息 段 指 针msgp 指 向 的 结 构。msgsz 给 出mtext 的 字 节 数, 如 果 所 接 收 的 消 息 比msgsz 大 且msgflg&MSG_NOERROR 为 真, 则 按msgsz 的 大 小 截 断 而 不 通 知 调 用 进 程。msgtyp 指 定 要 求 的 消 息 类 型:

---- msgtyp=0 接 收 消 息 队 列 中 的 第 一 个 报 文
---- msgtyp >0 接 收 消 息 队 列 中 的 类 型 为msgtyp 的 第 一 个 报 文
---- msgtyp< 0 接 收 消 息 队 列 中 小 于 等 于msgtyp 绝 对 值 的 最 低 类 型 的 第 一 个 报 文

---- 当 队 列 上 没 有 所 期 望 类 型 的 消 息 或 消 息 队 列 为 空 时msgflg 指 出 调 用 进 程 要 采 取 的 行 动: 如 果msgflg&IPC_NOWAIT 为 真, 则 调 用 进 程 立 即 结 束 并 返 回-1。 如msgflg&IPC_NOWAIT 为 假, 则 调 用 进 程 暂 停 执 行 直 至 出 现:

---- - 队 列 中 放 入 所 需 类 型 的 消 息, 调 用 进 程 接 收 该 消 息
---- -msqid 消 息 队 列 从 系 统 中 删 除
---- - 调 用 进 程 接 收 到 捕 获 的 信 号, 此 时 不 接 收 消 息, 调 用 进 程 按signal 描 述 的 方 式 执 行。

---- 如 果msgrev 执 行 成 功, 则 返 回 放 入mtext 中 的 字 节 数, 失 败 返 回-1 , 错 误 类 型 可 查errno。

---- 4. 消 息 队 列 的 控 制 和 撤 销

---- 用 msgctl(int msqid, int cmd, struct msqid_ds *buf) 系 统 调 用 实 现 对 消 息 队 列 的 控 制。msgqid 必 须 是 用msgget 创 建 的 消 息 队 列 描 述 符。cmd 可 以 是:

---- IPC_STAT 查 看 消 息 队 列 的 状 态, 结 果 放 入buf 指 针 指 向 的 结 构
---- IPC_SET 为 消 息 队 列 设 置 属 主 标 识, 同 组 标 识, 操 作 允 许 权, 最 大 字 节 数
---- IPC_RMID 删 除 指 定 的msqid 以 及 相 关 的 消 息 队 列 和 结 构

四、 编 程 示 例

---- 下 面 给 出 一 个 运 用 消 息 队 列, 实 现 进 程 通 信 的 实 例。 以 下 程 序 在 IBM RS/6000 小 型 机(AIX 操 作 系 统) 上 和IBM PC(UNIX 操 作 系 统) 上 分 别 调 试 通 过。 该 程 序 主 要 模 拟 根 据 帐 号 查 询 余 额 的 过 程。 包 括 三 方 面:

请 求 进 程 从 标 准 输 入 读 入 帐 号, 并 将 该 帐 号 通 过 消 息 队 列 发 送 给 服 务 进 程;

服 务 进 程 接 收 该 帐 号 后, 按 照 请 求 的 先 后 顺 序 在 标 准 输 入 上 输 入 该 帐 户 的 姓 名 和 余 额, 并 将 结 果 返 回 给 客 户 进 程;

请 求 进 程 接 收 返 回 的 信 息, 并 将 结 果 输 出 在 标 准 输 出 上。
---- 服 务 进 程(msgcenter) 先 于 请 求 进 程(msgreq) 启 动. 客 户 进 程 启 动 时 要 携 带 请 求 编 号, 可 同 时 起 动 多 个 请 求 进 程。


/*请求方程序msgreq.c*/

#include

#include

#include

#include

#include

static struct msgbuf1{

long mtype;

char mtext[100];

} sndbuf, rcvbuf, *msgp ;

extern int errno;

main(int argc, char **argv)

{ int rtrn, msqid ;

char name[10];

double balance;

if (argc!=2){ fprintf(stderr,

""msgreq [01-99] ""); exit(-1); }

if ( (msqid = msgget(0x888, IPC_CREAT|0660)) == -1 ){

fprintf(stderr, ""msgget 888 failed ! ""); exit(-1);

}

msgp=&sndbuf;

sprintf(sndbuf.mtext,""%2.2s"",argv[1]);

printf(""输入4位帐号:"");

scanf(""%s"",&sndbuf.mtext[2]);

sndbuf.mtext[6]=0;

msgp->mtype=666;

rtrn=msgsnd(msqid,msgp, strlen(sndbuf.mtext), 0);

if (rtrn==-1){

perror(""msgsnd""); exit(-1);

}

msgp=&rcvbuf;

fprintf(stderr,""等待后台数据处理进程的回答...."");

rtrn=msgrcv(msqid,msgp, 100, atoi(argv[1]), 0);

if(rtrn==-1){ perror(""msgrcv""); exit(-1); }

sscanf(rcvbuf.mtext,""%[^|]|%lf"",name,&balance);

printf("" 姓名=%s "",name);

printf(""余额=%lf "",balance);

}

/*服务方程序msgcenter.c*/

#include

#include

#include

#include

#include

static struct msgbuf1{

long mtype;

char mtext[100];

} sndbuf, rcvbuf , *msgp;

extern int errno;

main()

{ int rtrn, msgqid ;

char strbuf[100];

if ( (msqid = msgget(0x888, IPC_CREAT|0600)) == -1 ){

fprintf(stderr, ""msgget 888 failed ! ""); exit(-1);

}

while(1) {

msgp=&rcvbuf;

fprintf(stderr,""等待前台进程的请求...."");

rtrn=msgrcv(msqid, msgp, 100, 666 ,MSG_NOERROR);

if(rtrn==-1){ perror(""msgrcv"");exit(-1); }

msgp=&sndbuf;

sprintf(strbuf,""%2.2s"",rcvbuf.mtext);

msgp->mtype=atoi(strbuf);

printf("" 输入帐号=%4.4s的帐户姓名:"",&rcvbuf.mtext[2]);

scanf(""%s"",sndbuf.mtext);

strcat(sndbuf.mtext,""|"");

printf(""输入该帐户余额:"");

scanf(""%s"",strbuf);

strcat(sndbuf.mtext,strbuf);

rtrn=msgsnd(msqid,msgp, strlen(sndbuf.mtext), 0);

if (rtrn==-1){ perror(""msgsnd""); exit(-1); }

}

}