Subject: |
[Merge] lp:~epics-core/epics-base/thread-pool into lp:epics-base |
From: |
mdavidsaver <[email protected]> |
To: |
[email protected] |
Date: |
Fri, 01 Jun 2012 17:31:20 -0000 |
mdavidsaver has proposed merging lp:~epics-core/epics-base/thread-pool into lp:epics-base.
Requested reviews:
EPICS Core Developers (epics-core)
For more details, see:
https://code.launchpad.net/~epics-core/epics-base/thread-pool/+merge/108385
General purpose thread pool.
Remaining tasks
* Include in RTEMS/vxworks test harness
* For RTEMS/vxworks, replace mutex with interrupt disable
* Avoid excessive wakeups from worker and observer Events.
--
https://code.launchpad.net/~epics-core/epics-base/thread-pool/+merge/108385
Your team EPICS Core Developers is requested to review the proposed merge of lp:~epics-core/epics-base/thread-pool into lp:epics-base.
=== modified file 'src/libCom/Makefile'
--- src/libCom/Makefile 2012-03-01 17:31:32 +0000
+++ src/libCom/Makefile 2012-06-01 17:30:28 +0000
@@ -31,6 +31,7 @@
include $(LIBCOM)/macLib/Makefile
include $(LIBCOM)/misc/Makefile
include $(LIBCOM)/osi/Makefile
+include $(LIBCOM)/pool/Makefile
include $(LIBCOM)/ring/Makefile
include $(LIBCOM)/taskwd/Makefile
include $(LIBCOM)/timer/Makefile
=== added directory 'src/libCom/pool'
=== added file 'src/libCom/pool/Makefile'
--- src/libCom/pool/Makefile 1970-01-01 00:00:00 +0000
+++ src/libCom/pool/Makefile 2012-06-01 17:30:28 +0000
@@ -0,0 +1,16 @@
+#*************************************************************************
+# Copyright (c) 2010 UChicago Argonne LLC, as Operator of Argonne
+# National Laboratory.
+# EPICS BASE is distributed subject to a Software License Agreement found
+# in file LICENSE that is included with this distribution.
+#*************************************************************************
+
+# This is a Makefile fragment, see src/libCom/Makefile.
+
+SRC_DIRS += $(LIBCOM)/pool
+
+INC += epicsThreadPool.h
+
+Com_SRCS += poolJob.c
+Com_SRCS += threadPool.c
+
=== added file 'src/libCom/pool/epicsThreadPool.h'
--- src/libCom/pool/epicsThreadPool.h 1970-01-01 00:00:00 +0000
+++ src/libCom/pool/epicsThreadPool.h 2012-06-01 17:30:28 +0000
@@ -0,0 +1,113 @@
+/*************************************************************************\
+* Copyright (c) 2011 Brookhaven Science Associates, as Operator of
+* Brookhaven National Laboratory.
+* EPICS BASE is distributed subject to a Software License Agreement found
+* in file LICENSE that is included with this distribution.
+\*************************************************************************/
+/* General purpose worker thread pool manager
+ * [email protected]
+ */
+#ifndef EPICSTHREADPOOL_H
+#define EPICSTHREADPOOL_H
+
+#include <stdlib.h>
+#include <stdio.h>
+
+#include "shareLib.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct {
+ size_t initialThreads;
+ size_t maxThreads;
+ unsigned int workerStack;
+ unsigned int workerPriority;
+} epicsThreadPoolConfig;
+
+typedef struct epicsThreadPool epicsThreadPool;
+
+/* Job function call modes */
+/* Normal run of job */
+#define EPICSJOB_RUN 1
+/* Thread pool is being destroyed. A chance to cleanup the job */
+#define EPICSJOB_CLEANUP 2
+
+typedef void (*epicsJobFunction)(void* arg, unsigned int mode);
+
+typedef struct epicsJob epicsJob;
+
+/* Pool operations */
+
+/* Initialize a pool config with default values */
+epicsShareFunc void epicsThreadPoolConfigDefaults(epicsThreadPoolConfig *);
+
+/* If opts is NULL then defaults are used.
+ * The opts pointer is not stored by this call and may be freed by the caller
+ */
+epicsShareFunc epicsThreadPool* epicsThreadPoolCreate(epicsThreadPoolConfig *opts);
+
+/* Blocks until all jobs have ran and all worker threads have stopped */
+epicsShareFunc void epicsThreadPoolDestroy(epicsThreadPool *);
+
+/* pool control options */
+#define EPICSPOOL_QUEUEADD 1 /* val==0 causes epicsJobQueue to fail, 1 is default */
+#define EPICSPOOL_QUEUERUN 2 /* val==0 prevents workers from running jobs, 1 is default */
+epicsShareFunc void epicsThreadPoolControl(epicsThreadPool* pool, unsigned int opt, unsigned int val);
+
+/* Block until job queue is emptied and no jobs are running.
+ * timeout<0 waits forever, timeout==0 polls, timeout>0 waits for a fixed time
+ * Returns 1 for timeout, 0 for success, -1 on errors
+ */
+epicsShareFunc int epicsThreadPoolWait(epicsThreadPool* pool, double timeout);
+
+
+/* Per job operations */
+
+/* creates, but does not add, a new job */
+epicsShareFunc epicsJob* epicsJobCreate(epicsThreadPool*,
+ epicsJobFunction,
+ void*);
+/* override the job function and argument set on creation.
+ * Not safe to call on queued or running job.
+ */
+epicsShareFunc void epicsJobSet(epicsJob*,
+ epicsJobFunction,
+ void*);
+/* Cancel and free a job structure. Does not block.
+ * job may not be immediately free'd.
+ * Safe to call from a running job function.
+ */
+epicsShareFunc void epicsJobDestroy(epicsJob*);
+
+/* Move the job to a different pool.
+ * Job must not be running or queued.
+ */
+epicsShareFunc long epicsJobMove(epicsJob*, epicsThreadPool*);
+
+/* Adds the job to the run queue
+ * Safe to call from a running job function.
+ * returns !0 on error
+ */
+epicsShareFunc long epicsJobQueue(epicsJob*);
+
+/* Remove a job from the run queue before it starts.
+ * Safe to call from a running job function.
+ * returns 0 if job already ran or was not queued,
+ * 1 if job was queued, didn't run, and was cancelled
+ */
+epicsShareFunc int epicsJobCancel(epicsJob*);
+
+
+/* Mostly useful for debugging */
+
+epicsShareFunc void epicsThreadPoolReport(epicsThreadPool *pool, FILE *fd);
+
+epicsShareFunc size_t epicsThreadPoolNThreads(epicsThreadPool *);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // EPICSTHREADPOOL_H
=== added file 'src/libCom/pool/poolJob.c'
--- src/libCom/pool/poolJob.c 1970-01-01 00:00:00 +0000
+++ src/libCom/pool/poolJob.c 2012-06-01 17:30:28 +0000
@@ -0,0 +1,285 @@
+/*************************************************************************\
+* Copyright (c) 2011 Brookhaven Science Associates, as Operator of
+* Brookhaven National Laboratory.
+* EPICS BASE is distributed subject to a Software License Agreement found
+* in file LICENSE that is included with this distribution.
+\*************************************************************************/
+
+#include <stdlib.h>
+#include <string.h>
+
+#include <dbDefs.h>
+#include <errlog.h>
+#include <ellLib.h>
+#include <epicsThread.h>
+#include <epicsMutex.h>
+#include <epicsEvent.h>
+#include <epicsInterrupt.h>
+
+#define epicsExportSharedSymbols
+#include "epicsThreadPool.h"
+#include "poolPriv.h"
+
+
+static
+void workerMain(void* arg)
+{
+ epicsThreadPool *pool=arg;
+
+ epicsMutexMustLock(pool->guard);
+ pool->threadsAreAwake++;
+
+ while(1)
+ {
+ ELLNODE *cur;
+ epicsJob *job;
+
+ pool->threadsAreAwake--;
+ epicsMutexUnlock(pool->guard);
+
+ epicsEventMustWait(pool->workerWakeup);
+
+ epicsMutexMustLock(pool->guard);
+ pool->threadsAreAwake++;
+
+ if(pool->shutdown)
+ break;
+
+ if(pool->pauserun)
+ continue;
+
+ /* If there are (other) jobs and idle workers, wake one up */
+ if(ellCount(&pool->jobs) > 1 &&
+ pool->threadsAreAwake < pool->threadsRunning)
+ {
+ epicsEventSignal(pool->workerWakeup);
+ }
+
+ if(ellCount(&pool->jobs)==0)
+ errlogPrintf("============= pool worker wakeup with no work\n");
+
+ while ((cur=ellGet(&pool->jobs)) !=NULL)
+ {
+ job=CONTAINER(cur, epicsJob, jobnode);
+ job->queued=0;
+ job->running=1;
+
+ epicsMutexUnlock(pool->guard);
+ (*job->func)(job->arg, EPICSJOB_RUN);
+ epicsMutexMustLock(pool->guard);
+
+ if(job->freewhendone)
+ free(job);
+ else {
+ job->running=0;
+ /* job may be re-queued from within callback */
+ if(!job->queued)
+ ellAdd(&pool->owned, &job->jobnode);
+ }
+
+ }
+
+ epicsEventSignal(pool->observerWakeup);
+ }
+
+ pool->threadsAreAwake--;
+ pool->threadsRunning--;
+
+ epicsEventSignal(pool->observerWakeup);
+ epicsMutexUnlock(pool->guard);
+
+ if(pool->threadsRunning)
+ epicsEventSignal(pool->workerWakeup); /* pass along */
+ else
+ epicsEventSignal(pool->shutdownEvent);
+
+ return;
+}
+
+void createPoolThread(epicsThreadPool *pool)
+{
+ epicsThreadId tid;
+
+ tid = epicsThreadCreate("PoolWorker",
+ pool->conf.workerPriority,
+ pool->conf.workerStack,
+ &workerMain,
+ pool);
+ if(!tid)
+ return;
+
+ pool->threadsRunning++;
+}
+
+epicsJob* epicsJobCreate(epicsThreadPool* pool,
+ epicsJobFunction func,
+ void* arg)
+{
+ epicsJob *job=calloc(1, sizeof(*job));
+
+ if(!job)
+ return NULL;
+
+ job->pool=pool;
+ job->func=func;
+ job->arg=arg;
+
+ if(pool) {
+ if(epicsMutexLock(pool->guard)!=epicsMutexLockOK) {
+ errlogMessage("epicsJobDestroy: lock error");
+ free(job);
+ return NULL;
+ }
+
+ ellAdd(&pool->owned, &job->jobnode);
+
+ epicsMutexUnlock(pool->guard);
+ }
+
+ return job;
+}
+
+void epicsJobSet(epicsJob* job,
+ epicsJobFunction func,
+ void* arg)
+{
+ job->func=func;
+ job->arg=arg;
+}
+
+void epicsJobDestroy(epicsJob* job)
+{
+ if(!job->pool){
+ free(job);
+ return;
+ }
+
+ if(epicsMutexLock(job->pool->guard)!=epicsMutexLockOK) {
+ errlogMessage("epicsJobDestroy: lock error");
+ return;
+ }
+
+ epicsJobCancel(job);
+
+ if(job->running)
+ job->freewhendone=1;
+ else
+ ellDelete(&job->pool->owned, &job->jobnode);
+
+ epicsMutexUnlock(job->pool->guard);
+
+ if(!job->running)
+ free(job);
+}
+
+long epicsJobMove(epicsJob* job, epicsThreadPool* newpool)
+{
+ epicsThreadPool *pool=job->pool;
+
+ /* remove from current pool */
+ if(pool) {
+ if(epicsMutexLock(pool->guard)!=epicsMutexLockOK) {
+ errlogMessage("epicsJobQueue: lock error");
+ return -1;
+ }
+
+ if(job->queued || job->running) {
+ epicsMutexUnlock(pool->guard);
+ return -1;
+ }
+
+ ellDelete(&pool->owned, &job->jobnode);
+
+ epicsMutexUnlock(pool->guard);
+ }
+
+ pool = job->pool = newpool;
+
+ /* add to new pool */
+ if(pool) {
+ if(epicsMutexLock(pool->guard)!=epicsMutexLockOK) {
+ errlogMessage("epicsJobQueue: lock error");
+ return -1;
+ }
+
+ ellAdd(&pool->owned, &job->jobnode);
+
+ epicsMutexUnlock(pool->guard);
+ }
+
+ return 0;
+}
+
+long epicsJobQueue(epicsJob* job)
+{
+ epicsThreadPool *pool=job->pool;
+ if(!job->pool)
+ return -1;
+
+ if(epicsMutexLock(pool->guard)!=epicsMutexLockOK) {
+ errlogMessage("epicsJobQueue: lock error");
+ return -1;
+ }
+
+ if(job->queued || pool->pauseadd) {
+ epicsMutexUnlock(pool->guard);
+ return -1;
+ } else if(job->queued) {
+ epicsMutexUnlock(pool->guard);
+ return 0;
+ }
+
+ job->queued=1;
+ /* Job may be queued from within a callback */
+ if(!job->running)
+ ellDelete(&pool->owned, &job->jobnode);
+ ellAdd(&pool->jobs, &job->jobnode);
+
+ if(job->running) {
+ /* some worker will find it again before sleeping */
+ epicsMutexUnlock(pool->guard);
+ return 0;
+ }
+
+ /* Since we hold the lock, we can be certain that all awake worker are
+ * executing work functions. The current thread may be a worker.
+ */
+
+ epicsEventSignal(pool->workerWakeup);
+
+ if(ellCount(&pool->jobs) > (pool->threadsRunning - pool->threadsAreAwake)
+ && pool->threadsRunning < pool->conf.maxThreads)
+ {
+ createPoolThread(pool);
+ }
+ /* else wait for a running worker to finish its current job */
+
+ epicsMutexUnlock(pool->guard);
+ return 0;
+}
+
+int epicsJobCancel(epicsJob* job)
+{
+ int ret=0;
+ epicsThreadPool *pool=job->pool;
+
+ if(!pool)
+ return 0;
+
+ if(epicsMutexLock(pool->guard)!=epicsMutexLockOK) {
+ errlogMessage("epicsJobCancel: lock error");
+ return 0;
+ }
+
+ if(job->queued) {
+ ellDelete(&pool->jobs, &job->jobnode);
+ ellAdd(&pool->owned, &job->jobnode);
+ job->queued=0;
+ ret=1;
+ }
+
+ epicsMutexUnlock(pool->guard);
+
+ return ret;
+}
+
=== added file 'src/libCom/pool/poolPriv.h'
--- src/libCom/pool/poolPriv.h 1970-01-01 00:00:00 +0000
+++ src/libCom/pool/poolPriv.h 2012-06-01 17:30:28 +0000
@@ -0,0 +1,71 @@
+/*************************************************************************\
+* Copyright (c) 2011 Brookhaven Science Associates, as Operator of
+* Brookhaven National Laboratory.
+* EPICS BASE is distributed subject to a Software License Agreement found
+* in file LICENSE that is included with this distribution.
+\*************************************************************************/
+
+#ifndef POOLPRIV_H
+#define POOLPRIV_H
+
+#include "epicsThreadPool.h"
+#include "ellLib.h"
+#include "epicsThread.h"
+
+
+struct epicsThreadPool {
+ ELLLIST jobs; /* run queue */
+ ELLLIST owned; /* unqueued jobs. */
+
+ /* # of running workers which are not waiting for a wakeup event */
+ size_t threadsAreAwake;
+ /* # of threads started and not stopped */
+ size_t threadsRunning;
+
+ epicsEventId workerWakeup;
+ epicsEventId shutdownEvent;
+
+ epicsEventId observerWakeup;
+
+ /* Disallow epicsJobQueue */
+ unsigned int pauseadd:1;
+ /* Prevent workers from running new jobs */
+ unsigned int pauserun:1;
+ /* tell workers to exit */
+ unsigned int shutdown:1;
+
+ epicsMutexId guard;
+
+ /* copy of config passed when created */
+ epicsThreadPoolConfig conf;
+};
+
+/* When created a job is idle. queued and running are false
+ * and jobnode is in the thread pool's owned list.
+ *
+ * When the job is added, the queued flag is set and jobnode
+ * is in the jobs list.
+ *
+ * When the job starts running the queued flag is cleared and
+ * the running flag is set. jobnode is not in any list
+ * (held locally by worker).
+ *
+ * When the job has finished running the running flag is cleared.
+ * The queued flag may be set if the job re-added itself.
+ * Based on the queued flag jobnode is added to the appropriate
+ * list.
+ */
+struct epicsJob {
+ ELLNODE jobnode;
+ epicsJobFunction func;
+ void *arg;
+ epicsThreadPool *pool;
+
+ unsigned int queued:1;
+ unsigned int running:1;
+ unsigned int freewhendone:1; /* lazy delete of running job */
+};
+
+void createPoolThread(epicsThreadPool *pool);
+
+#endif // POOLPRIV_H
=== added file 'src/libCom/pool/threadPool.c'
--- src/libCom/pool/threadPool.c 1970-01-01 00:00:00 +0000
+++ src/libCom/pool/threadPool.c 2012-06-01 17:30:28 +0000
@@ -0,0 +1,265 @@
+/*************************************************************************\
+* Copyright (c) 2011 Brookhaven Science Associates, as Operator of
+* Brookhaven National Laboratory.
+* EPICS BASE is distributed subject to a Software License Agreement found
+* in file LICENSE that is included with this distribution.
+\*************************************************************************/
+
+#include <stdlib.h>
+#include <string.h>
+
+#include <dbDefs.h>
+#include <errlog.h>
+#include <ellLib.h>
+#include <epicsThread.h>
+#include <epicsMutex.h>
+#include <epicsEvent.h>
+#include <epicsInterrupt.h>
+
+#define epicsExportSharedSymbols
+#include "epicsThreadPool.h"
+#include "poolPriv.h"
+
+
+void epicsThreadPoolConfigDefaults(epicsThreadPoolConfig *opts)
+{
+ memset(opts, 0, sizeof(*opts));
+ opts->maxThreads=1;
+ opts->workerPriority=epicsThreadPriorityMedium;
+ opts->workerStack=epicsThreadGetStackSize(epicsThreadStackSmall);
+}
+
+epicsThreadPool* epicsThreadPoolCreate(epicsThreadPoolConfig *opts)
+{
+ size_t i;
+ epicsThreadPool *pool;
+
+ pool=calloc(1, sizeof(*pool));
+ if(!pool)
+ return 0;
+
+ if(opts)
+ memcpy(&pool->conf, opts, sizeof(*opts));
+ else
+ epicsThreadPoolConfigDefaults(&pool->conf);
+
+ if(pool->conf.initialThreads > pool->conf.maxThreads)
+ pool->conf.initialThreads = pool->conf.maxThreads;
+
+ pool->workerWakeup=epicsEventCreate(epicsEventEmpty);
+ pool->shutdownEvent=epicsEventCreate(epicsEventEmpty);
+ pool->observerWakeup=epicsEventCreate(epicsEventEmpty);
+ pool->guard=epicsMutexCreate();
+
+ if(!pool->workerWakeup || !pool->guard || !pool->shutdownEvent || !pool->observerWakeup)
+ goto cleanup;
+
+ ellInit(&pool->jobs);
+ ellInit(&pool->owned);
+
+ if(epicsMutexLock(pool->guard)!=epicsMutexLockOK)
+ goto cleanup;
+
+ for(i=0; i<pool->conf.initialThreads; i++)
+ createPoolThread(pool);
+
+ epicsMutexUnlock(pool->guard);
+
+ if(pool->threadsRunning==0 && pool->conf.initialThreads!=0) {
+ errlogPrintf("Unable to create any threads for thread pool\n");
+ goto cleanup;
+
+ }else if(pool->threadsRunning < pool->conf.initialThreads) {
+ errlogPrintf("Warning: Unable to create all threads for thread pool (%d/%d)\n",
+ pool->threadsRunning, pool->conf.initialThreads);
+ }
+
+ return pool;
+
+cleanup:
+ if(pool->workerWakeup) epicsEventDestroy(pool->workerWakeup);
+ if(pool->shutdownEvent) epicsEventDestroy(pool->shutdownEvent);
+ if(pool->observerWakeup) epicsEventDestroy(pool->observerWakeup);
+ if(pool->guard) epicsMutexDestroy(pool->guard);
+
+ free(pool);
+ return 0;
+}
+
+void epicsThreadPoolControl(epicsThreadPool* pool, unsigned int opt, unsigned int val)
+{
+ if(epicsMutexLock(pool->guard)!=epicsMutexLockOK) {
+ errlogMessage("epicsThreadPoolDestroy: lock error");
+ return;
+ }
+
+ if(opt==EPICSPOOL_QUEUEADD) {
+ pool->pauseadd = !val;
+
+ } else if(opt==EPICSPOOL_QUEUERUN) {
+ if(!val && !pool->pauserun)
+ pool->pauserun=1;
+
+ else if(val && pool->pauserun) {
+ pool->pauserun=0;
+
+ if(ellCount(&pool->jobs))
+ epicsEventSignal(pool->workerWakeup);
+ }
+ }
+
+ epicsMutexUnlock(pool->guard);
+}
+
+int epicsThreadPoolWait(epicsThreadPool* pool, double timeout)
+{
+ if(epicsMutexLock(pool->guard)!=epicsMutexLockOK) {
+ errlogMessage("epicsThreadPoolDestroy: lock error");
+ return -1;
+ }
+
+ while(ellCount(&pool->jobs)>0 || pool->threadsAreAwake>0) {
+ epicsMutexUnlock(pool->guard);
+
+ if(timeout<0.0) {
+ if(epicsEventWait(pool->observerWakeup)!=epicsEventWaitOK)
+ return -1;
+
+ } else {
+ switch(epicsEventWaitWithTimeout(pool->observerWakeup, timeout)) {
+ case epicsEventWaitError:
+ return -1;
+ case epicsEventWaitTimeout:
+ return 1;
+ case epicsEventWaitOK:
+ break;
+ }
+
+ }
+
+ if(epicsMutexLock(pool->guard)!=epicsMutexLockOK) {
+ errlogMessage("epicsThreadPoolDestroy: relock error");
+ return -1;
+ }
+ }
+
+ epicsMutexUnlock(pool->guard);
+ return 0;
+}
+
+void epicsThreadPoolDestroy(epicsThreadPool *pool)
+{
+ size_t nThr;
+ ELLLIST notify;
+ ELLNODE *cur;
+
+ ellInit(¬ify);
+
+ if(epicsMutexLock(pool->guard)!=epicsMutexLockOK) {
+ errlogMessage("epicsThreadPoolDestroy: lock error");
+ return;
+ }
+
+ /* run remaining queued jobs */
+ epicsThreadPoolControl(pool, EPICSPOOL_QUEUEADD, 0);
+ epicsThreadPoolControl(pool, EPICSPOOL_QUEUERUN, 1);
+ nThr=pool->threadsRunning;
+
+ epicsMutexUnlock(pool->guard);
+
+ epicsThreadPoolWait(pool, -1.0);
+ /* At this point all queued jobs have run */
+
+ if(epicsMutexLock(pool->guard)!=epicsMutexLockOK) {
+ errlogMessage("epicsThreadPoolDestroy: relock error");
+ return;
+ }
+
+ pool->shutdown=1;
+ epicsEventSignal(pool->workerWakeup);
+
+ ellConcat(¬ify, &pool->owned);
+ if(ellCount(&pool->jobs)>0)
+ errlogMessage("Leaked jobs in thread pool cleanup!!!!!!!!!\n");
+
+ epicsMutexUnlock(pool->guard);
+
+ /* notify remaining jobs that pool is being destroyed */
+ while( (cur=ellGet(¬ify))!=NULL )
+ {
+ epicsJob *job=CONTAINER(cur, epicsJob, jobnode);
+ job->running=1;
+ (*job->func)(job->arg, EPICSJOB_CLEANUP);
+ job->running=0;
+ if(job->freewhendone)
+ free(job);
+ else
+ job->pool=NULL; /* orphan */
+ }
+
+ if(nThr && epicsEventWait(pool->shutdownEvent)!=epicsEventWaitOK){
+ epicsMutexUnlock(pool->guard);
+ errlogMessage("epicsThreadPoolDestroy: wait error");
+ return;
+ }
+
+ epicsEventDestroy(pool->workerWakeup);
+ epicsEventDestroy(pool->shutdownEvent);
+ epicsEventDestroy(pool->observerWakeup);
+ epicsMutexDestroy(pool->guard);
+
+ free(pool);
+}
+
+
+void epicsThreadPoolReport(epicsThreadPool *pool, FILE *fd)
+{
+ ELLNODE *cur;
+ if(epicsMutexLock(pool->guard)!=epicsMutexLockOK) {
+ errlogMessage("epicsThreadPoolReport Failed to lock");
+ return;
+ }
+
+ fprintf(fd, "Thread Pool with %lu/%lu threads\n"
+ " running %d jobs with %lu threads\n",
+ (unsigned long)pool->threadsRunning,
+ (unsigned long)pool->conf.maxThreads,
+ ellCount(&pool->jobs),
+ (unsigned long)pool->threadsAreAwake);
+ if(pool->pauseadd)
+ fprintf(fd, " Inhibit queueing\n");
+ if(pool->pauserun)
+ fprintf(fd, " Pause workers\n");
+ if(pool->shutdown)
+ fprintf(fd, " Shutdown in progress\n");
+
+ for(cur=ellFirst(&pool->jobs); cur; cur=ellNext(cur))
+ {
+ epicsJob *job=CONTAINER(cur, epicsJob, jobnode);
+ fprintf(fd, " job 0x%lu func: 0x%lu, arg: 0x%lu ",
+ (unsigned long)job, (unsigned long)job->func,
+ (unsigned long)job->arg);
+ if(job->queued)
+ fprintf(fd, "Queued ");
+ if(job->running)
+ fprintf(fd, "Running ");
+ if(job->freewhendone)
+ fprintf(fd, "Free ");
+ fprintf(fd, "\n");
+ }
+
+ epicsMutexUnlock(pool->guard);
+}
+
+size_t epicsThreadPoolNThreads(epicsThreadPool *pool)
+{
+ size_t ret;
+ if(epicsMutexLock(pool->guard)!=epicsMutexLockOK) {
+ errlogMessage("epicsThreadPoolReport Failed to lock");
+ return 0;
+ }
+ ret=pool->threadsRunning;
+ epicsMutexUnlock(pool->guard);
+ return ret;
+}
+
=== modified file 'src/libCom/test/Makefile'
--- src/libCom/test/Makefile 2011-10-28 20:19:54 +0000
+++ src/libCom/test/Makefile 2012-06-01 17:30:28 +0000
@@ -12,6 +12,10 @@
PROD_LIBS += Com
+TESTPROD_HOST += epicsThreadPoolTest
+epicsThreadPoolTest_SRCS += epicsThreadPoolTest.c
+TESTS += epicsThreadPoolTest
+
TESTPROD_HOST += epicsUnitTestTest
epicsUnitTestTest_SRCS += epicsUnitTestTest.c
# Not much point running this on vxWorks or RTEMS...
=== added file 'src/libCom/test/epicsThreadPoolTest.c'
--- src/libCom/test/epicsThreadPoolTest.c 1970-01-01 00:00:00 +0000
+++ src/libCom/test/epicsThreadPoolTest.c 2012-06-01 17:30:28 +0000
@@ -0,0 +1,333 @@
+/*************************************************************************\
+* Copyright (c) 2011 Brookhaven Science Associates, as Operator of
+* Brookhaven National Laboratory.
+* EPICS BASE is distributed subject to a Software License Agreement found
+* in file LICENSE that is included with this distribution.
+\*************************************************************************/
+
+#include "epicsThreadPool.h"
+
+#include "testMain.h"
+#include "epicsUnitTest.h"
+
+#include "cantProceed.h"
+#include "epicsEvent.h"
+#include "epicsMutex.h"
+#include "epicsThread.h"
+
+/* Do nothing */
+static void nullop(void)
+{
+ epicsThreadPool *pool;
+ testDiag("nullop()");
+ {
+ epicsThreadPoolConfig conf;
+ epicsThreadPoolConfigDefaults(&conf);
+ testOk1(conf.maxThreads>0);
+
+ testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL);
+ if(!pool)
+ return;
+ }
+
+ epicsThreadPoolDestroy(pool);
+}
+
+/* Just create and destroy worker threads */
+static void oneop(void)
+{
+ epicsThreadPool *pool;
+ testDiag("oneop()");
+ {
+ epicsThreadPoolConfig conf;
+ epicsThreadPoolConfigDefaults(&conf);
+ conf.initialThreads=2;
+ testOk1(conf.maxThreads>0);
+
+ testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL);
+ if(!pool)
+ return;
+ }
+
+ epicsThreadPoolDestroy(pool);
+}
+
+/* Test that Bursts of jobs will create enough threads to
+ * run all in parallel
+ */
+typedef struct {
+ epicsMutexId guard;
+ unsigned int count;
+ epicsEventId allrunning;
+ epicsEventId done;
+ epicsJob **job;
+} countPriv;
+
+static void countjob(void *param, unsigned int flag)
+{
+ countPriv *cnt=param;
+ testOk1(flag==EPICSJOB_RUN||flag==EPICSJOB_CLEANUP);
+ if(flag!=EPICSJOB_RUN)
+ return;
+
+ epicsMutexMustLock(cnt->guard);
+ testDiag("Job %lu", (unsigned long)cnt->count);
+ cnt->count--;
+ if(cnt->count==0) {
+ testDiag("All jobs running");
+ epicsEventSignal(cnt->allrunning);
+ }
+ epicsMutexUnlock(cnt->guard);
+
+ epicsEventMustWait(cnt->done);
+ epicsEventSignal(cnt->done); /* pass along to next thread */
+}
+
+static void postjobs(size_t icnt, size_t mcnt)
+{
+ size_t i;
+ epicsThreadPool *pool;
+ countPriv *priv=callocMustSucceed(1, sizeof(*priv), "postjobs priv alloc");
+ priv->guard=epicsMutexMustCreate();
+ priv->done=epicsEventMustCreate(epicsEventEmpty);
+ priv->allrunning=epicsEventMustCreate(epicsEventEmpty);
+ priv->count=mcnt;
+ priv->job=callocMustSucceed(mcnt, sizeof(*priv->job), "postjobs job array");
+
+ testDiag("postjobs(%lu,%lu)", (unsigned long)icnt, (unsigned long)mcnt);
+
+ {
+ epicsThreadPoolConfig conf;
+ epicsThreadPoolConfigDefaults(&conf);
+ conf.initialThreads=icnt;
+ conf.maxThreads=mcnt;
+
+ testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL);
+ if(!pool)
+ return;
+ }
+
+ for(i=0; i<mcnt; i++) {
+ testDiag("i=%lu", (unsigned long)i);
+ priv->job[i] = epicsJobCreate(pool, &countjob, priv);
+ testOk1(priv->job[i]!=NULL);
+ testOk1(epicsJobQueue(priv->job[i])==0);
+ }
+
+ testDiag("Waiting for all jobs to start");
+ epicsEventMustWait(priv->allrunning);
+ testDiag("Stop all");
+ epicsEventSignal(priv->done);
+
+ for(i=0; i<mcnt; i++) {
+ testDiag("i=%lu", (unsigned long)i);
+ epicsJobDestroy(priv->job[i]);
+ }
+
+ epicsThreadPoolDestroy(pool);
+ epicsMutexDestroy(priv->guard);
+ epicsEventDestroy(priv->allrunning);
+ epicsEventDestroy(priv->done);
+ free(priv->job);
+ free(priv);
+}
+
+/* Test cancel from job (no-op)
+ * and destroy from job (lazy free)
+ */
+static void cleanupjob0(void* arg, unsigned int flags)
+{
+ epicsJob *job=arg;
+ testOk1(flags==EPICSJOB_RUN||flags==EPICSJOB_CLEANUP);
+ if(flags!=EPICSJOB_RUN)
+ return;
+
+ testOk1(epicsJobCancel(job)==0);
+ epicsJobDestroy(job); /* delete while job is running */
+}
+static void cleanupjob1(void* arg, unsigned int flags)
+{
+ epicsJob *job=arg;
+ testOk1(flags==EPICSJOB_RUN||flags==EPICSJOB_CLEANUP);
+ if(flags!=EPICSJOB_RUN)
+ return;
+
+ testOk1(epicsJobCancel(job)==0);
+ /* async delete (job may be running, or queued, or ...) */
+}
+static void cleanupjob2(void* arg, unsigned int flags)
+{
+ epicsJob *job=arg;
+ testOk1(flags==EPICSJOB_RUN||flags==EPICSJOB_CLEANUP);
+ if(flags==EPICSJOB_CLEANUP)
+ epicsJobDestroy(job); /* delete when threadpool is destroyed */
+
+ if(flags==EPICSJOB_RUN)
+ testOk1(epicsJobCancel(job)==0);
+}
+static epicsJobFunction cleanupjobs[3] = {&cleanupjob0,&cleanupjob1,&cleanupjob2};
+
+static void testcleanup(void)
+{
+ int i=0;
+ epicsThreadPool *pool;
+ epicsJob *job[3];
+
+ testDiag("testcleanup()");
+
+ testOk1((pool=epicsThreadPoolCreate(NULL))!=NULL);
+ if(!pool)
+ return;
+
+ /* unrolled so that valgrind can show which methods leaks */
+ testOk1((job[0]=epicsJobCreate(pool, cleanupjobs[0], NULL))!=NULL);
+ testOk1((job[1]=epicsJobCreate(pool, cleanupjobs[1], NULL))!=NULL);
+ testOk1((job[2]=epicsJobCreate(pool, cleanupjobs[2], NULL))!=NULL);
+ for(i=0; i<3; i++) {
+ epicsJobSet(job[i], cleanupjobs[i], job[i]);
+ testOk1(epicsJobQueue(job[i])==0);
+ }
+
+ epicsThreadPoolWait(pool, -1);
+ epicsJobDestroy(job[1]);
+ epicsThreadPoolDestroy(pool);
+}
+
+/* Test re-add from inside job */
+typedef struct {
+ unsigned int count;
+ epicsEventId done;
+ epicsJob *job;
+ unsigned int inprogress;
+} readdPriv;
+
+static void readdjob(void *arg, unsigned int flags)
+{
+ readdPriv *priv=arg;
+ testOk1(flags==EPICSJOB_RUN||flags==EPICSJOB_CLEANUP);
+ if(flags!=EPICSJOB_RUN)
+ return;
+ testOk1(priv->inprogress==0);
+ testDiag("count==%u", priv->count);
+
+ if(priv->count--) {
+ priv->inprogress=1;
+ epicsJobQueue(priv->job);
+ epicsThreadSleep(0.05);
+ priv->inprogress=0;
+ }else{
+ epicsEventSignal(priv->done);
+ epicsJobDestroy(priv->job);
+ }
+}
+
+static void testreadd(void) {
+ epicsThreadPool *pool;
+ readdPriv *priv=callocMustSucceed(1, sizeof(*priv), "testcleanup priv");
+
+ testDiag("testreadd");
+
+ priv->done=epicsEventMustCreate(epicsEventEmpty);
+ priv->count=5;
+
+ testOk1((pool=epicsThreadPoolCreate(NULL))!=NULL);
+ if(!pool)
+ return;
+
+ testOk1((priv->job=epicsJobCreate(pool, &readdjob, priv))!=NULL);
+
+ testOk1(epicsJobQueue(priv->job)==0);
+ epicsEventMustWait(priv->done);
+
+ testOk1(epicsThreadPoolNThreads(pool)==1);
+
+ epicsThreadPoolDestroy(pool);
+ epicsEventDestroy(priv->done);
+ free(priv);
+
+}
+
+/* test job canceling */
+static
+void neverrun(void *arg, unsigned int flag)
+{
+ epicsJob *job=arg;
+ testOk1(flag!=EPICSJOB_RUN);
+ if(flag==EPICSJOB_CLEANUP)
+ epicsJobDestroy(job);
+}
+static epicsEventId cancel[2];
+static
+void toolate(void *arg, unsigned int flag)
+{
+ epicsJob *job=arg;
+ if(flag==EPICSJOB_CLEANUP){
+ epicsJobDestroy(job);
+ return;
+ }
+ testPass("Job runs");
+ epicsEventSignal(cancel[0]);
+ epicsEventMustWait(cancel[1]);
+}
+
+static
+void testcancel(void)
+{
+ epicsJob *job[2];
+ epicsThreadPool *pool;
+ testOk1((pool=epicsThreadPoolCreate(NULL))!=NULL);
+ if(!pool)
+ return;
+
+ cancel[0]=epicsEventCreate(epicsEventEmpty);
+ cancel[1]=epicsEventCreate(epicsEventEmpty);
+
+ testOk1((job[0]=epicsJobCreate(pool, NULL, NULL))!=NULL);
+ testOk1((job[1]=epicsJobCreate(pool, NULL, NULL))!=NULL);
+ epicsJobSet(job[0], &neverrun, job[0]);
+ epicsJobSet(job[1], &toolate, job[1]);
+
+ /* freeze */
+ epicsThreadPoolControl(pool, EPICSPOOL_QUEUERUN, 0);
+
+ testOk1(epicsJobCancel(job[0])==0);
+
+ epicsJobQueue(job[0]);
+ testOk1(epicsJobCancel(job[0])==1);
+ testOk1(epicsJobCancel(job[0])==0);
+
+ epicsThreadSleep(0.01);
+ epicsJobQueue(job[0]);
+ testOk1(epicsJobCancel(job[0])==1);
+ testOk1(epicsJobCancel(job[0])==0);
+
+ epicsThreadPoolControl(pool, EPICSPOOL_QUEUERUN, 1);
+
+ epicsJobQueue(job[1]);
+
+ epicsEventMustWait(cancel[0]);
+ testOk1(epicsJobCancel(job[0])==0);
+ epicsEventSignal(cancel[1]);
+
+ epicsThreadPoolDestroy(pool);
+ epicsEventDestroy(cancel[0]);
+ epicsEventDestroy(cancel[1]);
+}
+
+MAIN(epicsThreadPoolTest)
+{
+ testPlan(92);
+
+ nullop();
+ oneop();
+ postjobs(1,1);
+ postjobs(0,1);
+ postjobs(4,4);
+ postjobs(0,4);
+ postjobs(2,4);
+ testcleanup();
+ testreadd();
+ testcancel();
+
+ return testDone();
+}
- Replies:
- Re: [Merge] lp:~epics-core/epics-base/thread-pool into lp:epics-base mdavidsaver
- Re: [Merge] lp:~epics-core/epics-base/thread-pool into lp:epics-base mdavidsaver
- Navigate by Date:
- Prev:
[Merge] lp:~epics-core/epics-base/server-side-plugins into lp:epics-base mdavidsaver
- Next:
[Merge] lp:~epics-core/epics-base/msi-join into lp:epics-base Andrew Johnson
- Index:
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
<2012>
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
- Navigate by Thread:
- Prev:
[Merge] lp:~epics-core/epics-base/server-side-plugins into lp:epics-base noreply
- Next:
Re: [Merge] lp:~epics-core/epics-base/thread-pool into lp:epics-base mdavidsaver
- Index:
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
<2012>
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
|