00001
00005 #ifndef SHM_QUEUE_H
00006 #define SHM_QUEUE_H 1
00007
00008 #include <pthread.h>
00009
00010 #include <string>
00011 #include <iostream>
00012 #include <sstream>
00013
00014
00030 template<typename T>
00031 class shm_queue_node
00032 {
00033 public:
00034 void* operator new(size_t s, shm_allocator& a)
00035 {
00036 return a.alloc(s);
00037 }
00038 void operator delete(void* p, shm_allocator& a)
00039 {
00040 a.free(p);
00041 }
00042 shm_ptr<T> data;
00043 shm_ptr<shm_queue_node> prev;
00044 shm_ptr<shm_queue_node> next;
00045 };
00046
00053 template<typename T>
00054 class shm_queue_header
00055 {
00056 public:
00066 shm_queue_header( size_t s = 0 ) : size(s)
00067 {
00068 pthread_mutexattr_t shared_attr;
00069 pthread_mutexattr_t fast_attr;
00070 pthread_condattr_t cattr;
00071 int rc;
00072 head.set(0);
00073 tail.set(0);
00074
00075 rc = pthread_mutexattr_init(&shared_attr);
00076 rc = pthread_mutexattr_init(&fast_attr);
00077 rc = pthread_mutexattr_setpshared(&shared_attr, PTHREAD_PROCESS_SHARED);
00078 std::cout
00079 << "pthread_mutexattr_setpshared(&shared_attr, PTHREAD_PROCESS_SHARED) => "
00080 << rc
00081 << std::endl
00082 << std::flush;
00083
00084
00085
00086
00087
00088
00089
00090
00091 rc = pthread_mutex_init( &head_lock, &shared_attr );
00092
00093 rc = pthread_condattr_init(&cattr);
00094 rc = pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
00095 std::cout
00096 << "pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED) => "
00097 << rc
00098 << std::endl
00099 << std::flush;
00100
00101 rc = pthread_cond_init( &ready_for_push, &cattr );
00102 rc = pthread_cond_init( &ready_for_pop, &cattr );
00103 return ;
00104 }
00105 void* operator new(size_t s, shm_allocator& a)
00106 {
00107 return a.alloc(s);
00108 }
00109 void operator delete(void* p, shm_allocator& a)
00110 {
00111 a.free(p);
00112 }
00113
00114 size_t size;
00115 shm_ptr<shm_queue_node<T> > head;
00116 shm_ptr<shm_queue_node<T> > tail;
00117 pthread_mutex_t lock;
00118 pthread_mutex_t head_lock;
00119 pthread_mutex_t tail_lock;
00120 pthread_cond_t ready_for_push;
00121 pthread_cond_t ready_for_pop;
00122 };
00123
00134 template<typename T>
00135 class shm_queue
00136 {
00137 public:
00145 shm_queue( shm_allocator& allocator, const size_t maxsize ) :
00146 a(allocator),
00147 MaxQueueSize( maxsize ),
00148 seg( a.get_segment() )
00149 {
00150 shm_header.reset( new (a) shm_queue_header<T>, seg );
00151 return;
00152 };
00153
00159 shm_queue( shm_allocator& allocator ) :
00160 a(allocator),
00161 seg( a.get_segment() )
00162 {
00163 shm_queue_header<T>* queue_head = static_cast<shm_queue_header<T>* >
00164 ((shm_queue_header<T>*)
00165 ((long)(seg.get_base())+a.get_overhead()));
00166
00167 shm_header.reset( queue_head, seg );
00168 return;
00169 };
00170
00171 ~shm_queue() { return ; };
00172
00173 void* operator new(size_t s, shm_allocator& a)
00174 {
00175 return a.alloc(s);
00176 }
00177 void operator delete(void* p, shm_allocator& a)
00178 {
00179 a.free(p);
00180 }
00181
00182 void push(shm_ptr<T> obj);
00183 shm_ptr<T> pop();
00184
00187 void lock_head()
00188 {
00189 shm_queue_header<T>* header = ((shm_queue_header<T> *)(shm_header.get(seg)));
00190 pthread_mutex_lock( &(header->head_lock) );
00191 }
00192
00195 void unlock_head()
00196 {
00197 shm_queue_header<T>* header = ((shm_queue_header<T> *)(shm_header.get(seg)));
00198 pthread_mutex_unlock( &(header->head_lock) );
00199 }
00200
00203 void lock_tail()
00204 {
00205 shm_queue_header<T>* header = ((shm_queue_header<T> *)(shm_header.get(seg)));
00206 pthread_mutex_lock( &(header->tail_lock) );
00207 }
00208
00211 void unlock_tail()
00212 {
00213 shm_queue_header<T>* header = ((shm_queue_header<T> *)(shm_header.get(seg)));
00214 pthread_mutex_unlock( &(header->tail_lock) );
00215 }
00216
00219 size_t size() const
00220 {
00221 shm_queue_header<T>* header = ((shm_queue_header<T> *)(shm_header.get(seg)));
00222 return header->size;
00223 }
00224 private:
00225 size_t MaxQueueSize;
00226
00227 shm_ptr< shm_queue_header<T> > shm_header;
00228
00229 shm_allocator& a;
00230 const shm_segment& seg;
00231
00232
00233 shm_queue(const shm_queue& copy);
00234 shm_queue& operator=(const shm_queue& rhs);
00235 };
00236
00237
00248 template<typename T>
00249 void shm_queue<T>::push(shm_ptr<T> obj)
00250 {
00251 shm_queue_header<T>* header = static_cast<shm_queue_header<T>* >
00252 ((shm_queue_header<T>*)
00253 ((long)(seg.get_base())+a.get_overhead()));
00254 scoped_lock guard(&(header->head_lock));
00255 shm_queue_node<T>* node = new (a) shm_queue_node<T>;
00256 node->data = obj;
00257 node->prev.set(0);
00258
00259
00260 if ( header->head.off() != 0) {
00261 shm_queue_node<T>* cart = header->head.get( seg );
00262
00263 std::cout
00264 << "previous queue head : "
00265 << cart
00266 << std::endl
00267 << std::flush;
00268
00269 cart->prev.reset( node, seg );
00270 node->next.reset( cart, seg );
00271 }
00272
00273 header->head.reset( node, seg );
00274
00275
00276 if ( header->tail.off() == 0) {
00277 header->tail.reset( node, seg );
00278 }
00279
00280 header->size += 1;
00281
00282 std::cout
00283 << "queue size : "
00284 << header->size
00285 << std::endl
00286 << std::flush;
00287
00288
00289 pthread_cond_signal( &header->ready_for_pop );
00290 }
00291
00301 template<typename T>
00302 shm_ptr<T> shm_queue<T>::pop() {
00303 shm_queue_header<T>* header = static_cast<shm_queue_header<T>* >
00304 ((shm_queue_header<T>*)
00305 ((long)(seg.get_base())+a.get_overhead()));
00306 scoped_lock guard( &(header->tail_lock) );
00307 if ( 1 > header->size ) {
00308 std::cout
00309 << "Queue empty : wait until ready for pop."
00310 << header->size
00311 << std::endl
00312 << std::flush;
00313 guard.wait( &(header->ready_for_pop) );
00314 }
00315
00316 shm_ptr<shm_queue_node<T> > obj = header->tail;
00317 shm_queue_node<T>* node = obj.get(seg);
00318
00319 if ( header->tail.off() == header->head.off() ) {
00320 std::cout
00321 << "Queue head and tail offsets are identical."
00322 << std::endl
00323 << std::flush;
00324 header->head.set(0);
00325 header->tail.set(0);
00326 header->size = 0;
00327
00328 } else {
00329
00330 header->tail = node->prev;
00331 header->size -= 1;
00332 }
00333 if ( header->tail.off() == 0) {
00334
00335 std::cout
00336 << "Queue is now empty."
00337 << std::endl
00338 << std::flush;
00339
00340 }
00341 pthread_cond_signal( &header->ready_for_push );
00342 return node->data;
00343 }
00344
00345 #endif
00346
00347
00348
00349
00350
00351
00352
00353
00354
00355
00356
00357
00358
00359