EPICS Controls Argonne National Laboratory

Experimental Physics and
Industrial Control System

2002  2003  2004  2005  2006  2007  2008  2009  2010  2011  <20122013  2014  2015  2016  2017  2018  2019  2020  2021  2022  2023  2024  Index 2002  2003  2004  2005  2006  2007  2008  2009  2010  2011  <20122013  2014  2015  2016  2017  2018  2019  2020  2021  2022  2023  2024 
<== Date ==> <== Thread ==>

Subject: [Merge] lp:~epics-core/epics-base/thread-pool into lp:epics-base
From: mdavidsaver <[email protected]>
To: [email protected]
Date: Fri, 01 Jun 2012 17:31:20 -0000
mdavidsaver has proposed merging lp:~epics-core/epics-base/thread-pool into lp:epics-base.

Requested reviews:
  EPICS Core Developers (epics-core)

For more details, see:
https://code.launchpad.net/~epics-core/epics-base/thread-pool/+merge/108385

General purpose thread pool.

Remaining tasks

* Include in RTEMS/vxworks test harness
* For RTEMS/vxworks, replace mutex with interrupt disable
* Avoid excessive wakeups from worker and observer Events.
-- 
https://code.launchpad.net/~epics-core/epics-base/thread-pool/+merge/108385
Your team EPICS Core Developers is requested to review the proposed merge of lp:~epics-core/epics-base/thread-pool into lp:epics-base.
=== modified file 'src/libCom/Makefile'
--- src/libCom/Makefile	2012-03-01 17:31:32 +0000
+++ src/libCom/Makefile	2012-06-01 17:30:28 +0000
@@ -31,6 +31,7 @@
 include $(LIBCOM)/macLib/Makefile
 include $(LIBCOM)/misc/Makefile
 include $(LIBCOM)/osi/Makefile
+include $(LIBCOM)/pool/Makefile
 include $(LIBCOM)/ring/Makefile
 include $(LIBCOM)/taskwd/Makefile
 include $(LIBCOM)/timer/Makefile

=== added directory 'src/libCom/pool'
=== added file 'src/libCom/pool/Makefile'
--- src/libCom/pool/Makefile	1970-01-01 00:00:00 +0000
+++ src/libCom/pool/Makefile	2012-06-01 17:30:28 +0000
@@ -0,0 +1,16 @@
+#*************************************************************************
+# Copyright (c) 2010 UChicago Argonne LLC, as Operator of Argonne
+#     National Laboratory.
+# EPICS BASE is distributed subject to a Software License Agreement found
+# in file LICENSE that is included with this distribution. 
+#*************************************************************************
+
+# This is a Makefile fragment, see src/libCom/Makefile.
+
+SRC_DIRS += $(LIBCOM)/pool
+
+INC += epicsThreadPool.h
+
+Com_SRCS += poolJob.c
+Com_SRCS += threadPool.c
+

=== added file 'src/libCom/pool/epicsThreadPool.h'
--- src/libCom/pool/epicsThreadPool.h	1970-01-01 00:00:00 +0000
+++ src/libCom/pool/epicsThreadPool.h	2012-06-01 17:30:28 +0000
@@ -0,0 +1,113 @@
+/*************************************************************************\
+* Copyright (c) 2011 Brookhaven Science Associates, as Operator of
+*     Brookhaven National Laboratory.
+* EPICS BASE is distributed subject to a Software License Agreement found
+* in file LICENSE that is included with this distribution.
+\*************************************************************************/
+/* General purpose worker thread pool manager
+ * [email protected]
+ */
+#ifndef EPICSTHREADPOOL_H
+#define EPICSTHREADPOOL_H
+
+#include <stdlib.h>
+#include <stdio.h>
+
+#include "shareLib.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct {
+    size_t initialThreads;
+    size_t maxThreads;
+    unsigned int workerStack;
+    unsigned int workerPriority;
+} epicsThreadPoolConfig;
+
+typedef struct epicsThreadPool epicsThreadPool;
+
+/* Job function call modes */
+/* Normal run of job */
+#define EPICSJOB_RUN 1
+/* Thread pool is being destroyed.  A chance to cleanup the job */
+#define EPICSJOB_CLEANUP 2
+
+typedef void (*epicsJobFunction)(void* arg, unsigned int mode);
+
+typedef struct epicsJob epicsJob;
+
+/* Pool operations */
+
+/* Initialize a pool config with default values */
+epicsShareFunc void epicsThreadPoolConfigDefaults(epicsThreadPoolConfig *);
+
+/* If opts is NULL then defaults are used.
+ * The opts pointer is not stored by this call and may be freed by the caller
+ */
+epicsShareFunc epicsThreadPool* epicsThreadPoolCreate(epicsThreadPoolConfig *opts);
+
+/* Blocks until all jobs have ran and all worker threads have stopped */
+epicsShareFunc void epicsThreadPoolDestroy(epicsThreadPool *);
+
+/* pool control options */
+#define EPICSPOOL_QUEUEADD 1 /* val==0 causes epicsJobQueue to fail, 1 is default */
+#define EPICSPOOL_QUEUERUN 2 /* val==0 prevents workers from running jobs, 1 is default */
+epicsShareFunc void epicsThreadPoolControl(epicsThreadPool* pool, unsigned int opt, unsigned int val);
+
+/* Block until job queue is emptied and no jobs are running.
+ * timeout<0 waits forever, timeout==0 polls, timeout>0 waits for a fixed time
+ * Returns 1 for timeout, 0 for success, -1 on errors
+ */
+epicsShareFunc int epicsThreadPoolWait(epicsThreadPool* pool, double timeout);
+
+
+/* Per job operations */
+
+/* creates, but does not add, a new job */
+epicsShareFunc epicsJob* epicsJobCreate(epicsThreadPool*,
+                                        epicsJobFunction,
+                                        void*);
+/* override the job function and argument set on creation.
+ * Not safe to call on queued or running job.
+ */
+epicsShareFunc void epicsJobSet(epicsJob*,
+                                epicsJobFunction,
+                                void*);
+/* Cancel and free a job structure.  Does not block.
+ * job may not be immediately free'd.
+ * Safe to call from a running job function.
+ */
+epicsShareFunc void epicsJobDestroy(epicsJob*);
+
+/* Move the job to a different pool.
+ * Job must not be running or queued.
+ */
+epicsShareFunc long epicsJobMove(epicsJob*, epicsThreadPool*);
+
+/* Adds the job to the run queue
+ * Safe to call from a running job function.
+ * returns !0 on error
+ */
+epicsShareFunc long epicsJobQueue(epicsJob*);
+
+/* Remove a job from the run queue before it starts.
+ * Safe to call from a running job function.
+ * returns 0 if job already ran or was not queued,
+ *         1 if job was queued, didn't run, and was cancelled
+ */
+epicsShareFunc int epicsJobCancel(epicsJob*);
+
+
+/* Mostly useful for debugging */
+
+epicsShareFunc void epicsThreadPoolReport(epicsThreadPool *pool, FILE *fd);
+
+epicsShareFunc size_t epicsThreadPoolNThreads(epicsThreadPool *);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // EPICSTHREADPOOL_H

=== added file 'src/libCom/pool/poolJob.c'
--- src/libCom/pool/poolJob.c	1970-01-01 00:00:00 +0000
+++ src/libCom/pool/poolJob.c	2012-06-01 17:30:28 +0000
@@ -0,0 +1,285 @@
+/*************************************************************************\
+* Copyright (c) 2011 Brookhaven Science Associates, as Operator of
+*     Brookhaven National Laboratory.
+* EPICS BASE is distributed subject to a Software License Agreement found
+* in file LICENSE that is included with this distribution.
+\*************************************************************************/
+
+#include <stdlib.h>
+#include <string.h>
+
+#include <dbDefs.h>
+#include <errlog.h>
+#include <ellLib.h>
+#include <epicsThread.h>
+#include <epicsMutex.h>
+#include <epicsEvent.h>
+#include <epicsInterrupt.h>
+
+#define epicsExportSharedSymbols
+#include "epicsThreadPool.h"
+#include "poolPriv.h"
+
+
+static
+void workerMain(void* arg)
+{
+    epicsThreadPool *pool=arg;
+
+    epicsMutexMustLock(pool->guard);
+    pool->threadsAreAwake++;
+
+    while(1)
+    {
+        ELLNODE *cur;
+        epicsJob *job;
+
+        pool->threadsAreAwake--;
+        epicsMutexUnlock(pool->guard);
+
+        epicsEventMustWait(pool->workerWakeup);
+
+        epicsMutexMustLock(pool->guard);
+        pool->threadsAreAwake++;
+
+        if(pool->shutdown)
+            break;
+
+        if(pool->pauserun)
+            continue;
+
+        /* If there are (other) jobs and idle workers, wake one up */
+        if(ellCount(&pool->jobs) > 1 &&
+           pool->threadsAreAwake < pool->threadsRunning)
+        {
+            epicsEventSignal(pool->workerWakeup);
+        }
+
+        if(ellCount(&pool->jobs)==0)
+            errlogPrintf("============= pool worker wakeup with no work\n");
+
+        while ((cur=ellGet(&pool->jobs)) !=NULL)
+        {
+            job=CONTAINER(cur, epicsJob, jobnode);
+            job->queued=0;
+            job->running=1;
+
+            epicsMutexUnlock(pool->guard);
+            (*job->func)(job->arg, EPICSJOB_RUN);
+            epicsMutexMustLock(pool->guard);
+
+            if(job->freewhendone)
+                free(job);
+            else {
+                job->running=0;
+                /* job may be re-queued from within callback */
+                if(!job->queued)
+                    ellAdd(&pool->owned, &job->jobnode);
+            }
+
+        }
+
+        epicsEventSignal(pool->observerWakeup);
+    }
+
+    pool->threadsAreAwake--;
+    pool->threadsRunning--;
+
+    epicsEventSignal(pool->observerWakeup);
+    epicsMutexUnlock(pool->guard);
+
+    if(pool->threadsRunning)
+        epicsEventSignal(pool->workerWakeup); /* pass along */
+    else
+        epicsEventSignal(pool->shutdownEvent);
+
+    return;
+}
+
+void createPoolThread(epicsThreadPool *pool)
+{
+    epicsThreadId tid;
+
+    tid = epicsThreadCreate("PoolWorker",
+                            pool->conf.workerPriority,
+                            pool->conf.workerStack,
+                            &workerMain,
+                            pool);
+    if(!tid)
+        return;
+
+    pool->threadsRunning++;
+}
+
+epicsJob* epicsJobCreate(epicsThreadPool* pool,
+                         epicsJobFunction func,
+                         void* arg)
+{
+    epicsJob *job=calloc(1, sizeof(*job));
+
+    if(!job)
+        return NULL;
+
+    job->pool=pool;
+    job->func=func;
+    job->arg=arg;
+
+    if(pool) {
+        if(epicsMutexLock(pool->guard)!=epicsMutexLockOK) {
+            errlogMessage("epicsJobDestroy: lock error");
+            free(job);
+            return NULL;
+        }
+
+        ellAdd(&pool->owned, &job->jobnode);
+
+        epicsMutexUnlock(pool->guard);
+    }
+
+    return job;
+}
+
+void epicsJobSet(epicsJob* job,
+                 epicsJobFunction func,
+                 void* arg)
+{
+    job->func=func;
+    job->arg=arg;
+}
+
+void epicsJobDestroy(epicsJob* job)
+{
+    if(!job->pool){
+        free(job);
+        return;
+    }
+
+    if(epicsMutexLock(job->pool->guard)!=epicsMutexLockOK) {
+        errlogMessage("epicsJobDestroy: lock error");
+        return;
+    }
+
+    epicsJobCancel(job);
+
+    if(job->running)
+        job->freewhendone=1;
+    else
+        ellDelete(&job->pool->owned, &job->jobnode);
+
+    epicsMutexUnlock(job->pool->guard);
+
+    if(!job->running)
+        free(job);
+}
+
+long epicsJobMove(epicsJob* job, epicsThreadPool* newpool)
+{
+    epicsThreadPool *pool=job->pool;
+
+    /* remove from current pool */
+    if(pool) {
+        if(epicsMutexLock(pool->guard)!=epicsMutexLockOK) {
+            errlogMessage("epicsJobQueue: lock error");
+            return -1;
+        }
+
+        if(job->queued || job->running) {
+            epicsMutexUnlock(pool->guard);
+            return -1;
+        }
+
+        ellDelete(&pool->owned, &job->jobnode);
+
+        epicsMutexUnlock(pool->guard);
+    }
+
+    pool = job->pool = newpool;
+
+    /* add to new pool */
+    if(pool) {
+        if(epicsMutexLock(pool->guard)!=epicsMutexLockOK) {
+            errlogMessage("epicsJobQueue: lock error");
+            return -1;
+        }
+
+        ellAdd(&pool->owned, &job->jobnode);
+
+        epicsMutexUnlock(pool->guard);
+    }
+
+    return 0;
+}
+
+long epicsJobQueue(epicsJob* job)
+{
+    epicsThreadPool *pool=job->pool;
+    if(!job->pool)
+        return -1;
+
+    if(epicsMutexLock(pool->guard)!=epicsMutexLockOK) {
+        errlogMessage("epicsJobQueue: lock error");
+        return -1;
+    }
+
+    if(job->queued || pool->pauseadd) {
+        epicsMutexUnlock(pool->guard);
+        return -1;
+    } else if(job->queued) {
+        epicsMutexUnlock(pool->guard);
+        return 0;
+    }
+
+    job->queued=1;
+    /* Job may be queued from within a callback */
+    if(!job->running)
+        ellDelete(&pool->owned, &job->jobnode);
+    ellAdd(&pool->jobs, &job->jobnode);
+
+    if(job->running) {
+        /* some worker will find it again before sleeping */
+        epicsMutexUnlock(pool->guard);
+        return 0;
+    }
+
+    /* Since we hold the lock, we can be certain that all awake worker are
+     * executing work functions.  The current thread may be a worker.
+     */
+
+    epicsEventSignal(pool->workerWakeup);
+
+    if(ellCount(&pool->jobs) > (pool->threadsRunning - pool->threadsAreAwake)
+        && pool->threadsRunning < pool->conf.maxThreads)
+    {
+        createPoolThread(pool);
+    }
+    /* else wait for a running worker to finish its current job */
+
+    epicsMutexUnlock(pool->guard);
+    return 0;
+}
+
+int epicsJobCancel(epicsJob* job)
+{
+    int ret=0;
+    epicsThreadPool *pool=job->pool;
+
+    if(!pool)
+        return 0;
+
+    if(epicsMutexLock(pool->guard)!=epicsMutexLockOK) {
+        errlogMessage("epicsJobCancel: lock error");
+        return 0;
+    }
+
+    if(job->queued) {
+        ellDelete(&pool->jobs, &job->jobnode);
+        ellAdd(&pool->owned, &job->jobnode);
+        job->queued=0;
+        ret=1;
+    }
+
+    epicsMutexUnlock(pool->guard);
+
+    return ret;
+}
+

=== added file 'src/libCom/pool/poolPriv.h'
--- src/libCom/pool/poolPriv.h	1970-01-01 00:00:00 +0000
+++ src/libCom/pool/poolPriv.h	2012-06-01 17:30:28 +0000
@@ -0,0 +1,71 @@
+/*************************************************************************\
+* Copyright (c) 2011 Brookhaven Science Associates, as Operator of
+*     Brookhaven National Laboratory.
+* EPICS BASE is distributed subject to a Software License Agreement found
+* in file LICENSE that is included with this distribution.
+\*************************************************************************/
+
+#ifndef POOLPRIV_H
+#define POOLPRIV_H
+
+#include "epicsThreadPool.h"
+#include "ellLib.h"
+#include "epicsThread.h"
+
+
+struct epicsThreadPool {
+    ELLLIST jobs; /* run queue */
+    ELLLIST owned; /* unqueued jobs. */
+
+    /* # of running workers which are not waiting for a wakeup event */
+    size_t threadsAreAwake;
+    /* # of threads started and not stopped */
+    size_t threadsRunning;
+
+    epicsEventId workerWakeup;
+    epicsEventId shutdownEvent;
+
+    epicsEventId observerWakeup;
+
+    /* Disallow epicsJobQueue */
+    unsigned int pauseadd:1;
+    /* Prevent workers from running new jobs */
+    unsigned int pauserun:1;
+    /* tell workers to exit */
+    unsigned int shutdown:1;
+
+    epicsMutexId guard;
+
+    /* copy of config passed when created */
+    epicsThreadPoolConfig conf;
+};
+
+/* When created a job is idle.  queued and running are false
+ * and jobnode is in the thread pool's owned list.
+ *
+ * When the job is added, the queued flag is set and jobnode
+ * is in the jobs list.
+ *
+ * When the job starts running the queued flag is cleared and
+ * the running flag is set.  jobnode is not in any list
+ * (held locally by worker).
+ *
+ * When the job has finished running the running flag is cleared.
+ * The queued flag may be set if the job re-added itself.
+ * Based on the queued flag jobnode is added to the appropriate
+ * list.
+ */
+struct epicsJob {
+    ELLNODE jobnode;
+    epicsJobFunction func;
+    void *arg;
+    epicsThreadPool *pool;
+
+    unsigned int queued:1;
+    unsigned int running:1;
+    unsigned int freewhendone:1; /* lazy delete of running job */
+};
+
+void createPoolThread(epicsThreadPool *pool);
+
+#endif // POOLPRIV_H

=== added file 'src/libCom/pool/threadPool.c'
--- src/libCom/pool/threadPool.c	1970-01-01 00:00:00 +0000
+++ src/libCom/pool/threadPool.c	2012-06-01 17:30:28 +0000
@@ -0,0 +1,265 @@
+/*************************************************************************\
+* Copyright (c) 2011 Brookhaven Science Associates, as Operator of
+*     Brookhaven National Laboratory.
+* EPICS BASE is distributed subject to a Software License Agreement found
+* in file LICENSE that is included with this distribution.
+\*************************************************************************/
+
+#include <stdlib.h>
+#include <string.h>
+
+#include <dbDefs.h>
+#include <errlog.h>
+#include <ellLib.h>
+#include <epicsThread.h>
+#include <epicsMutex.h>
+#include <epicsEvent.h>
+#include <epicsInterrupt.h>
+
+#define epicsExportSharedSymbols
+#include "epicsThreadPool.h"
+#include "poolPriv.h"
+
+
+void epicsThreadPoolConfigDefaults(epicsThreadPoolConfig *opts)
+{
+    memset(opts, 0, sizeof(*opts));
+    opts->maxThreads=1;
+    opts->workerPriority=epicsThreadPriorityMedium;
+    opts->workerStack=epicsThreadGetStackSize(epicsThreadStackSmall);
+}
+
+epicsThreadPool* epicsThreadPoolCreate(epicsThreadPoolConfig *opts)
+{
+    size_t i;
+    epicsThreadPool *pool;
+
+    pool=calloc(1, sizeof(*pool));
+    if(!pool)
+        return 0;
+
+    if(opts)
+        memcpy(&pool->conf, opts, sizeof(*opts));
+    else
+        epicsThreadPoolConfigDefaults(&pool->conf);
+
+    if(pool->conf.initialThreads > pool->conf.maxThreads)
+        pool->conf.initialThreads = pool->conf.maxThreads;
+
+    pool->workerWakeup=epicsEventCreate(epicsEventEmpty);
+    pool->shutdownEvent=epicsEventCreate(epicsEventEmpty);
+    pool->observerWakeup=epicsEventCreate(epicsEventEmpty);
+    pool->guard=epicsMutexCreate();
+
+    if(!pool->workerWakeup || !pool->guard || !pool->shutdownEvent || !pool->observerWakeup)
+        goto cleanup;
+
+    ellInit(&pool->jobs);
+    ellInit(&pool->owned);
+
+    if(epicsMutexLock(pool->guard)!=epicsMutexLockOK)
+        goto cleanup;
+
+    for(i=0; i<pool->conf.initialThreads; i++)
+        createPoolThread(pool);
+
+    epicsMutexUnlock(pool->guard);
+
+    if(pool->threadsRunning==0 && pool->conf.initialThreads!=0) {
+        errlogPrintf("Unable to create any threads for thread pool\n");
+        goto cleanup;
+
+    }else if(pool->threadsRunning < pool->conf.initialThreads) {
+        errlogPrintf("Warning: Unable to create all threads for thread pool (%d/%d)\n",
+                     pool->threadsRunning, pool->conf.initialThreads);
+    }
+
+    return pool;
+
+cleanup:
+    if(pool->workerWakeup) epicsEventDestroy(pool->workerWakeup);
+    if(pool->shutdownEvent) epicsEventDestroy(pool->shutdownEvent);
+    if(pool->observerWakeup) epicsEventDestroy(pool->observerWakeup);
+    if(pool->guard) epicsMutexDestroy(pool->guard);
+
+    free(pool);
+    return 0;
+}
+
+void epicsThreadPoolControl(epicsThreadPool* pool, unsigned int opt, unsigned int val)
+{
+    if(epicsMutexLock(pool->guard)!=epicsMutexLockOK) {
+        errlogMessage("epicsThreadPoolDestroy: lock error");
+        return;
+    }
+
+    if(opt==EPICSPOOL_QUEUEADD) {
+        pool->pauseadd = !val;
+
+    } else if(opt==EPICSPOOL_QUEUERUN) {
+        if(!val && !pool->pauserun)
+            pool->pauserun=1;
+
+        else if(val && pool->pauserun) {
+            pool->pauserun=0;
+
+            if(ellCount(&pool->jobs))
+                epicsEventSignal(pool->workerWakeup);
+        }
+    }
+
+    epicsMutexUnlock(pool->guard);
+}
+
+int epicsThreadPoolWait(epicsThreadPool* pool, double timeout)
+{
+    if(epicsMutexLock(pool->guard)!=epicsMutexLockOK) {
+        errlogMessage("epicsThreadPoolDestroy: lock error");
+        return -1;
+    }
+
+    while(ellCount(&pool->jobs)>0 || pool->threadsAreAwake>0) {
+        epicsMutexUnlock(pool->guard);
+
+        if(timeout<0.0) {
+            if(epicsEventWait(pool->observerWakeup)!=epicsEventWaitOK)
+                return -1;
+
+        } else {
+            switch(epicsEventWaitWithTimeout(pool->observerWakeup, timeout)) {
+            case epicsEventWaitError:
+                return -1;
+            case epicsEventWaitTimeout:
+                return 1;
+            case epicsEventWaitOK:
+                break;
+            }
+
+        }
+
+        if(epicsMutexLock(pool->guard)!=epicsMutexLockOK) {
+            errlogMessage("epicsThreadPoolDestroy: relock error");
+            return -1;
+        }
+    }
+
+    epicsMutexUnlock(pool->guard);
+    return 0;
+}
+
+void epicsThreadPoolDestroy(epicsThreadPool *pool)
+{
+    size_t nThr;
+    ELLLIST notify;
+    ELLNODE *cur;
+
+    ellInit(&notify);
+
+    if(epicsMutexLock(pool->guard)!=epicsMutexLockOK) {
+        errlogMessage("epicsThreadPoolDestroy: lock error");
+        return;
+    }
+
+    /* run remaining queued jobs */
+    epicsThreadPoolControl(pool, EPICSPOOL_QUEUEADD, 0);
+    epicsThreadPoolControl(pool, EPICSPOOL_QUEUERUN, 1);
+    nThr=pool->threadsRunning;
+
+    epicsMutexUnlock(pool->guard);
+
+    epicsThreadPoolWait(pool, -1.0);
+    /* At this point all queued jobs have run */
+
+    if(epicsMutexLock(pool->guard)!=epicsMutexLockOK) {
+        errlogMessage("epicsThreadPoolDestroy: relock error");
+        return;
+    }
+
+    pool->shutdown=1;
+    epicsEventSignal(pool->workerWakeup);
+
+    ellConcat(&notify, &pool->owned);
+    if(ellCount(&pool->jobs)>0)
+        errlogMessage("Leaked jobs in thread pool cleanup!!!!!!!!!\n");
+
+    epicsMutexUnlock(pool->guard);
+
+    /* notify remaining jobs that pool is being destroyed */
+    while( (cur=ellGet(&notify))!=NULL )
+    {
+        epicsJob *job=CONTAINER(cur, epicsJob, jobnode);
+        job->running=1;
+        (*job->func)(job->arg, EPICSJOB_CLEANUP);
+        job->running=0;
+        if(job->freewhendone)
+            free(job);
+        else
+            job->pool=NULL; /* orphan */
+    }
+
+    if(nThr && epicsEventWait(pool->shutdownEvent)!=epicsEventWaitOK){
+        epicsMutexUnlock(pool->guard);
+        errlogMessage("epicsThreadPoolDestroy: wait error");
+        return;
+    }
+
+    epicsEventDestroy(pool->workerWakeup);
+    epicsEventDestroy(pool->shutdownEvent);
+    epicsEventDestroy(pool->observerWakeup);
+    epicsMutexDestroy(pool->guard);
+
+    free(pool);
+}
+
+
+void epicsThreadPoolReport(epicsThreadPool *pool, FILE *fd)
+{
+    ELLNODE *cur;
+    if(epicsMutexLock(pool->guard)!=epicsMutexLockOK) {
+        errlogMessage("epicsThreadPoolReport Failed to lock");
+        return;
+    }
+
+    fprintf(fd, "Thread Pool with %lu/%lu threads\n"
+            " running %d jobs with %lu threads\n",
+            (unsigned long)pool->threadsRunning,
+            (unsigned long)pool->conf.maxThreads,
+            ellCount(&pool->jobs),
+            (unsigned long)pool->threadsAreAwake);
+    if(pool->pauseadd)
+        fprintf(fd, "  Inhibit queueing\n");
+    if(pool->pauserun)
+        fprintf(fd, "  Pause workers\n");
+    if(pool->shutdown)
+        fprintf(fd, "  Shutdown in progress\n");
+
+    for(cur=ellFirst(&pool->jobs); cur; cur=ellNext(cur))
+    {
+        epicsJob *job=CONTAINER(cur, epicsJob, jobnode);
+        fprintf(fd, "  job 0x%lu func: 0x%lu, arg: 0x%lu ",
+                (unsigned long)job, (unsigned long)job->func,
+                (unsigned long)job->arg);
+        if(job->queued)
+            fprintf(fd, "Queued ");
+        if(job->running)
+            fprintf(fd, "Running ");
+        if(job->freewhendone)
+            fprintf(fd, "Free ");
+        fprintf(fd, "\n");
+    }
+
+    epicsMutexUnlock(pool->guard);
+}
+
+size_t epicsThreadPoolNThreads(epicsThreadPool *pool)
+{
+    size_t ret;
+    if(epicsMutexLock(pool->guard)!=epicsMutexLockOK) {
+        errlogMessage("epicsThreadPoolReport Failed to lock");
+        return 0;
+    }
+    ret=pool->threadsRunning;
+    epicsMutexUnlock(pool->guard);
+    return ret;
+}
+

=== modified file 'src/libCom/test/Makefile'
--- src/libCom/test/Makefile	2011-10-28 20:19:54 +0000
+++ src/libCom/test/Makefile	2012-06-01 17:30:28 +0000
@@ -12,6 +12,10 @@
 
 PROD_LIBS += Com
 
+TESTPROD_HOST += epicsThreadPoolTest
+epicsThreadPoolTest_SRCS += epicsThreadPoolTest.c
+TESTS += epicsThreadPoolTest
+
 TESTPROD_HOST += epicsUnitTestTest
 epicsUnitTestTest_SRCS += epicsUnitTestTest.c
 # Not much point running this on vxWorks or RTEMS...

=== added file 'src/libCom/test/epicsThreadPoolTest.c'
--- src/libCom/test/epicsThreadPoolTest.c	1970-01-01 00:00:00 +0000
+++ src/libCom/test/epicsThreadPoolTest.c	2012-06-01 17:30:28 +0000
@@ -0,0 +1,333 @@
+/*************************************************************************\
+* Copyright (c) 2011 Brookhaven Science Associates, as Operator of
+*     Brookhaven National Laboratory.
+* EPICS BASE is distributed subject to a Software License Agreement found
+* in file LICENSE that is included with this distribution.
+\*************************************************************************/
+
+#include "epicsThreadPool.h"
+
+#include "testMain.h"
+#include "epicsUnitTest.h"
+
+#include "cantProceed.h"
+#include "epicsEvent.h"
+#include "epicsMutex.h"
+#include "epicsThread.h"
+
+/* Do nothing */
+static void nullop(void)
+{
+    epicsThreadPool *pool;
+    testDiag("nullop()");
+    {
+        epicsThreadPoolConfig conf;
+        epicsThreadPoolConfigDefaults(&conf);
+        testOk1(conf.maxThreads>0);
+
+        testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL);
+        if(!pool)
+            return;
+    }
+
+    epicsThreadPoolDestroy(pool);
+}
+
+/* Just create and destroy worker threads */
+static void oneop(void)
+{
+    epicsThreadPool *pool;
+    testDiag("oneop()");
+    {
+        epicsThreadPoolConfig conf;
+        epicsThreadPoolConfigDefaults(&conf);
+        conf.initialThreads=2;
+        testOk1(conf.maxThreads>0);
+
+        testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL);
+        if(!pool)
+            return;
+    }
+
+    epicsThreadPoolDestroy(pool);
+}
+
+/* Test that Bursts of jobs will create enough threads to
+ * run all in parallel
+ */
+typedef struct {
+    epicsMutexId guard;
+    unsigned int count;
+    epicsEventId allrunning;
+    epicsEventId done;
+    epicsJob **job;
+} countPriv;
+
+static void countjob(void *param, unsigned int flag)
+{
+    countPriv *cnt=param;
+    testOk1(flag==EPICSJOB_RUN||flag==EPICSJOB_CLEANUP);
+    if(flag!=EPICSJOB_RUN)
+        return;
+
+    epicsMutexMustLock(cnt->guard);
+    testDiag("Job %lu", (unsigned long)cnt->count);
+    cnt->count--;
+    if(cnt->count==0) {
+        testDiag("All jobs running");
+        epicsEventSignal(cnt->allrunning);
+    }
+    epicsMutexUnlock(cnt->guard);
+
+    epicsEventMustWait(cnt->done);
+    epicsEventSignal(cnt->done); /* pass along to next thread */
+}
+
+static void postjobs(size_t icnt, size_t mcnt)
+{
+    size_t i;
+    epicsThreadPool *pool;
+    countPriv *priv=callocMustSucceed(1, sizeof(*priv), "postjobs priv alloc");
+    priv->guard=epicsMutexMustCreate();
+    priv->done=epicsEventMustCreate(epicsEventEmpty);
+    priv->allrunning=epicsEventMustCreate(epicsEventEmpty);
+    priv->count=mcnt;
+    priv->job=callocMustSucceed(mcnt, sizeof(*priv->job), "postjobs job array");
+
+    testDiag("postjobs(%lu,%lu)", (unsigned long)icnt, (unsigned long)mcnt);
+
+    {
+        epicsThreadPoolConfig conf;
+        epicsThreadPoolConfigDefaults(&conf);
+        conf.initialThreads=icnt;
+        conf.maxThreads=mcnt;
+
+        testOk1((pool=epicsThreadPoolCreate(&conf))!=NULL);
+        if(!pool)
+            return;
+    }
+
+    for(i=0; i<mcnt; i++) {
+        testDiag("i=%lu", (unsigned long)i);
+        priv->job[i] = epicsJobCreate(pool, &countjob, priv);
+        testOk1(priv->job[i]!=NULL);
+        testOk1(epicsJobQueue(priv->job[i])==0);
+    }
+
+    testDiag("Waiting for all jobs to start");
+    epicsEventMustWait(priv->allrunning);
+    testDiag("Stop all");
+    epicsEventSignal(priv->done);
+
+    for(i=0; i<mcnt; i++) {
+        testDiag("i=%lu", (unsigned long)i);
+        epicsJobDestroy(priv->job[i]);
+    }
+
+    epicsThreadPoolDestroy(pool);
+    epicsMutexDestroy(priv->guard);
+    epicsEventDestroy(priv->allrunning);
+    epicsEventDestroy(priv->done);
+    free(priv->job);
+    free(priv);
+}
+
+/* Test cancel from job (no-op)
+ * and destroy from job (lazy free)
+ */
+static void cleanupjob0(void* arg, unsigned int flags)
+{
+    epicsJob *job=arg;
+    testOk1(flags==EPICSJOB_RUN||flags==EPICSJOB_CLEANUP);
+    if(flags!=EPICSJOB_RUN)
+        return;
+
+    testOk1(epicsJobCancel(job)==0);
+    epicsJobDestroy(job); /* delete while job is running */
+}
+static void cleanupjob1(void* arg, unsigned int flags)
+{
+    epicsJob *job=arg;
+    testOk1(flags==EPICSJOB_RUN||flags==EPICSJOB_CLEANUP);
+    if(flags!=EPICSJOB_RUN)
+        return;
+
+    testOk1(epicsJobCancel(job)==0);
+    /* async delete (job may be running, or queued, or ...) */
+}
+static void cleanupjob2(void* arg, unsigned int flags)
+{
+    epicsJob *job=arg;
+    testOk1(flags==EPICSJOB_RUN||flags==EPICSJOB_CLEANUP);
+    if(flags==EPICSJOB_CLEANUP)
+        epicsJobDestroy(job); /* delete when threadpool is destroyed */
+
+    if(flags==EPICSJOB_RUN)
+        testOk1(epicsJobCancel(job)==0);
+}
+static epicsJobFunction cleanupjobs[3] = {&cleanupjob0,&cleanupjob1,&cleanupjob2};
+
+static void testcleanup(void)
+{
+    int i=0;
+    epicsThreadPool *pool;
+    epicsJob *job[3];
+
+    testDiag("testcleanup()");
+
+    testOk1((pool=epicsThreadPoolCreate(NULL))!=NULL);
+    if(!pool)
+        return;
+
+    /* unrolled so that valgrind can show which methods leaks */
+    testOk1((job[0]=epicsJobCreate(pool, cleanupjobs[0], NULL))!=NULL);
+    testOk1((job[1]=epicsJobCreate(pool, cleanupjobs[1], NULL))!=NULL);
+    testOk1((job[2]=epicsJobCreate(pool, cleanupjobs[2], NULL))!=NULL);
+    for(i=0; i<3; i++) {
+        epicsJobSet(job[i], cleanupjobs[i], job[i]);
+        testOk1(epicsJobQueue(job[i])==0);
+    }
+
+    epicsThreadPoolWait(pool, -1);
+    epicsJobDestroy(job[1]);
+    epicsThreadPoolDestroy(pool);
+}
+
+/* Test re-add from inside job */
+typedef struct {
+    unsigned int count;
+    epicsEventId done;
+    epicsJob *job;
+    unsigned int inprogress;
+} readdPriv;
+
+static void readdjob(void *arg, unsigned int flags)
+{
+    readdPriv *priv=arg;
+    testOk1(flags==EPICSJOB_RUN||flags==EPICSJOB_CLEANUP);
+    if(flags!=EPICSJOB_RUN)
+        return;
+    testOk1(priv->inprogress==0);
+    testDiag("count==%u", priv->count);
+
+    if(priv->count--) {
+        priv->inprogress=1;
+        epicsJobQueue(priv->job);
+        epicsThreadSleep(0.05);
+        priv->inprogress=0;
+    }else{
+        epicsEventSignal(priv->done);
+        epicsJobDestroy(priv->job);
+    }
+}
+
+static void testreadd(void) {
+    epicsThreadPool *pool;
+    readdPriv *priv=callocMustSucceed(1, sizeof(*priv), "testcleanup priv");
+
+    testDiag("testreadd");
+
+    priv->done=epicsEventMustCreate(epicsEventEmpty);
+    priv->count=5;
+
+    testOk1((pool=epicsThreadPoolCreate(NULL))!=NULL);
+    if(!pool)
+        return;
+
+    testOk1((priv->job=epicsJobCreate(pool, &readdjob, priv))!=NULL);
+
+    testOk1(epicsJobQueue(priv->job)==0);
+    epicsEventMustWait(priv->done);
+
+    testOk1(epicsThreadPoolNThreads(pool)==1);
+
+    epicsThreadPoolDestroy(pool);
+    epicsEventDestroy(priv->done);
+    free(priv);
+
+}
+
+/* test job canceling */
+static
+void neverrun(void *arg, unsigned int flag)
+{
+    epicsJob *job=arg;
+    testOk1(flag!=EPICSJOB_RUN);
+    if(flag==EPICSJOB_CLEANUP)
+        epicsJobDestroy(job);
+}
+static epicsEventId cancel[2];
+static
+void toolate(void *arg, unsigned int flag)
+{
+    epicsJob *job=arg;
+    if(flag==EPICSJOB_CLEANUP){
+        epicsJobDestroy(job);
+        return;
+    }
+    testPass("Job runs");
+    epicsEventSignal(cancel[0]);
+    epicsEventMustWait(cancel[1]);
+}
+
+static
+void testcancel(void)
+{
+    epicsJob *job[2];
+    epicsThreadPool *pool;
+    testOk1((pool=epicsThreadPoolCreate(NULL))!=NULL);
+    if(!pool)
+        return;
+
+    cancel[0]=epicsEventCreate(epicsEventEmpty);
+    cancel[1]=epicsEventCreate(epicsEventEmpty);
+
+    testOk1((job[0]=epicsJobCreate(pool, NULL, NULL))!=NULL);
+    testOk1((job[1]=epicsJobCreate(pool, NULL, NULL))!=NULL);
+    epicsJobSet(job[0], &neverrun, job[0]);
+    epicsJobSet(job[1], &toolate, job[1]);
+
+    /* freeze */
+    epicsThreadPoolControl(pool, EPICSPOOL_QUEUERUN, 0);
+
+    testOk1(epicsJobCancel(job[0])==0);
+
+    epicsJobQueue(job[0]);
+    testOk1(epicsJobCancel(job[0])==1);
+    testOk1(epicsJobCancel(job[0])==0);
+
+    epicsThreadSleep(0.01);
+    epicsJobQueue(job[0]);
+    testOk1(epicsJobCancel(job[0])==1);
+    testOk1(epicsJobCancel(job[0])==0);
+
+    epicsThreadPoolControl(pool, EPICSPOOL_QUEUERUN, 1);
+
+    epicsJobQueue(job[1]);
+
+    epicsEventMustWait(cancel[0]);
+    testOk1(epicsJobCancel(job[0])==0);
+    epicsEventSignal(cancel[1]);
+
+    epicsThreadPoolDestroy(pool);
+    epicsEventDestroy(cancel[0]);
+    epicsEventDestroy(cancel[1]);
+}
+
+MAIN(epicsThreadPoolTest)
+{
+    testPlan(92);
+
+    nullop();
+    oneop();
+    postjobs(1,1);
+    postjobs(0,1);
+    postjobs(4,4);
+    postjobs(0,4);
+    postjobs(2,4);
+    testcleanup();
+    testreadd();
+    testcancel();
+
+    return testDone();
+}


Replies:
Re: [Merge] lp:~epics-core/epics-base/thread-pool into lp:epics-base mdavidsaver
Re: [Merge] lp:~epics-core/epics-base/thread-pool into lp:epics-base mdavidsaver

Navigate by Date:
Prev: [Merge] lp:~epics-core/epics-base/server-side-plugins into lp:epics-base mdavidsaver
Next: [Merge] lp:~epics-core/epics-base/msi-join into lp:epics-base Andrew Johnson
Index: 2002  2003  2004  2005  2006  2007  2008  2009  2010  2011  <20122013  2014  2015  2016  2017  2018  2019  2020  2021  2022  2023  2024 
Navigate by Thread:
Prev: [Merge] lp:~epics-core/epics-base/server-side-plugins into lp:epics-base noreply
Next: Re: [Merge] lp:~epics-core/epics-base/thread-pool into lp:epics-base mdavidsaver
Index: 2002  2003  2004  2005  2006  2007  2008  2009  2010  2011  <20122013  2014  2015  2016  2017  2018  2019  2020  2021  2022  2023  2024 
ANJ, 26 Nov 2012 Valid HTML 4.01! · Home · News · About · Base · Modules · Extensions · Distributions · Download ·
· Search · EPICS V4 · IRMIS · Talk · Bugs · Documents · Links · Licensing ·