shm_queue.h

Go to the documentation of this file.
00001 
00005 #ifndef SHM_QUEUE_H
00006 #define SHM_QUEUE_H 1
00007 
00008 #include <pthread.h>
00009 
00010 #include <string>
00011 #include <iostream>
00012 #include <sstream>
00013 
00014 
00030 template<typename T>
00031 class shm_queue_node
00032 {
00033  public:
00034   void* operator new(size_t s, shm_allocator& a)
00035     {
00036       return a.alloc(s);
00037     }
00038   void operator delete(void* p, shm_allocator& a)
00039     {
00040       a.free(p);
00041     }
00042   shm_ptr<T> data;
00043   shm_ptr<shm_queue_node> prev;
00044   shm_ptr<shm_queue_node> next;
00045 };
00046 
00053 template<typename T>
00054 class shm_queue_header
00055 {
00056  public:
00066   shm_queue_header( size_t s = 0 ) : size(s)
00067   {
00068     pthread_mutexattr_t shared_attr; 
00069     pthread_mutexattr_t fast_attr; 
00070     pthread_condattr_t cattr; 
00071     int rc; 
00072     head.set(0);
00073     tail.set(0);
00074     /* initialize a mutex and it's condition variables */
00075     rc = pthread_mutexattr_init(&shared_attr); 
00076     rc = pthread_mutexattr_init(&fast_attr); 
00077     rc = pthread_mutexattr_setpshared(&shared_attr, PTHREAD_PROCESS_SHARED);
00078     std::cout 
00079       << "pthread_mutexattr_setpshared(&shared_attr, PTHREAD_PROCESS_SHARED) => "
00080       << rc
00081       << std::endl
00082       << std::flush;
00083     /*
00084     rc = pthread_mutexattr_settype(&fast_attr, PTHREAD_MUTEX_FAST_NP );
00085     std::cout 
00086       << "pthread_mutexattr_settype(&fast_attr, PTHREAD_MUTEX_FAST_NP ) => "
00087       << rc
00088       << std::endl
00089       << std::flush;
00090     */
00091     rc = pthread_mutex_init( &head_lock, &shared_attr );
00092     //    rc = pthread_mutex_init( &tail_lock, &mattr );
00093     rc = pthread_condattr_init(&cattr); 
00094     rc = pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
00095     std::cout 
00096       << "pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED) => "
00097       << rc
00098       << std::endl
00099       << std::flush;
00100 
00101     rc = pthread_cond_init( &ready_for_push, &cattr ); 
00102     rc = pthread_cond_init( &ready_for_pop, &cattr );     
00103     return ;
00104   }
00105   void* operator new(size_t s, shm_allocator& a)
00106   {
00107       return a.alloc(s);
00108   }
00109   void operator delete(void* p, shm_allocator& a)
00110   {
00111     a.free(p);
00112   }
00113 
00114   size_t size;
00115   shm_ptr<shm_queue_node<T> > head;
00116   shm_ptr<shm_queue_node<T> > tail;
00117   pthread_mutex_t lock;
00118   pthread_mutex_t head_lock;
00119   pthread_mutex_t tail_lock;
00120   pthread_cond_t ready_for_push;
00121   pthread_cond_t ready_for_pop;
00122 };
00123  
00134 template<typename T>
00135 class shm_queue
00136 {
00137  public:
00145   shm_queue( shm_allocator& allocator, const size_t maxsize ) :
00146     a(allocator),
00147     MaxQueueSize( maxsize ),
00148     seg( a.get_segment() ) 
00149   {
00150     shm_header.reset( new (a) shm_queue_header<T>, seg );
00151     return; 
00152   };
00153 
00159   shm_queue( shm_allocator& allocator ) :
00160     a(allocator),
00161     seg( a.get_segment() ) 
00162   {
00163     shm_queue_header<T>* queue_head = static_cast<shm_queue_header<T>* >
00164                                       ((shm_queue_header<T>*)
00165                                       ((long)(seg.get_base())+a.get_overhead()));
00166     
00167     shm_header.reset( queue_head, seg );
00168     return; 
00169   };
00170 
00171   ~shm_queue() { return ; };
00172 
00173   void* operator new(size_t s, shm_allocator& a)
00174   {
00175     return a.alloc(s);
00176   }
00177   void operator delete(void* p, shm_allocator& a)
00178   {
00179     a.free(p);
00180   }
00181 
00182   void push(shm_ptr<T> obj);
00183   shm_ptr<T> pop();
00184 
00187   void lock_head() 
00188   {
00189     shm_queue_header<T>* header = ((shm_queue_header<T> *)(shm_header.get(seg)));
00190     pthread_mutex_lock( &(header->head_lock) );
00191   }
00192 
00195   void unlock_head() 
00196   {    
00197     shm_queue_header<T>* header = ((shm_queue_header<T> *)(shm_header.get(seg)));
00198     pthread_mutex_unlock( &(header->head_lock) );
00199   }
00200 
00203   void lock_tail() 
00204   {
00205     shm_queue_header<T>* header = ((shm_queue_header<T> *)(shm_header.get(seg)));
00206     pthread_mutex_lock( &(header->tail_lock) );
00207   }
00208 
00211   void unlock_tail() 
00212   {    
00213     shm_queue_header<T>* header = ((shm_queue_header<T> *)(shm_header.get(seg)));
00214     pthread_mutex_unlock( &(header->tail_lock) );
00215   }
00216 
00219   size_t size() const 
00220   {
00221     shm_queue_header<T>* header = ((shm_queue_header<T> *)(shm_header.get(seg)));
00222     return header->size;
00223   }
00224  private:
00225   size_t MaxQueueSize;
00226   //  shm_queue_header<T>* header;
00227   shm_ptr< shm_queue_header<T> > shm_header;
00228 
00229   shm_allocator& a;
00230   const shm_segment& seg;
00231  
00232   //disallowed (not implemented)
00233   shm_queue(const shm_queue& copy);
00234   shm_queue& operator=(const shm_queue& rhs);
00235 };
00236  
00237  
00248 template<typename T>
00249 void shm_queue<T>::push(shm_ptr<T> obj)
00250 {
00251   shm_queue_header<T>* header = static_cast<shm_queue_header<T>* >
00252                                       ((shm_queue_header<T>*)
00253                                       ((long)(seg.get_base())+a.get_overhead()));
00254   scoped_lock guard(&(header->head_lock)); 
00255   shm_queue_node<T>* node = new (a) shm_queue_node<T>;
00256   node->data = obj;
00257   node->prev.set(0);
00258 
00259   // queue head is not empty
00260   if ( header->head.off() != 0) {
00261     shm_queue_node<T>* cart = header->head.get( seg );
00262 
00263     std::cout 
00264       << "previous queue head : "
00265       << cart
00266       << std::endl
00267       << std::flush;
00268 
00269     cart->prev.reset( node, seg );
00270     node->next.reset( cart, seg );
00271   } 
00272 
00273   header->head.reset( node, seg );
00274 
00275   // queue tail is empty
00276   if ( header->tail.off() == 0) {
00277     header->tail.reset( node, seg );
00278   }
00279 
00280   header->size += 1;
00281 
00282   std::cout 
00283     << "queue size : "
00284     << header->size
00285     << std::endl
00286     << std::flush;
00287 
00288   // signal any waiting processes
00289   pthread_cond_signal( &header->ready_for_pop );
00290 }
00291 
00301 template<typename T>
00302 shm_ptr<T> shm_queue<T>::pop() {
00303   shm_queue_header<T>* header = static_cast<shm_queue_header<T>* >
00304                                       ((shm_queue_header<T>*)
00305                                       ((long)(seg.get_base())+a.get_overhead()));
00306   scoped_lock guard( &(header->tail_lock) );
00307   if ( 1 > header->size ) {
00308     std::cout 
00309       << "Queue empty : wait until ready for pop."
00310       << header->size
00311       << std::endl
00312       << std::flush;
00313     guard.wait( &(header->ready_for_pop) );
00314   }
00315   // get object at tail of queue
00316   shm_ptr<shm_queue_node<T> > obj = header->tail;
00317   shm_queue_node<T>* node = obj.get(seg);
00318   // head and tail are the same (single queue node)
00319   if ( header->tail.off() == header->head.off() ) {
00320     std::cout 
00321       << "Queue head and tail offsets are identical."
00322       << std::endl
00323       << std::flush;
00324     header->head.set(0);
00325     header->tail.set(0);
00326     header->size = 0;
00327 
00328   } else { 
00329     // set tail to the prior queue node
00330     header->tail = node->prev;
00331     header->size -= 1;
00332   }
00333   if ( header->tail.off() == 0) {
00334 
00335     std::cout 
00336       << "Queue is now empty."
00337       << std::endl
00338       << std::flush;
00339 
00340   }    
00341   pthread_cond_signal( &header->ready_for_push );
00342   return node->data;
00343 }
00344 
00345 #endif
00346 
00347 /*
00348  * $Log: shm_queue.h,v $
00349  * Revision 1.2  2006/09/02 18:47:28  oommoo
00350  * documentation enhancement
00351  *
00352  * Revision 1.1.1.1  2006/08/30 12:14:59  oommoo
00353  * Initial checkin of project library
00354  *
00355  * SharedMemory notes from :
00356  *
00357  * http://home.earthlink.net/~joshwalker1/writing/SharedMemory.html
00358  *
00359  */

Project shmq hosted by   SourceForge.net
Documentation generated on Sat Sep 2 10:07:40 2006 for shmq by   doxygen.org 1.4.6