当前位置:Linux教程 - Linux资讯 - 在Linux上处理共享对象的同步事件

在Linux上处理共享对象的同步事件

  在面向对象的系统中,当一个对象接收到一条消息时,可能会发生一系列的事件。通常,这些事件是以 同步(synchronous) 模式处理的:调用进程或向这个对象发送消息的线程在发送消息调用完成之前都会接收并处理一系列事件。然而,如果产生这些事件的对象是由多个进程进行共享并且保存在共享内存中时,情况就稍微有些不同了。

  本文将使用两种 C++ 设计模式来详细介绍这种情况,并使用一些样例代码来展示这种解决方案(这些样例代码可以从本文 下载 一节中获得):

我们将简要介绍不使用共享内存的样例代码。 使用第一种设计模式来修改这些代码,让其使用共享内存。 然后阐述如何使用第二种设计模式来实现进程间通信(IPC)。

  您可以在任何机器体系架构、操作系统和编译器上应用这两种设计模式中的概念。对于本文来说,我们使用的是 RedHat Linux 7.1 for 32-bit x86 Intel® 体系架构的发行版;使用 GNU C++ compiler version 3.2.3 编译器及其相关工具来编译并测试样例程序。

  不使用共享内存

  下面让我们开始介绍这个样例程序,首先是不使用共享内存的程序:

清单 1. common.h

#ifndef __COMMON_H__ #define __COMMON_H__ class IObjectWithEvents { public: class IEventSink { public: virtual void OnEvent(pid_t pid, const char * msg) = 0; }; static IObjectWithEvents * getInstance(); virtual bool AddEventHandler(IEventSink * pEI) = 0; virtual void SendMessage() = 0; }; #endif //__COMMON_H__

  接口类 IObjectWithEvents 包含了一个内嵌的接口类 IEventSink,它定义了 OnEvent() 方法。这个事件处理程序接收一个发送者的 id 和一个字符串消息。getInstance() 方法返回对共享内存中对象的引用,AddEventHandler() 注册一个事件处理程序,SendMessage() 向这个对象发送一条消息。由于不需要引用共享内存,所以可以像清单 2 中那样来使用 IObjectWithEvents:

清单 2. shm-client1.cpp

#include <iostream> #include <sys/types.h> #include <unistd.h> #include "common.h" #define HERE __FILE__ << ":" << __LINE__ << " " using namespace std; class EventSink : public IObjectWithEvents::IEventSink { public: void OnEvent(pid_t pid, const char * msg) { cout << HERE << "Message from pid(" << pid << ")\t : " << msg << endl; } }; int main() { IObjectWithEvents * powe = IObjectWithEvents::getInstance(); EventSink sink; powe->AddEventHandler(&sink); powe->SendMessage(); return 0; }

类 EventSink 提供了这个事件处理程序的实现。主函数中给出了用于发送消息和处理事件的标准序列。

ObjectWithEvents 的典型实现如清单 3、4 所示:

清单 3. ObjectWithEvents.h

#include "common.h" class ObjectWithEvents : public IObjectWithEvents { public: // We assume singleton design pattern for illustration static ObjectWithEvents * ms_pObjectWithEvents; ObjectWithEvents(); //the implementation for IObjectWithEvents void FireEvent(); virtual bool AddEventHandler(IEventSink * pEI); virtual void SendMessage(); //Collection for maintaining events enum { MAX_EVENT_HANDLERS = 16, }; long m_npEI; IEventSink * m_apEI[MAX_EVENT_HANDLERS]; pid_t m_alPID[MAX_EVENT_HANDLERS]; };

 

清单 4. ObjectWithEvents.cpp

#include <iostream> #include <sys/types.h> #include <sys/shm.h> #include <unistd.h> #include <pthread.h> #include "ObjectWithEvents.h" using namespace std; ObjectWithEvents * ObjectWithEvents::ms_pObjectWithEvents = NULL; IObjectWithEvents * IObjectWithEvents::getInstance() { // the following commented code is for illustration only. /* if (NULL == ObjectWithEvents::ms_pObjectWithEvents) { ObjectWithEvents::ms_pObjectWithEvents = new ObjectWithEvents(); } */ return ObjectWithEvents::ms_pObjectWithEvents; } ObjectWithEvents::ObjectWithEvents() : m_npEI(0) { } void ObjectWithEvents::FireEvent() { // iterate through the collection for (long i = 0; i < m_npEI; i++) { //Recheck for NULL if (0 != m_apEI[i]) { // Fire the event m_apEI[i]->OnEvent(m_alPID[i], ""); } } return; } bool ObjectWithEvents::AddEventHandler(IEventSink * pEI) { // NULL check if (NULL == pEI) { return false; } // check if there is space for this event handler if (MAX_EVENT_HANDLERS == m_npEI) { return false; } // Add this event handler to the collection m_alPID[m_npEI] = getpid(); m_apEI[m_npEI++] = pEI; return true; } void ObjectWithEvents::SendMessage() { //Some processing //And then fire the event FireEvent(); return; }

清单 4 中的代码可以使用下面的脚本来编译:

g++ -g -o shm_client shm_client1.cpp ObjectWithEvents.cpp

在运行 shm_client 时,应该可以看到下面的输出:

$ ./shm_client shm_client1.cpp:16 Message from pid(3920) :

使用共享内存:没有事件缓存

现在,为了在共享内存中对 ObjectWithEvents 进行实例化,我们需要修改 ObjectWithEvents 的实现。

清单 5. 修改 ObjectWithEvents.cpp

// To add a declaration for the "new" operator: class ObjectWithEvents : public IObjectWithEvents { public: void * operator new(unsigned int); }; // To include an additional header for the Initializer class: #include "Initializer.h" // To overload the operator "new": void * ObjectWithEvents::operator new(unsigned int) { return ms_pObjectWithEvents; } // Then, FireEvent is completely changed: void ObjectWithEvents::FireEvent() { // We need to serialize all Access to the collection by more than one process int iRetVal = Initializer::LockMutex(); if (0 != iRetVal) { return; } pid_t pid = getpid(); // iterate through the collection and fire only events belonging to the current process for (long i = 0; i < m_npEI; i++) { // Check whether the handler belongs to the current process. if (pid != m_alPID[i]) { continue; } //Recheck for NULL if (0 != m_apEI[i]) { m_apEI[i]->OnEvent(pid, ""); } } // release the mutex if ((0 == iRetVal) && (0 != Initializer::UnlockMutex())) { // Deal with error. } return; } // The following are changes to ObjectWithEvents::AddEventHandler(): // 1. Before accessing the collection, we lock the mutex: int bRetVal = Initializer::LockMutex(); if (0 != bRetVal) { return false; } // 2. After accessing the collection, we release the mutex: if ((0 == bRetVal) && (0 != Initializer::UnlockMutex())) { // Deal with error. }

要对共享内存中的对象进行实例化,请定义另外一个类 Initializer。

清单 6. Initializer.h

#ifndef __Initializer_H__ #define __Initializer_H__ class Initializer { public : int m_shmid; static Initializer ms_Initializer; Initializer(); static pthread_mutex_t ms_mutex; static int LockMutex(); static int UnlockMutex(); }; #endif // __Initializer_H__

Initializer 定义了共享内存的 id m_shmid 和一个用来处理同步事件的信号量 ms_mutex。

函数 LockMutex() 负责对这个互斥体进行加锁,UnlockMutex() 对这个互斥体进行解锁。

Initializer 的实现如清单 7 所示:

清单 7. Initializer.cpp

#include <iostream> #include <sys/types.h> #include <sys/shm.h> #include <unistd.h> #include <pthread.h> #include "Initializer.h" #include "ObjectWithEvents.h" using namespace std; Initializer Initializer::ms_Initializer; pthread_mutex_t Initializer::ms_mutex = PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP; Initializer::Initializer() : m_shmid(-1) { bool bCreated = false; key_t key = 0x1234; m_shmid = shmget(key,sizeof(ObjectWithEvents), 0666); if (-1 == m_shmid) { if(ENOENT != errno) { cerr<<"Critical Error"<<endl; return; } m_shmid = shmget(key, sizeof(ObjectWithEvents), IPC_CREAT0666); if (-1 == m_shmid ) { cout << " Critical Error " << errno<< endl; return; } bCreated = true; } ObjectWithEvents::ms_pObjectWithEvents = (ObjectWithEvents*)shmat(m_shmid,NULL,0); if (NULL == ObjectWithEvents::ms_pObjectWithEvents) { cout << " Critical Error " << errno << endl; return; } if (true == bCreated) { ObjectWithEvents * p = new ObjectWithEvents(); } // Create a mutex with no initial owner. pthread_mutex_init(&ms_mutex, NULL); } int Initializer::LockMutex() { // Request ownership of mutex. pthread_mutex_lock(&ms_mutex); if(EDEADLK == errno) { cout << "DeadLock" << endl; return -1; } return 0; } int Initializer::UnlockMutex() { return pthread_mutex_unlock(&ms_mutex); }

 

  如果共享内存尚不存在,就创建共享内存;并在其中创建对象。如果共享内存已经存在了,就跳过对象的构建。Initializer::m_shmid 记录了这个标识符,ObjectWithEvents::ms_pObjectWithEvents 记录了对这个共享对象的引用。

  即使在所有进程都与之脱离之后,这个共享内存也不会被销毁。这样您就可以使用 ipcrm 显式地删除,或者使用 ipcs 命令进行查看。测试程序的编译方式如下:

g++ -g -o shm_client shm_client1.cpp ObjectWithEvents.cpp Initializer.cpp

  控制台上运行这个程序的结果如下所示:

清单 8. 控制台结果

$ ./shm_client shm_client1.cpp:16 Message from pid(4332) : $ ipcs ------ Shared Memory Segments -------- key shmid owner perms bytes nattch status 0x00001234 327686 sachin 666 136 0 $ ./shm_client shm_client1.cpp:16 Message from pid(4333) : $ ipcrm -m 327686

ObjectWithEvents 实例中汇集了来自各个进程的事件。它可以只释放出当前进程所注册的事件。这种设计模式阐述了两点:

任何对一组事件的访问都由一个互斥对象来保护。 在事件发出前,使用进程 ID 进行过滤。

用于 IPC 的共享内存和事件缓存

现在让我们来看一下如何使用共享内存和事件的缓存进行进程间通信。如果事件是在共享对象中进行缓存的,那么它们可能会稍后才发出。接收进程必须要查询这个共享对象到底发生了什么事件。因此,通过采用一个同步模型,就可以实现进程间通信。这就是开发下面这种设计模式的动机。

给 IObjectWithEvents 添加两个方法,如下所示:

清单 9. 给 IObjectWithEvents 添加方法

class IObjectWithEvents { public: virtual bool EnqueueEvent(const char * msg) = 0; virtual bool PollForEvents() = 0; };

EnqueueEvent() 简单地在共享对象中添加事件的缓存,PollForEvents() 则会对这些缓存数据进行检索。

shm_client1 将使用 EnqueueEvent() 方法,如下所示:

powe->EnqueueEvent("Message from shm_client1");

shm_client2(实际上是 shm_client1 的一个拷贝)会使用 PollForEvents() 方法,如下所示:

powe->EnqueueEvent("Message from shm_client2"); powe->PollForEvents();

另外,我们给 ObjectWithEvents 增加了点东西,如下所示:

清单 10. 对 ObjectWithEvents 的改动

class ObjectWithEvents : public IObjectWithEvents { public: virtual bool EnqueueEvent(const char * msg); virtual bool PollForEvents(); //The event cache enum { MAX_EVENTS = 16, MAX_EVENT_MSG = 256, }; long m_nEvents; pid_t m_alPIDEvents[MAX_EVENTS]; char m_aaMsgs[MAX_EVENTS][MAX_EVENT_MSG]; };

这些生成了新的构造函数:

ObjectWithEvents::ObjectWithEvents() : m_npEI(0), m_nEvents(0) { }

EnqueueEvent() 将事件(例如每个发出的事件的消息和进程 id)加入一个队列中。 PollForEvents() 负责遍历这个队列,并逐一对队列中的事件调用 OnEvent()。

清单 11. EnqueueEvent

bool ObjectWithEvents::EnqueueEvent(const char * msg) { if (NULL == msg) { return false; } if (MAX_EVENTS == m_nEvents) { //IEventSink collection full return false; } int bRetVal = Initializer::LockMutex(); if (0 != bRetVal) { return false; } m_alPIDEvents[m_nEvents] = getpid(); strncpy(m_aaMsgs[m_nEvents++], msg, MAX_EVENT_MSG - 1); if ((0 == bRetVal) && (0 != Initializer::UnlockMutex())) { // Deal with error. } return true; } bool ObjectWithEvents::PollForEvents() { if (0 == m_nEvents) { return true; } int bRetVal = Initializer::LockMutex(); if (0 != bRetVal) { return false; } pid_t pid = getpid(); for (long i = 0; i < m_npEI; i++) { // Does the handler belongs to current process ? if (pid != m_alPID[i]) { continue; } //Recheck for NULL if (0 == m_apEI[i]) { continue; } for (long j = 0; j < m_nEvents; j++) { m_apEI[i]->OnEvent(m_alPIDEvents[j], m_aaMsgs[j]); } } if ((0 == bRetVal) && (0 != Initializer::UnlockMutex())) { // Deal with error. } return true; }

现在试着运行一下编译脚本:

g++ -g -o shm_client1 shm_client1.cpp ObjectWithEvents.cpp Initializer.cpp g++ -g -o shm_client2 shm_client2.cpp ObjectWithEvents.cpp Initializer.cpp

控制台上的输出应该如下所示:

清单 12. shm_client1 和 shm_client2 的输出

$ ./shm_client1 $ ./ipcs ------ Shared Memory Segments -------- key shmid owner perms bytes nattch status 0x00001234 360454 sachin 666 4300 0 $ ./shm_client2 shm_client2.cpp:16 Message from pid(4454) : Message from shm_client1 shm_client2.cpp:16 Message from pid(4456) : Message from shm_client2

(出处:http://www.sheup.com)