作者:杨海平 姚洪利 本文选自:中国计算机报 2001年12月18日
在UNIX中,一个进程让另外实体进行某项事务而采取的操作为fork的一个子进程,子进程只是将父进程的数据区拷贝一份到自己的数据区。在符合POSIX标准的UNIX操作系统下,同一个进程的线程之间共享进程指令、大多数数据(线程私有数据除外)、信号处理方式、进程运行环境等。由于线程共享进程的全局变量,因此可以采用用户自己编写的消息队列来实现数据的共享。
建立多任务模型,并用线程来实现
符合POSIX标准的UNIX操作系统提供了线程的控制函数,如:线程的创建和终止、线程之间的互斥、线程之间的同步等。利用这些系统函数可以成功地模拟消息队列,来实现线程间数据共享和同步,以完成多任务的实时性。为成功地描述线程间数据共享和同步,以下列任务模型为例。
首先建立消息队列属性数据结构
#define MAXQUEUE 30
typedef struct mq_attrib {
char name[20];
pthread_mutex_t mutex_buff;
pthread_mutex_t mutex_cond;
pthread cond_t cond;
int maxElements;
int elementLength;
int curElementNum;
caddr_t buff;
}mq_attrib,mq_attribstruct,mq_attrib_t;
mq_attrib_t msqueue[MAXQUEUE];
数据结构定义了消息队列的名字name,最大消息个数maxElements,单个消息长度elementLength,当前消息个数curElementNum,存放消息的缓冲区buff,保护缓冲区锁mutex_buff,线程同步条件变量cond,保护线程同步条件变量锁mutex_cond。
消息队列的创建
依据此数据结构进行消息队列的创建,函数为msqueue_create(参数解释:name消息队列名,maxnum消息的最大个数,length单个消息的长度)。
int msqueue_create( name, maxnum, length )
char name;
int maxnum,length;
{
int i;
for ( i=0; i
if ( msqueue[i]==NULL )break;
//如果消息队列全部被分配,返回错
if ( i==MAXQUEUE ) return MQERROR;
msqueue[i]=malloc(sizeof(mq_attribstruct));
sprintf( msqueue[i]->name, ""%s"", name);
msqueue[i]->maxElements = maxnum;
msqueue[i]->elementLength = length;
msqueue[i]->curElementNum = 0;
msqueue[i]->buff=malloc(maxnum?length);
//对保护锁进行初始化
pthread_mutex_init(&&msqueue[i]
->mutex_buff, NULL);
pthread_mutex_init(&&msqueue[i]
->mutex_cond, NULL);
//对线程同步条件变量初始化
pthread_cond_init(&&msqueue[i]->cond, NULL);
return i;
}
应用消息队列进行消息的发送和接收
发送消息到消息队列:
消息队列的发送和接收是在不同的线程中进行的。首先介绍发送消息到消息队列的函数:
int msqueue_send ( id, buff, length )
int id, length;
caddr_t buff;
{
int pos;
//消息队列id错,返回错
if ( id<0 || id >= MAXQUEU ) return MQERROR;
//消息长度与创建时的长度不符,返回错
if ( length != msqueue[id]->elementLength ) return MQERROR;
//消息队列满,不能发送
if ( msqueue[id]->curElementNum >= msqueue[id]->maxElements )
return MQERROR;
//在对消息队列缓冲区操作前,锁住缓冲区,以免其他线程操作
pthread_mutex_lock ( &&msqueue[id]->mutex_buff );
pos = msqueue[id]->curElementNum * msqueue[id]->elementLength;
bcopy ( buff, &&msqueue[id]->buff[pos], msqueue[id]->elementLength );
msqueue[id]->curElementNum ++;
pthread_mutex_unlock ( &&msqueue[id]->mutex_buff );
//如果插入消息前,消息队列是空的,插入消息后,消息队列为非空,则通知等待从消息队列取消息的线程,条件满足,可以取出消息进行处理
if ( msqueue[id]->curElementNum == 1 ) {
pthread_mutex_lock ( &&msqueue[id]->mutex_cond );
pthread_cond_broadcast ( &&msqueue[id]->cond );
pthread_mutex_unlock ( &&msqueue[id]->mutex_cond );
}
return length;
}
从消息队列中接收消息:
消息队列的接收函数 msqueue_receive,其参数:id为消息队列数组的索引号,buff为消息内容,length为消息长度。
int msqueue_receive ( id, buff, length )
int id, length;
caddr_t buff;
{
caddr_t temp;
int pos;
if(id<0||id>=MAXQUEUE)return MQERROR;
if(length != msqueue[id]->elementLength)
return MQERROR;
//如果消息队列为空,则等待,直到消息队列为非空条件满足
if ( msqueue[id]->curElementNum == 0){
pthread_mutex_lock ( &&msqueue[id]->mutex_cond );
pthread_cond_wait ( &&msqueue[id]->cond, &&msqueue[id]->mutex_cond );
pthread_mutex_unlock ( &&msqueue[id]->mutex_cond );
}
//取消息前,锁住消息队列缓冲区,以免其他线程存放或取消息
pthread_mutex_lock ( &&msqueue[id]->mutex_buff );
//为符合消息队列FIFO特性,取出消息后,进行消息队列的调整
temp =
malloc((msqueue[id]->curElementNum-1)
msqueue[id]-elementLength );
bcopy ( &&msqueue[id]->buff[0], buff, msqueue[id]->elementLength );
msqueue[id]->curElementNum --;
bcopy ( &&msqueue[id]->buff[msqueue[id]->elementLength], temp,
msqueue[id]->elementLength
msqueue[id]->curElementNum);
bcopy ( temp, &&msqueue[id]->buff[0],
msqueue[id]->elementLength
msqueue[id]->curElementNum);
free ( temp );
//解除缓冲区锁
pthread_mutex_unlock ( &&msqueue[id]->mutex_buff );
return length;
}
多任务模型的实现
在讨论完消息队列的创建、删除、发送和接收后,下面讲述消息队列在线程中的应用以实现多任务线程间的数据共享。
首先在main主函数中创建消息队列和线程:
//定义全局变量
Int msqueue_record, msqueue_process;
Void main()
{
pthread_t pthreadID1;
//创建消息队列,用于线程间通信
msqueue_record = msqueue_create ( “record”, 200, 200);
msqueue_process = msqueue_create ( “process”, 200, 200);
//创建数据采集线程
pthread_create ( &&pthreadID1, NULL, receiveData, NULL);
//创建数据处理线程
pthread_create ( &&pthreadID2, NULL, process, NULL);
//创建数据记录线程
pthread_create ( &&pthreadID1, NULL, record, NULL);
//等待进程结束
wait_thread_end( );
}
数据采集线程:
void receiveData( )
{
int count;
unsigned char buff[200];
for(;;) {
//从数据口采集数据,并将数据放置于buff中
//wait_data_from_data_port( buff )
//将数据写入消息队列msqueue_record中
msqueue_send ( msqueue_record, buff, 200 );
//将数据写入消息队列msqueue_process中
msqueue_send ( msqueue_process, buff, 200 );
}
}
记录线程函数:
void record ( )
{
int num, count;
unsigned char buffer[200];
for ( ;; ) {
count = msqueue_receive ( msg_record, &&buffer, 200 );
if ( count < 0) {
perror ( ""msgrcv in record"");
continue;
}
//将取到的消息进行记录处理
//record_message_to_lib();
}
}
数据处理线程函数:
int process( )
{
int count;
unsigned char buffer[200];
for ( ;; ) {
count = msqueue_receive ( msg_process, &&buffer, 200 );
if ( count < 0) {
perror ( ""msgrcv in record"");
continue;
}
//将取到的消息进行处理
//process_message_data()
}
}
在实现多任务系统时,作者曾经做过以下三种实现方法的比较:进程间通信采用IPC机制,线程间通信采用进程通信方式IPC,线程间通信采用基于作者开发的消息队列。结果表明:利用用户下的数据区进行线程间通信的速度最快,效率最高,而IPC方式慢。(作者联系方式:
[email protected])
(责任编辑 吴北)