grcm: Only Assert on Received Message Checks if Shutdown is False
[ipc/ipcdev.git] / packages / ti / grcm / RcmServer.c
1 /*
2  * Copyright (c) 2011-2014, Texas Instruments Incorporated
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
33 /*
34  *  ======== RcmServer.c ========
35  *
36  */
38 /* this define must precede inclusion of any xdc header file */
39 #define Registry_CURDESC ti_grcm_RcmServer__Desc
40 #define MODULE_NAME "ti.grcm.RcmServer"
42 #define xdc_runtime_Memory__nolocalnames  /* short name clashes with SysLink */
44 /* rtsc header files */
45 #include <xdc/std.h>
46 #include <xdc/runtime/Assert.h>
47 #include <xdc/runtime/Diags.h>
48 #include <xdc/runtime/Error.h>
49 #include <xdc/runtime/IHeap.h>
50 #include <xdc/runtime/Log.h>
51 #include <xdc/runtime/Memory.h>
52 #include <xdc/runtime/Registry.h>
53 #include <xdc/runtime/Startup.h>
54 #include <xdc/runtime/knl/GateThread.h>
55 #include <xdc/runtime/knl/ISemaphore.h>
56 #include <xdc/runtime/knl/Semaphore.h>
57 #include <xdc/runtime/knl/SemThread.h>
58 #include <xdc/runtime/knl/Thread.h>
59 #include <xdc/runtime/System.h>
61 #include <ti/sysbios/knl/Task.h>
63 #define MSGBUFFERSIZE    512   /* Make global and move to RPMessage.h */
65 #if defined(RCM_ti_ipc)
66 #include <ti/sdo/utils/List.h>
67 #include <ti/ipc/MultiProc.h>
69 #elif defined(RCM_ti_syslink)
70 #include <ti/syslink/utils/List.h>
71 #define List_Struct List_Object
72 #define List_handle(exp) (exp)
74 #else
75     #error "undefined ipc binding";
76 #endif
78 /* local header files */
79 #include "RcmClient.h"
80 #include "RcmTypes.h"
81 #include "RcmServer.h"
83 #if USE_RPMESSAGE
84 #include <ti/srvmgr/rpmsg_omx.h>
85 #endif
87 #define _RCM_KeyResetValue 0x07FF       /* key reset value*/
88 #define _RCM_KeyMask 0x7FF00000         /* key mask in function index*/
89 #define _RCM_KeyShift 20                /* key bit position in function index*/
91 #define RcmServer_MAX_TABLES 9          /* max number of function tables*/
92 #define RcmServer_POOL_MAP_LEN 4        /* pool map length*/
94 #define RcmServer_E_InvalidFxnIdx       (-101)
95 #define RcmServer_E_JobIdNotFound       (-102)
96 #define RcmServer_E_PoolIdNotFound      (-103)
98 typedef struct {                        /* function table element*/
99     String                      name;
100 #if USE_RPMESSAGE
101     union  {
102        RcmServer_MsgFxn         fxn;
103        RcmServer_MsgCreateFxn   createFxn;
104     }addr;
105 #else
106     RcmServer_MsgFxn            addr;
107 #endif
108     UInt16                      key;
109 } RcmServer_FxnTabElem;
111 typedef struct {
112     Int                         length;
113     RcmServer_FxnTabElem *      elem;
114 } RcmServer_FxnTabElemAry;
116 typedef struct {
117     String                      name;       /* pool name*/
118     Int                         count;      /* thread count (at create time)*/
119     Thread_Priority             priority;   /* thread priority*/
120     Int                         osPriority;
121     SizeT                       stackSize;  /* thread stack size*/
122     String                      stackSeg;   /* thread stack placement*/
123     ISemaphore_Handle           sem;        /* message semaphore (counting)*/
124     List_Struct                 threadList; /* list of worker threads*/
125     List_Struct                 readyQueue; /* queue of messages*/
126 } RcmServer_ThreadPool;
128 typedef struct RcmServer_Object_tag {
129     GateThread_Struct           gate;       /* instance gate*/
130     Ptr                         run;        /* run semaphore for the server*/
131 #if USE_RPMESSAGE
132     RPMessage_Handle         serverQue;  /* inbound message queue */
133     UInt32                      localAddr;  /* inbound message queue address */
134     UInt32                      replyAddr;  /* Reply address (same per inst.) */
135     UInt32                      dstProc;    /* Reply processor. */
136 #else
137     MessageQ_Handle             serverQue;  /* inbound message queue*/
138 #endif
139     Thread_Handle               serverThread; /* server thread object*/
140     RcmServer_FxnTabElemAry     fxnTabStatic; /* static function table*/
141     RcmServer_FxnTabElem *      fxnTab[RcmServer_MAX_TABLES]; /* base pointers*/
142     UInt16                      key;        /* function index key*/
143     UInt16                      jobId;      /* job id tracker*/
144     Bool                        shutdown;   /* server shutdown flag*/
145     Int                         poolMap0Len;/* length of static table*/
146     RcmServer_ThreadPool *      poolMap[RcmServer_POOL_MAP_LEN];
147     List_Handle                 jobList;    /* list of job stream queues*/
148 } RcmServer_Object;
150 typedef struct {
151     List_Elem                   elem;
152     UInt16                      jobId;      /* current job stream id*/
153     Thread_Handle               thread;     /* server thread object*/
154     Bool                        terminate;  /* thread terminate flag*/
155     RcmServer_ThreadPool*       pool;       /* worker pool*/
156     RcmServer_Object *          server;     /* server instance*/
157 } RcmServer_WorkerThread;
159 typedef struct {
160     List_Elem                   elem;
161     UInt16                      jobId;      /* job stream id*/
162     Bool                        empty;      /* true if no messages on server*/
163     List_Struct                 msgQue;     /* queue of messages*/
164 } RcmServer_JobStream;
166 typedef struct RcmServer_Module_tag {
167     String              name;
168     IHeap_Handle        heap;
169 } RcmServer_Module;
172 /* private functions */
173 static
174 Int RcmServer_Instance_init_P(
175         RcmServer_Object *              obj,
176         String                          name,
177         const RcmServer_Params *        params
178     );
180 static
181 Int RcmServer_Instance_finalize_P(
182         RcmServer_Object *              obj
183     );
185 static
186 Int RcmServer_acqJobId_P(
187         RcmServer_Object *              obj,
188         UInt16 *                        jobIdPtr
189     );
191 static
192 Int RcmServer_dispatch_P(
193         RcmServer_Object *              obj,
194         RcmClient_Packet *              packet
195     );
197 static
198 Int RcmServer_execMsg_I(
199         RcmServer_Object *              obj,
200         RcmClient_Message *             msg
201     );
203 static
204 Int RcmServer_getFxnAddr_P(
205         RcmServer_Object *              obj,
206         UInt32                          fxnIdx,
207         RcmServer_MsgFxn *              addrPtr,
208         RcmServer_MsgCreateFxn *        createPtr
209     );
211 static
212 UInt16 RcmServer_getNextKey_P(
213         RcmServer_Object *              obj
214     );
216 static
217 Int RcmServer_getSymIdx_P(
218         RcmServer_Object *              obj,
219         String                          name,
220         UInt32 *                        index
221     );
223 static
224 Int RcmServer_getPool_P(
225         RcmServer_Object *              obj,
226         RcmClient_Packet *              packet,
227         RcmServer_ThreadPool **         poolP
228     );
230 static
231 Void RcmServer_process_P(
232         RcmServer_Object *              obj,
233         RcmClient_Packet *              packet
234     );
236 static
237 Int RcmServer_relJobId_P(
238         RcmServer_Object *              obj,
239         UInt16                          jobId
240     );
242 static
243 Void RcmServer_serverThrFxn_P(
244         IArg                            arg
245     );
247 static inline
248 Void RcmServer_setStatusCode_I(
249         RcmClient_Packet *              packet,
250         UInt16                          code
251     );
253 static
254 Void RcmServer_workerThrFxn_P(
255         IArg                            arg
256     );
259 #define RcmServer_Module_heap() (RcmServer_Mod.heap)
262 /* static objects */
263 static Int curInit = 0;
264 static Char ti_grcm_RcmServer_Name[] = {
265         't','i','.','s','d','o','.','r','c','m','.',
266         'R','c','m','S','e','r','v','e','r','\0'
267     };
269 static RcmServer_Module RcmServer_Mod = {
270     MODULE_NAME,        /* name */
271     (IHeap_Handle)NULL  /* heap */
272 };
274 /* module diags mask */
275 Registry_Desc Registry_CURDESC;
278 /*
279  *  ======== RcmServer_init ========
280  *
281  *  This function must be serialized by the caller
282  */
283 Void RcmServer_init(Void)
285     Registry_Result result;
288     if (curInit++ != 0) {
289         return; /* module already initialized */
290     }
292     /* register with xdc.runtime to get a diags mask */
293 /*  result = Registry_addModule(&Registry_CURDESC, MODULE_NAME);*/
294     result = Registry_addModule(&Registry_CURDESC, ti_grcm_RcmServer_Name);
295     Assert_isTrue(result == Registry_SUCCESS, (Assert_Id)NULL);
297     /* the size of object and struct must be the same */
298     Assert_isTrue(sizeof(RcmServer_Object) == sizeof(RcmServer_Struct), NULL);
302 /*
303  *  ======== RcmServer_exit ========
304  *
305  *  This function must be serialized by the caller
306  */
307 Void RcmServer_exit(Void)
309 /*  Registry_Result result;*/
312     if (--curInit != 0) {
313         return; /* module still in use */
314     }
316     /* unregister from xdc.runtime */
317 /*  result = Registry_removeModule(MODULE_NAME);*/
318 /*  Assert_isTrue(result == Registry_SUCCESS, (Assert_Id)NULL);*/
322 /*
323  *  ======== RcmServer_Params_init ========
324  */
325 Void RcmServer_Params_init(RcmServer_Params *params)
327     /* server thread */
328     params->priority = Thread_Priority_HIGHEST;
329     params->osPriority = Thread_INVALID_OS_PRIORITY;
330     params->stackSize = 0;  /* use system default*/
331     params->stackSeg = "";
333     /* default pool */
334     params->defaultPool.name = NULL;
335     params->defaultPool.count = 0;
336     params->defaultPool.priority = Thread_Priority_NORMAL;
337     params->defaultPool.osPriority = Thread_INVALID_OS_PRIORITY;
338     params->defaultPool.stackSize = 0;  /* use system default*/
339     params->defaultPool.stackSeg = "";
341     /* worker pools */
342     params->workerPools.length = 0;
343     params->workerPools.elem = NULL;
345     /* function table */
346     params->fxns.length = 0;
347     params->fxns.elem = NULL;
351 /*
352  *  ======== RcmServer_create ========
353  */
354 #define FXNN "RcmServer_create"
355 Int RcmServer_create(String name, RcmServer_Params *params,
356         RcmServer_Handle *handle)
358     RcmServer_Object *obj;
359     Error_Block eb;
360     Int status = RcmServer_S_SUCCESS;
363     Log_print3(Diags_ENTRY, "--> "FXNN": (name=0x%x, params=0x%x, hP=0x%x)",
364         (IArg)name, (IArg)params, (IArg)handle);
366     /* initialize the error block */
367     Error_init(&eb);
369     if (NULL == handle) {
370         Log_error0(FXNN": Invalid pointer");
371         status = RcmServer_E_FAIL;
372         goto leave;
373     }
375     *handle = (RcmServer_Handle)NULL;
377     /* check for valid params */
378     if (NULL == params) {
379         Log_error0(FXNN": params ptr must not be NULL");
380         status = RcmServer_E_FAIL;
381         goto leave;
382     }
383     if (NULL == name) {
384         Log_error0(FXNN": name passed is NULL!");
385         status = RcmServer_E_FAIL;
386         goto leave;
387     }
389     /* allocate the object */
390     obj = (RcmServer_Handle)xdc_runtime_Memory_calloc(RcmServer_Module_heap(),
391         sizeof(RcmServer_Object), sizeof(Int), &eb);
393     if (NULL == obj) {
394         Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
395             (IArg)RcmServer_Module_heap(), sizeof(RcmServer_Object));
396         status = RcmServer_E_NOMEMORY;
397         goto leave;
398     }
400     Log_print1(Diags_LIFECYCLE, FXNN": instance create: 0x%x", (IArg)obj);
402     /* object-specific initialization */
403     status = RcmServer_Instance_init_P(obj, name, params);
405     if (status < 0) {
406         RcmServer_Instance_finalize_P(obj);
407         xdc_runtime_Memory_free(RcmServer_Module_heap(),
408             (Ptr)obj, sizeof(RcmServer_Object));
409         goto leave;
410     }
412     /* success, return opaque pointer */
413     *handle = (RcmServer_Handle)obj;
416 leave:
417     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
418     return(status);
420 #undef FXNN
423 /*
424  *  ======== RcmServer_construct ========
425  */
426 #define FXNN "RcmServer_construct"
427 Int RcmServer_construct(RcmServer_Struct *structPtr, String name,
428     const RcmServer_Params *params)
430     RcmServer_Object *obj = (RcmServer_Object*)structPtr;
431     Int status = RcmServer_S_SUCCESS;
434     Log_print1(Diags_ENTRY, "--> "FXNN": (structPtr=0x%x)", (IArg)structPtr);
435     Log_print1(Diags_LIFECYCLE, FXNN": instance construct: 0x%x", (IArg)obj);
437     /* ensure the constructed object is zeroed */
438     _memset((Void *)obj, 0, sizeof(RcmServer_Object));
440     /* object-specific initialization */
441     status = RcmServer_Instance_init_P(obj, name, params);
443     if (status < 0) {
444         RcmServer_Instance_finalize_P(obj);
445         goto leave;
446     }
449 leave:
450     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
451     return(status);
453 #undef FXNN
456 /*
457  *  ======== RcmServer_Instance_init_P ========
458  */
459 #define FXNN "RcmServer_Instance_init_P"
460 Int RcmServer_Instance_init_P(RcmServer_Object *obj, String name,
461         const RcmServer_Params *params)
463     Error_Block eb;
464     List_Params listP;
465 #if USE_RPMESSAGE == 0
466     MessageQ_Params mqParams;
467 #endif
468     Thread_Params threadP;
469     SemThread_Params semThreadP;
470     SemThread_Handle semThreadH;
471     Int i, j;
472     SizeT size;
473     Char *cp;
474     RcmServer_ThreadPool *poolAry;
475     RcmServer_WorkerThread *worker;
476     List_Handle listH;
477     Int status = RcmServer_S_SUCCESS;
480     Log_print1(Diags_ENTRY, "--> "FXNN": (obj=0x%x)", (IArg)obj);
482     Error_init(&eb);
484     /* initialize instance state */
485     obj->shutdown = FALSE;
486     obj->key = 0;
487     obj->jobId = 0xFFFF;
488     obj->run = NULL;
489     obj->serverQue = NULL;
490     obj->serverThread = NULL;
491     obj->fxnTabStatic.length = 0;
492     obj->fxnTabStatic.elem = NULL;
493     obj->poolMap0Len = 0;
494     obj->jobList = NULL;
497     /* initialize the function table */
498     for (i = 0; i < RcmServer_MAX_TABLES; i++) {
499         obj->fxnTab[i] = NULL;
500     }
502     /* initialize the worker pool map */
503     for (i = 0; i < RcmServer_POOL_MAP_LEN; i++) {
504         obj->poolMap[i] = NULL;
505     }
507     /* create the instance gate */
508     GateThread_construct(&obj->gate, NULL, &eb);
510     if (Error_check(&eb)) {
511         Log_error0(FXNN": could not create gate object");
512         status = RcmServer_E_FAIL;
513         goto leave;
514     }
516     /* create list for job objects */
517 #if defined(RCM_ti_ipc)
518     List_Params_init(&listP);
519     obj->jobList = List_create(&listP, &eb);
521     if (Error_check(&eb)) {
522         Log_error0(FXNN": could not create list object");
523         status = RcmServer_E_FAIL;
524         goto leave;
525     }
526 #elif defined(RCM_ti_syslink)
527     List_Params_init(&listP);
528     obj->jobList = List_create(&listP, NULL);
530     if (obj->jobList == NULL) {
531         Log_error0(FXNN": could not create list object");
532         status = RcmServer_E_FAIL;
533         goto leave;
534     }
535 #endif
537     /* create the static function table */
538     if (params->fxns.length > 0) {
539         obj->fxnTabStatic.length = params->fxns.length;
541         /* allocate static function table */
542         size = params->fxns.length * sizeof(RcmServer_FxnTabElem);
543         obj->fxnTabStatic.elem = xdc_runtime_Memory_alloc(
544             RcmServer_Module_heap(), size, sizeof(Ptr), &eb);
546         if (Error_check(&eb)) {
547             Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
548                 (IArg)RcmServer_Module_heap(), size);
549             status = RcmServer_E_NOMEMORY;
550             goto leave;
551         }
552         obj->fxnTabStatic.elem[0].name = NULL;
554         /* allocate a single block to store all name strings */
555         for (size = 0, i = 0; i < params->fxns.length; i++) {
556             size += _strlen(params->fxns.elem[i].name) + 1;
557         }
558         cp = xdc_runtime_Memory_alloc(
559             RcmServer_Module_heap(), size, sizeof(Ptr), &eb);
561         if (Error_check(&eb)) {
562             Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
563                 (IArg)RcmServer_Module_heap(), size);
564             status = RcmServer_E_NOMEMORY;
565             goto leave;
566         }
568         /* copy function table data into allocated memory blocks */
569         for (i = 0; i < params->fxns.length; i++) {
570             _strcpy(cp, params->fxns.elem[i].name);
571             obj->fxnTabStatic.elem[i].name = cp;
572             cp += (_strlen(params->fxns.elem[i].name) + 1);
573             obj->fxnTabStatic.elem[i].addr.fxn = params->fxns.elem[i].addr.fxn;
574             obj->fxnTabStatic.elem[i].key = 0;
575         }
577         /* hook up the static function table */
578         obj->fxnTab[0] = obj->fxnTabStatic.elem;
579     }
581     /* create static worker pools */
582     if ((params->workerPools.length + 1) > RcmServer_POOL_MAP_LEN) {
583         Log_error1(FXNN": Exceeded maximum number of worker pools =%d",
584             (IArg) (params->workerPools.length) );
585         status = RcmServer_E_NOMEMORY;
586         goto leave;
587     }
588     obj->poolMap0Len = params->workerPools.length + 1; /* workers + default */
590     /* allocate the static worker pool table */
591     size = obj->poolMap0Len * sizeof(RcmServer_ThreadPool);
592     obj->poolMap[0] = (RcmServer_ThreadPool *)xdc_runtime_Memory_alloc(
593         RcmServer_Module_heap(), size, sizeof(Ptr), &eb);
595     if (Error_check(&eb)) {
596         Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
597             (IArg)RcmServer_Module_heap(), size);
598         status = RcmServer_E_NOMEMORY;
599         goto leave;
600     }
602     /* convenience alias */
603     poolAry = obj->poolMap[0];
605     /* allocate a single block to store all name strings         */
606     /* Buffer format is: [SIZE][DPN][\0][WPN1][\0][WPN2][\0].... */
607     /* DPN = Default Pool Name, WPN = Worker Pool Name           */
608     /* In case, DPN is NULL, format is: [SIZE][\0][WPN1][\0].... */
609     /* In case, WPN is NULL, format is: [SIZE][DPN][\0]          */
610     size = sizeof(SizeT) /* block size */
611         + (params->defaultPool.name == NULL ? 1 :
612         _strlen(params->defaultPool.name) + 1);
614     for (i = 0; i < params->workerPools.length; i++) {
615         size += (params->workerPools.elem[i].name == NULL ? 0 :
616             _strlen(params->workerPools.elem[i].name) + 1);
617     }
618     cp = xdc_runtime_Memory_alloc(
619         RcmServer_Module_heap(), size, sizeof(Ptr), &eb);
621     if (Error_check(&eb)) {
622         Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
623             (IArg)RcmServer_Module_heap(), size);
624         status = RcmServer_E_NOMEMORY;
625         goto leave;
626     }
628     *(SizeT *)cp = size;
629     cp += sizeof(SizeT);
631     /* initialize the default worker pool, poolAry[0] */
632     if (params->defaultPool.name != NULL) {
633         _strcpy(cp, params->defaultPool.name);
634         poolAry[0].name = cp;
635         cp += (_strlen(params->defaultPool.name) + 1);
636     }
637     else {
638         poolAry[0].name = cp;
639         *cp++ = '\0';
640     }
642     poolAry[0].count = params->defaultPool.count;
643     poolAry[0].priority = params->defaultPool.priority;
644     poolAry[0].osPriority = params->defaultPool.osPriority;
645     poolAry[0].stackSize = params->defaultPool.stackSize;
646     poolAry[0].stackSeg = NULL;
647     poolAry[0].sem = NULL;
649     List_construct(&(poolAry[0].threadList), NULL);
650     List_construct(&(poolAry[0].readyQueue), NULL);
652     SemThread_Params_init(&semThreadP);
653     semThreadP.mode = SemThread_Mode_COUNTING;
655     semThreadH = SemThread_create(0, &semThreadP, &eb);
657     if (Error_check(&eb)) {
658         Log_error0(FXNN": could not create semaphore");
659         status = RcmServer_E_FAIL;
660         goto leave;
661     }
663     poolAry[0].sem = SemThread_Handle_upCast(semThreadH);
665     /* initialize the static worker pools, poolAry[1..(n-1)] */
666     for (i = 0; i < params->workerPools.length; i++) {
667         if (params->workerPools.elem[i].name != NULL) {
668             _strcpy(cp, params->workerPools.elem[i].name);
669             poolAry[i+1].name = cp;
670             cp += (_strlen(params->workerPools.elem[i].name) + 1);
671         }
672         else {
673             poolAry[i+1].name = NULL;
674         }
676         poolAry[i+1].count = params->workerPools.elem[i].count;
677         poolAry[i+1].priority = params->workerPools.elem[i].priority;
678         poolAry[i+1].osPriority =params->workerPools.elem[i].osPriority;
679         poolAry[i+1].stackSize = params->workerPools.elem[i].stackSize;
680         poolAry[i+1].stackSeg = NULL;
682         List_construct(&(poolAry[i+1].threadList), NULL);
683         List_construct(&(poolAry[i+1].readyQueue), NULL);
685         SemThread_Params_init(&semThreadP);
686         semThreadP.mode = SemThread_Mode_COUNTING;
688         semThreadH = SemThread_create(0, &semThreadP, &eb);
690         if (Error_check(&eb)) {
691             Log_error0(FXNN": could not create semaphore");
692             status = RcmServer_E_FAIL;
693             goto leave;
694         }
696         poolAry[i+1].sem = SemThread_Handle_upCast(semThreadH);
697     }
699     /* create the worker threads in each static pool */
700     for (i = 0; i < obj->poolMap0Len; i++) {
701         for (j = 0; j < poolAry[i].count; j++) {
703             /* allocate worker thread object */
704             size = sizeof(RcmServer_WorkerThread);
705             worker = (RcmServer_WorkerThread *)xdc_runtime_Memory_alloc(
706                 RcmServer_Module_heap(), size, sizeof(Ptr), &eb);
708             if (Error_check(&eb)) {
709                 Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
710                     (IArg)RcmServer_Module_heap(), size);
711                 status = RcmServer_E_NOMEMORY;
712                 goto leave;
713             }
715             /* initialize worker thread object */
716             worker->jobId = RcmClient_DISCRETEJOBID;
717             worker->thread = NULL;
718             worker->terminate = FALSE;
719             worker->pool = &(poolAry[i]);
720             worker->server = obj;
722             /* add worker thread to worker pool */
723             listH = List_handle(&(poolAry[i].threadList));
724             List_putHead(listH, &(worker->elem));
726             /* create worker thread */
727             Thread_Params_init(&threadP);
728             threadP.arg = (IArg)worker;
729             threadP.priority = poolAry[i].priority;
730             threadP.osPriority = poolAry[i].osPriority;
731             threadP.stackSize = poolAry[i].stackSize;
732             threadP.instance->name = "RcmServer_workerThr";
734             worker->thread = Thread_create(
735                 (Thread_RunFxn)(RcmServer_workerThrFxn_P), &threadP, &eb);
737             if (Error_check(&eb)) {
738                 Log_error2(FXNN": could not create worker thread, "
739                     "pool=%d, thread=%d", (IArg)i, (IArg)j);
740                 status = RcmServer_E_FAIL;
741                 goto leave;
742             }
743         }
744     }
746     /* create the semaphore used to release the server thread */
747     SemThread_Params_init(&semThreadP);
748     semThreadP.mode = SemThread_Mode_COUNTING;
750     obj->run = SemThread_create(0, &semThreadP, &eb);
752     if (Error_check(&eb)) {
753         Log_error0(FXNN": could not create semaphore");
754         status = RcmServer_E_FAIL;
755         goto leave;
756     }
758     /* create the message queue for inbound messages */
759 #if USE_RPMESSAGE
760     obj->serverQue = RPMessage_create(RPMessage_ASSIGN_ANY, NULL, NULL,
761                                          &obj->localAddr);
762 #ifdef BIOS_ONLY_TEST
763     obj->dstProc = MultiProc_self();
764 #else
765     obj->dstProc = MultiProc_getId("HOST");
766 #endif
768 #else
769     MessageQ_Params_init(&mqParams);
770     obj->serverQue = MessageQ_create(name, &mqParams);
771 #endif
773     if (NULL == obj->serverQue) {
774         Log_error0(FXNN": could not create server message queue");
775         status = RcmServer_E_FAIL;
776         goto leave;
777     }
779     /* create the server thread */
780     Thread_Params_init(&threadP);
781     threadP.arg = (IArg)obj;
782     threadP.priority = params->priority;
783     threadP.osPriority = params->osPriority;
784     threadP.stackSize = params->stackSize;
785     threadP.instance->name = "RcmServer_serverThr";
787     obj->serverThread = Thread_create(
788         (Thread_RunFxn)(RcmServer_serverThrFxn_P), &threadP, &eb);
790     if (Error_check(&eb)) {
791         Log_error0(FXNN": could not create server thread");
792         status = RcmServer_E_FAIL;
793         goto leave;
794     }
797 leave:
798     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
799     return(status);
801 #undef FXNN
804 /*
805  *  ======== RcmServer_delete ========
806  */
807 #define FXNN "RcmServer_delete"
808 Int RcmServer_delete(RcmServer_Handle *handlePtr)
810     RcmServer_Object *obj = (RcmServer_Object *)(*handlePtr);
811     Int status = RcmClient_S_SUCCESS;
814     Log_print1(Diags_ENTRY, "--> "FXNN": (handlePtr=0x%x)", (IArg)handlePtr);
816     /* finalize the object */
817     status = RcmServer_Instance_finalize_P(obj);
819     /* free the object memory */
820     Log_print1(Diags_LIFECYCLE, FXNN": instance delete: 0x%x", (IArg)obj);
822     xdc_runtime_Memory_free(RcmServer_Module_heap(),
823         (Ptr)obj, sizeof(RcmServer_Object));
824     *handlePtr = NULL;
826     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
827     return(status);
829 #undef FXNN
832 /*
833  *  ======== RcmServer_destruct ========
834  */
835 #define FXNN "RcmServer_destruct"
836 Int RcmServer_destruct(RcmServer_Struct *structPtr)
838     RcmServer_Object *obj = (RcmServer_Object *)(structPtr);
839     Int status = RcmClient_S_SUCCESS;
842     Log_print1(Diags_ENTRY, "--> "FXNN": (structPtr=0x%x)", (IArg)structPtr);
843     Log_print1(Diags_LIFECYCLE, FXNN": instance destruct: 0x%x", (IArg)obj);
845     /* finalize the object */
846     status = RcmServer_Instance_finalize_P(obj);
848     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
849     return(status);
851 #undef FXNN
854 /*
855  *  ======== RcmServer_Instance_finalize_P ========
856  */
857 #define FXNN "RcmServer_Instance_finalize_P"
858 Int RcmServer_Instance_finalize_P(RcmServer_Object *obj)
860     Int i, j;
861     Int size;
862     Char *cp;
863     UInt tabCount;
864     RcmServer_FxnTabElem *fdp;
865     Error_Block eb;
866     RcmServer_ThreadPool *poolAry;
867     RcmServer_WorkerThread *worker;
868     List_Elem *elem;
869     List_Handle listH;
870     List_Handle msgQueH;
871     RcmClient_Packet *packet;
872 #if USE_RPMESSAGE == 0
873     MessageQ_Msg msgqMsg;
874 #endif
875     SemThread_Handle semThreadH;
876     RcmServer_JobStream *job;
877     Int rval;
878     Int status = RcmClient_S_SUCCESS;
880     Log_print1(Diags_ENTRY, "--> "FXNN": (obj=0x%x)", (IArg)obj);
882     /* must initialize the error block before using it */
883     Error_init(&eb);
885     /* block until server thread exits */
886     obj->shutdown = TRUE;
888     if (obj->serverThread != NULL) {
889 #if USE_RPMESSAGE
890         RPMessage_unblock(obj->serverQue);
891 #else
892         MessageQ_unblock(obj->serverQue);
893 #endif
894         Thread_join(obj->serverThread, &eb);
896         if (Error_check(&eb)) {
897             Log_error0(FXNN": server thread did not exit properly");
898             status = RcmServer_E_FAIL;
899             goto leave;
900         }
901     }
903     /* delete any remaining job objects (there should not be any) */
904     while ((elem = List_get(obj->jobList)) != NULL) {
905         job = (RcmServer_JobStream *)elem;
907         /* return any remaining messages (there should not be any) */
908         msgQueH = List_handle(&job->msgQue);
910         while ((elem = List_get(msgQueH)) != NULL) {
911             packet = (RcmClient_Packet *)elem;
912             Log_warning2(
913                 FXNN": returning unprocessed message, jobId=0x%x, packet=0x%x",
914                 (IArg)job->jobId, (IArg)packet);
916             RcmServer_setStatusCode_I(packet, RcmServer_Status_Unprocessed);
917 #if USE_RPMESSAGE
918             packet->hdr.type = OMX_RAW_MSG;
919             packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
920             rval = RPMessage_send(obj->dstProc, obj->replyAddr,
921                                  obj->localAddr, (Ptr)&packet->hdr,
922                                  PACKET_HDR_SIZE + packet->message.dataSize);
923 #else
924             msgqMsg = &packet->msgqHeader;
925             rval = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
926 #endif
927             if (rval < 0) {
928                 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)rval);
929             }
930         }
932         /* finalize the job stream object */
933         List_destruct(&job->msgQue);
935         xdc_runtime_Memory_free(RcmServer_Module_heap(),
936             (Ptr)job, sizeof(RcmServer_JobStream));
937     }
938     List_delete(&(obj->jobList));
940     /* convenience alias */
941     poolAry = obj->poolMap[0];
943     /* free all the static pool resources */
944     for (i = 0; i < obj->poolMap0Len; i++) {
946         /* free all the worker thread objects */
947         listH = List_handle(&(poolAry[i].threadList));
949         /* mark each worker thread for termination */
950         elem = NULL;
951         while ((elem = List_next(listH, elem)) != NULL) {
952             worker = (RcmServer_WorkerThread *)elem;
953             worker->terminate = TRUE;
954         }
956         /* unblock each worker thread so it can terminate */
957         elem = NULL;
958         while ((elem = List_next(listH, elem)) != NULL) {
959             Semaphore_post(poolAry[i].sem, &eb);
961             if (Error_check(&eb)) {
962                 Log_error0(FXNN": post failed on thread");
963                 status = RcmServer_E_FAIL;
964                 goto leave;
965             }
966         }
968         /* wait for each worker thread to terminate */
969         elem = NULL;
970         while ((elem = List_get(listH)) != NULL) {
971             worker = (RcmServer_WorkerThread *)elem;
973             Thread_join(worker->thread, &eb);
975             if (Error_check(&eb)) {
976                 Log_error1(
977                     FXNN": worker thread did not exit properly, thread=0x%x",
978                     (IArg)worker->thread);
979                 status = RcmServer_E_FAIL;
980                 goto leave;
981             }
983             Thread_delete(&worker->thread);
985             /* free the worker thread object */
986             xdc_runtime_Memory_free(RcmServer_Module_heap(), (Ptr)worker,
987                 sizeof(RcmServer_WorkerThread));
988         }
990         /* free up pool resources */
991         semThreadH = SemThread_Handle_downCast(poolAry[i].sem);
992         SemThread_delete(&semThreadH);
993         List_destruct(&(poolAry[i].threadList));
995         /* return any remaining messages on the readyQueue */
996         msgQueH = List_handle(&poolAry[i].readyQueue);
998         while ((elem = List_get(msgQueH)) != NULL) {
999             packet = (RcmClient_Packet *)elem;
1000             Log_warning2(
1001                 FXNN": returning unprocessed message, msgId=0x%x, packet=0x%x",
1002                 (IArg)packet->msgId, (IArg)packet);
1004             RcmServer_setStatusCode_I(packet, RcmServer_Status_Unprocessed);
1005 #if USE_RPMESSAGE
1006             packet->hdr.type = OMX_RAW_MSG;
1007             packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1008             rval = RPMessage_send(obj->dstProc, obj->replyAddr,
1009                                  obj->localAddr, (Ptr)&packet->hdr,
1010                                  PACKET_HDR_SIZE + packet->message.dataSize);
1011 #else
1012             msgqMsg = &packet->msgqHeader;
1013             rval = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1014 #endif
1015             if (rval < 0) {
1016                 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)rval);
1017             }
1018         }
1020         List_destruct(&(poolAry[i].readyQueue));
1021     }
1023     /* free the name block for the static pools */
1024     if ((obj->poolMap[0] != NULL) && (obj->poolMap[0]->name != NULL)) {
1025         cp = obj->poolMap[0]->name;
1026         cp -= sizeof(SizeT);
1027         xdc_runtime_Memory_free(RcmServer_Module_heap(), (Ptr)cp, *(SizeT *)cp);
1028     }
1030     /* free the static worker pool table */
1031     if (obj->poolMap[0] != NULL) {
1032         xdc_runtime_Memory_free(RcmServer_Module_heap(), (Ptr)(obj->poolMap[0]),
1033             obj->poolMap0Len * sizeof(RcmServer_ThreadPool));
1034         obj->poolMap[0] = NULL;
1035     }
1037 #if 0
1038     /* free all dynamic worker pools */
1039     for (p = 1; p < RcmServer_POOL_MAP_LEN; p++) {
1040         if ((poolAry = obj->poolMap[p]) == NULL) {
1041             continue;
1042         }
1043     }
1044 #endif
1046     /* free up the dynamic function tables and any leftover name strings */
1047     for (i = 1; i < RcmServer_MAX_TABLES; i++) {
1048         if (obj->fxnTab[i] != NULL) {
1049             tabCount = (1 << (i + 4));
1050             for (j = 0; j < tabCount; j++) {
1051                 if (((obj->fxnTab[i])+j)->name != NULL) {
1052                     cp = ((obj->fxnTab[i])+j)->name;
1053                     size = _strlen(cp) + 1;
1054                     xdc_runtime_Memory_free(RcmServer_Module_heap(), cp, size);
1055                 }
1056             }
1057             fdp = obj->fxnTab[i];
1058             size = tabCount * sizeof(RcmServer_FxnTabElem);
1059             xdc_runtime_Memory_free(RcmServer_Module_heap(), fdp, size);
1060             obj->fxnTab[i] = NULL;
1061         }
1062     }
1064     if (NULL != obj->serverThread) {
1065         Thread_delete(&obj->serverThread);
1066     }
1068     if (NULL != obj->serverQue) {
1069 #if USE_RPMESSAGE
1070         RPMessage_delete(&obj->serverQue);
1071 #else
1072         MessageQ_delete(&obj->serverQue);
1073 #endif
1074     }
1076     if (NULL != obj->run) {
1077         SemThread_delete((SemThread_Handle *)(&obj->run));
1078     }
1080     /* free the name block for the static function table */
1081     if ((NULL != obj->fxnTabStatic.elem) &&
1082         (NULL != obj->fxnTabStatic.elem[0].name)) {
1083         for (size = 0, i = 0; i < obj->fxnTabStatic.length; i++) {
1084             size += _strlen(obj->fxnTabStatic.elem[i].name) + 1;
1085         }
1086         xdc_runtime_Memory_free(
1087             RcmServer_Module_heap(),
1088             obj->fxnTabStatic.elem[0].name, size);
1089     }
1091     /* free the static function table */
1092     if (NULL != obj->fxnTabStatic.elem) {
1093         xdc_runtime_Memory_free(RcmServer_Module_heap(),
1094             obj->fxnTabStatic.elem,
1095             obj->fxnTabStatic.length * sizeof(RcmServer_FxnTabElem));
1096     }
1098     /* destruct the instance gate */
1099     GateThread_destruct(&obj->gate);
1102 leave:
1103     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1104     return(status);
1106 #undef FXNN
1109 /*
1110  *  ======== RcmServer_addSymbol ========
1111  */
1112 #define FXNN "RcmServer_addSymbol"
1113 Int RcmServer_addSymbol(RcmServer_Object *obj, String funcName,
1114         RcmServer_MsgFxn addr, UInt32 *index)
1116     GateThread_Handle gateH;
1117     IArg key;
1118     Int len;
1119     UInt i, j;
1120     UInt tabCount;
1121     SizeT tabSize;
1122     UInt32 fxnIdx = 0xFFFF;
1123     RcmServer_FxnTabElem *slot = NULL;
1124     Error_Block eb;
1125     Int status = RcmServer_S_SUCCESS;
1128     Log_print3(Diags_ENTRY,
1129         "--> "FXNN": (obj=0x%x, name=0x%x, addr=0x%x)",
1130         (IArg)obj, (IArg)funcName, (IArg)addr);
1132     Error_init(&eb);
1134     /* protect the symbol table while changing it */
1135     gateH = GateThread_handle(&obj->gate);
1136     key = GateThread_enter(gateH);
1138     /* look for an empty slot to use */
1139     for (i = 1; i < RcmServer_MAX_TABLES; i++) {
1140         if (obj->fxnTab[i] != NULL) {
1141             for (j = 0; j < (1 << (i + 4)); j++) {
1142                 if (((obj->fxnTab[i])+j)->addr.fxn == 0) {
1143                     slot = (obj->fxnTab[i]) + j;  /* found empty slot*/
1144                     break;
1145                 }
1146             }
1147         }
1148         else {
1149             /* all previous tables are full, allocate a new table */
1150             tabCount = (1 << (i + 4));
1151             tabSize = tabCount * sizeof(RcmServer_FxnTabElem);
1152             obj->fxnTab[i] = (RcmServer_FxnTabElem *)xdc_runtime_Memory_alloc(
1153                 RcmServer_Module_heap(), tabSize, sizeof(Ptr), &eb);
1155             if (Error_check(&eb)) {
1156                 Log_error0(FXNN": unable to allocate new function table");
1157                 obj->fxnTab[i] = NULL;
1158                 status = RcmServer_E_NOMEMORY;
1159                 goto leave;
1160             }
1162             /* initialize the new table */
1163             for (j = 0; j < tabCount; j++) {
1164                 ((obj->fxnTab[i])+j)->addr.fxn = 0;
1165                 ((obj->fxnTab[i])+j)->name = NULL;
1166                 ((obj->fxnTab[i])+j)->key = 0;
1167             }
1169             /* use first slot in new table */
1170             j = 0;
1171             slot = (obj->fxnTab[i])+j;
1172         }
1174         /* if new slot found, break out of loop */
1175         if (slot != NULL) {
1176             break;
1177         }
1178     }
1180     /* insert new symbol into slot */
1181     if (slot != NULL) {
1182         slot->addr.fxn = addr;
1183         len = _strlen(funcName) + 1;
1184         slot->name = (String)xdc_runtime_Memory_alloc(
1185             RcmServer_Module_heap(), len, 0, &eb);
1187         if (Error_check(&eb)) {
1188             Log_error0(FXNN": unable to allocate function name");
1189             slot->name = NULL;
1190             status = RcmServer_E_NOMEMORY;
1191             goto leave;
1192         }
1194         _strcpy(slot->name, funcName);
1195         slot->key = RcmServer_getNextKey_P(obj);
1196         fxnIdx = ((UInt32)(slot->key) << _RCM_KeyShift) | (i << 12) | j;
1197     }
1199     /* error, no more room to add new symbol */
1200     else {
1201         Log_error0(FXNN": cannot add symbol, table is full");
1202         status = RcmServer_E_SYMBOLTABLEFULL;
1203         goto leave;
1204     }
1207 leave:
1208     GateThread_leave(gateH, key);
1210     /* on success, return new function index */
1211     if (status >= 0) {
1212         *index = fxnIdx;
1213     }
1214     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1215     return(status);
1217 #undef FXNN
1220 /*
1221  *  ======== RcmServer_removeSymbol ========
1222  */
1223 #define FXNN "RcmServer_removeSymbol"
1224 Int RcmServer_removeSymbol(RcmServer_Object *obj, String name)
1226     GateThread_Handle gateH;
1227     IArg key;
1228     UInt32 fxnIdx;
1229     UInt tabIdx, tabOff;
1230     RcmServer_FxnTabElem *slot;
1231     Int status = RcmServer_S_SUCCESS;
1234     Log_print2(Diags_ENTRY,
1235         "--> "FXNN": (obj=0x%x, name=0x%x)", (IArg)obj, (IArg)name);
1237     /* protect the symbol table while changing it */
1238     gateH = GateThread_handle(&obj->gate);
1239     key = GateThread_enter(gateH);
1241     /* find the symbol in the table */
1242     status = RcmServer_getSymIdx_P(obj, name, &fxnIdx);
1244     if (status < 0) {
1245         Log_error0(FXNN": given symbol not found");
1246         goto leave;
1247     }
1249     /* static symbols have bit-31 set, cannot remove these symbols */
1250     if (fxnIdx & 0x80000000) {
1251         Log_error0(FXNN": cannot remove static symbol");
1252         status = RcmServer_E_SYMBOLSTATIC;
1253         goto leave;
1254     }
1256     /* get slot pointer */
1257     tabIdx = (fxnIdx & 0xF000) >> 12;
1258     tabOff = (fxnIdx & 0xFFF);
1259     slot = (obj->fxnTab[tabIdx]) + tabOff;
1261     /* clear the table index */
1262     slot->addr.fxn = 0;
1263     if (slot->name != NULL) {
1264         xdc_runtime_Memory_free(
1265             RcmServer_Module_heap(), slot->name, _strlen(slot->name) + 1);
1266         slot->name = NULL;
1267     }
1268     slot->key = 0;
1270 leave:
1271     GateThread_leave(gateH, key);
1272     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1273     return(status);
1275 #undef FXNN
1278 /*
1279  *  ======== RcmServer_start ========
1280  */
1281 #define FXNN "RcmServer_start"
1282 Int RcmServer_start(RcmServer_Object *obj)
1284     Error_Block eb;
1285     Int status = RcmServer_S_SUCCESS;
1288     Log_print1(Diags_ENTRY, "--> "FXNN": (obj=0x%x)", (IArg)obj);
1290     Error_init(&eb);
1292     /* unblock the server thread */
1293     Semaphore_post(obj->run, &eb);
1295     if (Error_check(&eb)) {
1296         Log_error0(FXNN": semaphore post failed");
1297         status = RcmServer_E_FAIL;
1298     }
1300     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1301     return(status);
1303 #undef FXNN
1306 /*
1307  *  ======== RcmServer_acqJobId_P ========
1308  */
1309 #define FXNN "RcmServer_acqJobId_P"
1310 Int RcmServer_acqJobId_P(RcmServer_Object *obj, UInt16 *jobIdPtr)
1312     Error_Block eb;
1313     GateThread_Handle gateH;
1314     IArg key;
1315     Int count;
1316     UInt16 jobId;
1317     List_Elem *elem;
1318     RcmServer_JobStream *job;
1319     Int status = RcmServer_S_SUCCESS;
1322     Log_print2(Diags_ENTRY,
1323         "--> "FXNN": (obj=0x%x, jobIdPtr=0x%x)", (IArg)obj, (IArg)jobIdPtr);
1325     Error_init(&eb);
1326     gateH = GateThread_handle(&obj->gate);
1328     /* enter critical section */
1329     key = GateThread_enter(gateH);
1331     /* compute new job id */
1332     for (count = 0xFFFF; count > 0; count--) {
1334         /* generate a new job id */
1335         jobId = (obj->jobId == 0xFFFF ? obj->jobId = 1 : ++(obj->jobId));
1337         /* verify job id is not in use */
1338         elem = NULL;
1339         while ((elem = List_next(obj->jobList, elem)) != NULL) {
1340             job = (RcmServer_JobStream *)elem;
1341             if (jobId == job->jobId) {
1342                 jobId = RcmClient_DISCRETEJOBID;
1343                 break;
1344             }
1345         }
1347         if (jobId != RcmClient_DISCRETEJOBID) {
1348             break;
1349         }
1350     }
1352     /* check if job id was acquired */
1353     if (jobId == RcmClient_DISCRETEJOBID) {
1354         *jobIdPtr = RcmClient_DISCRETEJOBID;
1355         Log_error0(FXNN": no job id available");
1356         status = RcmServer_E_FAIL;
1357         GateThread_leave(gateH, key);
1358         goto leave;
1359     }
1361     /* create a new job steam object */
1362     job = xdc_runtime_Memory_alloc(RcmServer_Module_heap(),
1363         sizeof(RcmServer_JobStream), sizeof(Ptr), &eb);
1365     if (Error_check(&eb)) {
1366         Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
1367             (IArg)RcmServer_Module_heap(), sizeof(RcmServer_JobStream));
1368         status = RcmServer_E_NOMEMORY;
1369         GateThread_leave(gateH, key);
1370         goto leave;
1371     }
1373     /* initialize new job stream object */
1374     job->jobId = jobId;
1375     job->empty = TRUE;
1376     List_construct(&(job->msgQue), NULL);
1378     /* put new job stream object at end of server list */
1379     List_put(obj->jobList, (List_Elem *)job);
1381     /* leave critical section */
1382     GateThread_leave(gateH, key);
1384     /* return new job id */
1385     *jobIdPtr = jobId;
1388 leave:
1389     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1390     return(status);
1392 #undef FXNN
1395 /*
1396  *  ======== RcmServer_dispatch_P ========
1397  *
1398  *  Return Value
1399  *      < 0: error
1400  *        0: success, job stream queue
1401  *      > 0: success, ready queue
1402  *
1403  *  Pool id description
1404  *
1405  *  Static Worker Pools
1406  *  --------------------------------------------------------------------
1407  *  15      1 = static pool
1408  *  14:8    reserved
1409  *  7:0     offset: 0 - 255
1410  *
1411  *  Dynamic Worker Pools
1412  *  --------------------------------------------------------------------
1413  *  15      0 = dynamic pool
1414  *  14:7    key: 0 - 255
1415  *  6:5     index: 1 - 3
1416  *  4:0     offset: 0 - [7, 15, 31]
1417  */
1418 #define FXNN "RcmServer_dispatch_P"
1419 Int RcmServer_dispatch_P(RcmServer_Object *obj, RcmClient_Packet *packet)
1421     GateThread_Handle gateH;
1422     IArg key;
1423     List_Elem *elem;
1424     List_Handle listH;
1425     RcmServer_ThreadPool *pool;
1426     UInt16 jobId;
1427     RcmServer_JobStream *job;
1428     Error_Block eb;
1429     Int status = RcmServer_S_SUCCESS;
1432     Log_print2(Diags_ENTRY,
1433         "--> "FXNN": (obj=0x%x, packet=0x%x)", (IArg)obj, (IArg)packet);
1435     Error_init(&eb);
1437     /* get the target pool id from the message */
1438     status = RcmServer_getPool_P(obj, packet, &pool);
1440     if (status < 0) {
1441         goto leave;
1442     }
1444     System_printf("Rcm dispatch: p:%d j:%d f:%d l:%d\n",
1445                   packet->message.poolId, packet->message.jobId,
1446                   packet->message.fxnIdx, packet->message.dataSize);
1448     /* discrete jobs go on the end of the ready queue */
1449     jobId = packet->message.jobId;
1451     if (jobId == RcmClient_DISCRETEJOBID) {
1452         listH = List_handle(&pool->readyQueue);
1453         List_put(listH, (List_Elem *)packet);
1455         /* dispatch a new worker thread */
1456         Semaphore_post(pool->sem, &eb);
1458         if (Error_check(&eb)) {
1459             Log_error0(FXNN": semaphore post failed");
1460         }
1461     }
1463     /* must be a job stream message */
1464     else {
1465         /* must protect job list while searching it */
1466         gateH = GateThread_handle(&obj->gate);
1467         key = GateThread_enter(gateH);
1469         /* find the job stream object in the list */
1470         elem = NULL;
1471         while ((elem = List_next(obj->jobList, elem)) != NULL) {
1472             job = (RcmServer_JobStream *)elem;
1473             if (job->jobId == jobId) {
1474                 break;
1475             }
1476         }
1478         if (elem == NULL) {
1479             Log_error1(FXNN": failed to find jobId=%d", (IArg)jobId);
1480             status = RcmServer_E_JobIdNotFound;
1481         }
1483         /* if job object is empty, place message directly on ready queue */
1484         else if (job->empty) {
1485             job->empty = FALSE;
1486             listH = List_handle(&pool->readyQueue);
1487             List_put(listH, (List_Elem *)packet);
1489             /* dispatch a new worker thread */
1490             Semaphore_post(pool->sem, &eb);
1492             if (Error_check(&eb)) {
1493                 Log_error0(FXNN": semaphore post failed");
1494             }
1495         }
1497         /* place message on job queue */
1498         else {
1499             listH = List_handle(&job->msgQue);
1500             List_put(listH, (List_Elem *)packet);
1501         }
1503         GateThread_leave(gateH, key);
1504     }
1507 leave:
1508     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1509     return(status);
1511 #undef FXNN
1514 /*
1515  *  ======== RcmServer_execMsg_I ========
1516  */
1517 Int RcmServer_execMsg_I(RcmServer_Object *obj, RcmClient_Message *msg)
1519     RcmServer_MsgFxn fxn;
1520 #if USE_RPMESSAGE
1521     RcmServer_MsgCreateFxn createFxn = NULL;
1522 #endif
1523     Int status;
1525     status = RcmServer_getFxnAddr_P(obj, msg->fxnIdx, &fxn, &createFxn);
1527     if (status >= 0) {
1528 #if 0
1529         System_printf("RcmServer_execMsg_I: Calling fxnIdx: %d\n",
1530                       (msg->fxnIdx & 0x0000FFFF));
1531 #endif
1532         Task_setEnv(Task_self(), (Ptr)RcmServer_getLocalAddress(obj));
1533 #if USE_RPMESSAGE
1534         if (createFxn)  {
1535             msg->result = (*createFxn)(obj, msg->dataSize, msg->data);
1536         }
1537         else {
1538             msg->result = (*fxn)(msg->dataSize, msg->data);
1539         }
1540 #else
1541         msg->result = (*fxn)(msg->dataSize, msg->data);
1542 #endif
1543         Task_setEnv(Task_self(), NULL);
1544     }
1546     return(status);
1550 /*
1551  *  ======== RcmServer_getFxnAddr_P ========
1552  *
1553  *  The function index (fxnIdx) uses the following format. Note that the
1554  *  format differs for static vs. dynamic functions. All static functions
1555  *  are in fxnTab[0], all dynamic functions are in fxnTab[1 - 8].
1556  *
1557  *  Bits    Description
1558  *
1559  *  Static Function Index
1560  *  --------------------------------------------------------------------
1561  *  31      static/dynamic function flag
1562  *              0 = dynamic function
1563  *              1 = static function
1564  *  30:20   reserved
1565  *  19:16   reserved
1566  *  15:0    offset: 0 - 65,535
1567  *
1568  *  Dynamic Function Index
1569  *  --------------------------------------------------------------------
1570  *  31      static/dynamic function flag
1571  *              0 = dynamic function
1572  *              1 = static function
1573  *  30:20   key
1574  *  19:16   reserved
1575  *  15:12   index: 1 - 8
1576  *  11:0    offset: 0 - [31, 63, 127, 255, 511, 1023, 2047, 4095]
1577  */
1578 #define FXNN "RcmServer_getFxnAddr_P"
1579 Int RcmServer_getFxnAddr_P(RcmServer_Object *obj, UInt32 fxnIdx,
1580         RcmServer_MsgFxn *addrPtr, RcmServer_MsgCreateFxn *createPtr)
1582     UInt i, j;
1583     UInt16 key;
1584     RcmServer_FxnTabElem *slot;
1585     RcmServer_MsgFxn addr = NULL;
1586     RcmServer_MsgCreateFxn createAddr = NULL;
1587     Int status = RcmServer_S_SUCCESS;
1589     /* static functions have bit-31 set */
1590     if (fxnIdx & 0x80000000) {
1591         j = (fxnIdx & 0x0000FFFF);
1592         if (j < (obj->fxnTabStatic.length)) {
1594             /* fetch the function address from the table */
1595             slot = (obj->fxnTab[0])+j;
1596             if (j == 0)  {
1597                  createAddr = slot->addr.createFxn;
1598             }
1599             else {
1600                  addr = slot->addr.fxn;
1601             }
1602         }
1603         else {
1604             Log_error1(FXNN": invalid function index 0x%x", (IArg)fxnIdx);
1605             status = RcmServer_E_InvalidFxnIdx;
1606             goto leave;
1607         }
1608     }
1609     /* must be a dynamic function */
1610     else {
1611         /* extract the key from the function index */
1612         key = (fxnIdx & _RCM_KeyMask) >> _RCM_KeyShift;
1614         i = (fxnIdx & 0xF000) >> 12;
1615         if ((i > 0) && (i < RcmServer_MAX_TABLES) && (obj->fxnTab[i] != NULL)) {
1617             /* fetch the function address from the table */
1618             j = (fxnIdx & 0x0FFF);
1619             slot = (obj->fxnTab[i])+j;
1620             addr = slot->addr.fxn;
1622             /* validate the key */
1623             if (key != slot->key) {
1624                 Log_error1(FXNN": invalid function index 0x%x", (IArg)fxnIdx);
1625                 status = RcmServer_E_InvalidFxnIdx;
1626                 goto leave;
1627             }
1628         }
1629         else {
1630             Log_error1(FXNN": invalid function index 0x%x", (IArg)fxnIdx);
1631             status = RcmServer_E_InvalidFxnIdx;
1632             goto leave;
1633         }
1634     }
1636 leave:
1637     if (status >= 0) {
1638        if (j == 0)  {
1639            *createPtr = createAddr;
1640        }
1641        else {
1642            *addrPtr = addr;
1643        }
1644     }
1645     return(status);
1647 #undef FXNN
1650 /* *  ======== RcmServer_getSymIdx_P ========
1651  *
1652  *  Must have table gate before calling this function.
1653  */
1654 #define FXNN "RcmServer_getSymIdx_P"
1655 Int RcmServer_getSymIdx_P(RcmServer_Object *obj, String name, UInt32 *index)
1657     UInt i, j, len;
1658     RcmServer_FxnTabElem *slot;
1659     UInt32 fxnIdx = 0xFFFFFFFF;
1660     Int status = RcmServer_S_SUCCESS;
1663     Log_print3(Diags_ENTRY,
1664         "--> "FXNN": (obj=0x%x, name=0x%x, index=0x%x)",
1665         (IArg)obj, (IArg)name, (IArg)index);
1667     /* search tables for given function name */
1668     for (i = 0; i < RcmServer_MAX_TABLES; i++) {
1669         if (obj->fxnTab[i] != NULL) {
1670             len = (i == 0) ? obj->fxnTabStatic.length : (1 << (i + 4));
1671             for (j = 0; j < len; j++) {
1672                 slot = (obj->fxnTab[i]) + j;
1673                 if ((((obj->fxnTab[i])+j)->name != NULL) &&
1674                     (_strcmp(((obj->fxnTab[i])+j)->name, name) == 0)) {
1675                     /* found function name */
1676                     if (i == 0) {
1677                         fxnIdx = 0x80000000 | j;
1678                     } else {
1679                         fxnIdx = ((UInt32)(slot->key) << _RCM_KeyShift) |
1680                                 (i << 12) | j;
1681                     }
1682                     break;
1683                 }
1684             }
1685         }
1687         if (0xFFFFFFFF != fxnIdx) {
1688             break;
1689         }
1690     }
1692     /* log an error if the symbol was not found */
1693     if (fxnIdx == 0xFFFFFFFF) {
1694         Log_error0(FXNN": given symbol not found");
1695         status = RcmServer_E_SYMBOLNOTFOUND;
1696     }
1698     /* if success, return symbol index */
1699     if (status >= 0) {
1700         *index = fxnIdx;
1701     }
1703     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1704     return(status);
1706 #undef FXNN
1709 /*
1710  *  ======== RcmServer_getNextKey_P ========
1711  */
1712 UInt16 RcmServer_getNextKey_P(RcmServer_Object *obj)
1714     GateThread_Handle gateH;
1715     IArg gateKey;
1716     UInt16 key;
1719     gateH = GateThread_handle(&obj->gate);
1720     gateKey = GateThread_enter(gateH);
1722     if (obj->key <= 1) {
1723         obj->key = _RCM_KeyResetValue;  /* don't use 0 as a key value */
1724     }
1725     else {
1726         (obj->key)--;
1727     }
1728     key = obj->key;
1730     GateThread_leave(gateH, gateKey);
1732     return(key);
1736 /*
1737  *  ======== RcmServer_getPool_P ========
1738  */
1739 #define FXNN "RcmServer_getPool_P"
1740 Int RcmServer_getPool_P(RcmServer_Object *obj,
1741         RcmClient_Packet *packet, RcmServer_ThreadPool **poolP)
1743     UInt16 poolId;
1744     UInt16 offset;
1745     Int status = RcmServer_S_SUCCESS;
1748     poolId = packet->message.poolId;
1750     /* static pools have bit-15 set */
1751     if (poolId & 0x8000) {
1752         offset = (poolId & 0x00FF);
1753         if (offset < obj->poolMap0Len) {
1754             *poolP = &(obj->poolMap[0])[offset];
1755         }
1756         else {
1757             Log_error1(FXNN": pool id=0x%x not found", (IArg)poolId);
1758             *poolP = NULL;
1759             status = RcmServer_E_PoolIdNotFound;
1760             goto leave;
1761         }
1762     }
1764 leave:
1765     return(status);
1767 #undef FXNN
1770 /*
1771  *  ======== RcmServer_process_P ========
1772  */
1773 #define FXNN "RcmServer_process_P"
1774 Void RcmServer_process_P(RcmServer_Object *obj, RcmClient_Packet *packet)
1776     String name;
1777     UInt32 fxnIdx;
1778     RcmServer_MsgFxn fxn;
1779     RcmClient_Message *rcmMsg;
1780 #if USE_RPMESSAGE
1781     RcmServer_MsgCreateFxn createFxn = NULL;
1782 #endif
1783 #if USE_RPMESSAGE == 0
1784     MessageQ_Msg msgqMsg;
1785 #endif
1786     UInt16 messageType;
1787     Error_Block eb;
1788     UInt16 jobId;
1789     Int rval;
1790     Int status = RcmServer_S_SUCCESS;
1793     Log_print2(Diags_ENTRY,
1794         "--> "FXNN": (obj=0x%x, packet=0x%x)", (IArg)obj, (IArg)packet);
1796     Error_init(&eb);
1798     /* decode the message */
1799     rcmMsg = &packet->message;
1800 #if USE_RPMESSAGE == 0
1801     msgqMsg = &packet->msgqHeader;
1802 #endif
1803     Log_print1(Diags_INFO, FXNN": message desc=0x%x", (IArg)packet->desc);
1805     /* extract the message type from the packet descriptor field */
1806     messageType = (RcmClient_Desc_TYPE_MASK & packet->desc) >>
1807         RcmClient_Desc_TYPE_SHIFT;
1809     /* process the given message */
1810     switch (messageType) {
1812         case RcmClient_Desc_RCM_MSG:
1813             rval = RcmServer_execMsg_I(obj, rcmMsg);
1815             if (rval < 0) {
1816                 switch (rval) {
1817                     case RcmServer_E_InvalidFxnIdx:
1818                         RcmServer_setStatusCode_I(
1819                             packet, RcmServer_Status_INVALID_FXN);
1820                         break;
1821                     default:
1822                         RcmServer_setStatusCode_I(
1823                             packet, RcmServer_Status_Error);
1824                         break;
1825                 }
1826             }
1827             else if (rcmMsg->result < 0) {
1828                 RcmServer_setStatusCode_I(packet, RcmServer_Status_MSG_FXN_ERR);
1829             }
1830             else {
1831                 RcmServer_setStatusCode_I(packet, RcmServer_Status_SUCCESS);
1832             }
1834 #if USE_RPMESSAGE
1835 #if 0
1836             System_printf("RcmServer_process_P: Sending reply from: %d to: %d\n",
1837                       obj->localAddr, obj->replyAddr);
1838 #endif
1840             packet->hdr.type = OMX_RAW_MSG;
1841             packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1842             status = RPMessage_send(obj->dstProc, obj->replyAddr,
1843                                  obj->localAddr, (Ptr)&packet->hdr,
1844                                  PACKET_HDR_SIZE + packet->message.dataSize);
1845 #else
1846             status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1847 #endif
1848             if (status < 0) {
1849                 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
1850             }
1851             break;
1853         case RcmClient_Desc_CMD:
1854             status = RcmServer_execMsg_I(obj, rcmMsg);
1856             /* if all went well, free the message */
1857             if ((status >= 0) && (rcmMsg->result >= 0)) {
1859 #if USE_RPMESSAGE == 0
1860                 status = MessageQ_free(msgqMsg);
1861 #endif
1862                 if (status < 0) {
1863                     Log_error1(
1864                         FXNN": MessageQ_free returned error %d", (IArg)status);
1865                 }
1866             }
1868             /* an error occurred, must return message to client */
1869             else {
1870                 if (status < 0) {
1871                     /* error trying to process the message */
1872                     switch (status) {
1873                         case RcmServer_E_InvalidFxnIdx:
1874                             RcmServer_setStatusCode_I(
1875                                 packet, RcmServer_Status_INVALID_FXN);
1876                             break;
1877                         default:
1878                             RcmServer_setStatusCode_I(
1879                                 packet, RcmServer_Status_Error);
1880                             break;
1881                     }
1882                 }
1883                 else  {
1884                     /* error in message function */
1885                     RcmServer_setStatusCode_I(
1886                         packet, RcmServer_Status_MSG_FXN_ERR);
1887                 }
1889                 /* send error message back to client */
1890 #if USE_RPMESSAGE
1891                 packet->hdr.type = OMX_RAW_MSG;
1892                 packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1893                 status = RPMessage_send(obj->dstProc, obj->replyAddr,
1894                                  obj->localAddr, (Ptr)&packet->hdr,
1895                                  PACKET_HDR_SIZE + packet->message.dataSize);
1896 #else
1897                 status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1898 #endif
1899                 if (status < 0) {
1900                     Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
1901                 }
1902             }
1903             break;
1905         case RcmClient_Desc_DPC:
1906             rval = RcmServer_getFxnAddr_P(obj, rcmMsg->fxnIdx, &fxn,
1907                                           &createFxn);
1909             if (rval < 0) {
1910                 RcmServer_setStatusCode_I(
1911                     packet, RcmServer_Status_SYMBOL_NOT_FOUND);
1912                 Error_init(&eb);
1913             }
1915 #if USE_RPMESSAGE
1916             packet->hdr.type = OMX_RAW_MSG;
1917             packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1918             status = RPMessage_send(obj->dstProc, obj->replyAddr,
1919                                  obj->localAddr, (Ptr)&packet->hdr,
1920                                  PACKET_HDR_SIZE + packet->message.dataSize);
1921 #else
1922             status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1923 #endif
1924             if (status < 0) {
1925                 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
1926             }
1928             /* invoke the function with a null context */
1929 #if USE_RPMESSAGE
1930             if (createFxn)  {
1931                  (*createFxn)(obj, 0, NULL);
1932             }
1933             else {
1934                  (*fxn)(0, NULL);
1935             }
1936 #else
1937             (*fxn)(0, NULL);
1938 #endif
1939             break;
1941         case RcmClient_Desc_SYM_ADD:
1942             break;
1944         case RcmClient_Desc_SYM_IDX:
1945             name = (String)rcmMsg->data;
1946             rval = RcmServer_getSymIdx_P(obj, name, &fxnIdx);
1948             if (rval < 0) {
1949                 RcmServer_setStatusCode_I(
1950                     packet, RcmServer_Status_SYMBOL_NOT_FOUND);
1951             }
1952             else {
1953                 RcmServer_setStatusCode_I(packet, RcmServer_Status_SUCCESS);
1954                 rcmMsg->data[0] = fxnIdx;
1955                 rcmMsg->result = 0;
1956             }
1958 #if USE_RPMESSAGE
1959             packet->hdr.type = OMX_RAW_MSG;
1960             packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1961             status = RPMessage_send(obj->dstProc, obj->replyAddr,
1962                                  obj->localAddr, (Ptr)&packet->hdr,
1963                                  PACKET_HDR_SIZE + packet->message.dataSize);
1964 #else
1965             status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1966 #endif
1967             if (status < 0) {
1968                 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
1969             }
1970             break;
1972         case RcmClient_Desc_JOB_ACQ:
1973             rval = RcmServer_acqJobId_P(obj, &jobId);
1975             if (rval < 0) {
1976                 RcmServer_setStatusCode_I(packet, RcmServer_Status_Error);
1977             }
1978             else {
1979                 RcmServer_setStatusCode_I(packet, RcmServer_Status_SUCCESS);
1980                 *(UInt16 *)(&rcmMsg->data[0]) = jobId;
1981                 rcmMsg->result = 0;
1982             }
1984 #if USE_RPMESSAGE
1985             packet->hdr.type = OMX_RAW_MSG;
1986             packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1987             status = RPMessage_send(obj->dstProc, obj->replyAddr,
1988                                  obj->localAddr, (Ptr)&packet->hdr,
1989                                  PACKET_HDR_SIZE + packet->message.dataSize);
1990 #else
1991             status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1992 #endif
1993             if (status < 0) {
1994                 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
1995             }
1996             break;
1998         case RcmClient_Desc_JOB_REL:
1999             jobId = (UInt16)(rcmMsg->data[0]);
2000             rval = RcmServer_relJobId_P(obj, jobId);
2002             if (rval < 0) {
2003                 switch (rval) {
2004                     case RcmServer_E_JobIdNotFound:
2005                         RcmServer_setStatusCode_I(
2006                             packet, RcmServer_Status_JobNotFound);
2007                         break;
2008                     default:
2009                         RcmServer_setStatusCode_I(
2010                             packet, RcmServer_Status_Error);
2011                         break;
2012                 }
2013                 rcmMsg->result = rval;
2014             }
2015             else {
2016                 RcmServer_setStatusCode_I(packet, RcmServer_Status_SUCCESS);
2017                 rcmMsg->result = 0;
2018             }
2020 #if USE_RPMESSAGE
2021             packet->hdr.type = OMX_RAW_MSG;
2022             packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
2023             status = RPMessage_send(obj->dstProc, obj->replyAddr,
2024                                  obj->localAddr, (Ptr)&packet->hdr,
2025                                  PACKET_HDR_SIZE + packet->message.dataSize);
2026 #else
2027             status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
2028 #endif
2029             if (status < 0) {
2030                 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
2031             }
2032             break;
2034         default:
2035             Log_error1(FXNN": unknown message type recieved, 0x%x",
2036                 (IArg)messageType);
2037             break;
2038     }
2040     Log_print0(Diags_EXIT, "<-- "FXNN":");
2042 #undef FXNN
2045 /*
2046  *  ======== RcmServer_relJobId_P ========
2047  */
2048 #define FXNN "RcmServer_relJobId_P"
2049 Int RcmServer_relJobId_P(RcmServer_Object *obj, UInt16 jobId)
2051     GateThread_Handle gateH;
2052     IArg key;
2053     List_Elem *elem;
2054     List_Handle msgQueH;
2055     RcmClient_Packet *packet;
2056     RcmServer_JobStream *job;
2057 #if USE_RPMESSAGE == 0
2058     MessageQ_Msg msgqMsg;
2059 #endif
2060     Int rval;
2061     Int status = RcmServer_S_SUCCESS;
2064     Log_print2(Diags_ENTRY,
2065         "--> "FXNN": (obj=0x%x, jobId=0x%x)", (IArg)obj, (IArg)jobId);
2068     /* must protect job list while searching and modifying it */
2069     gateH = GateThread_handle(&obj->gate);
2070     key = GateThread_enter(gateH);
2072     /* find the job stream object in the list */
2073     elem = NULL;
2074     while ((elem = List_next(obj->jobList, elem)) != NULL) {
2075         job = (RcmServer_JobStream *)elem;
2077         /* remove the job stream object from the list */
2078         if (job->jobId == jobId) {
2079             List_remove(obj->jobList, elem);
2080             break;
2081         }
2082     }
2084     GateThread_leave(gateH, key);
2086     if (elem == NULL) {
2087         status = RcmServer_E_JobIdNotFound;
2088         Log_error1(FXNN": failed to find jobId=%d", (IArg)jobId);
2089         goto leave;
2090     }
2092     /* return any pending messages on the message queue */
2093     msgQueH = List_handle(&job->msgQue);
2095     while ((elem = List_get(msgQueH)) != NULL) {
2096         packet = (RcmClient_Packet *)elem;
2097         Log_warning2(
2098             FXNN": returning unprocessed message, jobId=0x%x, packet=0x%x",
2099             (IArg)jobId, (IArg)packet);
2101         RcmServer_setStatusCode_I(packet, RcmServer_Status_Unprocessed);
2103 #if USE_RPMESSAGE
2104         packet->hdr.type = OMX_RAW_MSG;
2105         packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
2106         rval = RPMessage_send(obj->dstProc, obj->replyAddr,
2107                                  obj->localAddr, (Ptr)&packet->hdr,
2108                                  PACKET_HDR_SIZE + packet->message.dataSize);
2109 #else
2110         msgqMsg = &packet->msgqHeader;
2111         rval = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
2112 #endif
2113         if (rval < 0) {
2114             Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)rval);
2115         }
2116     }
2118     /* finalize the job stream object */
2119     List_destruct(&job->msgQue);
2121     xdc_runtime_Memory_free(RcmServer_Module_heap(),
2122         (Ptr)job, sizeof(RcmServer_JobStream));
2125 leave:
2126     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
2127     return(status);
2129 #undef FXNN
2132 /*
2133  *  ======== RcmServer_serverThrFxn_P ========
2134  */
2135 #define FXNN "RcmServer_serverThrFxn_P"
2136 Void RcmServer_serverThrFxn_P(IArg arg)
2138     Error_Block eb;
2139     RcmClient_Packet *packet;
2140 #if USE_RPMESSAGE
2141     Char         recvBuf[MSGBUFFERSIZE];
2142     UInt16       len;
2143 #else
2144     MessageQ_Msg msgqMsg = NULL;
2145 #endif
2146     Int rval;
2147     Bool running = TRUE;
2148     RcmServer_Object *obj = (RcmServer_Object *)arg;
2149     Int dataSize;
2151 #if USE_RPMESSAGE
2152     packet = (RcmClient_Packet *)&recvBuf[0];
2153 #endif
2155     Log_print1(Diags_ENTRY, "--> "FXNN": (arg=0x%x)", arg);
2157     Error_init(&eb);
2159     /* wait until ready to run */
2160     Semaphore_pend(obj->run, Semaphore_FOREVER, &eb);
2162     if (Error_check(&eb)) {
2163         Log_error0(FXNN": Semaphore_pend failure in server thread");
2164     }
2166     /* main processing loop */
2167     while (running) {
2168         Log_print1(Diags_INFO,
2169             FXNN": waiting for message, thread=0x%x",
2170             (IArg)(obj->serverThread));
2172         /* block until message arrives */
2173         do {
2174 #if USE_RPMESSAGE
2175             rval = RPMessage_recv(obj->serverQue, (Ptr)&packet->hdr, &len,
2176                       &obj->replyAddr, RPMessage_FOREVER);
2177 #if 0
2178             System_printf("RcmServer_serverThrFxn_P: Received msg of len %d "
2179                           "from: %d\n",
2180                          len, obj->replyAddr);
2182             System_printf("hdr - t:%d l:%d\n", packet->hdr.type,
2183                           packet->hdr.len);
2185             System_printf("pkt - d:%d m:%d\n", packet->desc, packet->msgId);
2186 #endif
2187             if (packet->hdr.type == OMX_DISC_REQ) {
2188                 System_printf("RcmServer_serverThrFxn_P: Got OMX_DISCONNECT\n");
2189             }
2190             if (!obj->shutdown) {
2191                 Assert_isTrue((len <= MSGBUFFERSIZE), NULL);
2192                 Assert_isTrue((packet->hdr.type == OMX_RAW_MSG) ||
2193                               (packet->hdr.type == OMX_DISC_REQ) , NULL);
2194             }
2196             if ((rval < 0) && (rval != RPMessage_E_UNBLOCKED)) {
2197 #else
2198             rval = MessageQ_get(obj->serverQue, &msgqMsg, MessageQ_FOREVER);
2199             if ((rval < 0) && (rval != MessageQ_E_UNBLOCKED)) {
2200 #endif
2201                 Log_error1(FXNN": ipc error 0x%x", (IArg)rval);
2202                 /* keep running and hope for the best */
2203             }
2204 #if USE_RPMESSAGE
2205         } while (FALSE);
2206 #else
2207         } while ((msgqMsg == NULL) && !obj->shutdown);
2208 #endif
2210         /* if shutdown, exit this thread */
2211 #if USE_RPMESSAGE
2212         if (obj->shutdown || packet->hdr.type == OMX_DISC_REQ) {
2213             running = FALSE;
2214             Log_print1(Diags_INFO,
2215                 FXNN": terminating, thread=0x%x", (IArg)(obj->serverThread));
2216             continue;
2217         }
2218 #else
2219         if (obj->shutdown) {
2220             running = FALSE;
2221             Log_print1(Diags_INFO,
2222                 FXNN": terminating, thread=0x%x", (IArg)(obj->serverThread));
2223             if (msgqMsg == NULL ) {
2224                 continue;
2225             }
2226         }
2227 #endif
2229 #if USE_RPMESSAGE == 0
2230         packet = (RcmClient_Packet *)msgqMsg;
2231 #endif
2233         Log_print2(Diags_INFO,
2234             FXNN": message received, thread=0x%x packet=0x%x",
2235             (IArg)(obj->serverThread), (IArg)packet);
2237         if ((packet->message.poolId == RcmClient_DEFAULTPOOLID)
2238             && ((obj->poolMap[0])[0].count == 0)) {
2240             /* in-band (server thread) message processing */
2241             RcmServer_process_P(obj, packet);
2242         }
2243         else {
2244             /* out-of-band (worker thread) message processing */
2245             rval = RcmServer_dispatch_P(obj, packet);
2247             /* if error, message was not dispatched; must return to client */
2248             if (rval < 0) {
2249                 switch (rval) {
2250                     case RcmServer_E_JobIdNotFound:
2251                         RcmServer_setStatusCode_I(
2252                             packet, RcmServer_Status_JobNotFound);
2253                         break;
2255                     case RcmServer_E_PoolIdNotFound:
2256                         RcmServer_setStatusCode_I(
2257                             packet, RcmServer_Status_PoolNotFound);
2258                         break;
2260                     default:
2261                         RcmServer_setStatusCode_I(
2262                             packet, RcmServer_Status_Error);
2263                         break;
2264                 }
2265                 packet->message.result = rval;
2267                 /* return the message to the client */
2268 #if USE_RPMESSAGE
2269 #if 0
2270                 System_printf("RcmServer_serverThrFxn_P: "
2271                               "Sending response from: %d to: %d\n",
2272                               obj->localAddr, obj->replyAddr);
2273                 System_printf("sending: %d + %d\n", PACKET_HDR_SIZE,
2274                               packet->message.dataSize);
2275 #endif
2276                 packet->hdr.type = OMX_RAW_MSG;
2277                 if (rval < 0) {
2278                     packet->hdr.len = sizeof(UInt32);
2279                     packet->desc = 0x4142;
2280                     packet->msgId = 0x0044;
2281                     dataSize = sizeof(struct rpmsg_omx_hdr) + sizeof(UInt32);
2282                 }
2283                 else {
2284                     packet->hdr.len = PACKET_DATA_SIZE +
2285                         packet->message.dataSize;
2286                     dataSize = PACKET_HDR_SIZE + packet->message.dataSize;
2287                 }
2288                 rval = RPMessage_send(obj->dstProc, obj->replyAddr,
2289                                  obj->localAddr, (Ptr)&packet->hdr, dataSize);
2290 #else
2291                 rval = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
2292 #endif
2293                 if (rval < 0) {
2294                     Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)rval);
2295                 }
2296             }
2297         }
2298     }
2300     System_printf("RcmServer_serverThrFxn_P: Exiting thread.\n");
2302     Log_print0(Diags_EXIT, "<-- "FXNN":");
2304 #undef FXNN
2307 /*
2308  *  ======== RcmServer_setStatusCode_I ========
2309  */
2310 Void RcmServer_setStatusCode_I(RcmClient_Packet *packet, UInt16 code)
2313     /* code must be 0 - 15, it has to fit in a 4-bit field */
2314     Assert_isTrue((code < 16), NULL);
2316     packet->desc &= ~(RcmClient_Desc_TYPE_MASK);
2317     packet->desc |= ((code << RcmClient_Desc_TYPE_SHIFT)
2318         & RcmClient_Desc_TYPE_MASK);
2322 /*
2323  *  ======== RcmServer_workerThrFxn_P ========
2324  */
2325 #define FXNN "RcmServer_workerThrFxn_P"
2326 Void RcmServer_workerThrFxn_P(IArg arg)
2328     Error_Block eb;
2329     RcmClient_Packet *packet;
2330     List_Elem *elem;
2331     List_Handle listH;
2332     List_Handle readyQueueH;
2333     UInt16 jobId;
2334     GateThread_Handle gateH;
2335     IArg key;
2336     RcmServer_ThreadPool *pool;
2337     RcmServer_JobStream *job;
2338     RcmServer_WorkerThread *obj;
2339     Bool running;
2340     Int rval;
2343     Log_print1(Diags_ENTRY, "--> "FXNN": (arg=0x%x)", arg);
2345     Error_init(&eb);
2346     obj = (RcmServer_WorkerThread *)arg;
2347     readyQueueH = List_handle(&obj->pool->readyQueue);
2348     packet = NULL;
2349     running = TRUE;
2351     /* main processing loop */
2352     while (running) {
2353         Log_print1(Diags_INFO,
2354             FXNN": waiting for job, thread=0x%x", (IArg)(obj->thread));
2356         /* if no current message, wait until signaled to run */
2357         if (packet == NULL) {
2358             Semaphore_pend(obj->pool->sem, Semaphore_FOREVER, &eb);
2360             if (Error_check(&eb)) {
2361                 Log_error0(FXNN": semaphore pend failed");
2362             }
2363         }
2365         /* check if thread should terminate */
2366         if (obj->terminate) {
2367             running = FALSE;
2368             Log_print1(Diags_INFO,
2369                 FXNN": terminating, thread=0x%x", (IArg)(obj->thread));
2370             continue;
2371         }
2373         /* get next message from ready queue */
2374         if (packet == NULL) {
2375             packet = (RcmClient_Packet *)List_get(readyQueueH);
2376         }
2378         if (packet == NULL) {
2379             Log_error1(FXNN": ready queue is empty, thread=0x%x",
2380                 (IArg)(obj->thread));
2381             continue;
2382         }
2384         Log_print2(Diags_INFO, FXNN": job received, thread=0x%x packet=0x%x",
2385             (IArg)obj->thread, (IArg)packet);
2387         /* remember the message job id */
2388         jobId = packet->message.jobId;
2390         /* process the message */
2391         RcmServer_process_P(obj->server, packet);
2392         packet = NULL;
2394         /* If this worker thread just finished processing a job message,
2395          * queue up the next message for this job id. As an optimization,
2396          * if the message is addressed to this worker's pool, then don't
2397          * signal the semaphore, just get the next message from the queue
2398          * and processes it. This keeps the current thread running instead
2399          * of switching to another thread.
2400          */
2401         if (jobId != RcmClient_DISCRETEJOBID) {
2403             /* must protect job list while searching it */
2404             gateH = GateThread_handle(&obj->server->gate);
2405             key = GateThread_enter(gateH);
2407             /* find the job object in the list */
2408             elem = NULL;
2409             while ((elem = List_next(obj->server->jobList, elem)) != NULL) {
2410                 job = (RcmServer_JobStream *)elem;
2411                 if (job->jobId == jobId) {
2412                     break;
2413                 }
2414             }
2416             /* if job object not found, it is not an error */
2417             if (elem == NULL) {
2418                 GateThread_leave(gateH, key);
2419                 continue;
2420             }
2422             /* found the job object */
2423             listH = List_handle(&job->msgQue);
2425             /* get next job message and either process it or queue it */
2426             do {
2427                 elem = List_get(listH);
2429                 if (elem == NULL) {
2430                     job->empty = TRUE;  /* no more messages */
2431                     break;
2432                 }
2433                 else {
2434                     /* get target pool id */
2435                     packet = (RcmClient_Packet *)elem;
2436                     rval = RcmServer_getPool_P(obj->server, packet, &pool);
2438                     /* if error, return the message to the client */
2439                     if (rval < 0) {
2440                         switch (rval) {
2441                             case RcmServer_E_PoolIdNotFound:
2442                                 RcmServer_setStatusCode_I(
2443                                     packet, RcmServer_Status_PoolNotFound);
2444                                 break;
2446                             default:
2447                                 RcmServer_setStatusCode_I(
2448                                     packet, RcmServer_Status_Error);
2449                                 break;
2450                         }
2451                         packet->message.result = rval;
2453 #if USE_RPMESSAGE
2454                         packet->hdr.type = OMX_RAW_MSG;
2455                         packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
2456                         rval = RPMessage_send(
2457                                  (obj->server)->dstProc,
2458                                  (obj->server)->replyAddr,
2459                                  (obj->server)->localAddr, (Ptr)&packet->hdr,
2460                                  PACKET_HDR_SIZE + packet->message.dataSize);
2461 #else
2462                         rval = MessageQ_put(
2463                             MessageQ_getReplyQueue(&packet->msgqHeader),
2464                             &packet->msgqHeader);
2465 #endif
2466                         if (rval < 0) {
2467                             Log_error1(
2468                                 FXNN": unknown ipc error, 0x%x", (IArg)rval);
2469                         }
2470                     }
2471                     /* packet is valid, queue it in the corresponding pool's
2472                      * ready queue */
2473                     else {
2474                         listH = List_handle(&pool->readyQueue);
2475                         List_put(listH, elem);
2476                         packet = NULL;
2477                         Semaphore_post(pool->sem, &eb);
2479                         if (Error_check(&eb)) {
2480                             Log_error0(FXNN": semaphore post failed");
2481                         }
2482                     }
2484                     /* loop around and wait to be run again */
2485                 }
2487             } while (rval < 0);
2489             GateThread_leave(gateH, key);
2490         }
2491     }  /* while (running) */
2493     Log_print0(Diags_EXIT, "<-- "FXNN":");
2495 #undef FXNN
2498 /*
2499  *  ======== RcmServer_getLocalAddress ========
2500  */
2501 UInt32 RcmServer_getLocalAddress(RcmServer_Object *obj)
2503     return(obj->localAddr);
2507 /*
2508  *  ======== RcmServer_getRemoteAddress ========
2509  */
2510 UInt32 RcmServer_getRemoteAddress(RcmServer_Object *obj)
2512     return(obj->replyAddr);
2517 /*
2518  *  ======== RcmServer_getRemoteProc ========
2519  */
2520 UInt16  RcmServer_getRemoteProc(RcmServer_Object *obj)
2522     return(obj->dstProc);