From e27740688ab7435b2c86a3b0f3895f2ca2d914f0 Mon Sep 17 00:00:00 2001 From: c9s Date: Sat, 24 May 2014 10:17:54 +0800 Subject: [PATCH] Add a basic pthread mutex queue --- include/r3_queue.h | 43 +++++++++++++++ include/r3_workers.h | 14 +++++ src/Makefile.am | 2 +- src/queue.c | 121 +++++++++++++++++++++++++++++++++++++++++++ src/workers.c | 50 ++++++++---------- tests/Makefile.am | 2 + tests/check_queue.c | 117 +++++++++++++++++++++++++++++++++++++++++ tests/check_worker.c | 11 ++++ 8 files changed, 331 insertions(+), 29 deletions(-) create mode 100644 include/r3_queue.h create mode 100644 src/queue.c create mode 100644 tests/check_queue.c diff --git a/include/r3_queue.h b/include/r3_queue.h new file mode 100644 index 0000000..b5d4d93 --- /dev/null +++ b/include/r3_queue.h @@ -0,0 +1,43 @@ +/* + * r3_queue.h + * Copyright (C) 2014 c9s + * + * Distributed under terms of the MIT license. + */ + +#ifndef R3_QUEUE_H +#define R3_QUEUE_H + +#include +#include +#include +#include + +struct _queue_node { + void *data; + struct _queue_node * next; +}; +typedef struct _queue_node queue_node; + +typedef struct { + queue_node * first; + queue_node * last; +} queue; + +// create and return the queue +queue * queue_factory(void); + +// destory the queue (free all the memory associate with the que even the data) +void queue_destroy(queue * que); + + +// enque the data into queue +// data is expected to a pointer to a heap allocated memory +int enque(queue * que, void * data); + +// return the data from the que (FIFO) +// and free up all the internally allocated memory +// but the user have to free the returning data pointer +void * deque(queue * que); + +#endif /* !R3_QUEUE_H */ diff --git a/include/r3_workers.h b/include/r3_workers.h index 7468977..cf1bba1 100644 --- a/include/r3_workers.h +++ b/include/r3_workers.h @@ -8,5 +8,19 @@ #ifndef R3_WORKERS_H #define R3_WORKERS_H +#include +#include +#include + +typedef struct { + int thread_id; + node * matched_node; +} feedback_payload; + +void r3_feedback_worker_init(pthread_t * t, feedback_payload * data); + +void r3_worker_cancel(pthread_t t); + +void r3_worker_exit(); #endif /* !R3_WORKERS_H */ diff --git a/src/Makefile.am b/src/Makefile.am index 826792f..63ba53b 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1,6 +1,6 @@ lib_LTLIBRARIES = libr3.la # lib_LIBRARIES = libr3.a -libr3_la_SOURCES = node.c edge.c str.c token.c zmalloc.c workers.c +libr3_la_SOURCES = node.c edge.c str.c token.c zmalloc.c workers.c queue.c # libr3_la_LDFLAGS = -export-symbols-regex '^r3_|^match_' libr3_la_LIBADD=$(DEPS_LIBS) diff --git a/src/queue.c b/src/queue.c new file mode 100644 index 0000000..2f8f785 --- /dev/null +++ b/src/queue.c @@ -0,0 +1,121 @@ +/* + * queue.c + * Copyright (C) 2014 c9s + * + * Distributed under terms of the MIT license. + */ +#include "r3_queue.h" + +pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + +/** + * create and return a new queue + **/ +queue * queue_factory() +{ + queue * new_queue = malloc(sizeof(queue)); + if(new_queue == NULL) { + fprintf(stderr, "Malloc failed creating the que\n"); + return NULL; + } + new_queue->first = NULL; + new_queue->last = NULL; + return new_queue; +} + +void queue_destroy(queue * que) +{ + if(que == NULL) { + return; + } + + pthread_mutex_lock(&mutex); + if(que->first == NULL) { + // ("que->first == NULL .... \n"); + free(que); + pthread_mutex_unlock(&mutex); + return; + } + + // ("que is there lets try to free it...\n"); + + queue_node * _node = que->first; + + while(_node != NULL) { + // freeing the data coz it's on the heap and no one to free it + // except for this one + // ("freeing : %s\n", (char *)_node->data); + free(_node->data); + queue_node *tmp = _node->next; + free(_node); + _node = tmp; + } + + free(que); + + pthread_mutex_unlock(&mutex); +} + +/** + * que is a queue pointer + * data is a heap allocated memory pointer + */ +int enque(queue * que, void * data) +{ + queue_node * new_node = malloc(sizeof(queue_node)); + if(new_node == NULL) { + fprintf(stderr, "Malloc failed creating a queue_node\n"); + return -1; + } + // assumming data is in heap + new_node->data = data; + new_node->next = NULL; + + pthread_mutex_lock(&mutex); + if (que->first == NULL) { + // new que + que->first = new_node; + que->last = new_node; + } else { + que->last->next = new_node; + que->last = new_node; + } + pthread_mutex_unlock(&mutex); + + return 0; +} + +void * deque(queue * que) +{ + // print("Entered to deque\n"); + if (que == NULL) { + // print("que is null exiting...\n"); + return NULL; + } + + + pthread_mutex_lock(&mutex); + if (que->first == NULL) { + pthread_mutex_unlock(&mutex); + // print("que->first is null exiting...\n"); + return NULL; + } + + void * data; + queue_node * _node = que->first; + if (que->first == que->last) { + que->first = NULL; + que->last = NULL; + } else { + que->first = _node->next; + } + + data = _node->data; + + // print("Freeing _node@ %p", _node); + free(_node); + pthread_mutex_unlock(&mutex); + // print("Exiting deque\n"); + return data; +} + diff --git a/src/workers.c b/src/workers.c index 20035ef..e63512f 100644 --- a/src/workers.c +++ b/src/workers.c @@ -11,37 +11,31 @@ #include #include - -void *PrintHello(void *threadid) -{ - long tid; - tid = (long)threadid; - printf("Hello World! It's me, thread #%ld!\n", tid); - pthread_exit(NULL); -} - -typedef struct { - int thread_id; - node * matched_node; -} feedback_payload; - void *r3_feedback_worker(void * data) { feedback_payload * payload = (feedback_payload*) data; // pointer cast - return NULL; -} - -void r3_launch_feedback_worker(feedback_payload * data) { - int rc; - pthread_t worker_thread; - pthread_attr_t attr; - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - rc = pthread_create(&worker_thread, &attr, r3_feedback_worker, (void*) data); -} - - -void r3_worker_stop() { + pthread_exit(NULL); +} + +void r3_feedback_worker_init(pthread_t * t, feedback_payload * data) { + int rc; + // pthread_t worker_thread; + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + rc = pthread_create(t, &attr, r3_feedback_worker, (void*) data); + + // if the pthread is created, we may free the attr + pthread_attr_destroy(&attr); + + // rc = pthread_join(t, &status); +} + +void r3_worker_cancel(pthread_t t) { + pthread_cancel(t); +} + +void r3_worker_exit() { pthread_exit(NULL); } diff --git a/tests/Makefile.am b/tests/Makefile.am index 0d71368..73f14b8 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -46,6 +46,8 @@ check_slug_SOURCES = check_slug.c TESTS += check_worker check_worker_SOURCES = check_worker.c +TESTS += check_queue +check_queue_SOURCES = check_queue.c check_PROGRAMS = $(TESTS) diff --git a/tests/check_queue.c b/tests/check_queue.c new file mode 100644 index 0000000..10244fa --- /dev/null +++ b/tests/check_queue.c @@ -0,0 +1,117 @@ +/* + * check_queue.c + * Copyright (C) 2014 c9s + * + * Distributed under terms of the MIT license. + */ +#include "r3_queue.h" + +#include +#include +#include +#include + + +#define number_of_threads 6 +#define number_of_threads_d 5 +#define number_of_iters 1000 + +struct _stru { + int number; + int thread_no; + queue * q; +}; + +void * func(void * arg) +{ + struct _stru * args = (struct _stru *) arg; + int number = args->number; + //int th_number = args->thread_no; + queue * q = args->q; + + int i; + srand(time(NULL)); + for(i = 0; i < number; i++) { + char * message = malloc(16); + snprintf(message, 15, "rand: %d", rand()); + enque(q, (void *)message); + } + + return NULL; +} + + +void * func_d(void * args) +{ + queue * q = (queue *) args; + + void * data; + while((data = deque(q)) != NULL) { + char * string = (char *)data; + free(data); + } + return NULL; +} + +START_TEST (test_queue) +{ + queue * q = queue_factory(); + enque(q, (void*) 1); + int i = (int) deque(q); + ck_assert_int_eq(i, 1); +} +END_TEST + +START_TEST (test_queue_threads) +{ + queue * q = queue_factory(); + pthread_t threads[number_of_threads]; + pthread_t thread_d[number_of_threads_d]; + + int i; + struct _stru arg[number_of_threads]; + for(i = 0; i < number_of_threads; i++) { + arg[i].number = number_of_iters; + arg[i].thread_no = i; + arg[i].q = q; + pthread_create(threads+i, NULL, func, (void *)&arg[i]); + } + + for(i = 0; i < number_of_threads_d; i++) { + pthread_create(thread_d+i, NULL, func_d, (void *)q); + } + + for(i = 0; i < number_of_threads; i++) { + pthread_join(*(threads+i), NULL); + } + + for(i = 0; i < number_of_threads_d; i++) { + pthread_join(*(thread_d+i), NULL); + } + queue_destroy(q); +} +END_TEST + + + + + +Suite* r3_suite (void) { + Suite *suite = suite_create("queue"); + TCase *tcase = tcase_create("queue_test"); + tcase_add_test(tcase, test_queue_threads); + tcase_add_test(tcase, test_queue); + tcase_set_timeout(tcase, 30); + suite_add_tcase(suite, tcase); + return suite; +} + +int main (int argc, char *argv[]) { + int number_failed; + Suite *suite = r3_suite(); + SRunner *runner = srunner_create(suite); + srunner_run_all(runner, CK_NORMAL); + number_failed = srunner_ntests_failed(runner); + srunner_free(runner); + return number_failed; +} diff --git a/tests/check_worker.c b/tests/check_worker.c index e1c2dfa..34056e9 100644 --- a/tests/check_worker.c +++ b/tests/check_worker.c @@ -27,6 +27,17 @@ START_TEST (test_feedback_worker) r3_tree_insert_path(n, "/garply/grault/bar", NULL); r3_tree_compile(n); + node *matched; + matched = r3_tree_matchl(n, "/garply/grault/foo", strlen("/garply/grault/foo"), NULL); + + /* + feedback_payload payload; + payload.matched_node = matched; + r3_feedback_worker_init(&payload); + */ + + + r3_tree_free(n); } END_TEST