`
ndi88ndi
  • 浏览: 18447 次
最近访客 更多访客>>
社区版块
存档分类
最新评论

qt线程(转)----这篇很专业!(五至九 部分)

 
阅读更多

qt线程(转)----这篇很专业!(五至九 部分)
2011年05月19日
  五.图像循环队列(摄像头的采集数据放到图像循环队列)
  程序通过建立带共享锁的4帧图像循环队列做为图像采集线程和图像发送线程进行数据交换的公共缓冲区(带共享锁的循环队列在这个网址下有介绍:http://www.zaoxue.com/article/tech-55122.htm)
  能够在通信中更好的对数据进行读写和存储,在程序编写过程中就把数据队列的方式改为了循环队列。通过设定数据存储地址的头指针和尾指针,以及对数据存储长度状态值的判断,从而达到循环队列的目的。当有新的数据到来时,数据的尾指针自动往后移,长度的也做相应的增加。同时判断数据的长度有没有超出BANK的地址范围,如果超出地址范围,则尾指针跳转到头指针之前,继续往后存储数据,形成了循环队列;同样,当从RAM里取走数据时,数据的头指针自动往后移,长度的也做相应的增减。在队列的出队操作中,还要对数据的长度进行判断,如果达到了最大长度,则丢弃后面所有的数据。直到缓存区能有空间继续作队列为止。
  六.视频采集数据缓冲机制的研究
  在视频采集系统中,视频数据的采集与发送需要很好的性能,所以需要一个高性能的数据流缓冲机制,在传统的生产者/消费者模型的基础上,提出一种新的数据流缓冲模型(在一份叫做视频 会议系统中数据缓冲机制的研究.pdf 中有介绍)(一个关于多线程同步的文章 :http://www.vckbase.com/document/viewdoc/?id=1080)
  当解决多线程互斥同步的问题时,经常会有如下几个问题:
  1. 在一个给定的问题中,需要多少个Mutex,多少个Semaphore?有什么规律?
  2. 在对临界区加锁和等待信号量的顺序上有什么要求和规律?
  3. 什么样操作适合放在临界区,什么样的不适合?
  下面就生产者和消费者问题来分析一些这几个问题.
  下面是一个简单的实现程序:
  生产者向数组sharedArray中写入数据,而消费者从该数组中读取数据.
  #include 
  #include 
  #include 
  #include 
  #define MAXSIZE  5               /*共享缓冲区的大小*/
  int sharedArray[MAXSIZE];        /*sharedArray是共享缓冲区*/
  int curr=-1;                     /*curr是用来指定sharedArray当前存有数据的最大位置*/
  /*注意,sharedArray和curr都属于共享数据*/
  int empty=0;            
  int full=MAXSIZE;
  pthread_mutex_t sharedMutex=PTHREAD_MUTEX_INITIALIZER; /*锁定临界区的mutex*/
  sem_t waitNonEmpty, waitNonFull; /*等待"非空资源"和等待"非满资源"的semaphor*/
  void * readData(void * whichone)
  {
  int data, position;
  while (1){
  sem_wait(&waitNonEmpty);             /*是否有"非空资源"*/
  pthread_mutex_lock(&sharedMutex);    /*进入临界区*/
  data = sharedArray[curr];
  position = curr--;
  printf ("%s read from the %dth: %d, \n", (char*)whichone, position, data);
  sem_post(&waitNonFull);              /*生成一个"非满资源"*/
  pthread_mutex_unlock(&sharedMutex);  /*离开临界区*/
  sleep(2);                            /*跟同步无关的费时操作*/
  }
  }
  void * writeData(void * whichone)
  {
  int data, position;
  while (1) {
  data=(int)(10.0*random()/RAND_MAX);    /*生成一个随机数据,注意是10.0而不是10*/
  sem_wait(&waitNonFull);                /*是否有"非满资源"*/
  pthread_mutex_lock(&sharedMutex);      /*进入临界区*/
  position = ++curr;
  sharedArray[curr]=data;
  printf ("%s wrote to the %dth: %d, \n", (char*)whichone, position, data);
  sem_post(&waitNonEmpty);               /*生成一个"非空资源"*/
  pthread_mutex_unlock(&sharedMutex);    /*离开临界区*/
  sleep(1);                              /*跟同步无关的费时操作*/
  }
  }
  int main (int argc, char** argv)
  {
  pthread_t consumer1, consumer2, producer1, producer2;    /*两个生产者和两个消费者*/
  sem_init(&waitNonEmpty, 0, empty);                       /*初始化信号量*/
  sem_init(&waitNonFull, 0, full);            
  /*注意,本问题中的两种semaphore是有一定关系的,那就是它们的初始值之和应该等于共享缓冲区大小*/
  /*即empty+full等于MAXSIZE*/
  pthread_create (&consumer1, NULL, &readData, "consumer1");
  pthread_create (&consumer2, NULL, &readData, "consumer2");
  pthread_create (&producer1, NULL, &writeData, "producer1");
  pthread_create (&producer2, NULL, &writeData, "producer2");
  pthread_join (consumer1, NULL);
  pthread_join (consumer2, NULL);
  pthread_join (producer1, NULL);
  pthread_join (producer2, NULL);
  sem_destroy(&waitNonEmpty);
  sem_destroy(&waitNonFull);
  }
  分析和说明:
  1. 在一个给定的问题中,需要多少个Mutex,多少个Semaphore?有什么规律?
  在本问题中,共需要一个Mutex和两个Semaphore.
  其中,Mutex是用来锁定临界区的,以解决对共享数据的互斥访问问题(无论是对生成者还是对消费者);
  我们共需要两个Semaphore,这是因为在本问题中共有两个稀缺资源.
  第一种是"非空"这种资源,是在消费者之间进行竞争的.
  第二种是"非满"这种资源,是在生产者之间进行竞争的.
  所以,一般来说,需要锁定临界区,就需要Mutex;有几种稀缺资源就需要几个Semaphore.
  对稀缺资源的分析不能想当然.稀缺资源不一定是指被共享的资源,很多时候是指线程会被阻塞的条件(除了要进临界区被阻塞外).
  本例中,消费者会在缓冲区为空时被阻塞,所以"非空"是一种稀缺资源;
  生产者会在缓冲区为满时被阻塞,所以"非满"也是一种稀缺资源.
  2. 在对临界区加锁和等待信号量的顺序上有什么要求和规律?
  这里要说两点:
  第一,不要将等待信号量的语句放在被锁定的临界区内,这样会造成死锁.而且这也是很没有必要的.
  比如,消费者在缓冲区没有数据的时候进入临界区,这样就会把临界区锁上,由于没有数据,消费者也会被锁上.
  这时,任何生产者都会由于临界区被锁上而被block住,这样就造成了死锁.
  第二,如果有多个Semaphore需要等待,那么每个线程中,最好对这多个信号量进行等待的顺序一致,
  不然的话很容易造成死锁.
  3.  什么样操作适合放在临界区,什么样的不适合?
  一般来说,临界区中只放对共享数据进行访问的语句,这样会改善程序的性能.
  很多时候,取出共享数据的副本后,对副本进行费时的各种操作就不需要放在临界区了.
  比如,本例中的sleep语句就根本不需要放入临界区.
  八.教你如何测试循环缓冲区(代码我是参考DivX播放器源代码-playa-0.3.3src.zip这个开源软件的)
  这个缓冲区主要包括这两个文件:Queue.h,Queue.cpp(具体代码请参考程序)
  主要函数:
  RingBuff::RingBuff()
  {
  read_pos  = 0;
  write_pos = 0;
  }
  void RingBuff::ring_read(unsigned char *data, int size)
  {  
  MutexBuff.lock();
  if(write_pos = read_pos) {
  if(write_pos + size  read_pos) {
  if(write_pos + size writeCapBuff(pBuffer,320*240*4);//将视频数据放入到每一个缓冲区
  parent->readCapBuff(testbuffer,320*240*4);//从每一个缓冲区中取得视频数据
  parent->writeEnCodebuff(testbuffer,320*240*4);//把从第一个缓冲区中取得的视频数据放到第二个视频缓冲区中
  parent->readEnCodebuff(testbuffer2,320*240*4);//从每二个视频缓冲区中取出视频数据
  //add--->>把从第二个视频缓冲区中取出的视频数据显示出来
  //QImage image(testbuffer,320,240,QImage::Format_RGB32);
  //QImage image(pBuffer,320,240,QImage::Format_RGB32);
  QImage image(testbuffer2,320,240,QImage::Format_RGB32);
  QPixmap pixmap;
  pixmap=pixmap.fromImage(image);
  parent->label->setPixmap(pixmap);
  parent->label->setFixedSize(pixmap.width(),pixmap.he ight()); 
  //add
  }
  }
  //第二种情况(在两个线程中分别执行)
  void CapThread::run(){
  for(;;){
  v4l_grab_movie(&v4l_dev);
  unsigned char *pBuffer= v4l_dev.buffer;
  parent->writeCapBuff(pBuffer,320*240*4);//将视频数据放入到每一个缓冲区
  parent->readCapBuff(testbuffer,320*240*4);//从每一个缓冲区中取得视频数据
  parent->writeEnCodebuff(testbuffer,320*240*4);//把从第一个缓冲区中取得的视频数据放到第二个视频缓冲区中
  }
  }
  void CXvidDec::run()
  {for(;;){
  parent->readEnCodebuff(getEnCodeBuff,320*240*4);
  //Decode(getEnCodeBuff, 320*240*4) ; 
  //v4l_save_pnm(m_image, 320, 240, 3); 
  //add_display
  //QImage image(m_image,320,240,QImage::Format_RGB32);
  QImage image(getEnCodeBuff,320,240,QImage::Format_RGB32);
  QPixmap pixmap;
  pixmap=pixmap.fromImage(image);
  parent->label->setPixmap(pixmap);
  parent->label->setFixedSize(pixmap.width(),pixmap.he ight()); 
  //ddd_display
  }
  }
  我们要测试 ring_read()与ring_write()是否符合我们的要求,本人设计了一种简单的测试方法:就是用ring_write()把采集到的视频数据放到缓冲区中,然后用ring_read()从缓冲区中读取数据,然后将数据保存成一张图片,如果图片是输入的图像就证明了缓冲区的代码是正确的,由于我使用的队列缓冲区完全是由生产者驱动的,就是说队列的推进的速度等于生产者生产产品的速度,消费者不一定必必须消费每一个产品,在一定程序上,丢失某些数据是允许的
  循环缓冲区由一个固定大小的内存缓冲区构成,进程使用这个内存缓冲区进行日志记录。顾名思义,该缓冲区采用循环的方式进行实现。当该缓冲区填满了数据时,无需为新的数据分配更多的内存,而是从缓冲区开始的位置对其进行写操作,因此将覆盖以前的内容。
  1)对循环缓冲区进行写操作
  2)注意事项---在多线程程序中使用循环缓冲区
  这个部分介绍了在多线程应用程序中使用循环缓冲区启时需要考虑的一些重要方面。 
  在访问一个公共的资源时,同步 始终是多线程程序不可缺少的部分。因为每个线程都试图对全局空间进行写操作,所以必须确保它们同步地写入内存,否则消息就会遭到破坏。通常,每个线程在写入缓冲区之前都持有一个锁,在完成操作时释放该锁。您可以下载一个使用锁对内存进行写操作的循环缓冲区示例。
  这种方法具有以下的缺点:如果您的应用程序中包含几个线程,并且每个线程都在进行访问缓冲区,那么该进程的整体性能将会受到影响,因为这些线程将在获得和释放锁上花费了大部分的时间。 
  通过使得每个线程将数据写入到它自己的内存块,就可以完全避免同步问题。当收到来自用户的转储数据的请求时,每个线程获得一个锁,并将其转储到中心 位置。因为仅在将数据刷新到磁盘时获得锁,所以性能并不会受到很大的影响。
  九.每二种缓冲方法,先建多个buffer,然后将这些buffer构成bufferpool(本人觉得这种方法比较好)
  class CBuffer
  {
  BYTE *          m_pbBuffer ;            //  buffer pointer for data
  DWORD           m_dwBufferLength ;      //  allocated buffer length
  DWORD           m_dwPayloadLength ;     //  actual data length; m_pbBuffer != NULL)
  delete [] m_pbBuffer;
  m_pbBuffer = pBuffer;
  }
  DWORD   GetBufferLength ()              { return m_dwBufferLength ; }
  DWORD   GetPayloadLength ()             { return m_dwPayloadLength ; }
  void    SetPayloadLength (IN DWORD dw)  { ASSERT (dw  hEvent = NULL ;
  pBlockRequest -> pBuffer = NULL ;
  }
  return pBlockRequest ;
  }
  //  recycles the given block request
  void
  RecycleRequestLocked_ (
  IN  BLOCK_REQUEST * pBlockRequest
  )
  {
  InsertHeadList (& m_RequestPool, & pBlockRequest -> ListEntry) ;
  }
  public :
  CBufferPool (
  IN  DWORD       dwPoolSize,     //  number of buffers to allocate
  IN  DWORD       dwBufferLength, //  allocated length of each buffer
  OUT HRESULT *   phr             //  success/failure
  ) ;
  ~CBufferPool (
  ) ;
  DWORD GetBufferAllocatedLength ()   { return m_dwBufferAllocatedLength ; }
  void
  Recycle (
  CBuffer *
  ) ;
  CBuffer *
  GetBuffer (
  IN  HANDLE  hEvent,                 //  manual reset
  IN  DWORD   dwTimeout = INFINITE
  ) ;
  } ;
  #endif  //  __buffpool_h
  以上这些代码是我宿舍的成哥所写的,听说是从directshow下的例子所带的,我就参考了这个自己再搞一个qt下的,以下代码是我自己写的
  buffer.h文件
  #include 
  #define MAXBUFSIZE  320*240*4
  class Buffer  
  {
  public:
  Buffer();
  virtual ~Buffer();
  char *getBuf();
  int getSize();
  void setSize(int s);
  //int getTag();
  //void setTag(int stop);
  void lockBuf();
  void unlockBuf();
  int capacity();
  int getAvailableSpace();
  //unsigned char*buf;
  unsigned char *buf;
  //char buf[MAXBUFSIZE];
  int size; 
  //synchronization
  //#ifdef WIN32
  //       CCriticalSection mutex;
  //#else
  //       pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
  //#endif
  //       int stopTag; //flag to indicate I/O operation stops or fails on this buffer
  public:
  //unsigned char*   m_pbBuffer;
  int  m_dwBufferLength ;
  int  m_dwPayloadLength ;
  QMutex BufferMutex;
  };
  buffer.cpp文件
  #include "buffer.h"
  #include 
  #include  
  Buffer::Buffer():m_dwBufferLength(MAXBUFSIZE)
  {
  buf = (unsigned char*)malloc(m_dwBufferLength) ;
  }
  Buffer::~Buffer()
  {
  delete buf ;
  }
  char* Buffer::getBuf()
  {
  //return &buf[0];
  }
  int Buffer::getSize()
  {
  return size;
  }
  void Buffer::setSize(int s)
  {
  size = s;
  }
  //int Buffer::getTag()
  //
  //       return stopTag;
  //} 
  //void Buffer::setTag(int stop)
  //{
  //       stopTag = stop;
  //}
  int Buffer::capacity()
  {
  return sizeof(buf);
  }
  int Buffer::getAvailableSpace()
  {
  return sizeof(buf)-size;
  }
  void Buffer::lockBuf()
  {
  BufferMutex.lock();
  }
  void Buffer::unlockBuf()
  {
  BufferMutex.unlock();
  }
  QueueBuffer.h文件
  #include 
  #include 
  #include "buffer.h"
  #define BUFFERNUM 5
  class BufferQueue  
  {
  public:
  BufferQueue();
  virtual ~BufferQueue();
  Buffer *getReadBuffer();
  void getWriteBuffer(unsigned char *e,int len);
  int IsEmpty() const {return readPos == writePos ;}//&& tag == 0; }   //if Queue is empty
  int IsFull() const {return readPos == (writePos+1)%BUFFERNUM;}//&&tag ==1;    //if Queue is full
  //bool moveReadBuffer(bool);
  //bool moveWriteBuffer(bool);
  //void invalidate();
  QMutex QueueBufferMutex;
  QWaitCondition bufferNotEmpty;//用于信号等待
  QWaitCondition bufferNotFull;//用于信号等待
  int numUsedBytes;
  void lockAccessMutex();
  void unLockAccessMutex();
  Buffer buffers[BUFFERNUM];
  int readPos;//int front;
  int writePos;//int rear;
  int num;
  bool validFlag;
  /*Define two events to synchronize between input thread and output thread
  **hFullEvent is defined for the circumstance when the buffer queue is full 
  **and input thread is waiting for output thread to retrieve data from buffer queue
  **hEmptyEvent is defined for the circumstance when the buffer queue is empty 
  **and output thread is waiting for input thread puts data in buffer queue  
  */
  static int numOfBuffers(){return BUFFERNUM;}
  };
  QueueBuffer.cpp文件
  #include "QueueBuffer.h"
  #include 
  BufferQueue::BufferQueue()
  {
  readPos=writePos=NULL;
  numUsedBytes = 0;
  }
  BufferQueue::~BufferQueue()
  {
  }
  void BufferQueue::lockAccessMutex()
  {
  QueueBufferMutex.lock();
  }
  void BufferQueue::unLockAccessMutex()
  {
  QueueBufferMutex.unlock();
  }
  /*
  * This is used by read thread 
  */
  Buffer* BufferQueue::getReadBuffer()
  {        
  Buffer *readBuffer;
  QueueBufferMutex.lock();
  if (numUsedBytes == 0)
  bufferNotEmpty.wait(&QueueBufferMutex);
  QueueBufferMutex.unlock();
  QueueBufferMutex.lock();
  readBuffer=&buffers[readPos];
  readPos = (readPos+1)%numOfBuffers();
  --numUsedBytes;
  bufferNotFull.wakeAll();
  QueueBufferMutex.unlock(); 
  return readBuffer;
  }
  void BufferQueue::getWriteBuffer(unsigned char *e,int len)
  {        
  Buffer *writeBuffer = NULL;
  QueueBufferMutex.lock();
  if (numUsedBytes == numOfBuffers())
  bufferNotFull.wait(&QueueBufferMutex);
  QueueBufferMutex.unlock();
  QueueBufferMutex.lock();
  buffers[writePos].buf=e;
  buffers[writePos].m_dwPayloadLength=len;
  writePos=(writePos+1)%numOfBuffers();
  ++numUsedBytes;
  bufferNotEmpty.wakeAll();
  QueueBufferMutex.unlock();
  }
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics