g+
g+ Communities
Argonne National Laboratory

Experimental Physics and
Industrial Control System

2002  2003  2004  2005  2006  2007  2008  2009  2010  2011  2012  <20132014  Index 2002  2003  2004  2005  2006  2007  2008  2009  2010  2011  2012  <20132014 
<== Date ==> <== Thread ==>

Subject: [Merge] lp:~epics-core/epics-base/parallel-cbthreads into lp:epics-base
From: Ralph Lange <Ralph.Lange@gmx.de>
To: mp+164448@code.launchpad.net
Date: Fri, 17 May 2013 16:26:28 -0000
Ralph Lange has proposed merging lp:~epics-core/epics-base/parallel-cbthreads into lp:epics-base with lp:~epics-core/epics-base/get-cpus as a prerequisite.

Requested reviews:
  EPICS Core Developers (epics-core)

For more details, see:
https://code.launchpad.net/~epics-core/epics-base/parallel-cbthreads/+merge/164448

Implements parallel callback threads on multi-core (SMP) systems. 
-- 
https://code.launchpad.net/~epics-core/epics-base/parallel-cbthreads/+merge/164448
Your team EPICS Core Developers is requested to review the proposed merge of lp:~epics-core/epics-base/parallel-cbthreads into lp:epics-base.
=== modified file 'src/ioc/db/callback.c'
--- src/ioc/db/callback.c	2012-05-04 22:34:48 +0000
+++ src/ioc/db/callback.c	2013-05-17 16:25:35 +0000
@@ -3,8 +3,9 @@
 *     National Laboratory.
 * Copyright (c) 2002 The Regents of the University of California, as
 *     Operator of Los Alamos National Laboratory.
+* Copyright (c) 2013 ITER Organization.
 * EPICS BASE is distributed subject to a Software License Agreement found
-* in file LICENSE that is included with this distribution. 
+* in file LICENSE that is included with this distribution.
 \*************************************************************************/
 /* callback.c */
 
@@ -25,6 +26,7 @@
 #include "epicsThread.h"
 #include "epicsExit.h"
 #include "epicsInterrupt.h"
+#include "epicsString.h"
 #include "epicsTimer.h"
 #include "epicsRingPointer.h"
 #include "errlog.h"
@@ -36,6 +38,7 @@
 #include "taskwd.h"
 #include "errMdef.h"
 #include "dbCommon.h"
+#include "epicsExport.h"
 #define epicsExportSharedSymbols
 #include "dbAddr.h"
 #include "dbAccessDefs.h"
@@ -49,6 +52,12 @@
 static epicsRingPointerId callbackQ[NUM_CALLBACK_PRIORITIES];
 static volatile int ringOverflow[NUM_CALLBACK_PRIORITIES];
 
+/* Parallel callback threads (configured and actual counts) */
+static int callbackThreadCount[NUM_CALLBACK_PRIORITIES] = { 1, 1, 1 };
+static int callbackThreadsRunning[NUM_CALLBACK_PRIORITIES];
+int callbackParallelThreadsDefault = 2;
+epicsExportAddress(int,callbackParallelThreadsDefault);
+
 /* Timer for Delayed Requests */
 static epicsTimerQueueId timerQueue;
 
@@ -57,7 +66,7 @@
 static void *exitCallback;
 
 /* Static data */
-static char *threadName[NUM_CALLBACK_PRIORITIES] = {
+static char *threadNamePrefix[NUM_CALLBACK_PRIORITIES] = {
     "cbLow", "cbMedium", "cbHigh"
 };
 static unsigned int threadPriority[NUM_CALLBACK_PRIORITIES] = {
@@ -78,6 +87,54 @@
     return 0;
 }
 
+int callbackParallelThreads(int count, const char *prio)
+{
+    int i;
+    dbMenu	*pdbMenu;
+    int		gotMatch;
+
+    if (callbackOnceFlag != EPICS_THREAD_ONCE_INIT) {
+        errlogPrintf("Callback system already initialized\n");
+        return -1;
+    }
+
+    if (count < 0)
+        count = epicsThreadGetCPUs() + count;
+    else if (count == 0)
+        count = callbackParallelThreadsDefault;
+
+    if (!prio || strcmp(prio, "") == 0 || strcmp(prio, "*") == 0) {
+        for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) {
+            callbackThreadCount[i] = count;
+        }
+    } else {
+        if (!pdbbase) {
+            errlogPrintf("pdbbase not specified\n");
+            return -1;
+        }
+        /* Find prio in menuPriority */
+        pdbMenu = (dbMenu *)ellFirst(&pdbbase->menuList);
+        while (pdbMenu) {
+            gotMatch = (strcmp("menuPriority", pdbMenu->name)==0) ? TRUE : FALSE;
+            if (gotMatch) {
+                for (i = 0; i < pdbMenu->nChoice; i++) {
+                    gotMatch = (epicsStrCaseCmp(prio, pdbMenu->papChoiceValue[i])==0) ? TRUE : FALSE;
+                    if (gotMatch) break;
+                }
+                if (gotMatch) {
+                    callbackThreadCount[i] = count;
+                    break;
+                } else {
+                    errlogPrintf("Unknown priority \"%s\"\n", prio);
+                    return -1;
+                }
+            }
+            pdbMenu = (dbMenu *)ellNext(&pdbMenu->node);
+        }
+    }
+    return 0;
+}
+
 static void callbackTask(void *arg)
 {
     int priority = *(int *)arg;
@@ -106,36 +163,48 @@
     int i;
 
     for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) {
-        int lockKey = epicsInterruptLock();
-        int ok = epicsRingPointerPush(callbackQ[i], &exitCallback);
-        epicsInterruptUnlock(lockKey);
-        epicsEventSignal(callbackSem[i]);
-        if (ok) epicsEventWait(startStopEvent);
+        while (callbackThreadsRunning[i]--) {
+            int ok = epicsRingPointerPush(callbackQ[i], &exitCallback);
+            epicsEventSignal(callbackSem[i]);
+            if (ok) epicsEventWait(startStopEvent);
+        }
     }
 }
 
 static void callbackInitOnce(void *arg)
 {
     int i;
+    int j;
+    char threadName[32];
 
     startStopEvent = epicsEventMustCreate(epicsEventEmpty);
     timerQueue = epicsTimerQueueAllocate(0,epicsThreadPriorityScanHigh);
+
     for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++) {
         epicsThreadId tid;
 
         callbackSem[i] = epicsEventMustCreate(epicsEventEmpty);
-        callbackQ[i] = epicsRingPointerCreate(callbackQueueSize);
+        callbackQ[i] = epicsRingPointerLockedCreate(callbackQueueSize);
         if (callbackQ[i] == 0)
-            cantProceed("epicsRingPointerCreate failed for %s\n",
-                threadName[i]);
+            cantProceed("epicsRingPointerLockedCreate failed for %s\n",
+                threadNamePrefix[i]);
         ringOverflow[i] = FALSE;
-        tid = epicsThreadCreate(threadName[i], threadPriority[i],
-            epicsThreadGetStackSize(epicsThreadStackBig),
-            (EPICSTHREADFUNC)callbackTask, &priorityValue[i]);
-        if (tid == 0)
-            cantProceed("Failed to spawn callback task %s\n", threadName[i]);
-        else
-            epicsEventWait(startStopEvent);
+
+        for (j = 0; j < callbackThreadCount[i]; j++) {
+            if (callbackThreadCount[i] > 1 )
+                sprintf(threadName, "%s-%d", threadNamePrefix[i], j);
+            else
+                strcpy(threadName, threadNamePrefix[i]);
+            tid = epicsThreadCreate(threadName, threadPriority[i],
+                epicsThreadGetStackSize(epicsThreadStackBig),
+                (EPICSTHREADFUNC)callbackTask, &priorityValue[i]);
+            if (tid == 0) {
+                cantProceed("Failed to spawn callback thread %s\n", threadName);
+            } else {
+                epicsEventWait(startStopEvent);
+                callbackThreadsRunning[i]++;
+            }
+        }
     }
     epicsAtExit(callbackShutdown, NULL);
 }
@@ -150,7 +219,6 @@
 {
     int priority;
     int pushOK;
-    int lockKey;
 
     if (!pcallback) {
         epicsInterruptContextMessage("callbackRequest: pcallback was NULL\n");
@@ -163,14 +231,12 @@
     }
     if (ringOverflow[priority]) return;
 
-    lockKey = epicsInterruptLock();
     pushOK = epicsRingPointerPush(callbackQ[priority], pcallback);
-    epicsInterruptUnlock(lockKey);
 
     if (!pushOK) {
         char msg[48] = "callbackRequest: ";
 
-        strcat(msg, threadName[priority]);
+        strcat(msg, threadNamePrefix[priority]);
         strcat(msg, " ring buffer full\n");
         epicsInterruptContextMessage(msg);
         ringOverflow[priority] = TRUE;

=== modified file 'src/ioc/db/callback.h'
--- src/ioc/db/callback.h	2010-10-05 19:27:37 +0000
+++ src/ioc/db/callback.h	2013-05-17 16:25:35 +0000
@@ -3,6 +3,7 @@
 *     National Laboratory.
 * Copyright (c) 2002 The Regents of the University of California, as
 *     Operator of Los Alamos National Laboratory.
+* Copyright (c) 2013 ITER Organization.
 * EPICS BASE is distributed subject to a Software License Agreement found
 * in file LICENSE that is included with this distribution. 
 \*************************************************************************/
@@ -67,6 +68,7 @@
 epicsShareFunc void callbackRequestProcessCallbackDelayed(
     CALLBACK *pCallback, int Priority, void *pRec, double seconds);
 epicsShareFunc int callbackSetQueueSize(int size);
+epicsShareFunc int callbackParallelThreads(int count, const char *prio);
 
 #ifdef __cplusplus
 }

=== modified file 'src/ioc/db/dbIocRegister.c'
--- src/ioc/db/dbIocRegister.c	2012-07-15 21:08:50 +0000
+++ src/ioc/db/dbIocRegister.c	2013-05-17 16:25:35 +0000
@@ -23,6 +23,8 @@
 #include "dbIocRegister.h"
 #include "dbState.h"
 
+epicsShareDef int callbackParallelThreadsDefault;
+
 /* dbLoadDatabase */
 static const iocshArg dbLoadDatabaseArg0 = { "file name",iocshArgString};
 static const iocshArg dbLoadDatabaseArg1 = { "path",iocshArgString};
@@ -298,6 +300,18 @@
     callbackSetQueueSize(args[0].ival);
 }
 
+/* callbackParallelThreads */
+static const iocshArg callbackParallelThreadsArg0 = { "no of threads", iocshArgInt};
+static const iocshArg callbackParallelThreadsArg1 = { "priority", iocshArgString};
+static const iocshArg * const callbackParallelThreadsArgs[2] =
+    {&callbackParallelThreadsArg0,&callbackParallelThreadsArg1};
+static const iocshFuncDef callbackParallelThreadsFuncDef =
+    {"callbackParallelThreads",2,callbackParallelThreadsArgs};
+static void callbackParallelThreadsCallFunc(const iocshArgBuf *args)
+{
+    callbackParallelThreads(args[0].ival, args[1].sval);
+}
+
 /* dbStateCreate */
 static const iocshArg dbStateArgName = { "name", iocshArgString };
 static const iocshArg * const dbStateCreateArgs[] = { &dbStateArgName };
@@ -394,6 +408,10 @@
     iocshRegister(&scanpiolFuncDef,scanpiolCallFunc);
 
     iocshRegister(&callbackSetQueueSizeFuncDef,callbackSetQueueSizeCallFunc);
+    iocshRegister(&callbackParallelThreadsFuncDef,callbackParallelThreadsCallFunc);
+
+    /* Needed before callback system is initialized */
+    callbackParallelThreadsDefault = epicsThreadGetCPUs();
 
     iocshRegister(&dbStateCreateFuncDef, dbStateCreateCallFunc);
     iocshRegister(&dbStateSetFuncDef, dbStateSetCallFunc);

=== modified file 'src/ioc/db/test/Makefile'
--- src/ioc/db/test/Makefile	2012-07-17 19:33:31 +0000
+++ src/ioc/db/test/Makefile	2013-05-17 16:25:35 +0000
@@ -21,6 +21,11 @@
 testHarness_SRCS += callbackTest.c
 TESTS += callbackTest
 
+TESTPROD_HOST += callbackParallelTest
+callbackParallelTest_SRCS += callbackParallelTest.c
+testHarness_SRCS += callbackParallelTest.c
+TESTS += callbackParallelTest
+
 TESTPROD_HOST += dbStateTest
 dbStateTest_SRCS += dbStateTest.c
 testHarness_SRCS += dbStateTest.c

=== added file 'src/ioc/db/test/callbackParallelTest.c'
--- src/ioc/db/test/callbackParallelTest.c	1970-01-01 00:00:00 +0000
+++ src/ioc/db/test/callbackParallelTest.c	2013-05-17 16:25:35 +0000
@@ -0,0 +1,184 @@
+/*************************************************************************\
+* Copyright (c) 2008 UChicago Argonne LLC, as Operator of Argonne
+*     National Laboratory.
+* Copyright (c) 2002 The Regents of the University of California, as
+*     Operator of Los Alamos National Laboratory.
+* Copyright (c) 2013 ITER Organization.
+* EPICS BASE is distributed subject to a Software License Agreement found
+* in file LICENSE that is included with this distribution. 
+\*************************************************************************/
+/* $Revision-Id$ */
+
+/* Author:  Marty Kraimer Date:    26JAN2000 */
+
+#include <stddef.h>
+#include <stdlib.h>
+#include <stddef.h>
+#include <string.h>
+#include <stdio.h>
+#include <math.h>
+
+#include "callback.h"
+#include "cantProceed.h"
+#include "epicsThread.h"
+#include "epicsEvent.h"
+#include "epicsTime.h"
+#include "epicsUnitTest.h"
+#include "testMain.h"
+
+/*
+ * This test checks both immediate and delayed callbacks in two steps.
+ * In the first step (pass1) NCALLBACKS immediate callbacks are queued.
+ * As each is run it starts a second delayed callback (pass2).
+ * The last delayed callback which runs signals an epicsEvent
+ * to the main thread.
+ *
+ * Two time intervals are measured.  The time to queue and run each of
+ * the immediate callbacks, and the actual delay of the delayed callback.
+ */
+
+#define NCALLBACKS 169
+#define DELAY_QUANTUM 0.25
+
+#define TEST_DELAY(i) ((i / NUM_CALLBACK_PRIORITIES) * DELAY_QUANTUM)
+
+typedef struct myPvt {
+    CALLBACK cb1;
+    CALLBACK cb2;
+    epicsTimeStamp pass1Time;
+    epicsTimeStamp pass2Time;
+    double delay;
+    int pass;
+    int resultFail;
+} myPvt;
+
+epicsEventId finished;
+
+static void myCallback(CALLBACK *pCallback)
+{
+    myPvt *pmyPvt;
+
+    callbackGetUser(pmyPvt, pCallback);
+
+    pmyPvt->pass++;
+
+    if (pmyPvt->pass == 1) {
+        epicsTimeGetCurrent(&pmyPvt->pass1Time);
+        callbackRequestDelayed(&pmyPvt->cb2, pmyPvt->delay);
+    } else if (pmyPvt->pass == 2) {
+        epicsTimeGetCurrent(&pmyPvt->pass2Time);
+    } else {
+        pmyPvt->resultFail = 1;
+        return;
+    }
+}
+
+static void finalCallback(CALLBACK *pCallback)
+{
+    myCallback(pCallback);
+    epicsEventSignal(finished);
+}
+
+static void updateStats(double *stats, double val)
+{
+    if (stats[0] > val) stats[0] = val;
+    if (stats[1] < val) stats[1] = val;
+    stats[2] += val;
+    stats[3] += pow(val, 2.0);
+    stats[4] += 1.;
+}
+
+static void printStats(double *stats, const char* tag) {
+    testDiag("Priority %4s  min/avg/max/sigma = %f / %f / %f / %f",
+            tag, stats[0], stats[2]/stats[4], stats[1],
+            sqrt(stats[4]*stats[3]-pow(stats[2], 2.0))/stats[4]);
+}
+
+MAIN(callbackParallelTest)
+{
+    myPvt *pcbt[NCALLBACKS];
+    epicsTimeStamp start;
+    int noCpus = epicsThreadGetCPUs();
+    int i, j;
+    /* Statistics: min/max/sum/sum^2/n for each priority */
+    double setupError[NUM_CALLBACK_PRIORITIES][5];
+    double timeError[NUM_CALLBACK_PRIORITIES][5];
+    double defaultError[5] = {1,-1,0,0,0};
+
+    for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++)
+        for (j = 0; j < 5; j++)
+            setupError[i][j] = timeError[i][j] = defaultError[j];
+
+    testPlan(NCALLBACKS * 2 + 1);
+
+    testDiag("Starting %d parallel callback threads", noCpus);
+
+    callbackParallelThreads(noCpus, "");
+    callbackInit();
+    epicsThreadSleep(1.0);
+
+    finished = epicsEventMustCreate(epicsEventEmpty);
+
+    for (i = 0; i < NCALLBACKS ; i++) {
+        pcbt[i] = callocMustSucceed(1, sizeof(myPvt), "pcbt");
+        callbackSetCallback(myCallback, &pcbt[i]->cb1);
+        callbackSetCallback(myCallback, &pcbt[i]->cb2);
+        callbackSetUser(pcbt[i], &pcbt[i]->cb1);
+        callbackSetUser(pcbt[i], &pcbt[i]->cb2);
+        callbackSetPriority(i % NUM_CALLBACK_PRIORITIES, &pcbt[i]->cb1);
+        callbackSetPriority(i % NUM_CALLBACK_PRIORITIES, &pcbt[i]->cb2);
+        pcbt[i]->delay = TEST_DELAY(i);
+        pcbt[i]->pass = 0;
+    }
+
+    /* Last callback is special */
+    callbackSetCallback(finalCallback, &pcbt[NCALLBACKS-1]->cb2);
+    callbackSetPriority(0, &pcbt[NCALLBACKS-1]->cb1);
+    callbackSetPriority(0, &pcbt[NCALLBACKS-1]->cb2);
+    pcbt[NCALLBACKS-1]->delay = TEST_DELAY(NCALLBACKS) + 1.0;
+    pcbt[NCALLBACKS-1]->pass = 0;
+
+    testOk1(epicsTimeGetCurrent(&start)==epicsTimeOK);
+
+    for (i = 0; i < NCALLBACKS ; i++) {
+        callbackRequest(&pcbt[i]->cb1);
+    }
+
+    testDiag("Waiting %.02f sec", pcbt[NCALLBACKS-1]->delay);
+
+    epicsEventWait(finished);
+    
+    for (i = 0; i < NCALLBACKS ; i++) {
+        if(pcbt[i]->resultFail || pcbt[i]->pass!=2)
+            testFail("pass = %d for delay = %f", pcbt[i]->pass, pcbt[i]->delay);
+        else {
+            double delta = epicsTimeDiffInSeconds(&pcbt[i]->pass1Time, &start);
+            testOk(fabs(delta) < 0.05, "callback %.02f setup time |%f| < 0.05",
+                    pcbt[i]->delay, delta);
+            updateStats(setupError[i%NUM_CALLBACK_PRIORITIES], delta);
+        }
+    }
+
+    for (i = 0; i < NCALLBACKS ; i++) {
+        double delta, error;
+        if(pcbt[i]->resultFail || pcbt[i]->pass!=2)
+            continue;
+        delta = epicsTimeDiffInSeconds(&pcbt[i]->pass2Time, &pcbt[i]->pass1Time);
+        error = delta - pcbt[i]->delay;
+        testOk(fabs(error) < 0.05, "delay %.02f seconds, callback time error |%.04f| < 0.05",
+                pcbt[i]->delay, error);
+        updateStats(timeError[i%NUM_CALLBACK_PRIORITIES], error);
+    }
+
+    testDiag("Setup time statistics");
+    printStats(setupError[0], "LOW");
+    printStats(setupError[1], "MID");
+    printStats(setupError[2], "HIGH");
+
+    testDiag("Delay time statistics");
+    printStats(timeError[0], "LOW");
+    printStats(timeError[1], "MID");
+    printStats(timeError[2], "HIGH");
+
+    return testDone();
+}

=== modified file 'src/ioc/db/test/callbackTest.c'
--- src/ioc/db/test/callbackTest.c	2012-07-06 17:35:26 +0000
+++ src/ioc/db/test/callbackTest.c	2013-05-17 16:25:35 +0000
@@ -3,6 +3,7 @@
 *     National Laboratory.
 * Copyright (c) 2002 The Regents of the University of California, as
 *     Operator of Los Alamos National Laboratory.
+* Copyright (c) 2013 ITER Organization.
 * EPICS BASE is distributed subject to a Software License Agreement found
 * in file LICENSE that is included with this distribution. 
 \*************************************************************************/
@@ -79,11 +80,34 @@
     epicsEventSignal(finished);
 }
 
+static void updateStats(double *stats, double val)
+{
+    if (stats[0] > val) stats[0] = val;
+    if (stats[1] < val) stats[1] = val;
+    stats[2] += val;
+    stats[3] += pow(val, 2.0);
+    stats[4] += 1.;
+}
+
+static void printStats(double *stats, const char* tag) {
+    testDiag("Priority %4s  min/avg/max/sigma = %f / %f / %f / %f",
+            tag, stats[0], stats[2]/stats[4], stats[1],
+            sqrt(stats[4]*stats[3]-pow(stats[2], 2.0))/stats[4]);
+}
+
 MAIN(callbackTest)
 {
     myPvt *pcbt[NCALLBACKS];
     epicsTimeStamp start;
-    int i;
+    int i, j;
+    /* Statistics: min/max/sum/sum^2/n for each priority */
+    double setupError[NUM_CALLBACK_PRIORITIES][5];
+    double timeError[NUM_CALLBACK_PRIORITIES][5];
+    double defaultError[5] = {1,-1,0,0,0};
+
+    for (i = 0; i < NUM_CALLBACK_PRIORITIES; i++)
+        for (j = 0; j < 5; j++)
+            setupError[i][j] = timeError[i][j] = defaultError[j];
 
     testPlan(NCALLBACKS * 2 + 1);
 
@@ -128,8 +152,8 @@
             double delta = epicsTimeDiffInSeconds(&pcbt[i]->pass1Time, &start);
             testOk(fabs(delta) < 0.05, "callback %.02f setup time |%f| < 0.05",
                     pcbt[i]->delay, delta);
+            updateStats(setupError[i%NUM_CALLBACK_PRIORITIES], delta);
         }
-        
     }
 
     for (i = 0; i < NCALLBACKS ; i++) {
@@ -140,7 +164,18 @@
         error = delta - pcbt[i]->delay;
         testOk(fabs(error) < 0.05, "delay %.02f seconds, callback time error |%.04f| < 0.05",
                 pcbt[i]->delay, error);
+        updateStats(timeError[i%NUM_CALLBACK_PRIORITIES], error);
     }
 
+    testDiag("Setup time statistics");
+    printStats(setupError[0], "LOW");
+    printStats(setupError[1], "MID");
+    printStats(setupError[2], "HIGH");
+
+    testDiag("Delay time statistics");
+    printStats(timeError[0], "LOW");
+    printStats(timeError[1], "MID");
+    printStats(timeError[2], "HIGH");
+
     return testDone();
 }

=== modified file 'src/ioc/misc/dbCore.dbd'
--- src/ioc/misc/dbCore.dbd	2011-02-26 05:56:51 +0000
+++ src/ioc/misc/dbCore.dbd	2013-05-17 16:25:35 +0000
@@ -12,3 +12,5 @@
 variable(dbRecordsOnceOnly,int)
 variable(dbBptNotMonotonic,int)
 
+# Default number of parallel callback threads
+variable(callbackParallelThreadsDefault,int)

=== modified file 'src/libCom/ring/epicsRingBytes.c'
--- src/libCom/ring/epicsRingBytes.c	2012-05-24 18:31:28 +0000
+++ src/libCom/ring/epicsRingBytes.c	2013-05-17 16:25:35 +0000
@@ -3,13 +3,16 @@
 *     National Laboratory.
 * Copyright (c) 2002 The Regents of the University of California, as
 *     Operator of Los Alamos National Laboratory.
-* EPICS BASE Versions 3.13.7
-* and higher are distributed subject to a Software License Agreement found
-* in file LICENSE that is included with this distribution. 
+* Copyright (c) 2012 ITER Organization.
+* EPICS BASE is distributed subject to a Software License Agreement found
+* in file LICENSE that is included with this distribution.
 \*************************************************************************/
-/* epicsRingBytes.cd */
 
-/* Author:  Eric Norum & Marty Kraimer Date:    15JUL99 */
+/*
+ * Author:  Marty Kraimer Date:    15JUL99
+ *          Eric Norum
+ *          Ralph Lange <Ralph.Lange@gmx.de>
+ */
 
 #include <stddef.h>
 #include <string.h>
@@ -18,6 +21,7 @@
 #include <stdio.h>
 
 #define epicsExportSharedSymbols
+#include "epicsSpin.h"
 #include "dbDefs.h"
 #include "epicsRingBytes.h"
 
@@ -30,6 +34,7 @@
 #define SLOP    16
 
 typedef struct ringPvt {
+    epicsSpinId    lock;
     volatile int   nextPut;
     volatile int   nextGet;
     int            size;
@@ -44,12 +49,23 @@
     pring->size = size + SLOP;
     pring->nextGet = 0;
     pring->nextPut = 0;
+    pring->lock    = 0;
+    return((void *)pring);
+}
+
+epicsShareFunc epicsRingBytesId  epicsShareAPI epicsRingBytesLockedCreate(int size)
+{
+    ringPvt *pring = (ringPvt *)epicsRingBytesCreate(size);
+    if(!pring)
+        return NULL;
+    pring->lock = epicsSpinCreate();
     return((void *)pring);
 }
 
 epicsShareFunc void epicsShareAPI epicsRingBytesDelete(epicsRingBytesId id)
 {
     ringPvt *pring = (ringPvt *)id;
+    if (pring->lock) epicsSpinDestroy(pring->lock);
     free((void *)pring);
 }
 
@@ -57,11 +73,14 @@
     epicsRingBytesId id, char *value,int nbytes)
 {
     ringPvt *pring = (ringPvt *)id;
-    int nextGet = pring->nextGet;
-    int nextPut = pring->nextPut;
-    int size = pring->size;
+    int nextGet, nextPut, size;
     int count;
 
+    if (pring->lock) epicsSpinLock(pring->lock);
+    nextGet = pring->nextGet;
+    nextPut = pring->nextPut;
+    size = pring->size;
+
     if (nextGet <= nextPut) {
         count = nextPut - nextGet;
         if (count < nbytes)
@@ -89,6 +108,8 @@
         }
     }
     pring->nextGet = nextGet;
+
+    if (pring->lock) epicsSpinUnlock(pring->lock);
     return nbytes;
 }
 
@@ -96,23 +117,30 @@
     epicsRingBytesId id, char *value,int nbytes)
 {
     ringPvt *pring = (ringPvt *)id;
-    int nextGet = pring->nextGet;
-    int nextPut = pring->nextPut;
-    int size = pring->size;
+    int nextGet, nextPut, size;
     int freeCount, copyCount, topCount;
 
+    if (pring->lock) epicsSpinLock(pring->lock);
+    nextGet = pring->nextGet;
+    nextPut = pring->nextPut;
+    size = pring->size;
+
     if (nextPut < nextGet) {
         freeCount = nextGet - nextPut - SLOP;
-        if (nbytes > freeCount)
+        if (nbytes > freeCount) {
+            if (pring->lock) epicsSpinUnlock(pring->lock);
             return 0;
+        }
         if (nbytes)
             memcpy ((void *)&pring->buffer[nextPut], value, nbytes);
         nextPut += nbytes;
     }
     else {
         freeCount = size - nextPut + nextGet - SLOP;
-        if (nbytes > freeCount)
+        if (nbytes > freeCount) {
+            if (pring->lock) epicsSpinUnlock(pring->lock);
             return 0;
+        }
         topCount = size - nextPut;
         copyCount = (nbytes > topCount) ?  topCount : nbytes;
         if (copyCount)
@@ -126,6 +154,8 @@
         }
     }
     pring->nextPut = nextPut;
+
+    if (pring->lock) epicsSpinUnlock(pring->lock);
     return nbytes;
 }
 
@@ -133,14 +163,20 @@
 {
     ringPvt *pring = (ringPvt *)id;
 
+    if (pring->lock) epicsSpinLock(pring->lock);
     pring->nextGet = pring->nextPut;
+    if (pring->lock) epicsSpinUnlock(pring->lock);
 }
 
 epicsShareFunc int epicsShareAPI epicsRingBytesFreeBytes(epicsRingBytesId id)
 {
     ringPvt *pring = (ringPvt *)id;
-    int nextGet = pring->nextGet;
-    int nextPut = pring->nextPut;
+    int nextGet, nextPut;
+
+    if (pring->lock) epicsSpinLock(pring->lock);
+    nextGet = pring->nextGet;
+    nextPut = pring->nextPut;
+    if (pring->lock) epicsSpinUnlock(pring->lock);
 
     if (nextPut < nextGet)
         return nextGet - nextPut - SLOP;
@@ -151,8 +187,18 @@
 epicsShareFunc int epicsShareAPI epicsRingBytesUsedBytes(epicsRingBytesId id)
 {
     ringPvt *pring = (ringPvt *)id;
-
-    return pring->size - epicsRingBytesFreeBytes(id) - SLOP;
+    int nextGet, nextPut;
+    int used;
+
+    if (pring->lock) epicsSpinLock(pring->lock);
+    nextGet = pring->nextGet;
+    nextPut = pring->nextPut;
+    if (pring->lock) epicsSpinUnlock(pring->lock);
+
+    used = nextPut - nextGet;
+    if (used < 0) used += pring->size;
+
+    return used;
 }
 
 epicsShareFunc int epicsShareAPI epicsRingBytesSize(epicsRingBytesId id)
@@ -165,8 +211,13 @@
 epicsShareFunc int epicsShareAPI epicsRingBytesIsEmpty(epicsRingBytesId id)
 {
     ringPvt *pring = (ringPvt *)id;
-
-    return (pring->nextPut == pring->nextGet);
+    int isEmpty;
+
+    if (pring->lock) epicsSpinLock(pring->lock);
+    isEmpty = (pring->nextPut == pring->nextGet);
+    if (pring->lock) epicsSpinUnlock(pring->lock);
+
+    return isEmpty;
 }
 
 epicsShareFunc int epicsShareAPI epicsRingBytesIsFull(epicsRingBytesId id)

=== modified file 'src/libCom/ring/epicsRingBytes.h'
--- src/libCom/ring/epicsRingBytes.h	2002-07-12 21:35:43 +0000
+++ src/libCom/ring/epicsRingBytes.h	2013-05-17 16:25:35 +0000
@@ -3,13 +3,16 @@
 *     National Laboratory.
 * Copyright (c) 2002 The Regents of the University of California, as
 *     Operator of Los Alamos National Laboratory.
-* EPICS BASE Versions 3.13.7
-* and higher are distributed subject to a Software License Agreement found
-* in file LICENSE that is included with this distribution. 
+* Copyright (c) 2012 ITER Organization.
+* EPICS BASE is distributed subject to a Software License Agreement found
+* in file LICENSE that is included with this distribution.
 \*************************************************************************/
-/*epicsRingBytes.h */
 
-/* Author:  Eric Norum & Marty Kraimer Date:    15JUL99 */
+/*
+ * Author:  Marty Kraimer Date:    15JUL99
+ *          Eric Norum
+ *          Ralph Lange <Ralph.Lange@gmx.de>
+ */
 
 #ifndef INCepicsRingBytesh
 #define INCepicsRingBytesh
@@ -23,6 +26,8 @@
 typedef void *epicsRingBytesId;
 
 epicsShareFunc epicsRingBytesId  epicsShareAPI epicsRingBytesCreate(int nbytes);
+/* Same, but secured by a spinlock */
+epicsShareFunc epicsRingBytesId  epicsShareAPI epicsRingBytesLockedCreate(int nbytes);
 epicsShareFunc void epicsShareAPI epicsRingBytesDelete(epicsRingBytesId id);
 epicsShareFunc int  epicsShareAPI epicsRingBytesGet(
     epicsRingBytesId id, char *value,int nbytes);
@@ -42,6 +47,8 @@
 /* NOTES
     If there is only one writer it is not necessary to lock for put
     If there is a single reader it is not necessary to lock for puts
+
+    epicsRingBytesLocked uses a spinlock.
 */
 
 #endif /* INCepicsRingBytesh */

=== modified file 'src/libCom/ring/epicsRingPointer.cpp'
--- src/libCom/ring/epicsRingPointer.cpp	2013-01-30 00:02:51 +0000
+++ src/libCom/ring/epicsRingPointer.cpp	2013-05-17 16:25:35 +0000
@@ -3,11 +3,16 @@
 *     National Laboratory.
 * Copyright (c) 2002 The Regents of the University of California, as
 *     Operator of Los Alamos National Laboratory.
+* Copyright (c) 2012 ITER Organization.
 * EPICS BASE is distributed subject to a Software License Agreement found
 * in file LICENSE that is included with this distribution.
 \*************************************************************************/
-/*epicsRingPointer.cpp*/
-/* Author:  Marty Kraimer Date:    13OCT2000 */
+
+/*
+ * Author:  Marty Kraimer Date:    13OCT2000
+ *          Ralph Lange <Ralph.Lange@gmx.de>
+ */
+
 
 #include <stddef.h>
 #include <string.h>
@@ -22,7 +27,13 @@
 
 epicsShareFunc epicsRingPointerId  epicsShareAPI epicsRingPointerCreate(int size)
 {
-    voidPointer *pvoidPointer = new voidPointer(size);
+    voidPointer *pvoidPointer = new voidPointer(size, false);
+    return(reinterpret_cast<void *>(pvoidPointer));
+}
+
+epicsShareFunc epicsRingPointerId  epicsShareAPI epicsRingPointerLockedCreate(int size)
+{
+    voidPointer *pvoidPointer = new voidPointer(size, true);
     return(reinterpret_cast<void *>(pvoidPointer));
 }
 

=== modified file 'src/libCom/ring/epicsRingPointer.h'
--- src/libCom/ring/epicsRingPointer.h	2013-01-30 00:02:51 +0000
+++ src/libCom/ring/epicsRingPointer.h	2013-05-17 16:25:35 +0000
@@ -3,12 +3,15 @@
 *     National Laboratory.
 * Copyright (c) 2002 The Regents of the University of California, as
 *     Operator of Los Alamos National Laboratory.
+* Copyright (c) 2012 ITER Organization.
 * EPICS BASE is distributed subject to a Software License Agreement found
 * in file LICENSE that is included with this distribution.
 \*************************************************************************/
-/*epicsRingPointer.h */
 
-/* Author:  Marty Kraimer Date:    15JUL99 */
+/*
+ * Author:  Marty Kraimer Date:    15JUL99
+ *          Ralph Lange <Ralph.Lange@gmx.de>
+ */
 
 #ifndef INCepicsRingPointerh
 #define INCepicsRingPointerh
@@ -16,15 +19,18 @@
 /* NOTES
  *   If there is only one writer it is not necessary to lock push
  *   If there is a single reader it is not necessary to lock pop
+ *
+ *   epicsRingPointerLocked uses a spinlock.
  */
 
+#include "epicsSpin.h"
 #include "shareLib.h"
 
 #ifdef __cplusplus
 template <class T>
 class epicsRingPointer {
 public: /* Functions */
-    epicsRingPointer(int size);
+    epicsRingPointer(int size, bool locked);
     ~epicsRingPointer();
     bool push(T *p);
     T* pop();
@@ -42,6 +48,7 @@
     epicsRingPointer& operator=(const epicsRingPointer &);
 
 private: /* Data */
+    epicsSpinId lock;
     volatile int nextPush;
     volatile int nextPop;
     int size;
@@ -54,6 +61,8 @@
 typedef void *epicsRingPointerId;
 
 epicsShareFunc epicsRingPointerId  epicsShareAPI epicsRingPointerCreate(int size);
+/* Same, but secured by a spinlock */
+epicsShareFunc epicsRingPointerId  epicsShareAPI epicsRingPointerLockedCreate(int size);
 epicsShareFunc void epicsShareAPI epicsRingPointerDelete(epicsRingPointerId id);
 /*ringPointerPush returns (0,1) if p (was not, was) put on ring*/
 epicsShareFunc int  epicsShareAPI epicsRingPointerPush(epicsRingPointerId id,void *p);
@@ -85,72 +94,105 @@
 #ifdef __cplusplus
 
 template <class T>
-inline epicsRingPointer<T>::epicsRingPointer(int sz) :
-    nextPush(0), nextPop(0), size(sz+1), buffer(new T* [sz+1]) {}
+inline epicsRingPointer<T>::epicsRingPointer(int sz, bool locked) :
+    lock(0), nextPush(0), nextPop(0), size(sz+1), buffer(new T* [sz+1])
+{
+    if (locked)
+        lock = epicsSpinCreate();
+}
 
 template <class T>
 inline epicsRingPointer<T>::~epicsRingPointer()
-{   delete [] buffer;}
+{
+    if (lock) epicsSpinDestroy(lock);
+    delete [] buffer;
+}
 
 template <class T>
 inline bool epicsRingPointer<T>::push(T *p)
 {
+    if (lock) epicsSpinLock(lock);
     int next = nextPush;
     int newNext = next + 1;
     if(newNext>=size) newNext=0;
-    if(newNext==nextPop) return(false);
+    if (newNext == nextPop) {
+        if (lock) epicsSpinUnlock(lock);
+        return(false);
+    }
     buffer[next] = p;
     nextPush = newNext;
+    if (lock) epicsSpinUnlock(lock);
     return(true);
 }
 
 template <class T>
 inline T* epicsRingPointer<T>::pop()
 {
+    if (lock) epicsSpinLock(lock);
     int next = nextPop;
-    if(next == nextPush) return(0);
+    if (next == nextPush) {
+        if (lock) epicsSpinUnlock(lock);
+        return(0);
+    }
     T*p  = buffer[next];
     ++next;
     if(next >=size) next = 0;
     nextPop = next;
+    if (lock) epicsSpinUnlock(lock);
     return(p);
 }
 
 template <class T>
 inline void epicsRingPointer<T>::flush()
 {
+    if (lock) epicsSpinLock(lock);
     nextPop = 0;
     nextPush = 0;
+    if (lock) epicsSpinUnlock(lock);
 }
 
 template <class T>
 inline int epicsRingPointer<T>::getFree() const
 {
+    if (lock) epicsSpinLock(lock);
     int n = nextPop - nextPush - 1;
     if (n < 0) n += size;
+    if (lock) epicsSpinUnlock(lock);
     return n;
 }
 
 template <class T>
 inline int epicsRingPointer<T>::getUsed() const
 {
+    if (lock) epicsSpinLock(lock);
     int n = nextPush - nextPop;
     if (n < 0) n += size;
+    if (lock) epicsSpinUnlock(lock);
     return n;
 }
 
 template <class T>
 inline int epicsRingPointer<T>::getSize() const
-{   return(size-1);}
+{
+    return(size-1);
+}
 
 template <class T>
 inline bool epicsRingPointer<T>::isEmpty() const
-{   return(nextPush==nextPop);}
+{
+    bool isEmpty;
+    if (lock) epicsSpinLock(lock);
+    isEmpty = (nextPush == nextPop);
+    if (lock) epicsSpinUnlock(lock);
+    return isEmpty;
+}
 
 template <class T>
 inline bool epicsRingPointer<T>::isFull() const
 {
+    if (lock) epicsSpinLock(lock);
     int count = nextPush - nextPop +1;
+    if (lock) epicsSpinUnlock(lock);
     return((count == 0) || (count == size));
 }
 

=== modified file 'src/libCom/test/ringPointerTest.c'
--- src/libCom/test/ringPointerTest.c	2009-04-23 22:28:44 +0000
+++ src/libCom/test/ringPointerTest.c	2013-05-17 16:25:35 +0000
@@ -3,6 +3,7 @@
 *     National Laboratory.
 * Copyright (c) 2002 The Regents of the University of California, as
 *     Operator of Los Alamos National Laboratory.
+* Copyright (c) 2013 ITER Organization.
 * EPICS BASE is distributed subject to a Software License Agreement found
 * in file LICENSE that is included with this distribution.
 \*************************************************************************/
@@ -26,12 +27,17 @@
 #include "testMain.h"
 
 #define ringSize 10
+#define consumerCount 4
+#define producerCount 4
 
 static volatile int testExit = 0;
+int value[ringSize*2];
 
 typedef struct info {
     epicsEventId consumerEvent;
     epicsRingPointerId	ring;
+    int checkOrder;
+    int value[ringSize*2];
 }info;
 
 static void consumer(void *arg)
@@ -39,16 +45,46 @@
     info *pinfo = (info *)arg;
     static int expectedValue=0;
     int *newvalue;
+    char myname[20];
 
-    testDiag("Consumer starting");
+    epicsThreadGetName(epicsThreadGetIdSelf(), myname, sizeof(myname));
+    testDiag("%s starting", myname);
     while(1) {
         epicsEventMustWait(pinfo->consumerEvent);
         if (testExit) return;
-        while((newvalue = (int *)epicsRingPointerPop(pinfo->ring))) {
-            testOk(expectedValue == *newvalue, 
-                "Consumer: %d == %d", expectedValue, *newvalue);
-            expectedValue = *newvalue + 1;
-        }  
+        while ((newvalue = (int *)epicsRingPointerPop(pinfo->ring))) {
+            if (pinfo->checkOrder) {
+                testOk(expectedValue == *newvalue,
+                    "%s: (got) %d == %d (expected)", myname, *newvalue, expectedValue);
+                expectedValue = *newvalue + 1;
+            } else {
+                testOk(pinfo->value[*newvalue] <= producerCount, "%s: got a %d (%d times seen before)",
+                        myname, *newvalue, pinfo->value[*newvalue]);
+            }
+            /* This must be atomic... */
+            pinfo->value[*newvalue]++;
+            epicsThreadSleep(0.05);
+        }
+    }
+}
+
+static void producer(void *arg)
+{
+    info *pinfo = (info *)arg;
+    char myname[20];
+    int i;
+
+    epicsThreadGetName(epicsThreadGetIdSelf(), myname, sizeof(myname));
+    testDiag("%s starting", myname);
+    for (i=0; i<ringSize*2; i++) {
+        while (epicsRingPointerIsFull(pinfo->ring)) {
+            epicsThreadSleep(0.2);
+            if (testExit) return;
+        }
+        testOk(epicsRingPointerPush(pinfo->ring, (void *)&value[i]),
+               "%s: Pushing %d, ring not full", myname, i);
+        epicsEventSignal(pinfo->consumerEvent);
+        if (testExit) return;
     }
 }
 
@@ -57,21 +93,27 @@
     int i;
     info *pinfo;
     epicsEventId consumerEvent;
-    int value[ringSize*2];
     int *pgetValue;
     epicsRingPointerId ring;
     epicsThreadId tid;
+    char threadName[20];
 
-    testPlan(54);
+    testPlan(256);
 
     for (i=0; i<ringSize*2; i++) value[i] = i;
+
     pinfo = calloc(1,sizeof(info));
     if(!pinfo) testAbort("calloc failed");
+
     pinfo->consumerEvent = consumerEvent = epicsEventMustCreate(epicsEventEmpty);
     if (!consumerEvent) {
         testAbort("epicsEventMustCreate failed");
     }
 
+    testDiag("******************************************************");
+    testDiag("** Test 1: local ring pointer, check size and order **");
+    testDiag("******************************************************");
+
     pinfo->ring = ring = epicsRingPointerCreate(ringSize);
     if (!ring) {
         testAbort("epicsRingPointerCreate failed");
@@ -86,22 +128,74 @@
     }
     testOk(epicsRingPointerIsEmpty(ring), "Ring empty");
 
+    testDiag("**************************************************************");
+    testDiag("** Test 2: unlocked ring pointer, one consumer, check order **");
+    testDiag("**************************************************************");
+
+    pinfo->checkOrder = 1;
     tid=epicsThreadCreate("consumer", 50, 
         epicsThreadGetStackSize(epicsThreadStackSmall), consumer, pinfo);
     if(!tid) testAbort("epicsThreadCreate failed");
-    epicsThreadSleep(0.1);
+    epicsThreadSleep(0.2);
 
     for (i=0; i<ringSize*2; i++) {
         if (epicsRingPointerIsFull(ring)) {
-            epicsEventSignal(consumerEvent);
             epicsThreadSleep(0.2);
         }
-        testOk(epicsRingPointerPush(ring, (void *)&value[i]), "Ring not full");
-    }
+        testOk(epicsRingPointerPush(ring, (void *)&value[i]), "Pushing %d, ring not full", i);
+        epicsEventSignal(consumerEvent);
+    }
+    epicsThreadSleep(1.0);
+    testOk(epicsRingPointerIsEmpty(ring), "Ring empty");
+
+    for (i=0; i<ringSize*2; i++) {
+        testOk(pinfo->value[i] == 1, "Value test: %d was processed", i);
+    }
+
+    testExit = 1;
     epicsEventSignal(consumerEvent);
+    epicsThreadSleep(1.0);
+
+    epicsRingPointerDelete(pinfo->ring);
+
+    testDiag("*************************************************************************************");
+    testDiag("** Test 3: locked ring pointer, many consumers, many producers, check no of copies **");
+    testDiag("*************************************************************************************");
+
+    pinfo->ring = ring = epicsRingPointerLockedCreate(ringSize);
+    if (!ring) {
+        testAbort("epicsRingPointerLockedCreate failed");
+    }
+    testOk(epicsRingPointerIsEmpty(ring), "Ring empty");
+
+    for (i=0; i<ringSize*2; i++) pinfo->value[i] = 0;
+    testExit = 0;
+    pinfo->checkOrder = 0;
+    for (i=0; i<consumerCount; i++) {
+        sprintf(threadName, "consumer%d", i);
+        tid=epicsThreadCreate(threadName, 50,
+                              epicsThreadGetStackSize(epicsThreadStackSmall), consumer, pinfo);
+        if(!tid) testAbort("epicsThreadCreate failed");
+    }
     epicsThreadSleep(0.2);
+
+    for (i=0; i<producerCount; i++) {
+        sprintf(threadName, "producer%d", i);
+        tid=epicsThreadCreate(threadName, 50,
+                              epicsThreadGetStackSize(epicsThreadStackSmall), producer, pinfo);
+        if(!tid) testAbort("epicsThreadCreate failed");
+    }
+
+    epicsThreadSleep(0.5);
+    epicsEventSignal(consumerEvent);
+    epicsThreadSleep(1.0);
+
     testOk(epicsRingPointerIsEmpty(ring), "Ring empty");
 
+    for (i=0; i<ringSize*2; i++) {
+        testOk(pinfo->value[i] == producerCount, "Value test: %d was processed %d times", i, producerCount);
+    }
+
     testExit = 1;
     epicsEventSignal(consumerEvent);
     epicsThreadSleep(1.0);


Replies:
Re: [Merge] lp:~epics-core/epics-base/parallel-cbthreads into lp:epics-base Ralph Lange
Re: [Merge] lp:~epics-core/epics-base/parallel-cbthreads into lp:epics-base mdavidsaver
[Merge] lp:~epics-core/epics-base/parallel-cbthreads into lp:epics-base Ralph Lange
Re: [Merge] lp:~epics-core/epics-base/parallel-cbthreads into lp:epics-base Ralph Lange

Navigate by Date:
Prev: [Bug 697413] Re: Index is not bookmarked Ralph Lange
Next: Re: [Merge] lp:~epics-core/epics-base/parallel-cbthreads into lp:epics-base Ralph Lange
Index: 2002  2003  2004  2005  2006  2007  2008  2009  2010  2011  2012  <20132014 
Navigate by Thread:
Prev: [Bug 697413] Re: Index is not bookmarked Ralph Lange
Next: Re: [Merge] lp:~epics-core/epics-base/parallel-cbthreads into lp:epics-base Ralph Lange
Index: 2002  2003  2004  2005  2006  2007  2008  2009  2010  2011  2012  <20132014 
ANJ, 18 Nov 2013 Valid HTML 4.01! · Home · News · About · Base · Modules · Extensions · Distributions · Download ·
· EPICSv4 · IRMIS · Talk · Bugs · Documents · Links · Licensing ·