博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
消息队列
阅读量:6680 次
发布时间:2019-06-25

本文共 5174 字,大约阅读时间需要 17 分钟。

一个利用条件变量写的消息队列,基于双缓冲的,虽然相比三缓冲的差距不小,但是还是值得拿来学习一下条件变量。

/* * BufQueue.h * *  Created on: May 30, 2013 *      Author: archy_yu */#ifndef BUFQUEUE_H_#define BUFQUEUE_H_#include 
#include
//#include "CommonStruct.h"typedef void* CommonItem;#define BATPROCESS_NUM 5class BufQueue{public: BufQueue(); virtual ~BufQueue(); int peek(CommonItem &item); int append(CommonItem item); std::list
* serial_read(std::list
* inlist); int set_read_event(); void WaitReadEventByTimeOut(bool& isReadable);private: std::list
* _write_list; std::list
* _read_list; pthread_mutex_t _write_mutex; pthread_mutex_t _read_mutex; pthread_cond_t _read_cond;};#endif /* BUFQUEUE_H_ */
/* * BufQueue.cpp * *  Created on: May 30, 2013 *      Author: archy_yu */#include "BufQueue.h"#include 
#include
void maketimeout(struct timespec* tsp,long ns = 1){ struct timeval now; //get the current time gettimeofday(&now,0); tsp->tv_sec = now.tv_sec; tsp->tv_nsec = now.tv_usec * 1000; //usec to nsec tsp->tv_nsec += 1000*ns;}CommonItem PopMsgToPutFromList( std::list
* pList ){ if(pList == NULL) { return NULL; } if(pList->empty()) { return NULL; } else { CommonItem item = pList->front(); pList->pop_front(); return item; }}BufQueue::BufQueue(){ pthread_mutex_init(&this->_write_mutex,NULL); pthread_mutex_init(&this->_read_mutex,NULL); pthread_cond_init(&this->_read_cond,NULL); this->_read_list = new std::list
(); this->_write_list = new std::list
();}BufQueue::~BufQueue(){ pthread_mutex_destroy(&this->_write_mutex); pthread_mutex_destroy(&this->_read_mutex); pthread_cond_destroy(&this->_read_cond); this->_read_list->clear(); this->_write_list->clear();}int BufQueue::peek(CommonItem &item){ pthread_mutex_lock(&this->_write_mutex); item = PopMsgToPutFromList(this->_read_list); pthread_mutex_unlock(&this->_write_mutex); return 0;}int BufQueue::append(CommonItem item){ if(item == NULL) { return 0; } bool issetread = false; pthread_mutex_lock(&this->_write_mutex); this->_write_list->push_back(item); issetread = (this->_write_list->size() > BATPROCESS_NUM); pthread_mutex_unlock(&this->_write_mutex); if(issetread) { this->set_read_event(); } return 0;}std::list
* BufQueue::serial_read(std::list
* inQueue){ pthread_mutex_lock(&this->_write_mutex); std::list
* tmplist = this->_write_list; this->_write_list = this->_read_list; this->_read_list = tmplist; tmplist = this->_read_list; this->_read_list = inQueue; pthread_mutex_unlock(&this->_write_mutex); return tmplist;}int BufQueue::set_read_event(){ pthread_mutex_lock(&this->_read_mutex); pthread_cond_signal(&this->_read_cond); pthread_mutex_unlock(&this->_read_mutex); return 0;}void BufQueue::WaitReadEventByTimeOut(bool& isReadable){ pthread_mutex_lock(&this->_read_mutex); struct timespec ts; maketimeout(&ts,0); isReadable = (0 == pthread_cond_timedwait(&this->_read_cond,&this->_read_mutex, &ts)); pthread_mutex_unlock(&this->_read_mutex);}

 给出测试代码和用法

BufQueue _queue;void* process(void* arg){    int i=0;    while(true)    {        int *j = new int();        *j = i;        _queue.append((void *)j);        i ++;    }    return NULL;}int main(int argc,char* argv[]){    pthread_t pid;    pthread_create(&pid,0,process,0);    long long int start = TimeKit::get_tick();    while(true)    {        list
* queue_to_read = new list
(); bool read = false; _queue.wait_readevent_by_timeout(read); if(read) { queue_to_read = _queue.serial_read(queue_to_read); list
::iterator iter; for(iter = queue_to_read->begin();iter != queue_to_read->end();iter ++) { int* j = (int*)(*iter); if(100000 <= (*j)) { long long int end = TimeKit::get_tick(); printf("%ld",(end - start)); return 0; } printf("%d\n",(*j)); } } } /* _recv_net_msg_queue = new DuplexList(); _send_net_msg_queue = new DuplexList(); InputMonitor _recv_thread(_recv_net_msg_queue); OutPutMonitor _send_thread(_send_net_msg_queue); _recv_thread.open("192.168.9.221",6000); _recv_thread.start(); _send_thread.start(); int count = 0; while(true) { MessageBlock* mb = NULL; if(_recv_net_msg_queue->peek((CommonItem &)mb) == 0) { if(count % 1000 == 0) {// printf("process %d msg\n",count); } mb->msg_type(NORMAL_MSG_TYPE); _send_net_msg_queue->append(mb); count ++; } else { usleep(2); } }*/ return 0;}

有兴趣的可以测试下,有什么问题可以联系我!

转载于:https://www.cnblogs.com/archy_yu/archive/2013/06/05/3119571.html

你可能感兴趣的文章
Maven多模块项目使用Jenkins分析代码的配置
查看>>
jQery Ajax 执行顺序
查看>>
一篇文章教你看懂Photoshop和Sketch
查看>>
【多图软文】使用Team@OSC进行团队协作
查看>>
阻止文字选中
查看>>
Spring Cloud搭建微服务架构----使用Spring boot开发web项目
查看>>
python 时间格式转化成毫秒
查看>>
java一些需要掌握的知识点
查看>>
CentOS 6.2 yum安装配置lnmp服务器(Nginx+PHP+MySQL)
查看>>
Redis学习手册 比较全面
查看>>
SpringLDAP-Reference (中文文档四)
查看>>
JQuery上传插件Uploadify使用详解
查看>>
(二)线程同步_6---修改锁的竞争原则
查看>>
Intent跳转时,activity的生命周期
查看>>
Java基础数据结构和堆栈
查看>>
我的友情链接
查看>>
我的友情链接
查看>>
ubuntu建立和删除用户
查看>>
Html5本地图片读取及裁剪
查看>>
MySQL数据库操作个人手记
查看>>