source: src/router/freeradius/src/main/threads.c @ 14669

Last change on this file since 14669 was 14669, checked in by BrainSlayer, 3 years ago

update to 2.1.9

File size: 26.3 KB
Line 
1/*
2 * threads.c    request threading support
3 *
4 * Version:     $Id$
5 *
6 *   This program is free software; you can redistribute it and/or modify
7 *   it under the terms of the GNU General Public License as published by
8 *   the Free Software Foundation; either version 2 of the License, or
9 *   (at your option) any later version.
10 *
11 *   This program is distributed in the hope that it will be useful,
12 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *   GNU General Public License for more details.
15 *
16 *   You should have received a copy of the GNU General Public License
17 *   along with this program; if not, write to the Free Software
18 *   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
19 *
20 * Copyright 2000,2006  The FreeRADIUS server project
21 * Copyright 2000  Alan DeKok <aland@ox.org>
22 */
23
24#include <freeradius-devel/ident.h>
25RCSID("$Id$")
26
27#include <freeradius-devel/radiusd.h>
28#include <freeradius-devel/rad_assert.h>
29
30/*
31 *      Other OS's have sem_init, OS X doesn't.
32 */
33#ifdef HAVE_SEMAPHORE_H
34#include <semaphore.h>
35#endif
36
37#ifdef DARWIN
38#include <mach/task.h>
39#include <mach/semaphore.h>
40
41#undef sem_t
42#define sem_t semaphore_t
43#undef sem_init
44#define sem_init(s,p,c) semaphore_create(mach_task_self(),s,SYNC_POLICY_FIFO,c)
45#undef sem_wait
46#define sem_wait(s) semaphore_wait(*s)
47#undef sem_post
48#define sem_post(s) semaphore_signal(*s)
49#endif
50
51#ifdef HAVE_SYS_WAIT_H
52#include <sys/wait.h>
53#endif
54
55#ifdef HAVE_PTHREAD_H
56
57#ifdef HAVE_OPENSSL_CRYPTO_H
58#include <openssl/crypto.h>
59#endif
60#ifdef HAVE_OPENSSL_ERR_H
61#include <openssl/err.h>
62#endif
63#ifdef HAVE_OPENSSL_EVP_H
64#include <openssl/evp.h>
65#endif
66
67#define SEMAPHORE_LOCKED        (0)
68#define SEMAPHORE_UNLOCKED      (1)
69
70#define THREAD_RUNNING          (1)
71#define THREAD_CANCELLED        (2)
72#define THREAD_EXITED           (3)
73
74#define NUM_FIFOS               RAD_LISTEN_MAX
75
76
77/*
78 *  A data structure which contains the information about
79 *  the current thread.
80 *
81 *  pthread_id     pthread id
82 *  thread_num     server thread number, 1...number of threads
83 *  semaphore     used to block the thread until a request comes in
84 *  status        is the thread running or exited?
85 *  request_count the number of requests that this thread has handled
86 *  timestamp     when the thread started executing.
87 */
88typedef struct THREAD_HANDLE {
89        struct THREAD_HANDLE *prev;
90        struct THREAD_HANDLE *next;
91        pthread_t            pthread_id;
92        int                  thread_num;
93        int                  status;
94        unsigned int         request_count;
95        time_t               timestamp;
96        REQUEST              *request;
97} THREAD_HANDLE;
98
99/*
100 *      For the request queue.
101 */
102typedef struct request_queue_t {
103        REQUEST           *request;
104        RAD_REQUEST_FUNP  fun;
105} request_queue_t;
106
107typedef struct thread_fork_t {
108        pid_t           pid;
109        int             status;
110        int             exited;
111} thread_fork_t;
112
113
114/*
115 *      A data structure to manage the thread pool.  There's no real
116 *      need for a data structure, but it makes things conceptually
117 *      easier.
118 */
119typedef struct THREAD_POOL {
120        THREAD_HANDLE *head;
121        THREAD_HANDLE *tail;
122
123        int total_threads;
124        int active_threads;     /* protected by queue_mutex */
125        int max_thread_num;
126        int start_threads;
127        int max_threads;
128        int min_spare_threads;
129        int max_spare_threads;
130        unsigned int max_requests_per_thread;
131        unsigned long request_count;
132        time_t time_last_spawned;
133        int cleanup_delay;
134        int spawn_flag;
135
136#ifdef WNOHANG
137        pthread_mutex_t wait_mutex;
138        fr_hash_table_t *waiters;
139#endif
140
141        /*
142         *      All threads wait on this semaphore, for requests
143         *      to enter the queue.
144         */
145        sem_t           semaphore;
146
147        /*
148         *      To ensure only one thread at a time touches the queue.
149         */
150        pthread_mutex_t queue_mutex;
151
152        int             max_queue_size;
153        int             num_queued;
154        fr_fifo_t       *fifo[NUM_FIFOS];
155} THREAD_POOL;
156
157static THREAD_POOL thread_pool;
158static int pool_initialized = FALSE;
159static time_t last_cleaned = 0;
160
161static void thread_pool_manage(time_t now);
162
163/*
164 *      A mapping of configuration file names to internal integers
165 */
166static const CONF_PARSER thread_config[] = {
167        { "start_servers",           PW_TYPE_INTEGER, 0, &thread_pool.start_threads,           "5" },
168        { "max_servers",             PW_TYPE_INTEGER, 0, &thread_pool.max_threads,             "32" },
169        { "min_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.min_spare_threads,       "3" },
170        { "max_spare_servers",       PW_TYPE_INTEGER, 0, &thread_pool.max_spare_threads,       "10" },
171        { "max_requests_per_server", PW_TYPE_INTEGER, 0, &thread_pool.max_requests_per_thread, "0" },
172        { "cleanup_delay",           PW_TYPE_INTEGER, 0, &thread_pool.cleanup_delay,           "5" },
173        { "max_queue_size",          PW_TYPE_INTEGER, 0, &thread_pool.max_queue_size,           "65536" },
174        { NULL, -1, 0, NULL, NULL }
175};
176
177
178#ifdef HAVE_OPENSSL_CRYPTO_H
179
180/*
181 *      If we're linking against OpenSSL, then it is the
182 *      duty of the application, if it is multithreaded,
183 *      to provide OpenSSL with appropriate thread id
184 *      and mutex locking functions
185 *
186 *      Note: this only implements static callbacks.
187 *      OpenSSL does not use dynamic locking callbacks
188 *      right now, but may in the futiure, so we will have
189 *      to add them at some point.
190 */
191
192static pthread_mutex_t *ssl_mutexes = NULL;
193
194static unsigned long ssl_id_function(void)
195{
196        return (unsigned long) pthread_self();
197}
198
199static void ssl_locking_function(int mode, int n, const char *file, int line)
200{
201        file = file;            /* -Wunused */
202        line = line;            /* -Wunused */
203
204        if (mode & CRYPTO_LOCK) {
205                pthread_mutex_lock(&(ssl_mutexes[n]));
206        } else {
207                pthread_mutex_unlock(&(ssl_mutexes[n]));
208        }
209}
210
211static int setup_ssl_mutexes(void)
212{
213        int i;
214
215#ifdef HAVE_OPENSSL_EVP_H
216        /*
217         *      Enable all ciphers and digests.
218         */
219        OpenSSL_add_all_algorithms();
220#endif
221
222        ssl_mutexes = rad_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));
223        if (!ssl_mutexes) {
224                radlog(L_ERR, "Error allocating memory for SSL mutexes!");
225                return 0;
226        }
227
228        for (i = 0; i < CRYPTO_num_locks(); i++) {
229                pthread_mutex_init(&(ssl_mutexes[i]), NULL);
230        }
231
232        CRYPTO_set_id_callback(ssl_id_function);
233        CRYPTO_set_locking_callback(ssl_locking_function);
234
235        return 1;
236}
237#endif
238
239#ifdef WNOHANG
240/*
241 *      We don't want to catch SIGCHLD for a host of reasons.
242 *
243 *      - exec_wait means that someone, somewhere, somewhen, will
244 *      call waitpid(), and catch the child.
245 *
246 *      - SIGCHLD is delivered to a random thread, not the one that
247 *      forked.
248 *
249 *      - if another thread catches the child, we have to coordinate
250 *      with the thread doing the waiting.
251 *
252 *      - if we don't waitpid() for non-wait children, they'll be zombies,
253 *      and will hang around forever.
254 *
255 */
256static void reap_children(void)
257{
258        pid_t pid;
259        int status;
260        thread_fork_t mytf, *tf;
261
262
263        pthread_mutex_lock(&thread_pool.wait_mutex);
264
265        do {
266        retry:
267                pid = waitpid(0, &status, WNOHANG);
268                if (pid <= 0) break;
269
270                mytf.pid = pid;
271                tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
272                if (!tf) goto retry;
273
274                tf->status = status;
275                tf->exited = 1;
276        } while (fr_hash_table_num_elements(thread_pool.waiters) > 0);
277
278        pthread_mutex_unlock(&thread_pool.wait_mutex);
279}
280#else
281#define reap_children()
282#endif /* WNOHANG */
283
284/*
285 *      Add a request to the list of waiting requests.
286 *      This function gets called ONLY from the main handler thread...
287 *
288 *      This function should never fail.
289 */
290static int request_enqueue(REQUEST *request, RAD_REQUEST_FUNP fun)
291{
292        request_queue_t *entry;
293
294        pthread_mutex_lock(&thread_pool.queue_mutex);
295
296        thread_pool.request_count++;
297
298        if (thread_pool.num_queued >= thread_pool.max_queue_size) {
299                pthread_mutex_unlock(&thread_pool.queue_mutex);
300
301                /*
302                 *      Mark the request as done.
303                 */
304                radlog(L_ERR, "!!! ERROR !!! The server is blocked: discarding new request %d", request->number);
305                request->child_state = REQUEST_DONE;
306                return 0;
307        }
308
309        entry = rad_malloc(sizeof(*entry));
310        entry->request = request;
311        entry->fun = fun;
312
313        /*
314         *      Push the request onto the appropriate fifo for that
315         */
316        if (!fr_fifo_push(thread_pool.fifo[request->priority],
317                            entry)) {
318                pthread_mutex_unlock(&thread_pool.queue_mutex);
319                radlog(L_ERR, "!!! ERROR !!! Failed inserting request %d into the queue", request->number);
320                request->child_state = REQUEST_DONE;
321                return 0;
322        }
323
324        thread_pool.num_queued++;
325
326        pthread_mutex_unlock(&thread_pool.queue_mutex);
327
328        /*
329         *      There's one more request in the queue.
330         *
331         *      Note that we're not touching the queue any more, so
332         *      the semaphore post is outside of the mutex.  This also
333         *      means that when the thread wakes up and tries to lock
334         *      the mutex, it will be unlocked, and there won't be
335         *      contention.
336         */
337        sem_post(&thread_pool.semaphore);
338
339        return 1;
340}
341
342/*
343 *      Remove a request from the queue.
344 */
345static int request_dequeue(REQUEST **request, RAD_REQUEST_FUNP *fun)
346{
347        RAD_LISTEN_TYPE i, start;
348        request_queue_t *entry;
349
350        reap_children();
351
352        pthread_mutex_lock(&thread_pool.queue_mutex);
353
354        /*
355         *      Clear old requests from all queues.
356         *
357         *      We only do one pass over the queue, in order to
358         *      amortize the work across the child threads.  Since we
359         *      do N checks for one request de-queued, the old
360         *      requests will be quickly cleared.
361         */
362        for (i = 0; i < RAD_LISTEN_MAX; i++) {
363                entry = fr_fifo_peek(thread_pool.fifo[i]);
364                if (!entry ||
365                    (entry->request->master_state != REQUEST_STOP_PROCESSING)) {
366                        continue;
367}
368                /*
369                 *      This entry was marked to be stopped.  Acknowledge it.
370                 */
371                entry = fr_fifo_pop(thread_pool.fifo[i]);
372                rad_assert(entry != NULL);
373                entry->request->child_state = REQUEST_DONE;
374                thread_pool.num_queued--;
375                free(entry);
376                entry = NULL;
377        }
378
379        start = 0;
380 retry:
381        /*
382         *      Pop results from the top of the queue
383         */
384        for (i = start; i < RAD_LISTEN_MAX; i++) {
385                entry = fr_fifo_pop(thread_pool.fifo[i]);
386                if (entry) {
387                        start = i;
388                        break;
389                }
390        }
391
392        if (!entry) {
393                pthread_mutex_unlock(&thread_pool.queue_mutex);
394                *request = NULL;
395                *fun = NULL;
396                return 0;
397        }
398
399        rad_assert(thread_pool.num_queued > 0);
400        thread_pool.num_queued--;
401        *request = entry->request;
402        *fun = entry->fun;
403        free(entry);
404        entry = NULL;
405
406        rad_assert(*request != NULL);
407        rad_assert((*request)->magic == REQUEST_MAGIC);
408        rad_assert(*fun != NULL);
409
410        /*
411         *      If the request has sat in the queue for too long,
412         *      kill it.
413         *
414         *      The main clean-up code can't delete the request from
415         *      the queue, and therefore won't clean it up until we
416         *      have acknowledged it as "done".
417         */
418        if ((*request)->master_state == REQUEST_STOP_PROCESSING) {
419                (*request)->child_state = REQUEST_DONE;
420                goto retry;
421        }
422
423        /*
424         *      The thread is currently processing a request.
425         */
426        thread_pool.active_threads++;
427
428        pthread_mutex_unlock(&thread_pool.queue_mutex);
429
430        return 1;
431}
432
433
434/*
435 *      The main thread handler for requests.
436 *
437 *      Wait on the semaphore until we have it, and process the request.
438 */
439static void *request_handler_thread(void *arg)
440{
441        RAD_REQUEST_FUNP  fun;
442        THREAD_HANDLE     *self = (THREAD_HANDLE *) arg;
443
444        /*
445         *      Loop forever, until told to exit.
446         */
447        do {
448                /*
449                 *      Wait to be signalled.
450                 */
451                DEBUG2("Thread %d waiting to be assigned a request",
452                       self->thread_num);
453        re_wait:
454                if (sem_wait(&thread_pool.semaphore) != 0) {
455                        /*
456                         *      Interrupted system call.  Go back to
457                         *      waiting, but DON'T print out any more
458                         *      text.
459                         */
460                        if (errno == EINTR) {
461                                DEBUG2("Re-wait %d", self->thread_num);
462                                goto re_wait;
463                        }
464                        radlog(L_ERR, "Thread %d failed waiting for semaphore: %s: Exiting\n",
465                               self->thread_num, strerror(errno));
466                        break;
467                }
468
469                DEBUG2("Thread %d got semaphore", self->thread_num);
470
471#ifdef HAVE_OPENSSL_ERR_H
472                /*
473                 *      Clear the error queue for the current thread.
474                 */
475                ERR_clear_error ();
476#endif
477
478                /*
479                 *      Try to grab a request from the queue.
480                 *
481                 *      It may be empty, in which case we fail
482                 *      gracefully.
483                 */
484                if (!request_dequeue(&self->request, &fun)) continue;
485
486                self->request->child_pid = self->pthread_id;
487                self->request_count++;
488
489                DEBUG2("Thread %d handling request %d, (%d handled so far)",
490                       self->thread_num, self->request->number,
491                       self->request_count);
492
493                radius_handle_request(self->request, fun);
494
495                /*
496                 *      Update the active threads.
497                 */
498                pthread_mutex_lock(&thread_pool.queue_mutex);
499                rad_assert(thread_pool.active_threads > 0);
500                thread_pool.active_threads--;
501                pthread_mutex_unlock(&thread_pool.queue_mutex);
502        } while (self->status != THREAD_CANCELLED);
503
504        DEBUG2("Thread %d exiting...", self->thread_num);
505
506#ifdef HAVE_OPENSSL_ERR_H
507        /*
508         *      If we linked with OpenSSL, the application
509         *      must remove the thread's error queue before
510         *      exiting to prevent memory leaks.
511         */
512        ERR_remove_state(0);
513#endif
514
515        /*
516         *  Do this as the LAST thing before exiting.
517         */
518        self->request = NULL;
519        self->status = THREAD_EXITED;
520
521        return NULL;
522}
523
524/*
525 *      Take a THREAD_HANDLE, delete it from the thread pool and
526 *      free its resources.
527 *
528 *      This function is called ONLY from the main server thread,
529 *      ONLY after the thread has exited.
530 */
531static void delete_thread(THREAD_HANDLE *handle)
532{
533        THREAD_HANDLE *prev;
534        THREAD_HANDLE *next;
535
536        rad_assert(handle->request == NULL);
537
538        DEBUG2("Deleting thread %d", handle->thread_num);
539
540        prev = handle->prev;
541        next = handle->next;
542        rad_assert(thread_pool.total_threads > 0);
543        thread_pool.total_threads--;
544
545        /*
546         *      Remove the handle from the list.
547         */
548        if (prev == NULL) {
549                rad_assert(thread_pool.head == handle);
550                thread_pool.head = next;
551        } else {
552                prev->next = next;
553        }
554
555        if (next == NULL) {
556                rad_assert(thread_pool.tail == handle);
557                thread_pool.tail = prev;
558        } else {
559                next->prev = prev;
560        }
561
562        /*
563         *      Free the handle, now that it's no longer referencable.
564         */
565        free(handle);
566}
567
568
569/*
570 *      Spawn a new thread, and place it in the thread pool.
571 *
572 *      The thread is started initially in the blocked state, waiting
573 *      for the semaphore.
574 */
575static THREAD_HANDLE *spawn_thread(time_t now)
576{
577        int rcode;
578        THREAD_HANDLE *handle;
579        pthread_attr_t attr;
580
581        /*
582         *      Ensure that we don't spawn too many threads.
583         */
584        if (thread_pool.total_threads >= thread_pool.max_threads) {
585                DEBUG2("Thread spawn failed.  Maximum number of threads (%d) already running.", thread_pool.max_threads);
586                return NULL;
587        }
588
589        /*
590         *      Allocate a new thread handle.
591         */
592        handle = (THREAD_HANDLE *) rad_malloc(sizeof(THREAD_HANDLE));
593        memset(handle, 0, sizeof(THREAD_HANDLE));
594        handle->prev = NULL;
595        handle->next = NULL;
596        handle->thread_num = thread_pool.max_thread_num++;
597        handle->request_count = 0;
598        handle->status = THREAD_RUNNING;
599        handle->timestamp = time(NULL);
600
601        /*
602         *      Initialize the thread's attributes to detached.
603         *
604         *      We could call pthread_detach() later, but if the thread
605         *      exits between the create & detach calls, it will need to
606         *      be joined, which will never happen.
607         */
608        pthread_attr_init(&attr);
609        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
610
611        /*
612         *      Create the thread detached, so that it cleans up it's
613         *      own memory when it exits.
614         *
615         *      Note that the function returns non-zero on error, NOT
616         *      -1.  The return code is the error, and errno isn't set.
617         */
618        rcode = pthread_create(&handle->pthread_id, &attr,
619                        request_handler_thread, handle);
620        if (rcode != 0) {
621                radlog(L_ERR, "Thread create failed: %s",
622                       strerror(rcode));
623                return NULL;
624        }
625        pthread_attr_destroy(&attr);
626
627        /*
628         *      One more thread to go into the list.
629         */
630        thread_pool.total_threads++;
631        DEBUG2("Thread spawned new child %d. Total threads in pool: %d",
632                        handle->thread_num, thread_pool.total_threads);
633
634        /*
635         *      Add the thread handle to the tail of the thread pool list.
636         */
637        if (thread_pool.tail) {
638                thread_pool.tail->next = handle;
639                handle->prev = thread_pool.tail;
640                thread_pool.tail = handle;
641        } else {
642                rad_assert(thread_pool.head == NULL);
643                thread_pool.head = thread_pool.tail = handle;
644        }
645
646        /*
647         *      Update the time we last spawned a thread.
648         */
649        thread_pool.time_last_spawned = now;
650
651        /*
652         *      And return the new handle to the caller.
653         */
654        return handle;
655}
656
657/*
658 *      Temporary function to prevent server from executing a SIGHUP
659 *      until all threads are finished handling requests.  This returns
660 *      the number of active threads to 'radiusd.c'.
661 */
662int total_active_threads(void)
663{
664        /*
665         *      We don't acquire the mutex, so this is just an estimate.
666         *      We can't return with the lock held, so there's no point
667         *      in getting the guaranteed correct value; by the time
668         *      the caller sees it, it can be wrong again.
669         */
670        return thread_pool.active_threads;
671}
672
673
674#ifdef WNOHANG
675static uint32_t pid_hash(const void *data)
676{
677        const thread_fork_t *tf = data;
678
679        return fr_hash(&tf->pid, sizeof(tf->pid));
680}
681
682static int pid_cmp(const void *one, const void *two)
683{
684        const thread_fork_t *a = one;
685        const thread_fork_t *b = two;
686
687        return (a->pid - b->pid);
688}
689#endif
690
691/*
692 *      Allocate the thread pool, and seed it with an initial number
693 *      of threads.
694 *
695 *      FIXME: What to do on a SIGHUP???
696 */
697int thread_pool_init(CONF_SECTION *cs, int *spawn_flag)
698{
699        int             i, rcode;
700        CONF_SECTION    *pool_cf;
701        time_t          now;
702
703        now = time(NULL);
704
705        rad_assert(spawn_flag != NULL);
706        rad_assert(*spawn_flag == TRUE);
707        rad_assert(pool_initialized == FALSE); /* not called on HUP */
708
709        pool_cf = cf_subsection_find_next(cs, NULL, "thread");
710        if (!pool_cf) *spawn_flag = FALSE;
711
712        /*
713         *      Initialize the thread pool to some reasonable values.
714         */
715        memset(&thread_pool, 0, sizeof(THREAD_POOL));
716        thread_pool.head = NULL;
717        thread_pool.tail = NULL;
718        thread_pool.total_threads = 0;
719        thread_pool.max_thread_num = 1;
720        thread_pool.cleanup_delay = 5;
721        thread_pool.spawn_flag = *spawn_flag;
722       
723        /*
724         *      Don't bother initializing the mutexes or
725         *      creating the hash tables.  They won't be used.
726         */
727        if (!*spawn_flag) return 0;
728       
729#ifdef WNOHANG
730        if ((pthread_mutex_init(&thread_pool.wait_mutex,NULL) != 0)) {
731                radlog(L_ERR, "FATAL: Failed to initialize wait mutex: %s",
732                       strerror(errno));
733                return -1;
734        }
735       
736        /*
737         *      Create the hash table of child PID's
738         */
739        thread_pool.waiters = fr_hash_table_create(pid_hash,
740                                                   pid_cmp,
741                                                   free);
742        if (!thread_pool.waiters) {
743                radlog(L_ERR, "FATAL: Failed to set up wait hash");
744                return -1;
745        }
746#endif
747
748        if (cf_section_parse(pool_cf, NULL, thread_config) < 0) {
749                return -1;
750        }
751
752        /*
753         *      Catch corner cases.
754         */
755        if (thread_pool.min_spare_threads < 1)
756                thread_pool.min_spare_threads = 1;
757        if (thread_pool.max_spare_threads < 1)
758                thread_pool.max_spare_threads = 1;
759        if (thread_pool.max_spare_threads < thread_pool.min_spare_threads)
760                thread_pool.max_spare_threads = thread_pool.min_spare_threads;
761
762        /*
763         *      The pool has already been initialized.  Don't spawn
764         *      new threads, and don't forget about forked children,
765         */
766        if (pool_initialized) {
767                return 0;
768        }
769
770        /*
771         *      Initialize the queue of requests.
772         */
773        memset(&thread_pool.semaphore, 0, sizeof(thread_pool.semaphore));
774        rcode = sem_init(&thread_pool.semaphore, 0, SEMAPHORE_LOCKED);
775        if (rcode != 0) {
776                radlog(L_ERR, "FATAL: Failed to initialize semaphore: %s",
777                       strerror(errno));
778                return -1;
779        }
780
781        rcode = pthread_mutex_init(&thread_pool.queue_mutex,NULL);
782        if (rcode != 0) {
783                radlog(L_ERR, "FATAL: Failed to initialize queue mutex: %s",
784                       strerror(errno));
785                return -1;
786        }
787
788        /*
789         *      Allocate multiple fifos.
790         */
791        for (i = 0; i < RAD_LISTEN_MAX; i++) {
792                thread_pool.fifo[i] = fr_fifo_create(65536, NULL);
793                if (!thread_pool.fifo[i]) {
794                        radlog(L_ERR, "FATAL: Failed to set up request fifo");
795                        return -1;
796                }
797        }
798
799#ifdef HAVE_OPENSSL_CRYPTO_H
800        /*
801         *      If we're linking with OpenSSL too, then we need
802         *      to set up the mutexes and enable the thread callbacks.
803         */
804        if (!setup_ssl_mutexes()) {
805                radlog(L_ERR, "FATAL: Failed to set up SSL mutexes");
806                return -1;
807        }
808#endif
809
810
811        /*
812         *      Create a number of waiting threads.
813         *
814         *      If we fail while creating them, do something intelligent.
815         */
816        for (i = 0; i < thread_pool.start_threads; i++) {
817                if (spawn_thread(now) == NULL) {
818                        return -1;
819                }
820        }
821
822        DEBUG2("Thread pool initialized");
823        pool_initialized = TRUE;
824        return 0;
825}
826
827
828/*
829 *      Assign a new request to a free thread.
830 *
831 *      If there isn't a free thread, then try to create a new one,
832 *      up to the configured limits.
833 */
834int thread_pool_addrequest(REQUEST *request, RAD_REQUEST_FUNP fun)
835{
836        time_t now = request->timestamp;
837
838        /*
839         *      We've been told not to spawn threads, so don't.
840         */
841        if (!thread_pool.spawn_flag) {
842                radius_handle_request(request, fun);
843
844#ifdef WNOHANG
845                /*
846                 *      Requests that care about child process exit
847                 *      codes have already either called
848                 *      rad_waitpid(), or they've given up.
849                 */
850                wait(NULL);
851#endif
852                return 1;
853        }
854
855        /*
856         *      Add the new request to the queue.
857         */
858        if (!request_enqueue(request, fun)) return 0;
859
860        /*
861         *      If we haven't checked the number of child threads
862         *      in a while, OR if the thread pool appears to be full,
863         *      go manage it.
864         */
865        if ((last_cleaned < now) ||
866            (thread_pool.active_threads == thread_pool.total_threads)) {
867                thread_pool_manage(now);
868        }
869
870        return 1;
871}
872
873/*
874 *      Check the min_spare_threads and max_spare_threads.
875 *
876 *      If there are too many or too few threads waiting, then we
877 *      either create some more, or delete some.
878 */
879static void thread_pool_manage(time_t now)
880{
881        int spare;
882        int i, total;
883        THREAD_HANDLE *handle, *next;
884        int active_threads;
885
886        /*
887         *      We don't need a mutex lock here, as we're reading
888         *      active_threads, and not modifying it.  We want a close
889         *      approximation of the number of active threads, and this
890         *      is good enough.
891         */
892        active_threads = thread_pool.active_threads;
893        spare = thread_pool.total_threads - active_threads;
894        if (debug_flag) {
895                static int old_total = -1;
896                static int old_active = -1;
897
898                if ((old_total != thread_pool.total_threads) ||
899                                (old_active != active_threads)) {
900                        DEBUG2("Threads: total/active/spare threads = %d/%d/%d",
901                                        thread_pool.total_threads, active_threads, spare);
902                        old_total = thread_pool.total_threads;
903                        old_active = active_threads;
904                }
905        }
906
907        /*
908         *      If there are too few spare threads.  Go create some more.
909         */
910        if (spare < thread_pool.min_spare_threads) {
911                total = thread_pool.min_spare_threads - spare;
912
913                DEBUG2("Threads: Spawning %d spares", total);
914
915                /*
916                 *      Create a number of spare threads.
917                 */
918                for (i = 0; i < total; i++) {
919                        handle = spawn_thread(now);
920                        if (handle == NULL) {
921                                return;
922                        }
923                }
924
925                return;         /* there aren't too many spare threads */
926        }
927
928        /*
929         *      Only delete spare threads if we haven't already done
930         *      so this second.
931         */
932        if (now == last_cleaned) {
933                return;
934        }
935        last_cleaned = now;
936
937        /*
938         *      Loop over the thread pool, deleting exited threads.
939         */
940        for (handle = thread_pool.head; handle; handle = next) {
941                next = handle->next;
942
943                /*
944                 *      Maybe we've asked the thread to exit, and it
945                 *      has agreed.
946                 */
947                if (handle->status == THREAD_EXITED) {
948                        delete_thread(handle);
949                }
950        }
951
952        /*
953         *      Only delete the spare threads if sufficient time has
954         *      passed since we last created one.  This helps to minimize
955         *      the amount of create/delete cycles.
956         */
957        if ((now - thread_pool.time_last_spawned) < thread_pool.cleanup_delay) {
958                return;
959        }
960
961        /*
962         *      If there are too many spare threads, delete one.
963         *
964         *      Note that we only delete ONE at a time, instead of
965         *      wiping out many.  This allows the excess servers to
966         *      be slowly reaped, just in case the load spike comes again.
967         */
968        if (spare > thread_pool.max_spare_threads) {
969
970                spare -= thread_pool.max_spare_threads;
971
972                DEBUG2("Threads: deleting 1 spare out of %d spares", spare);
973
974                /*
975                 *      Walk through the thread pool, deleting the
976                 *      first idle thread we come across.
977                 */
978                for (handle = thread_pool.head; (handle != NULL) && (spare > 0) ; handle = next) {
979                        next = handle->next;
980
981                        /*
982                         *      If the thread is not handling a
983                         *      request, but still live, then tell it
984                         *      to exit.
985                         *
986                         *      It will eventually wake up, and realize
987                         *      it's been told to commit suicide.
988                         */
989                        if ((handle->request == NULL) &&
990                            (handle->status == THREAD_RUNNING)) {
991                                handle->status = THREAD_CANCELLED;
992                                /*
993                                 *      Post an extra semaphore, as a
994                                 *      signal to wake up, and exit.
995                                 */
996                                sem_post(&thread_pool.semaphore);
997                                spare--;
998                                break;
999                        }
1000                }
1001        }
1002
1003        /*
1004         *      If the thread has handled too many requests, then make it
1005         *      exit.
1006         */
1007        if (thread_pool.max_requests_per_thread > 0) {
1008                for (handle = thread_pool.head; handle; handle = next) {
1009                        next = handle->next;
1010
1011                        /*
1012                         *      Not handling a request, but otherwise
1013                         *      live, we can kill it.
1014                         */
1015                        if ((handle->request == NULL) &&
1016                            (handle->status == THREAD_RUNNING) &&
1017                            (handle->request_count > thread_pool.max_requests_per_thread)) {
1018                                handle->status = THREAD_CANCELLED;
1019                                sem_post(&thread_pool.semaphore);
1020                        }
1021                }
1022        }
1023
1024        /*
1025         *      Otherwise everything's kosher.  There are not too few,
1026         *      or too many spare threads.  Exit happily.
1027         */
1028        return;
1029}
1030
1031
1032#ifdef WNOHANG
1033/*
1034 *      Thread wrapper for fork().
1035 */
1036pid_t rad_fork(void)
1037{
1038        pid_t child_pid;
1039
1040        if (!pool_initialized) return fork();
1041
1042        reap_children();        /* be nice to non-wait thingies */
1043
1044        if (fr_hash_table_num_elements(thread_pool.waiters) >= 1024) {
1045                return -1;
1046        }
1047
1048        /*
1049         *      Fork & save the PID for later reaping.
1050         */
1051        child_pid = fork();
1052        if (child_pid > 0) {
1053                int rcode;
1054                thread_fork_t *tf;
1055
1056                tf = rad_malloc(sizeof(*tf));
1057                memset(tf, 0, sizeof(*tf));
1058
1059                tf->pid = child_pid;
1060
1061                pthread_mutex_lock(&thread_pool.wait_mutex);
1062                rcode = fr_hash_table_insert(thread_pool.waiters, tf);
1063                pthread_mutex_unlock(&thread_pool.wait_mutex);
1064
1065                if (!rcode) {
1066                        radlog(L_ERR, "Failed to store PID, creating what will be a zombie process %d",
1067                               (int) child_pid);
1068                        free(tf);
1069                }
1070        }
1071
1072        /*
1073         *      Return whatever we were told.
1074         */
1075        return child_pid;
1076}
1077
1078
1079/*
1080 *      Wait 10 seconds at most for a child to exit, then give up.
1081 */
1082pid_t rad_waitpid(pid_t pid, int *status)
1083{
1084        int i;
1085        thread_fork_t mytf, *tf;
1086
1087        if (!pool_initialized) return waitpid(pid, status, 0);
1088
1089        if (pid <= 0) return -1;
1090
1091        mytf.pid = pid;
1092
1093        pthread_mutex_lock(&thread_pool.wait_mutex);
1094        tf = fr_hash_table_finddata(thread_pool.waiters, &mytf);
1095        pthread_mutex_unlock(&thread_pool.wait_mutex);
1096
1097        if (!tf) return -1;
1098
1099        for (i = 0; i < 100; i++) {
1100                reap_children();
1101
1102                if (tf->exited) {
1103                        *status = tf->status;
1104
1105                        pthread_mutex_lock(&thread_pool.wait_mutex);
1106                        fr_hash_table_delete(thread_pool.waiters, &mytf);
1107                        pthread_mutex_unlock(&thread_pool.wait_mutex);
1108                        return pid;
1109                }
1110                usleep(100000); /* sleep for 1/10 of a second */
1111        }
1112
1113        /*
1114         *      10 seconds have passed, give up on the child.
1115         */
1116        pthread_mutex_lock(&thread_pool.wait_mutex);
1117        fr_hash_table_delete(thread_pool.waiters, &mytf);
1118        pthread_mutex_unlock(&thread_pool.wait_mutex);
1119
1120        return 0;
1121}
1122#else
1123/*
1124 *      No rad_fork or rad_waitpid
1125 */
1126#endif
1127
1128void thread_pool_lock(void)
1129{
1130        pthread_mutex_lock(&thread_pool.queue_mutex);
1131}
1132
1133void thread_pool_unlock(void)
1134{
1135        pthread_mutex_unlock(&thread_pool.queue_mutex);
1136}
1137
1138void thread_pool_queue_stats(int *array)
1139{
1140        int i;
1141
1142        if (pool_initialized) {
1143                for (i = 0; i < RAD_LISTEN_MAX; i++) {
1144                        array[i] = fr_fifo_num_elements(thread_pool.fifo[i]);
1145                }
1146        } else {
1147                for (i = 0; i < RAD_LISTEN_MAX; i++) {
1148                        array[i] = 0;
1149                }
1150        }
1151}
1152#endif /* HAVE_PTHREAD_H */
Note: See TracBrowser for help on using the repository browser.