Markus Rechberger / C / Thread Queues
 
StartSeite | MarkusRechberger/ C/ | Neues | TestSeite | ForumSeite | Teilnehmer | Kategorien | Index | Hilfe | Einstellungen | Ändern

Queues/Threads

diese lösung hat mir recht gut gefallen :-)

threadqueue.h
#ifndef _THREADQUEUE_H_
#define _THREADQUEUE_H_ 1

#include <pthread.h>
/**
 * @defgroup ThreadQueue ThreadQueue
 *
 * Little API for waitable queues, typically used for passing messages
 * between threads.
 *
 * @author Nils O. Selåsdal <NOS@Utel.no>
 */

/**
 * @mainpage
 *  @htmlonly
 *  
 *  Copyright (c) 2002-2003 Nils O. Selåsdal <NOS@Utel.no>.
 *  All rights reserved, all wrongs reversed.
 *
 *  Redistribution and use in source and binary forms, with or without
 *  modification, are permitted provided that the following conditions
 *  are met:
 *
 *  1. Redistributions of source code must retain the above copyright
 *     notice, this list of conditions and the following disclaimer.
 *  2. Redistributions in binary form must reproduce the above copyright
 *     notice, this list of conditions and the following disclaimer in the
 *     documentation and/or other materials provided with the distribution.
 *
 *  THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
 *  IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
 *  OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
 *  IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
 *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
 *  NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
 *  THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 *  
* @endhtmlonly */ /** * A thread message. * * @ingroup ThreadQueue * * This is used for passing to #thread_queue_get for retreive messages. * the date is stored in the data member, the message type in the #msgtype. * * Typical: * @code * struct threadmsg; * struct myfoo *foo; * while(1) * ret = thread_queue_get(&queue,NULL,&message); * .. * foo = msg.data; * switch(msg.msgtype){ * ... * } * } * @endcode * */ struct threadmsg{ /** * Holds the data. */ void *data; /** * Holds the messagetype */ long msgtype; }; /** * A TthreadQueue * * @ingroup ThreadQueue * * You should threat this struct as opaque, never ever set/get any * of the variables. You have been warned. */ struct threadqueue { /** * Length of the queue, never set this, never read this. * Use #threadqueue_length to read it. */ long length; /** * Mutex for the queue, never touch. */ pthread_mutex_t mutex; /** * Condition variable for the queue, never touch. */ pthread_cond_t cond; /** * Internal pointers for the queue, never touch. */ struct msglist *first,*last; }; /** * Initializes a queue. * * @ingroup ThreadQueue * * thread_queue_init initializes a new threadqueue. A new queue must always * be initialized before it is used. * * @param queue Pointer to the queue that should be initialized * @return 0 on success see pthread_mutex_init */ int thread_queue_init(struct threadqueue *queue); /** * Adds a message to a queue * * @ingroup ThreadQueue * * thread_queue_add adds a "message" to the specified queue, a message * is just a pointer to a anything of the users choice. Nothing is copied * so the user must keep track on (de)allocation of the data. * A message type is also specified, it is not used for anything else than * given back when a message is retreived from the queue. * * @param queue Pointer to the queue on where the message should be added. * @param data the "message". * @param msgtype a long specifying the message type, choice of the user. * @return 0 on succes ENOMEM if out of memory EINVAL if queue is NULL */ int thread_queue_add(struct threadqueue *queue, void *data, long msgtype); /** * Gets a message from a queue * * @ingroup ThreadQueue * * thread_queue_get gets a message from the specified queue, it will block * the caling thread untill a message arrives, or the (optional) timeout occurs. * If timeout is NULL, there will be no timeout, and thread_queue_get will wait * untill a message arrives. * * struct timespec is defined as: * @code * struct timespec { * long tv_sec; // seconds * long tv_nsec; // nanoseconds * }; * @endcode * * @param queue Pointer to the queue to wait on for a message. * @param timeout timeout on how long to wait on a message * @param msg pointer that is filled in with mesagetype and data * * @return 0 on success EINVAL if queue is NULL ETIMEDOUT if timeout occurs */ int thread_queue_get(struct threadqueue *queue, const struct timespec *timeout, struct threadmsg *msg); /** * Gets the length of a queue * * @ingroup ThreadQueue * * threadqueue_length returns the number of messages waiting in the queue * * @param queue Pointer to the queue for which to get the length * @return the length(number of pending messages) in the queue */ long thread_queue_length( struct threadqueue *queue ); /** * @ingroup ThreadQueue * Cleans up the queue. * * threadqueue_cleanup cleans up and destroys the queue. * This will remove all messages from a queue, and reset it. If * freedata is != 0 free(3) will be called on all pending messages in the queue * You cannot call this if there are someone currently adding or getting messages * from the queue. * After a queue have been cleaned, it cannot be used again untill #thread_queue_init * has been called on the queue. * * @param queue Pointer to the queue that should be cleaned * @param freedata set to nonzero if free(3) should be called on remaining * messages * @return 0 on success EINVAL if queue is NULL EBUSY if someone is holding any locks on the queue */ int thread_queue_cleanup(struct threadqueue *queue, int freedata); #endif

threadqueue.c
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <pthread.h>
#include <sys/time.h>
#include "threadqueue.h"

struct msglist {
        struct threadmsg msg;
        struct msglist *next;
};

int thread_queue_init(struct threadqueue *queue)
{
        int ret = 0;
        if (queue == NULL) {
                return EINVAL;
        }
        // assure a queue is proper initialized, else Bad Things might happen
        memset(queue, 0, sizeof(struct threadqueue));
        ret = pthread_cond_init(&queue->cond, NULL);
        if(ret != 0){
                return ret;
        }

        ret = pthread_mutex_init(&queue->mutex, NULL);
        if(ret != 0){
                pthread_cond_destroy(&queue->cond);
                return ret;
        }

        return 0;

}

int thread_queue_add(struct threadqueue *queue, void *data, long msgtype)
{
        struct msglist *newmsg;

        newmsg = malloc(sizeof(struct msglist));
        if (newmsg == NULL) {
                return ENOMEM;
        }
        newmsg->msg.data = data;
        newmsg->msg.msgtype = msgtype;

        newmsg->next = NULL;

        pthread_mutex_lock(&queue->mutex);
        if (queue->last == NULL) {
                queue->last = newmsg;
                queue->first = newmsg;
        } else {
                queue->last->next = newmsg;
                queue->last = newmsg;
        }

        queue->length++;
        //wake other threads.
        pthread_cond_broadcast(&queue->cond);
        pthread_mutex_unlock(&queue->mutex);

        return 0;

}

int thread_queue_get(struct threadqueue *queue, const struct timespec *timeout, struct threadmsg *msg)
{
        struct msglist *firstrec;
        int ret = 0;

        if (queue == NULL || msg == NULL) {
                return EINVAL;
        }

        pthread_mutex_lock(&queue->mutex);

        /* Will wait until awakened by a signal or broadcast */
        while (queue->first == NULL && ret != ETIMEDOUT) { //Not 100% sure we need this loop
                if (timeout) {                             // is there anyway we got woken up, and the queue would still
                        struct timeval now;                // be empty? Doesn't hurt though
                        struct timespec abstimeout;

                        gettimeofday(&now, NULL);
                        abstimeout.tv_sec = now.tv_sec + timeout->tv_sec;
                        abstimeout.tv_nsec = (now.tv_usec * 1000) + timeout->tv_nsec;
                              if (abstimeout.tv_nsec >= 1000000000) {
                                      abstimeout.tv_sec++;
                                      abstimeout.tv_nsec -= 1000000000;
                              }
                        ret = pthread_cond_timedwait(&queue->cond, &queue->mutex, &abstimeout);
                } else {
                        pthread_cond_wait(&queue->cond, &queue->mutex);

                }
        }
        if (ret == ETIMEDOUT) {
                pthread_mutex_unlock(&queue->mutex);
                return ret;
        }

        firstrec = queue->first;
        queue->first = queue->first->next;
        queue->length--;

        if (queue->first == NULL) {
                queue->last = NULL; // we know this since we hold the lock
                queue->length = 0;
        }
        pthread_mutex_unlock(&queue->mutex);
        //no need to do this inside the lock
        msg->data = firstrec->msg.data;
        msg->msgtype = firstrec->msg.msgtype;
        free(firstrec);



        return 0;


}
//maybe caller should supply a callback for cleaning the elements ?
int thread_queue_cleanup(struct threadqueue *queue, int freedata)
{
        struct msglist *rec;
        struct msglist *next;
        int ret;
        if(queue == NULL){
                return EINVAL;
        }

        pthread_mutex_lock(&queue->mutex);
        rec = queue->first;

        while (rec) {
                next = rec->next;
                if (freedata) {

                        free(rec->msg.data);
                }
                free(rec);
                rec = next;
        }

        pthread_mutex_unlock(&queue->mutex);
        ret = pthread_mutex_destroy(&queue->mutex);
        pthread_cond_destroy(&queue->cond);

        return ret;

}
long thread_queue_length(struct threadqueue *queue)
{
        long counter;
        // get the length properly
        pthread_mutex_lock(&queue->mutex);
        counter = queue->length;
        pthread_mutex_unlock(&queue->mutex);
        return counter;

}

Codebeispiel:
#include <stdio.h>
#include "threadqueue.h"

void runquery(void *test){
    struct threadmsg msg;
    int ret=0;
    struct myfoo *foo;
    while(1){
            printf("threaded queue\n");
            ret = thread_queue_get((struct threadqueue*)test,NULL,&msg);
            printf("done!\n");
            printf("MSG type: %u\n",msg.msgtype); // und hier das free wenn strdup angewandt wurde.. 
            if(msg.msgtype==0){
                    printf("exiting thread!\n");
                    pthread_exit(0);
            }
    }
}


int main(){
        struct threadqueue thqueue;
        pthread_t pg_thread;
        int i=0;
        thread_queue_init(&thqueue);
        pthread_create(&pg_thread, NULL,(void*)&runquery,&thqueue);
        for(i=0;i<=1000;i++){
        thread_queue_add(&thqueue,(void*)"testing",1); //hier eventuell n strdup!
        }
        sleep(3);
        printf("exiting!\n");
        thread_queue_add(&thqueue,NULL,0);
        pthread_join(pg_thread,NULL);
        thread_queue_cleanup(&thqueue,0);

        return(0);
}

Valgrind Ausgabe:
 ==29065== warning: Valgrind's pthread_cond_destroy is incomplete
 ==29065==          (it doesn't check if the cond is waited on)
 ==29065==          your program may misbehave as a result
 ==29065== 
 ==29065== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 15 from 1)
 --29065-- 
 --29065-- supp:   15 Ugly strchr error in /lib/ld-2.3.2.so
 ==29065== malloc/free: in use at exit: 216 bytes in 2 blocks.
 ==29065== malloc/free: 1009 allocs, 1007 frees, 13860 bytes allocated.
 ==29065== 
 ==29065== searching for pointers to 2 not-freed blocks.
 ==29065== checked 1620932 bytes.
 ==29065== 
 ==29065== 16 bytes in 1 blocks are still reachable in loss record 1 of 2
 ==29065==    at 0x1B905901: calloc (vg_replace_malloc.c:176)
 ==29065==    by 0x1BA7C4F1: (within /lib/tls/libdl-2.3.2.so)
 ==29065==    by 0x1BA7C0F3: dlsym (in /lib/tls/libdl-2.3.2.so)
 ==29065==    by 0x1B910BF9: pthread_create (vg_libpthread.c:1168)
 ==29065== 
 ==29065== LEAK SUMMARY:
 ==29065==    definitely lost: 0 bytes in 0 blocks.
 ==29065==    possibly lost:   0 bytes in 0 blocks.
 ==29065==    still reachable: 16 bytes in 1 blocks.
 ==29065==         suppressed: 200 bytes in 1 blocks.
 --29065--     TT/TC: 0 tc sectors discarded.
 --29065--            2779 tt_fast misses.
 --29065-- translate: new     2767 (47949 -> 611117; ratio 127:10)
 --29065--            discard 0 (0 -> 0; ratio 0:10).
 --29065-- chainings: 1832 chainings, 0 unchainings.
 --29065--  dispatch: 50000 jumps (bb entries); of them 112986 (225%) unchained.
 --29065--            3078/14321 major/minor sched events.
 --29065-- reg-alloc: 536 t-req-spill, 113567+3917 orig+spill uis,
 --29065--            14669 total-reg-rank
 --29065--    sanity: 3077 cheap, 124 expensive checks.
 --29065--    ccalls: 10475 C calls, 54% saves+restores avoided (33382 bytes)
 --29065--            14152 args, avg 0.87 setup instrs each (3640 bytes)
 --29065--            0% clear the stack (31329 bytes)
 --29065--            4307 retvals, 29% of reg-reg movs avoided (2494 bytes)


StartSeite | MarkusRechberger/ C/ | Neues | TestSeite | ForumSeite | Teilnehmer | Kategorien | Index | Hilfe | Einstellungen | Ändern
Text dieser Seite ändern (zuletzt geändert: 21. Februar 2005 22:19 (diff))
Suchbegriff: gesucht wird
im Titel
im Text