ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/gclib/gclib/GYarn.cpp
Revision: 107
Committed: Tue Oct 11 12:31:50 2011 UTC (12 years, 10 months ago) by gpertea
File size: 10703 byte(s)
Log Message:
wip GYarn (threads)

Line User Rev File contents
1 gpertea 107 /* cpp-ification of yarn.c -- generic thread operations implemented using pthread functions
2     * Copyright (C) 2008 Mark Adler
3     * Version 1.1 26 Oct 2008 Mark Adler
4     * For conditions of distribution and use, see copyright notice in yarn.h
5     */
6    
7     /* Basic thread operations implemented using the POSIX pthread library. All
8     pthread references are isolated within this module to allow alternate
9     implementations with other thread libraries. See yarn.h for the description
10     of these operations. */
11    
12     /* Version history:
13     1.0 19 Oct 2008 First version
14     1.1 26 Oct 2008 No need to set the stack size -- remove
15     Add yarn_abort() function for clean-up on error exit
16     */
17    
18     /* for thread portability */
19     #define _POSIX_PTHREAD_SEMANTICS
20     #define _REENTRANT
21    
22     /* external libraries and entities referenced */
23     #include <stdio.h> /* fprintf(), stderr */
24     #include <stdlib.h> /* exit(), malloc(), free(), NULL */
25     #include <pthread.h> /* pthread_t, pthread_create(), pthread_join(), */
26     /* pthread_attr_t, pthread_attr_init(), pthread_attr_destroy(),
27     PTHREAD_CREATE_JOINABLE, pthread_attr_setdetachstate(),
28     pthread_self(), pthread_equal(),
29     pthread_mutex_t, PTHREAD_MUTEX_INITIALIZER, pthread_mutex_init(),
30     pthread_mutex_lock(), pthread_mutex_unlock(), pthread_mutex_destroy(),
31     pthread_cond_t, PTHREAD_COND_INITIALIZER, pthread_cond_init(),
32     pthread_cond_broadcast(), pthread_cond_wait(), pthread_cond_destroy() */
33     #include <errno.h> /* ENOMEM, EAGAIN, EINVAL */
34    
35     /* interface definition */
36     #include "GYarn.h"
37    
38     /* constants */
39     #define local static /* for non-exported functions and globals */
40    
41     /* error handling external globals, resettable by application */
42     char *yarn_prefix = "yarn";
43     void (*yarn_abort)(int) = NULL;
44    
45    
46     /* immediately exit -- use for errors that shouldn't ever happen */
47     local void fail(int err)
48     {
49     fprintf(stderr, "%s: %s (%d) -- aborting\n", yarn_prefix,
50     err == ENOMEM ? "out of memory" : "internal pthread error", err);
51     if (yarn_abort != NULL)
52     yarn_abort(err);
53     exit(err == ENOMEM || err == EAGAIN ? err : EINVAL);
54     }
55    
56     /* memory handling routines provided by user -- if none are provided, malloc()
57     and free() are used, which are therefore assumed to be thread-safe */
58     typedef void *(*malloc_t)(size_t);
59     typedef void (*free_t)(void *);
60     local malloc_t my_malloc_f = malloc;
61     local free_t my_free = free;
62    
63     /* use user-supplied allocation routines instead of malloc() and free() */
64     void yarn_mem(malloc_t lease, free_t vacate)
65     {
66     my_malloc_f = lease;
67     my_free = vacate;
68     }
69    
70     /* memory allocation that cannot fail (from the point of view of the caller) */
71     local void *my_malloc(size_t size)
72     {
73     void *block;
74    
75     if ((block = my_malloc_f(size)) == NULL)
76     fail(ENOMEM);
77     return block;
78     }
79    
80     /* -- lock functions -- */
81    
82     struct lock_s {
83     pthread_mutex_t mutex;
84     pthread_cond_t cond;
85     long value;
86     };
87    
88     lock *new_lock(long initial)
89     {
90     int ret;
91     lock *bolt;
92    
93     bolt = my_malloc(sizeof(struct lock_s));
94     if ((ret = pthread_mutex_init(&(bolt->mutex), NULL)) ||
95     (ret = pthread_cond_init(&(bolt->cond), NULL)))
96     fail(ret);
97     bolt->value = initial;
98     return bolt;
99     }
100    
101     void possess(lock *bolt)
102     {
103     int ret;
104    
105     if ((ret = pthread_mutex_lock(&(bolt->mutex))) != 0)
106     fail(ret);
107     }
108    
109     void release(lock *bolt)
110     {
111     int ret;
112    
113     if ((ret = pthread_mutex_unlock(&(bolt->mutex))) != 0)
114     fail(ret);
115     }
116    
117     void twist(lock *bolt, enum twist_op op, long val)
118     {
119     int ret;
120    
121     if (op == TO)
122     bolt->value = val;
123     else if (op == BY)
124     bolt->value += val;
125     if ((ret = pthread_cond_broadcast(&(bolt->cond))) ||
126     (ret = pthread_mutex_unlock(&(bolt->mutex))))
127     fail(ret);
128     }
129    
130     #define until(a) while(!(a))
131    
132     void wait_for(lock *bolt, enum wait_op op, long val)
133     {
134     int ret;
135    
136     switch (op) {
137     case TO_BE:
138     until (bolt->value == val)
139     if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
140     fail(ret);
141     break;
142     case NOT_TO_BE:
143     until (bolt->value != val)
144     if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
145     fail(ret);
146     break;
147     case TO_BE_MORE_THAN:
148     until (bolt->value > val)
149     if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
150     fail(ret);
151     break;
152     case TO_BE_LESS_THAN:
153     until (bolt->value < val)
154     if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
155     fail(ret);
156     }
157     }
158    
159     long peek_lock(lock *bolt)
160     {
161     return bolt->value;
162     }
163    
164     void free_lock(lock *bolt)
165     {
166     int ret;
167     if ((ret = pthread_cond_destroy(&(bolt->cond))) ||
168     (ret = pthread_mutex_destroy(&(bolt->mutex))))
169     fail(ret);
170     my_free(bolt);
171     }
172    
173     /* -- thread functions (uses lock functions above) -- */
174    
175     struct thread_s {
176     pthread_t id;
177     int done; /* true if this thread has exited */
178     thread *next; /* for list of all launched threads */
179     };
180    
181     /* list of threads launched but not joined, count of threads exited but not
182     joined (incremented by ignition() just before exiting) */
183     local lock threads_lock = {
184     PTHREAD_MUTEX_INITIALIZER,
185     PTHREAD_COND_INITIALIZER,
186     0 /* number of threads exited but not joined */
187     };
188     local thread *threads = NULL; /* list of extant threads */
189    
190     /* structure in which to pass the probe and its payload to ignition() */
191     struct capsule {
192     void (*probe)(void *);
193     void *payload;
194     };
195    
196     /* mark the calling thread as done and alert join_all() */
197     local void reenter(void *dummy)
198     {
199     thread *match, **prior;
200     pthread_t me;
201    
202     /* find this thread in the threads list by matching the thread id */
203     me = pthread_self();
204     possess(&(threads_lock));
205     prior = &(threads);
206     while ((match = *prior) != NULL) {
207     if (pthread_equal(match->id, me))
208     break;
209     prior = &(match->next);
210     }
211     if (match == NULL)
212     fail(EINVAL);
213    
214     /* mark this thread as done and move it to the head of the list */
215     match->done = 1;
216     if (threads != match) {
217     *prior = match->next;
218     match->next = threads;
219     threads = match;
220     }
221    
222     /* update the count of threads to be joined and alert join_all() */
223     twist(&(threads_lock), BY, +1);
224     }
225    
226     /* all threads go through this routine so that just before the thread exits,
227     it marks itself as done in the threads list and alerts join_all() so that
228     the thread resources can be released -- use cleanup stack so that the
229     marking occurs even if the thread is cancelled */
230     local void *ignition(void *arg)
231     {
232     struct capsule *capsule = arg;
233    
234     /* run reenter() before leaving */
235     pthread_cleanup_push(reenter, NULL);
236    
237     /* execute the requested function with argument */
238     capsule->probe(capsule->payload);
239     my_free(capsule);
240    
241     /* mark this thread as done and let join_all() know */
242     pthread_cleanup_pop(1);
243    
244     /* exit thread */
245     return NULL;
246     }
247    
248     /* not all POSIX implementations create threads as joinable by default, so that
249     is made explicit here */
250     thread *launch(void (*probe)(void *), void *payload)
251     {
252     int ret;
253     thread *th;
254     struct capsule *capsule;
255     pthread_attr_t attr;
256    
257     /* construct the requested call and argument for the ignition() routine
258     (allocated instead of automatic so that we're sure this will still be
259     there when ignition() actually starts up -- ignition() will free this
260     allocation) */
261     capsule = my_malloc(sizeof(struct capsule));
262     capsule->probe = probe;
263     capsule->payload = payload;
264    
265     /* assure this thread is in the list before join_all() or ignition() looks
266     for it */
267     possess(&(threads_lock));
268    
269     /* create the thread and call ignition() from that thread */
270     th = my_malloc(sizeof(struct thread_s));
271     if ((ret = pthread_attr_init(&attr)) ||
272     (ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE)) ||
273     (ret = pthread_create(&(th->id), &attr, ignition, capsule)) ||
274     (ret = pthread_attr_destroy(&attr)))
275     fail(ret);
276    
277     /* put the thread in the threads list for join_all() */
278     th->done = 0;
279     th->next = threads;
280     threads = th;
281     release(&(threads_lock));
282     return th;
283     }
284    
285     void join(thread *ally)
286     {
287     int ret;
288     thread *match, **prior;
289    
290     /* wait for thread to exit and return its resources */
291     if ((ret = pthread_join(ally->id, NULL)) != 0)
292     fail(ret);
293    
294     /* find the thread in the threads list */
295     possess(&(threads_lock));
296     prior = &(threads);
297     while ((match = *prior) != NULL) {
298     if (match == ally)
299     break;
300     prior = &(match->next);
301     }
302     if (match == NULL)
303     fail(EINVAL);
304    
305     /* remove thread from list and update exited count, free thread */
306     if (match->done)
307     threads_lock.value--;
308     *prior = match->next;
309     release(&(threads_lock));
310     my_free(ally);
311     }
312    
313     /* This implementation of join_all() only attempts to join threads that have
314     announced that they have exited (see ignition()). When there are many
315     threads, this is faster than waiting for some random thread to exit while a
316     bunch of other threads have already exited. */
317     int join_all(void)
318     {
319     int ret, count;
320     thread *match, **prior;
321    
322     /* grab the threads list and initialize the joined count */
323     count = 0;
324     possess(&(threads_lock));
325    
326     /* do until threads list is empty */
327     while (threads != NULL) {
328     /* wait until at least one thread has reentered */
329     wait_for(&(threads_lock), NOT_TO_BE, 0);
330    
331     /* find the first thread marked done (should be at or near the top) */
332     prior = &(threads);
333     while ((match = *prior) != NULL) {
334     if (match->done)
335     break;
336     prior = &(match->next);
337     }
338     if (match == NULL)
339     fail(EINVAL);
340    
341     /* join the thread (will be almost immediate), remove from the threads
342     list, update the reenter count, and free the thread */
343     if ((ret = pthread_join(match->id, NULL)) != 0)
344     fail(ret);
345     threads_lock.value--;
346     *prior = match->next;
347     my_free(match);
348     count++;
349     }
350    
351     /* let go of the threads list and return the number of threads joined */
352     release(&(threads_lock));
353     return count;
354     }
355    
356     /* cancel and join the thread -- the thread will cancel when it gets to a file
357     operation, a sleep or pause, or a condition wait */
358     void destruct(thread *off_course)
359     {
360     int ret;
361    
362     if ((ret = pthread_cancel(off_course->id)) != 0)
363     fail(ret);
364     join(off_course);
365     }