一个利用条件变量写的消息队列,基于双缓冲的,虽然相比三缓冲的差距不小,但是还是值得拿来学习一下条件变量。
/* * BufQueue.h * * Created on: May 30, 2013 * Author: archy_yu */#ifndef BUFQUEUE_H_#define BUFQUEUE_H_#include#include
//#include "CommonStruct.h"typedef void* CommonItem;#define BATPROCESS_NUM 5class BufQueue{public: BufQueue(); virtual ~BufQueue(); int peek(CommonItem &item); int append(CommonItem item); std::list * serial_read(std::list * inlist); int set_read_event(); void WaitReadEventByTimeOut(bool& isReadable);private: std::list * _write_list; std::list * _read_list; pthread_mutex_t _write_mutex; pthread_mutex_t _read_mutex; pthread_cond_t _read_cond;};#endif /* BUFQUEUE_H_ */
/* * BufQueue.cpp * * Created on: May 30, 2013 * Author: archy_yu */#include "BufQueue.h"#include#include void maketimeout(struct timespec* tsp,long ns = 1){ struct timeval now; //get the current time gettimeofday(&now,0); tsp->tv_sec = now.tv_sec; tsp->tv_nsec = now.tv_usec * 1000; //usec to nsec tsp->tv_nsec += 1000*ns;}CommonItem PopMsgToPutFromList( std::list * pList ){ if(pList == NULL) { return NULL; } if(pList->empty()) { return NULL; } else { CommonItem item = pList->front(); pList->pop_front(); return item; }}BufQueue::BufQueue(){ pthread_mutex_init(&this->_write_mutex,NULL); pthread_mutex_init(&this->_read_mutex,NULL); pthread_cond_init(&this->_read_cond,NULL); this->_read_list = new std::list (); this->_write_list = new std::list ();}BufQueue::~BufQueue(){ pthread_mutex_destroy(&this->_write_mutex); pthread_mutex_destroy(&this->_read_mutex); pthread_cond_destroy(&this->_read_cond); this->_read_list->clear(); this->_write_list->clear();}int BufQueue::peek(CommonItem &item){ pthread_mutex_lock(&this->_write_mutex); item = PopMsgToPutFromList(this->_read_list); pthread_mutex_unlock(&this->_write_mutex); return 0;}int BufQueue::append(CommonItem item){ if(item == NULL) { return 0; } bool issetread = false; pthread_mutex_lock(&this->_write_mutex); this->_write_list->push_back(item); issetread = (this->_write_list->size() > BATPROCESS_NUM); pthread_mutex_unlock(&this->_write_mutex); if(issetread) { this->set_read_event(); } return 0;}std::list * BufQueue::serial_read(std::list * inQueue){ pthread_mutex_lock(&this->_write_mutex); std::list * tmplist = this->_write_list; this->_write_list = this->_read_list; this->_read_list = tmplist; tmplist = this->_read_list; this->_read_list = inQueue; pthread_mutex_unlock(&this->_write_mutex); return tmplist;}int BufQueue::set_read_event(){ pthread_mutex_lock(&this->_read_mutex); pthread_cond_signal(&this->_read_cond); pthread_mutex_unlock(&this->_read_mutex); return 0;}void BufQueue::WaitReadEventByTimeOut(bool& isReadable){ pthread_mutex_lock(&this->_read_mutex); struct timespec ts; maketimeout(&ts,0); isReadable = (0 == pthread_cond_timedwait(&this->_read_cond,&this->_read_mutex, &ts)); pthread_mutex_unlock(&this->_read_mutex);}
给出测试代码和用法
BufQueue _queue;void* process(void* arg){ int i=0; while(true) { int *j = new int(); *j = i; _queue.append((void *)j); i ++; } return NULL;}int main(int argc,char* argv[]){ pthread_t pid; pthread_create(&pid,0,process,0); long long int start = TimeKit::get_tick(); while(true) { list* queue_to_read = new list (); bool read = false; _queue.wait_readevent_by_timeout(read); if(read) { queue_to_read = _queue.serial_read(queue_to_read); list ::iterator iter; for(iter = queue_to_read->begin();iter != queue_to_read->end();iter ++) { int* j = (int*)(*iter); if(100000 <= (*j)) { long long int end = TimeKit::get_tick(); printf("%ld",(end - start)); return 0; } printf("%d\n",(*j)); } } } /* _recv_net_msg_queue = new DuplexList(); _send_net_msg_queue = new DuplexList(); InputMonitor _recv_thread(_recv_net_msg_queue); OutPutMonitor _send_thread(_send_net_msg_queue); _recv_thread.open("192.168.9.221",6000); _recv_thread.start(); _send_thread.start(); int count = 0; while(true) { MessageBlock* mb = NULL; if(_recv_net_msg_queue->peek((CommonItem &)mb) == 0) { if(count % 1000 == 0) {// printf("process %d msg\n",count); } mb->msg_type(NORMAL_MSG_TYPE); _send_net_msg_queue->append(mb); count ++; } else { usleep(2); } }*/ return 0;}
有兴趣的可以测试下,有什么问题可以联系我!