e668738db7d83761ee51edd2374405570f9b2457
[ipc/ipcdev.git] / packages / ti / grcm / RcmServer.c
1 /*
2  * Copyright (c) 2011-2013, Texas Instruments Incorporated
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
33 /*
34  *  ======== RcmServer.c ========
35  *
36  */
38 /* this define must precede inclusion of any xdc header file */
39 #define Registry_CURDESC ti_grcm_RcmServer__Desc
40 #define MODULE_NAME "ti.grcm.RcmServer"
42 #define xdc_runtime_Memory__nolocalnames  /* short name clashes with SysLink */
44 /* rtsc header files */
45 #include <xdc/std.h>
46 #include <xdc/runtime/Assert.h>
47 #include <xdc/runtime/Diags.h>
48 #include <xdc/runtime/Error.h>
49 #include <xdc/runtime/IHeap.h>
50 #include <xdc/runtime/Log.h>
51 #include <xdc/runtime/Memory.h>
52 #include <xdc/runtime/Registry.h>
53 #include <xdc/runtime/Startup.h>
54 #include <xdc/runtime/knl/GateThread.h>
55 #include <xdc/runtime/knl/ISemaphore.h>
56 #include <xdc/runtime/knl/Semaphore.h>
57 #include <xdc/runtime/knl/SemThread.h>
58 #include <xdc/runtime/knl/Thread.h>
59 #include <xdc/runtime/System.h>
61 #define MSGBUFFERSIZE    512   /* Make global and move to RPMessage.h */
63 #if defined(RCM_ti_ipc)
64 #include <ti/sdo/utils/List.h>
65 #include <ti/ipc/MultiProc.h>
67 #elif defined(RCM_ti_syslink)
68 #include <ti/syslink/utils/List.h>
69 #define List_Struct List_Object
70 #define List_handle(exp) (exp)
72 #else
73     #error "undefined ipc binding";
74 #endif
76 /* local header files */
77 #include "RcmClient.h"
78 #include "RcmTypes.h"
79 #include "RcmServer.h"
81 #if USE_RPMESSAGE
82 #include <ti/srvmgr/rpmsg_omx.h>
83 #endif
85 #define _RCM_KeyResetValue 0x07FF       /* key reset value*/
86 #define _RCM_KeyMask 0x7FF00000         /* key mask in function index*/
87 #define _RCM_KeyShift 20                /* key bit position in function index*/
89 #define RcmServer_MAX_TABLES 9          /* max number of function tables*/
90 #define RcmServer_POOL_MAP_LEN 4        /* pool map length*/
92 #define RcmServer_E_InvalidFxnIdx       (-101)
93 #define RcmServer_E_JobIdNotFound       (-102)
94 #define RcmServer_E_PoolIdNotFound      (-103)
96 typedef struct {                        /* function table element*/
97     String                      name;
98 #if USE_RPMESSAGE
99     union  {
100        RcmServer_MsgFxn         fxn;
101        RcmServer_MsgCreateFxn   createFxn;
102     }addr;
103 #else
104     RcmServer_MsgFxn            addr;
105 #endif
106     UInt16                      key;
107 } RcmServer_FxnTabElem;
109 typedef struct {
110     Int                         length;
111     RcmServer_FxnTabElem *      elem;
112 } RcmServer_FxnTabElemAry;
114 typedef struct {
115     String                      name;       /* pool name*/
116     Int                         count;      /* thread count (at create time)*/
117     Thread_Priority             priority;   /* thread priority*/
118     Int                         osPriority;
119     SizeT                       stackSize;  /* thread stack size*/
120     String                      stackSeg;   /* thread stack placement*/
121     ISemaphore_Handle           sem;        /* message semaphore (counting)*/
122     List_Struct                 threadList; /* list of worker threads*/
123     List_Struct                 readyQueue; /* queue of messages*/
124 } RcmServer_ThreadPool;
126 typedef struct RcmServer_Object_tag {
127     GateThread_Struct           gate;       /* instance gate*/
128     Ptr                         run;        /* run semaphore for the server*/
129 #if USE_RPMESSAGE
130     RPMessage_Handle         serverQue;  /* inbound message queue */
131     UInt32                      localAddr;  /* inbound message queue address */
132     UInt32                      replyAddr;  /* Reply address (same per inst.) */
133     UInt32                      dstProc;    /* Reply processor. */
134 #else
135     MessageQ_Handle             serverQue;  /* inbound message queue*/
136 #endif
137     Thread_Handle               serverThread; /* server thread object*/
138     RcmServer_FxnTabElemAry     fxnTabStatic; /* static function table*/
139     RcmServer_FxnTabElem *      fxnTab[RcmServer_MAX_TABLES]; /* base pointers*/
140     UInt16                      key;        /* function index key*/
141     UInt16                      jobId;      /* job id tracker*/
142     Bool                        shutdown;   /* server shutdown flag*/
143     Int                         poolMap0Len;/* length of static table*/
144     RcmServer_ThreadPool *      poolMap[RcmServer_POOL_MAP_LEN];
145     List_Handle                 jobList;    /* list of job stream queues*/
146 } RcmServer_Object;
148 typedef struct {
149     List_Elem                   elem;
150     UInt16                      jobId;      /* current job stream id*/
151     Thread_Handle               thread;     /* server thread object*/
152     Bool                        terminate;  /* thread terminate flag*/
153     RcmServer_ThreadPool*       pool;       /* worker pool*/
154     RcmServer_Object *          server;     /* server instance*/
155 } RcmServer_WorkerThread;
157 typedef struct {
158     List_Elem                   elem;
159     UInt16                      jobId;      /* job stream id*/
160     Bool                        empty;      /* true if no messages on server*/
161     List_Struct                 msgQue;     /* queue of messages*/
162 } RcmServer_JobStream;
164 typedef struct RcmServer_Module_tag {
165     String              name;
166     IHeap_Handle        heap;
167 } RcmServer_Module;
170 /* private functions */
171 static
172 Int RcmServer_Instance_init_P(
173         RcmServer_Object *              obj,
174         String                          name,
175         const RcmServer_Params *        params
176     );
178 static
179 Int RcmServer_Instance_finalize_P(
180         RcmServer_Object *              obj
181     );
183 static
184 Int RcmServer_acqJobId_P(
185         RcmServer_Object *              obj,
186         UInt16 *                        jobIdPtr
187     );
189 static
190 Int RcmServer_dispatch_P(
191         RcmServer_Object *              obj,
192         RcmClient_Packet *              packet
193     );
195 static
196 Int RcmServer_execMsg_I(
197         RcmServer_Object *              obj,
198         RcmClient_Message *             msg
199     );
201 static
202 Int RcmServer_getFxnAddr_P(
203         RcmServer_Object *              obj,
204         UInt32                          fxnIdx,
205         RcmServer_MsgFxn *              addrPtr,
206         RcmServer_MsgCreateFxn *        createPtr
207     );
209 static
210 UInt16 RcmServer_getNextKey_P(
211         RcmServer_Object *              obj
212     );
214 static
215 Int RcmServer_getSymIdx_P(
216         RcmServer_Object *              obj,
217         String                          name,
218         UInt32 *                        index
219     );
221 static
222 Int RcmServer_getPool_P(
223         RcmServer_Object *              obj,
224         RcmClient_Packet *              packet,
225         RcmServer_ThreadPool **         poolP
226     );
228 static
229 Void RcmServer_process_P(
230         RcmServer_Object *              obj,
231         RcmClient_Packet *              packet
232     );
234 static
235 Int RcmServer_relJobId_P(
236         RcmServer_Object *              obj,
237         UInt16                          jobId
238     );
240 static
241 Void RcmServer_serverThrFxn_P(
242         IArg                            arg
243     );
245 static inline
246 Void RcmServer_setStatusCode_I(
247         RcmClient_Packet *              packet,
248         UInt16                          code
249     );
251 static
252 Void RcmServer_workerThrFxn_P(
253         IArg                            arg
254     );
257 #define RcmServer_Module_heap() (RcmServer_Mod.heap)
260 /* static objects */
261 static Int curInit = 0;
262 static Char ti_grcm_RcmServer_Name[] = {
263         't','i','.','s','d','o','.','r','c','m','.',
264         'R','c','m','S','e','r','v','e','r','\0'
265     };
267 static RcmServer_Module RcmServer_Mod = {
268     MODULE_NAME,        /* name */
269     (IHeap_Handle)NULL  /* heap */
270 };
272 /* module diags mask */
273 Registry_Desc Registry_CURDESC;
276 /*
277  *  ======== RcmServer_init ========
278  *
279  *  This function must be serialized by the caller
280  */
281 Void RcmServer_init(Void)
283     Registry_Result result;
286     if (curInit++ != 0) {
287         return; /* module already initialized */
288     }
290     /* register with xdc.runtime to get a diags mask */
291 /*  result = Registry_addModule(&Registry_CURDESC, MODULE_NAME);*/
292     result = Registry_addModule(&Registry_CURDESC, ti_grcm_RcmServer_Name);
293     Assert_isTrue(result == Registry_SUCCESS, (Assert_Id)NULL);
295     /* the size of object and struct must be the same */
296     Assert_isTrue(sizeof(RcmServer_Object) == sizeof(RcmServer_Struct), NULL);
300 /*
301  *  ======== RcmServer_exit ========
302  *
303  *  This function must be serialized by the caller
304  */
305 Void RcmServer_exit(Void)
307 /*  Registry_Result result;*/
310     if (--curInit != 0) {
311         return; /* module still in use */
312     }
314     /* unregister from xdc.runtime */
315 /*  result = Registry_removeModule(MODULE_NAME);*/
316 /*  Assert_isTrue(result == Registry_SUCCESS, (Assert_Id)NULL);*/
320 /*
321  *  ======== RcmServer_Params_init ========
322  */
323 Void RcmServer_Params_init(RcmServer_Params *params)
325     /* server thread */
326     params->priority = Thread_Priority_HIGHEST;
327     params->osPriority = Thread_INVALID_OS_PRIORITY;
328     params->stackSize = 0;  /* use system default*/
329     params->stackSeg = "";
331     /* default pool */
332     params->defaultPool.name = NULL;
333     params->defaultPool.count = 0;
334     params->defaultPool.priority = Thread_Priority_NORMAL;
335     params->defaultPool.osPriority = Thread_INVALID_OS_PRIORITY;
336     params->defaultPool.stackSize = 0;  /* use system default*/
337     params->defaultPool.stackSeg = "";
339     /* worker pools */
340     params->workerPools.length = 0;
341     params->workerPools.elem = NULL;
343     /* function table */
344     params->fxns.length = 0;
345     params->fxns.elem = NULL;
349 /*
350  *  ======== RcmServer_create ========
351  */
352 #define FXNN "RcmServer_create"
353 Int RcmServer_create(String name, RcmServer_Params *params,
354         RcmServer_Handle *handle)
356     RcmServer_Object *obj;
357     Error_Block eb;
358     Int status = RcmServer_S_SUCCESS;
361     Log_print3(Diags_ENTRY, "--> "FXNN": (name=0x%x, params=0x%x, hP=0x%x)",
362         (IArg)name, (IArg)params, (IArg)handle);
364     /* initialize the error block */
365     Error_init(&eb);
367     if (NULL == handle) {
368         Log_error0(FXNN": Invalid pointer");
369         status = RcmServer_E_FAIL;
370         goto leave;
371     }
373     *handle = (RcmServer_Handle)NULL;
375     /* check for valid params */
376     if (NULL == params) {
377         Log_error0(FXNN": params ptr must not be NULL");
378         status = RcmServer_E_FAIL;
379         goto leave;
380     }
381     if (NULL == name) {
382         Log_error0(FXNN": name passed is NULL!");
383         status = RcmServer_E_FAIL;
384         goto leave;
385     }
387     /* allocate the object */
388     obj = (RcmServer_Handle)xdc_runtime_Memory_calloc(RcmServer_Module_heap(),
389         sizeof(RcmServer_Object), sizeof(Int), &eb);
391     if (NULL == obj) {
392         Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
393             (IArg)RcmServer_Module_heap(), sizeof(RcmServer_Object));
394         status = RcmServer_E_NOMEMORY;
395         goto leave;
396     }
398     Log_print1(Diags_LIFECYCLE, FXNN": instance create: 0x%x", (IArg)obj);
400     /* object-specific initialization */
401     status = RcmServer_Instance_init_P(obj, name, params);
403     if (status < 0) {
404         RcmServer_Instance_finalize_P(obj);
405         xdc_runtime_Memory_free(RcmServer_Module_heap(),
406             (Ptr)obj, sizeof(RcmServer_Object));
407         goto leave;
408     }
410     /* success, return opaque pointer */
411     *handle = (RcmServer_Handle)obj;
414 leave:
415     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
416     return(status);
418 #undef FXNN
421 /*
422  *  ======== RcmServer_construct ========
423  */
424 #define FXNN "RcmServer_construct"
425 Int RcmServer_construct(RcmServer_Struct *structPtr, String name,
426     const RcmServer_Params *params)
428     RcmServer_Object *obj = (RcmServer_Object*)structPtr;
429     Int status = RcmServer_S_SUCCESS;
432     Log_print1(Diags_ENTRY, "--> "FXNN": (structPtr=0x%x)", (IArg)structPtr);
433     Log_print1(Diags_LIFECYCLE, FXNN": instance construct: 0x%x", (IArg)obj);
435     /* ensure the constructed object is zeroed */
436     _memset((Void *)obj, 0, sizeof(RcmServer_Object));
438     /* object-specific initialization */
439     status = RcmServer_Instance_init_P(obj, name, params);
441     if (status < 0) {
442         RcmServer_Instance_finalize_P(obj);
443         goto leave;
444     }
447 leave:
448     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
449     return(status);
451 #undef FXNN
454 /*
455  *  ======== RcmServer_Instance_init_P ========
456  */
457 #define FXNN "RcmServer_Instance_init_P"
458 Int RcmServer_Instance_init_P(RcmServer_Object *obj, String name,
459         const RcmServer_Params *params)
461     Error_Block eb;
462     List_Params listP;
463 #if USE_RPMESSAGE == 0
464     MessageQ_Params mqParams;
465 #endif
466     Thread_Params threadP;
467     SemThread_Params semThreadP;
468     SemThread_Handle semThreadH;
469     Int i, j;
470     SizeT size;
471     Char *cp;
472     RcmServer_ThreadPool *poolAry;
473     RcmServer_WorkerThread *worker;
474     List_Handle listH;
475     Int status = RcmServer_S_SUCCESS;
478     Log_print1(Diags_ENTRY, "--> "FXNN": (obj=0x%x)", (IArg)obj);
480     Error_init(&eb);
482     /* initialize instance state */
483     obj->shutdown = FALSE;
484     obj->key = 0;
485     obj->jobId = 0xFFFF;
486     obj->run = NULL;
487     obj->serverQue = NULL;
488     obj->serverThread = NULL;
489     obj->fxnTabStatic.length = 0;
490     obj->fxnTabStatic.elem = NULL;
491     obj->poolMap0Len = 0;
492     obj->jobList = NULL;
495     /* initialize the function table */
496     for (i = 0; i < RcmServer_MAX_TABLES; i++) {
497         obj->fxnTab[i] = NULL;
498     }
500     /* initialize the worker pool map */
501     for (i = 0; i < RcmServer_POOL_MAP_LEN; i++) {
502         obj->poolMap[i] = NULL;
503     }
505     /* create the instance gate */
506     GateThread_construct(&obj->gate, NULL, &eb);
508     if (Error_check(&eb)) {
509         Log_error0(FXNN": could not create gate object");
510         status = RcmServer_E_FAIL;
511         goto leave;
512     }
514     /* create list for job objects */
515 #if defined(RCM_ti_ipc)
516     List_Params_init(&listP);
517     obj->jobList = List_create(&listP, &eb);
519     if (Error_check(&eb)) {
520         Log_error0(FXNN": could not create list object");
521         status = RcmServer_E_FAIL;
522         goto leave;
523     }
524 #elif defined(RCM_ti_syslink)
525     List_Params_init(&listP);
526     obj->jobList = List_create(&listP, NULL);
528     if (obj->jobList == NULL) {
529         Log_error0(FXNN": could not create list object");
530         status = RcmServer_E_FAIL;
531         goto leave;
532     }
533 #endif
535     /* create the static function table */
536     if (params->fxns.length > 0) {
537         obj->fxnTabStatic.length = params->fxns.length;
539         /* allocate static function table */
540         size = params->fxns.length * sizeof(RcmServer_FxnTabElem);
541         obj->fxnTabStatic.elem = xdc_runtime_Memory_alloc(
542             RcmServer_Module_heap(), size, sizeof(Ptr), &eb);
544         if (Error_check(&eb)) {
545             Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
546                 (IArg)RcmServer_Module_heap(), size);
547             status = RcmServer_E_NOMEMORY;
548             goto leave;
549         }
550         obj->fxnTabStatic.elem[0].name = NULL;
552         /* allocate a single block to store all name strings */
553         for (size = 0, i = 0; i < params->fxns.length; i++) {
554             size += _strlen(params->fxns.elem[i].name) + 1;
555         }
556         cp = xdc_runtime_Memory_alloc(
557             RcmServer_Module_heap(), size, sizeof(Ptr), &eb);
559         if (Error_check(&eb)) {
560             Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
561                 (IArg)RcmServer_Module_heap(), size);
562             status = RcmServer_E_NOMEMORY;
563             goto leave;
564         }
566         /* copy function table data into allocated memory blocks */
567         for (i = 0; i < params->fxns.length; i++) {
568             _strcpy(cp, params->fxns.elem[i].name);
569             obj->fxnTabStatic.elem[i].name = cp;
570             cp += (_strlen(params->fxns.elem[i].name) + 1);
571             obj->fxnTabStatic.elem[i].addr.fxn = params->fxns.elem[i].addr.fxn;
572             obj->fxnTabStatic.elem[i].key = 0;
573         }
575         /* hook up the static function table */
576         obj->fxnTab[0] = obj->fxnTabStatic.elem;
577     }
579     /* create static worker pools */
580     if ((params->workerPools.length + 1) > RcmServer_POOL_MAP_LEN) {
581         Log_error1(FXNN": Exceeded maximum number of worker pools =%d",
582             (IArg) (params->workerPools.length) );
583         status = RcmServer_E_NOMEMORY;
584         goto leave;
585     }
586     obj->poolMap0Len = params->workerPools.length + 1; /* workers + default */
588     /* allocate the static worker pool table */
589     size = obj->poolMap0Len * sizeof(RcmServer_ThreadPool);
590     obj->poolMap[0] = (RcmServer_ThreadPool *)xdc_runtime_Memory_alloc(
591         RcmServer_Module_heap(), size, sizeof(Ptr), &eb);
593     if (Error_check(&eb)) {
594         Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
595             (IArg)RcmServer_Module_heap(), size);
596         status = RcmServer_E_NOMEMORY;
597         goto leave;
598     }
600     /* convenience alias */
601     poolAry = obj->poolMap[0];
603     /* allocate a single block to store all name strings         */
604     /* Buffer format is: [SIZE][DPN][\0][WPN1][\0][WPN2][\0].... */
605     /* DPN = Default Pool Name, WPN = Worker Pool Name           */
606     /* In case, DPN is NULL, format is: [SIZE][\0][WPN1][\0].... */
607     /* In case, WPN is NULL, format is: [SIZE][DPN][\0]          */
608     size = sizeof(SizeT) /* block size */
609         + (params->defaultPool.name == NULL ? 1 :
610         _strlen(params->defaultPool.name) + 1);
612     for (i = 0; i < params->workerPools.length; i++) {
613         size += (params->workerPools.elem[i].name == NULL ? 0 :
614             _strlen(params->workerPools.elem[i].name) + 1);
615     }
616     cp = xdc_runtime_Memory_alloc(
617         RcmServer_Module_heap(), size, sizeof(Ptr), &eb);
619     if (Error_check(&eb)) {
620         Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
621             (IArg)RcmServer_Module_heap(), size);
622         status = RcmServer_E_NOMEMORY;
623         goto leave;
624     }
626     *(SizeT *)cp = size;
627     cp += sizeof(SizeT);
629     /* initialize the default worker pool, poolAry[0] */
630     if (params->defaultPool.name != NULL) {
631         _strcpy(cp, params->defaultPool.name);
632         poolAry[0].name = cp;
633         cp += (_strlen(params->defaultPool.name) + 1);
634     }
635     else {
636         poolAry[0].name = cp;
637         *cp++ = '\0';
638     }
640     poolAry[0].count = params->defaultPool.count;
641     poolAry[0].priority = params->defaultPool.priority;
642     poolAry[0].osPriority = params->defaultPool.osPriority;
643     poolAry[0].stackSize = params->defaultPool.stackSize;
644     poolAry[0].stackSeg = NULL;
645     poolAry[0].sem = NULL;
647     List_construct(&(poolAry[0].threadList), NULL);
648     List_construct(&(poolAry[0].readyQueue), NULL);
650     SemThread_Params_init(&semThreadP);
651     semThreadP.mode = SemThread_Mode_COUNTING;
653     semThreadH = SemThread_create(0, &semThreadP, &eb);
655     if (Error_check(&eb)) {
656         Log_error0(FXNN": could not create semaphore");
657         status = RcmServer_E_FAIL;
658         goto leave;
659     }
661     poolAry[0].sem = SemThread_Handle_upCast(semThreadH);
663     /* initialize the static worker pools, poolAry[1..(n-1)] */
664     for (i = 0; i < params->workerPools.length; i++) {
665         if (params->workerPools.elem[i].name != NULL) {
666             _strcpy(cp, params->workerPools.elem[i].name);
667             poolAry[i+1].name = cp;
668             cp += (_strlen(params->workerPools.elem[i].name) + 1);
669         }
670         else {
671             poolAry[i+1].name = NULL;
672         }
674         poolAry[i+1].count = params->workerPools.elem[i].count;
675         poolAry[i+1].priority = params->workerPools.elem[i].priority;
676         poolAry[i+1].osPriority =params->workerPools.elem[i].osPriority;
677         poolAry[i+1].stackSize = params->workerPools.elem[i].stackSize;
678         poolAry[i+1].stackSeg = NULL;
680         List_construct(&(poolAry[i+1].threadList), NULL);
681         List_construct(&(poolAry[i+1].readyQueue), NULL);
683         SemThread_Params_init(&semThreadP);
684         semThreadP.mode = SemThread_Mode_COUNTING;
686         semThreadH = SemThread_create(0, &semThreadP, &eb);
688         if (Error_check(&eb)) {
689             Log_error0(FXNN": could not create semaphore");
690             status = RcmServer_E_FAIL;
691             goto leave;
692         }
694         poolAry[i+1].sem = SemThread_Handle_upCast(semThreadH);
695     }
697     /* create the worker threads in each static pool */
698     for (i = 0; i < obj->poolMap0Len; i++) {
699         for (j = 0; j < poolAry[i].count; j++) {
701             /* allocate worker thread object */
702             size = sizeof(RcmServer_WorkerThread);
703             worker = (RcmServer_WorkerThread *)xdc_runtime_Memory_alloc(
704                 RcmServer_Module_heap(), size, sizeof(Ptr), &eb);
706             if (Error_check(&eb)) {
707                 Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
708                     (IArg)RcmServer_Module_heap(), size);
709                 status = RcmServer_E_NOMEMORY;
710                 goto leave;
711             }
713             /* initialize worker thread object */
714             worker->jobId = RcmClient_DISCRETEJOBID;
715             worker->thread = NULL;
716             worker->terminate = FALSE;
717             worker->pool = &(poolAry[i]);
718             worker->server = obj;
720             /* add worker thread to worker pool */
721             listH = List_handle(&(poolAry[i].threadList));
722             List_putHead(listH, &(worker->elem));
724             /* create worker thread */
725             Thread_Params_init(&threadP);
726             threadP.arg = (IArg)worker;
727             threadP.priority = poolAry[i].priority;
728             threadP.osPriority = poolAry[i].osPriority;
729             threadP.stackSize = poolAry[i].stackSize;
730             threadP.instance->name = "RcmServer_workerThr";
732             worker->thread = Thread_create(
733                 (Thread_RunFxn)(RcmServer_workerThrFxn_P), &threadP, &eb);
735             if (Error_check(&eb)) {
736                 Log_error2(FXNN": could not create worker thread, "
737                     "pool=%d, thread=%d", (IArg)i, (IArg)j);
738                 status = RcmServer_E_FAIL;
739                 goto leave;
740             }
741         }
742     }
744     /* create the semaphore used to release the server thread */
745     SemThread_Params_init(&semThreadP);
746     semThreadP.mode = SemThread_Mode_COUNTING;
748     obj->run = SemThread_create(0, &semThreadP, &eb);
750     if (Error_check(&eb)) {
751         Log_error0(FXNN": could not create semaphore");
752         status = RcmServer_E_FAIL;
753         goto leave;
754     }
756     /* create the message queue for inbound messages */
757 #if USE_RPMESSAGE
758     obj->serverQue = RPMessage_create(RPMessage_ASSIGN_ANY, NULL, NULL,
759                                          &obj->localAddr);
760 #ifdef BIOS_ONLY_TEST
761     obj->dstProc = MultiProc_self();
762 #else
763     obj->dstProc = MultiProc_getId("HOST");
764 #endif
766 #else
767     MessageQ_Params_init(&mqParams);
768     obj->serverQue = MessageQ_create(name, &mqParams);
769 #endif
771     if (NULL == obj->serverQue) {
772         Log_error0(FXNN": could not create server message queue");
773         status = RcmServer_E_FAIL;
774         goto leave;
775     }
777     /* create the server thread */
778     Thread_Params_init(&threadP);
779     threadP.arg = (IArg)obj;
780     threadP.priority = params->priority;
781     threadP.osPriority = params->osPriority;
782     threadP.stackSize = params->stackSize;
783     threadP.instance->name = "RcmServer_serverThr";
785     obj->serverThread = Thread_create(
786         (Thread_RunFxn)(RcmServer_serverThrFxn_P), &threadP, &eb);
788     if (Error_check(&eb)) {
789         Log_error0(FXNN": could not create server thread");
790         status = RcmServer_E_FAIL;
791         goto leave;
792     }
795 leave:
796     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
797     return(status);
799 #undef FXNN
802 /*
803  *  ======== RcmServer_delete ========
804  */
805 #define FXNN "RcmServer_delete"
806 Int RcmServer_delete(RcmServer_Handle *handlePtr)
808     RcmServer_Object *obj = (RcmServer_Object *)(*handlePtr);
809     Int status = RcmClient_S_SUCCESS;
812     Log_print1(Diags_ENTRY, "--> "FXNN": (handlePtr=0x%x)", (IArg)handlePtr);
814     /* finalize the object */
815     status = RcmServer_Instance_finalize_P(obj);
817     /* free the object memory */
818     Log_print1(Diags_LIFECYCLE, FXNN": instance delete: 0x%x", (IArg)obj);
820     xdc_runtime_Memory_free(RcmServer_Module_heap(),
821         (Ptr)obj, sizeof(RcmServer_Object));
822     *handlePtr = NULL;
824     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
825     return(status);
827 #undef FXNN
830 /*
831  *  ======== RcmServer_destruct ========
832  */
833 #define FXNN "RcmServer_destruct"
834 Int RcmServer_destruct(RcmServer_Struct *structPtr)
836     RcmServer_Object *obj = (RcmServer_Object *)(structPtr);
837     Int status = RcmClient_S_SUCCESS;
840     Log_print1(Diags_ENTRY, "--> "FXNN": (structPtr=0x%x)", (IArg)structPtr);
841     Log_print1(Diags_LIFECYCLE, FXNN": instance destruct: 0x%x", (IArg)obj);
843     /* finalize the object */
844     status = RcmServer_Instance_finalize_P(obj);
846     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
847     return(status);
849 #undef FXNN
852 /*
853  *  ======== RcmServer_Instance_finalize_P ========
854  */
855 #define FXNN "RcmServer_Instance_finalize_P"
856 Int RcmServer_Instance_finalize_P(RcmServer_Object *obj)
858     Int i, j;
859     Int size;
860     Char *cp;
861     UInt tabCount;
862     RcmServer_FxnTabElem *fdp;
863     Error_Block eb;
864     RcmServer_ThreadPool *poolAry;
865     RcmServer_WorkerThread *worker;
866     List_Elem *elem;
867     List_Handle listH;
868     List_Handle msgQueH;
869     RcmClient_Packet *packet;
870 #if USE_RPMESSAGE == 0
871     MessageQ_Msg msgqMsg;
872 #endif
873     SemThread_Handle semThreadH;
874     RcmServer_JobStream *job;
875     Int rval;
876     Int status = RcmClient_S_SUCCESS;
878     Log_print1(Diags_ENTRY, "--> "FXNN": (obj=0x%x)", (IArg)obj);
880     /* must initialize the error block before using it */
881     Error_init(&eb);
883     /* block until server thread exits */
884     obj->shutdown = TRUE;
886     if (obj->serverThread != NULL) {
887 #if USE_RPMESSAGE
888         RPMessage_unblock(obj->serverQue);
889 #else
890         MessageQ_unblock(obj->serverQue);
891 #endif
892         Thread_join(obj->serverThread, &eb);
894         if (Error_check(&eb)) {
895             Log_error0(FXNN": server thread did not exit properly");
896             status = RcmServer_E_FAIL;
897             goto leave;
898         }
899     }
901     /* delete any remaining job objects (there should not be any) */
902     while ((elem = List_get(obj->jobList)) != NULL) {
903         job = (RcmServer_JobStream *)elem;
905         /* return any remaining messages (there should not be any) */
906         msgQueH = List_handle(&job->msgQue);
908         while ((elem = List_get(msgQueH)) != NULL) {
909             packet = (RcmClient_Packet *)elem;
910             Log_warning2(
911                 FXNN": returning unprocessed message, jobId=0x%x, packet=0x%x",
912                 (IArg)job->jobId, (IArg)packet);
914             RcmServer_setStatusCode_I(packet, RcmServer_Status_Unprocessed);
915 #if USE_RPMESSAGE
916             packet->hdr.type = OMX_RAW_MSG;
917             packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
918             rval = RPMessage_send(obj->dstProc, obj->replyAddr,
919                                  obj->localAddr, (Ptr)&packet->hdr,
920                                  PACKET_HDR_SIZE + packet->message.dataSize);
921 #else
922             msgqMsg = &packet->msgqHeader;
923             rval = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
924 #endif
925             if (rval < 0) {
926                 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)rval);
927             }
928         }
930         /* finalize the job stream object */
931         List_destruct(&job->msgQue);
933         xdc_runtime_Memory_free(RcmServer_Module_heap(),
934             (Ptr)job, sizeof(RcmServer_JobStream));
935     }
936     List_delete(&(obj->jobList));
938     /* convenience alias */
939     poolAry = obj->poolMap[0];
941     /* free all the static pool resources */
942     for (i = 0; i < obj->poolMap0Len; i++) {
944         /* free all the worker thread objects */
945         listH = List_handle(&(poolAry[i].threadList));
947         /* mark each worker thread for termination */
948         elem = NULL;
949         while ((elem = List_next(listH, elem)) != NULL) {
950             worker = (RcmServer_WorkerThread *)elem;
951             worker->terminate = TRUE;
952         }
954         /* unblock each worker thread so it can terminate */
955         elem = NULL;
956         while ((elem = List_next(listH, elem)) != NULL) {
957             Semaphore_post(poolAry[i].sem, &eb);
959             if (Error_check(&eb)) {
960                 Log_error0(FXNN": post failed on thread");
961                 status = RcmServer_E_FAIL;
962                 goto leave;
963             }
964         }
966         /* wait for each worker thread to terminate */
967         elem = NULL;
968         while ((elem = List_get(listH)) != NULL) {
969             worker = (RcmServer_WorkerThread *)elem;
971             Thread_join(worker->thread, &eb);
973             if (Error_check(&eb)) {
974                 Log_error1(
975                     FXNN": worker thread did not exit properly, thread=0x%x",
976                     (IArg)worker->thread);
977                 status = RcmServer_E_FAIL;
978                 goto leave;
979             }
981             Thread_delete(&worker->thread);
983             /* free the worker thread object */
984             xdc_runtime_Memory_free(RcmServer_Module_heap(), (Ptr)worker,
985                 sizeof(RcmServer_WorkerThread));
986         }
988         /* free up pool resources */
989         semThreadH = SemThread_Handle_downCast(poolAry[i].sem);
990         SemThread_delete(&semThreadH);
991         List_destruct(&(poolAry[i].threadList));
993         /* return any remaining messages on the readyQueue */
994         msgQueH = List_handle(&poolAry[i].readyQueue);
996         while ((elem = List_get(msgQueH)) != NULL) {
997             packet = (RcmClient_Packet *)elem;
998             Log_warning2(
999                 FXNN": returning unprocessed message, msgId=0x%x, packet=0x%x",
1000                 (IArg)packet->msgId, (IArg)packet);
1002             RcmServer_setStatusCode_I(packet, RcmServer_Status_Unprocessed);
1003 #if USE_RPMESSAGE
1004             packet->hdr.type = OMX_RAW_MSG;
1005             packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1006             rval = RPMessage_send(obj->dstProc, obj->replyAddr,
1007                                  obj->localAddr, (Ptr)&packet->hdr,
1008                                  PACKET_HDR_SIZE + packet->message.dataSize);
1009 #else
1010             msgqMsg = &packet->msgqHeader;
1011             rval = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1012 #endif
1013             if (rval < 0) {
1014                 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)rval);
1015             }
1016         }
1018         List_destruct(&(poolAry[i].readyQueue));
1019     }
1021     /* free the name block for the static pools */
1022     if ((obj->poolMap[0] != NULL) && (obj->poolMap[0]->name != NULL)) {
1023         cp = obj->poolMap[0]->name;
1024         cp -= sizeof(SizeT);
1025         xdc_runtime_Memory_free(RcmServer_Module_heap(), (Ptr)cp, *(SizeT *)cp);
1026     }
1028     /* free the static worker pool table */
1029     if (obj->poolMap[0] != NULL) {
1030         xdc_runtime_Memory_free(RcmServer_Module_heap(), (Ptr)(obj->poolMap[0]),
1031             obj->poolMap0Len * sizeof(RcmServer_ThreadPool));
1032         obj->poolMap[0] = NULL;
1033     }
1035 #if 0
1036     /* free all dynamic worker pools */
1037     for (p = 1; p < RcmServer_POOL_MAP_LEN; p++) {
1038         if ((poolAry = obj->poolMap[p]) == NULL) {
1039             continue;
1040         }
1041     }
1042 #endif
1044     /* free up the dynamic function tables and any leftover name strings */
1045     for (i = 1; i < RcmServer_MAX_TABLES; i++) {
1046         if (obj->fxnTab[i] != NULL) {
1047             tabCount = (1 << (i + 4));
1048             for (j = 0; j < tabCount; j++) {
1049                 if (((obj->fxnTab[i])+j)->name != NULL) {
1050                     cp = ((obj->fxnTab[i])+j)->name;
1051                     size = _strlen(cp) + 1;
1052                     xdc_runtime_Memory_free(RcmServer_Module_heap(), cp, size);
1053                 }
1054             }
1055             fdp = obj->fxnTab[i];
1056             size = tabCount * sizeof(RcmServer_FxnTabElem);
1057             xdc_runtime_Memory_free(RcmServer_Module_heap(), fdp, size);
1058             obj->fxnTab[i] = NULL;
1059         }
1060     }
1062     if (NULL != obj->serverThread) {
1063         Thread_delete(&obj->serverThread);
1064     }
1066     if (NULL != obj->serverQue) {
1067 #if USE_RPMESSAGE
1068         RPMessage_delete(&obj->serverQue);
1069 #else
1070         MessageQ_delete(&obj->serverQue);
1071 #endif
1072     }
1074     if (NULL != obj->run) {
1075         SemThread_delete((SemThread_Handle *)(&obj->run));
1076     }
1078     /* free the name block for the static function table */
1079     if ((NULL != obj->fxnTabStatic.elem) &&
1080         (NULL != obj->fxnTabStatic.elem[0].name)) {
1081         for (size = 0, i = 0; i < obj->fxnTabStatic.length; i++) {
1082             size += _strlen(obj->fxnTabStatic.elem[i].name) + 1;
1083         }
1084         xdc_runtime_Memory_free(
1085             RcmServer_Module_heap(),
1086             obj->fxnTabStatic.elem[0].name, size);
1087     }
1089     /* free the static function table */
1090     if (NULL != obj->fxnTabStatic.elem) {
1091         xdc_runtime_Memory_free(RcmServer_Module_heap(),
1092             obj->fxnTabStatic.elem,
1093             obj->fxnTabStatic.length * sizeof(RcmServer_FxnTabElem));
1094     }
1096     /* destruct the instance gate */
1097     GateThread_destruct(&obj->gate);
1100 leave:
1101     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1102     return(status);
1104 #undef FXNN
1107 /*
1108  *  ======== RcmServer_addSymbol ========
1109  */
1110 #define FXNN "RcmServer_addSymbol"
1111 Int RcmServer_addSymbol(RcmServer_Object *obj, String funcName,
1112         RcmServer_MsgFxn addr, UInt32 *index)
1114     GateThread_Handle gateH;
1115     IArg key;
1116     Int len;
1117     UInt i, j;
1118     UInt tabCount;
1119     SizeT tabSize;
1120     UInt32 fxnIdx = 0xFFFF;
1121     RcmServer_FxnTabElem *slot = NULL;
1122     Error_Block eb;
1123     Int status = RcmServer_S_SUCCESS;
1126     Log_print3(Diags_ENTRY,
1127         "--> "FXNN": (obj=0x%x, name=0x%x, addr=0x%x)",
1128         (IArg)obj, (IArg)funcName, (IArg)addr);
1130     Error_init(&eb);
1132     /* protect the symbol table while changing it */
1133     gateH = GateThread_handle(&obj->gate);
1134     key = GateThread_enter(gateH);
1136     /* look for an empty slot to use */
1137     for (i = 1; i < RcmServer_MAX_TABLES; i++) {
1138         if (obj->fxnTab[i] != NULL) {
1139             for (j = 0; j < (1 << (i + 4)); j++) {
1140                 if (((obj->fxnTab[i])+j)->addr.fxn == 0) {
1141                     slot = (obj->fxnTab[i]) + j;  /* found empty slot*/
1142                     break;
1143                 }
1144             }
1145         }
1146         else {
1147             /* all previous tables are full, allocate a new table */
1148             tabCount = (1 << (i + 4));
1149             tabSize = tabCount * sizeof(RcmServer_FxnTabElem);
1150             obj->fxnTab[i] = (RcmServer_FxnTabElem *)xdc_runtime_Memory_alloc(
1151                 RcmServer_Module_heap(), tabSize, sizeof(Ptr), &eb);
1153             if (Error_check(&eb)) {
1154                 Log_error0(FXNN": unable to allocate new function table");
1155                 obj->fxnTab[i] = NULL;
1156                 status = RcmServer_E_NOMEMORY;
1157                 goto leave;
1158             }
1160             /* initialize the new table */
1161             for (j = 0; j < tabCount; j++) {
1162                 ((obj->fxnTab[i])+j)->addr.fxn = 0;
1163                 ((obj->fxnTab[i])+j)->name = NULL;
1164                 ((obj->fxnTab[i])+j)->key = 0;
1165             }
1167             /* use first slot in new table */
1168             j = 0;
1169             slot = (obj->fxnTab[i])+j;
1170         }
1172         /* if new slot found, break out of loop */
1173         if (slot != NULL) {
1174             break;
1175         }
1176     }
1178     /* insert new symbol into slot */
1179     if (slot != NULL) {
1180         slot->addr.fxn = addr;
1181         len = _strlen(funcName) + 1;
1182         slot->name = (String)xdc_runtime_Memory_alloc(
1183             RcmServer_Module_heap(), len, 0, &eb);
1185         if (Error_check(&eb)) {
1186             Log_error0(FXNN": unable to allocate function name");
1187             slot->name = NULL;
1188             status = RcmServer_E_NOMEMORY;
1189             goto leave;
1190         }
1192         _strcpy(slot->name, funcName);
1193         slot->key = RcmServer_getNextKey_P(obj);
1194         fxnIdx = ((UInt32)(slot->key) << _RCM_KeyShift) | (i << 12) | j;
1195     }
1197     /* error, no more room to add new symbol */
1198     else {
1199         Log_error0(FXNN": cannot add symbol, table is full");
1200         status = RcmServer_E_SYMBOLTABLEFULL;
1201         goto leave;
1202     }
1205 leave:
1206     GateThread_leave(gateH, key);
1208     /* on success, return new function index */
1209     if (status >= 0) {
1210         *index = fxnIdx;
1211     }
1212     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1213     return(status);
1215 #undef FXNN
1218 /*
1219  *  ======== RcmServer_removeSymbol ========
1220  */
1221 #define FXNN "RcmServer_removeSymbol"
1222 Int RcmServer_removeSymbol(RcmServer_Object *obj, String name)
1224     GateThread_Handle gateH;
1225     IArg key;
1226     UInt32 fxnIdx;
1227     UInt tabIdx, tabOff;
1228     RcmServer_FxnTabElem *slot;
1229     Int status = RcmServer_S_SUCCESS;
1232     Log_print2(Diags_ENTRY,
1233         "--> "FXNN": (obj=0x%x, name=0x%x)", (IArg)obj, (IArg)name);
1235     /* protect the symbol table while changing it */
1236     gateH = GateThread_handle(&obj->gate);
1237     key = GateThread_enter(gateH);
1239     /* find the symbol in the table */
1240     status = RcmServer_getSymIdx_P(obj, name, &fxnIdx);
1242     if (status < 0) {
1243         Log_error0(FXNN": given symbol not found");
1244         goto leave;
1245     }
1247     /* static symbols have bit-31 set, cannot remove these symbols */
1248     if (fxnIdx & 0x80000000) {
1249         Log_error0(FXNN": cannot remove static symbol");
1250         status = RcmServer_E_SYMBOLSTATIC;
1251         goto leave;
1252     }
1254     /* get slot pointer */
1255     tabIdx = (fxnIdx & 0xF000) >> 12;
1256     tabOff = (fxnIdx & 0xFFF);
1257     slot = (obj->fxnTab[tabIdx]) + tabOff;
1259     /* clear the table index */
1260     slot->addr.fxn = 0;
1261     if (slot->name != NULL) {
1262         xdc_runtime_Memory_free(
1263             RcmServer_Module_heap(), slot->name, _strlen(slot->name) + 1);
1264         slot->name = NULL;
1265     }
1266     slot->key = 0;
1268 leave:
1269     GateThread_leave(gateH, key);
1270     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1271     return(status);
1273 #undef FXNN
1276 /*
1277  *  ======== RcmServer_start ========
1278  */
1279 #define FXNN "RcmServer_start"
1280 Int RcmServer_start(RcmServer_Object *obj)
1282     Error_Block eb;
1283     Int status = RcmServer_S_SUCCESS;
1286     Log_print1(Diags_ENTRY, "--> "FXNN": (obj=0x%x)", (IArg)obj);
1288     Error_init(&eb);
1290     /* unblock the server thread */
1291     Semaphore_post(obj->run, &eb);
1293     if (Error_check(&eb)) {
1294         Log_error0(FXNN": semaphore post failed");
1295         status = RcmServer_E_FAIL;
1296     }
1298     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1299     return(status);
1301 #undef FXNN
1304 /*
1305  *  ======== RcmServer_acqJobId_P ========
1306  */
1307 #define FXNN "RcmServer_acqJobId_P"
1308 Int RcmServer_acqJobId_P(RcmServer_Object *obj, UInt16 *jobIdPtr)
1310     Error_Block eb;
1311     GateThread_Handle gateH;
1312     IArg key;
1313     Int count;
1314     UInt16 jobId;
1315     List_Elem *elem;
1316     RcmServer_JobStream *job;
1317     Int status = RcmServer_S_SUCCESS;
1320     Log_print2(Diags_ENTRY,
1321         "--> "FXNN": (obj=0x%x, jobIdPtr=0x%x)", (IArg)obj, (IArg)jobIdPtr);
1323     Error_init(&eb);
1324     gateH = GateThread_handle(&obj->gate);
1326     /* enter critical section */
1327     key = GateThread_enter(gateH);
1329     /* compute new job id */
1330     for (count = 0xFFFF; count > 0; count--) {
1332         /* generate a new job id */
1333         jobId = (obj->jobId == 0xFFFF ? obj->jobId = 1 : ++(obj->jobId));
1335         /* verify job id is not in use */
1336         elem = NULL;
1337         while ((elem = List_next(obj->jobList, elem)) != NULL) {
1338             job = (RcmServer_JobStream *)elem;
1339             if (jobId == job->jobId) {
1340                 jobId = RcmClient_DISCRETEJOBID;
1341                 break;
1342             }
1343         }
1345         if (jobId != RcmClient_DISCRETEJOBID) {
1346             break;
1347         }
1348     }
1350     /* check if job id was acquired */
1351     if (jobId == RcmClient_DISCRETEJOBID) {
1352         *jobIdPtr = RcmClient_DISCRETEJOBID;
1353         Log_error0(FXNN": no job id available");
1354         status = RcmServer_E_FAIL;
1355         GateThread_leave(gateH, key);
1356         goto leave;
1357     }
1359     /* create a new job steam object */
1360     job = xdc_runtime_Memory_alloc(RcmServer_Module_heap(),
1361         sizeof(RcmServer_JobStream), sizeof(Ptr), &eb);
1363     if (Error_check(&eb)) {
1364         Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
1365             (IArg)RcmServer_Module_heap(), sizeof(RcmServer_JobStream));
1366         status = RcmServer_E_NOMEMORY;
1367         GateThread_leave(gateH, key);
1368         goto leave;
1369     }
1371     /* initialize new job stream object */
1372     job->jobId = jobId;
1373     job->empty = TRUE;
1374     List_construct(&(job->msgQue), NULL);
1376     /* put new job stream object at end of server list */
1377     List_put(obj->jobList, (List_Elem *)job);
1379     /* leave critical section */
1380     GateThread_leave(gateH, key);
1382     /* return new job id */
1383     *jobIdPtr = jobId;
1386 leave:
1387     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1388     return(status);
1390 #undef FXNN
1393 /*
1394  *  ======== RcmServer_dispatch_P ========
1395  *
1396  *  Return Value
1397  *      < 0: error
1398  *        0: success, job stream queue
1399  *      > 0: success, ready queue
1400  *
1401  *  Pool id description
1402  *
1403  *  Static Worker Pools
1404  *  --------------------------------------------------------------------
1405  *  15      1 = static pool
1406  *  14:8    reserved
1407  *  7:0     offset: 0 - 255
1408  *
1409  *  Dynamic Worker Pools
1410  *  --------------------------------------------------------------------
1411  *  15      0 = dynamic pool
1412  *  14:7    key: 0 - 255
1413  *  6:5     index: 1 - 3
1414  *  4:0     offset: 0 - [7, 15, 31]
1415  */
1416 #define FXNN "RcmServer_dispatch_P"
1417 Int RcmServer_dispatch_P(RcmServer_Object *obj, RcmClient_Packet *packet)
1419     GateThread_Handle gateH;
1420     IArg key;
1421     List_Elem *elem;
1422     List_Handle listH;
1423     RcmServer_ThreadPool *pool;
1424     UInt16 jobId;
1425     RcmServer_JobStream *job;
1426     Error_Block eb;
1427     Int status = RcmServer_S_SUCCESS;
1430     Log_print2(Diags_ENTRY,
1431         "--> "FXNN": (obj=0x%x, packet=0x%x)", (IArg)obj, (IArg)packet);
1433     Error_init(&eb);
1435     /* get the target pool id from the message */
1436     status = RcmServer_getPool_P(obj, packet, &pool);
1438     if (status < 0) {
1439         goto leave;
1440     }
1442     System_printf("Rcm dispatch: p:%d j:%d f:%d l:%d\n",
1443                   packet->message.poolId, packet->message.jobId,
1444                   packet->message.fxnIdx, packet->message.dataSize);
1446     /* discrete jobs go on the end of the ready queue */
1447     jobId = packet->message.jobId;
1449     if (jobId == RcmClient_DISCRETEJOBID) {
1450         listH = List_handle(&pool->readyQueue);
1451         List_put(listH, (List_Elem *)packet);
1453         /* dispatch a new worker thread */
1454         Semaphore_post(pool->sem, &eb);
1456         if (Error_check(&eb)) {
1457             Log_error0(FXNN": semaphore post failed");
1458         }
1459     }
1461     /* must be a job stream message */
1462     else {
1463         /* must protect job list while searching it */
1464         gateH = GateThread_handle(&obj->gate);
1465         key = GateThread_enter(gateH);
1467         /* find the job stream object in the list */
1468         elem = NULL;
1469         while ((elem = List_next(obj->jobList, elem)) != NULL) {
1470             job = (RcmServer_JobStream *)elem;
1471             if (job->jobId == jobId) {
1472                 break;
1473             }
1474         }
1476         if (elem == NULL) {
1477             Log_error1(FXNN": failed to find jobId=%d", (IArg)jobId);
1478             status = RcmServer_E_JobIdNotFound;
1479         }
1481         /* if job object is empty, place message directly on ready queue */
1482         else if (job->empty) {
1483             job->empty = FALSE;
1484             listH = List_handle(&pool->readyQueue);
1485             List_put(listH, (List_Elem *)packet);
1487             /* dispatch a new worker thread */
1488             Semaphore_post(pool->sem, &eb);
1490             if (Error_check(&eb)) {
1491                 Log_error0(FXNN": semaphore post failed");
1492             }
1493         }
1495         /* place message on job queue */
1496         else {
1497             listH = List_handle(&job->msgQue);
1498             List_put(listH, (List_Elem *)packet);
1499         }
1501         GateThread_leave(gateH, key);
1502     }
1505 leave:
1506     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1507     return(status);
1509 #undef FXNN
1512 /*
1513  *  ======== RcmServer_execMsg_I ========
1514  */
1515 Int RcmServer_execMsg_I(RcmServer_Object *obj, RcmClient_Message *msg)
1517     RcmServer_MsgFxn fxn;
1518 #if USE_RPMESSAGE
1519     RcmServer_MsgCreateFxn createFxn = NULL;
1520 #endif
1521     Int status;
1523     status = RcmServer_getFxnAddr_P(obj, msg->fxnIdx, &fxn, &createFxn);
1525     if (status >= 0) {
1526 #if 0
1527         System_printf("RcmServer_execMsg_I: Calling fxnIdx: %d\n",
1528                       (msg->fxnIdx & 0x0000FFFF));
1529 #endif
1530 #if USE_RPMESSAGE
1531         if (createFxn)  {
1532             msg->result = (*createFxn)(obj, msg->dataSize, msg->data);
1533         }
1534         else {
1535             msg->result = (*fxn)(msg->dataSize, msg->data);
1536         }
1537 #else
1538         msg->result = (*fxn)(msg->dataSize, msg->data);
1539 #endif
1540     }
1542     return(status);
1546 /*
1547  *  ======== RcmServer_getFxnAddr_P ========
1548  *
1549  *  The function index (fxnIdx) uses the following format. Note that the
1550  *  format differs for static vs. dynamic functions. All static functions
1551  *  are in fxnTab[0], all dynamic functions are in fxnTab[1 - 8].
1552  *
1553  *  Bits    Description
1554  *
1555  *  Static Function Index
1556  *  --------------------------------------------------------------------
1557  *  31      static/dynamic function flag
1558  *              0 = dynamic function
1559  *              1 = static function
1560  *  30:20   reserved
1561  *  19:16   reserved
1562  *  15:0    offset: 0 - 65,535
1563  *
1564  *  Dynamic Function Index
1565  *  --------------------------------------------------------------------
1566  *  31      static/dynamic function flag
1567  *              0 = dynamic function
1568  *              1 = static function
1569  *  30:20   key
1570  *  19:16   reserved
1571  *  15:12   index: 1 - 8
1572  *  11:0    offset: 0 - [31, 63, 127, 255, 511, 1023, 2047, 4095]
1573  */
1574 #define FXNN "RcmServer_getFxnAddr_P"
1575 Int RcmServer_getFxnAddr_P(RcmServer_Object *obj, UInt32 fxnIdx,
1576         RcmServer_MsgFxn *addrPtr, RcmServer_MsgCreateFxn *createPtr)
1578     UInt i, j;
1579     UInt16 key;
1580     RcmServer_FxnTabElem *slot;
1581     RcmServer_MsgFxn addr = NULL;
1582     RcmServer_MsgCreateFxn createAddr = NULL;
1583     Int status = RcmServer_S_SUCCESS;
1585     /* static functions have bit-31 set */
1586     if (fxnIdx & 0x80000000) {
1587         j = (fxnIdx & 0x0000FFFF);
1588         if (j < (obj->fxnTabStatic.length)) {
1590             /* fetch the function address from the table */
1591             slot = (obj->fxnTab[0])+j;
1592             if (j == 0)  {
1593                  createAddr = slot->addr.createFxn;
1594             }
1595             else {
1596                  addr = slot->addr.fxn;
1597             }
1598         }
1599         else {
1600             Log_error1(FXNN": invalid function index 0x%x", (IArg)fxnIdx);
1601             status = RcmServer_E_InvalidFxnIdx;
1602             goto leave;
1603         }
1604     }
1605     /* must be a dynamic function */
1606     else {
1607         /* extract the key from the function index */
1608         key = (fxnIdx & _RCM_KeyMask) >> _RCM_KeyShift;
1610         i = (fxnIdx & 0xF000) >> 12;
1611         if ((i > 0) && (i < RcmServer_MAX_TABLES) && (obj->fxnTab[i] != NULL)) {
1613             /* fetch the function address from the table */
1614             j = (fxnIdx & 0x0FFF);
1615             slot = (obj->fxnTab[i])+j;
1616             addr = slot->addr.fxn;
1618             /* validate the key */
1619             if (key != slot->key) {
1620                 Log_error1(FXNN": invalid function index 0x%x", (IArg)fxnIdx);
1621                 status = RcmServer_E_InvalidFxnIdx;
1622                 goto leave;
1623             }
1624         }
1625         else {
1626             Log_error1(FXNN": invalid function index 0x%x", (IArg)fxnIdx);
1627             status = RcmServer_E_InvalidFxnIdx;
1628             goto leave;
1629         }
1630     }
1632 leave:
1633     if (status >= 0) {
1634        if (j == 0)  {
1635            *createPtr = createAddr;
1636        }
1637        else {
1638            *addrPtr = addr;
1639        }
1640     }
1641     return(status);
1643 #undef FXNN
1646 /* *  ======== RcmServer_getSymIdx_P ========
1647  *
1648  *  Must have table gate before calling this function.
1649  */
1650 #define FXNN "RcmServer_getSymIdx_P"
1651 Int RcmServer_getSymIdx_P(RcmServer_Object *obj, String name, UInt32 *index)
1653     UInt i, j, len;
1654     RcmServer_FxnTabElem *slot;
1655     UInt32 fxnIdx = 0xFFFFFFFF;
1656     Int status = RcmServer_S_SUCCESS;
1659     Log_print3(Diags_ENTRY,
1660         "--> "FXNN": (obj=0x%x, name=0x%x, index=0x%x)",
1661         (IArg)obj, (IArg)name, (IArg)index);
1663     /* search tables for given function name */
1664     for (i = 0; i < RcmServer_MAX_TABLES; i++) {
1665         if (obj->fxnTab[i] != NULL) {
1666             len = (i == 0) ? obj->fxnTabStatic.length : (1 << (i + 4));
1667             for (j = 0; j < len; j++) {
1668                 slot = (obj->fxnTab[i]) + j;
1669                 if ((((obj->fxnTab[i])+j)->name != NULL) &&
1670                     (_strcmp(((obj->fxnTab[i])+j)->name, name) == 0)) {
1671                     /* found function name */
1672                     if (i == 0) {
1673                         fxnIdx = 0x80000000 | j;
1674                     } else {
1675                         fxnIdx = ((UInt32)(slot->key) << _RCM_KeyShift) |
1676                                 (i << 12) | j;
1677                     }
1678                     break;
1679                 }
1680             }
1681         }
1683         if (0xFFFFFFFF != fxnIdx) {
1684             break;
1685         }
1686     }
1688     /* log an error if the symbol was not found */
1689     if (fxnIdx == 0xFFFFFFFF) {
1690         Log_error0(FXNN": given symbol not found");
1691         status = RcmServer_E_SYMBOLNOTFOUND;
1692     }
1694     /* if success, return symbol index */
1695     if (status >= 0) {
1696         *index = fxnIdx;
1697     }
1699     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1700     return(status);
1702 #undef FXNN
1705 /*
1706  *  ======== RcmServer_getNextKey_P ========
1707  */
1708 UInt16 RcmServer_getNextKey_P(RcmServer_Object *obj)
1710     GateThread_Handle gateH;
1711     IArg gateKey;
1712     UInt16 key;
1715     gateH = GateThread_handle(&obj->gate);
1716     gateKey = GateThread_enter(gateH);
1718     if (obj->key <= 1) {
1719         obj->key = _RCM_KeyResetValue;  /* don't use 0 as a key value */
1720     }
1721     else {
1722         (obj->key)--;
1723     }
1724     key = obj->key;
1726     GateThread_leave(gateH, gateKey);
1728     return(key);
1732 /*
1733  *  ======== RcmServer_getPool_P ========
1734  */
1735 #define FXNN "RcmServer_getPool_P"
1736 Int RcmServer_getPool_P(RcmServer_Object *obj,
1737         RcmClient_Packet *packet, RcmServer_ThreadPool **poolP)
1739     UInt16 poolId;
1740     UInt16 offset;
1741     Int status = RcmServer_S_SUCCESS;
1744     poolId = packet->message.poolId;
1746     /* static pools have bit-15 set */
1747     if (poolId & 0x8000) {
1748         offset = (poolId & 0x00FF);
1749         if (offset < obj->poolMap0Len) {
1750             *poolP = &(obj->poolMap[0])[offset];
1751         }
1752         else {
1753             Log_error1(FXNN": pool id=0x%x not found", (IArg)poolId);
1754             *poolP = NULL;
1755             status = RcmServer_E_PoolIdNotFound;
1756             goto leave;
1757         }
1758     }
1760 leave:
1761     return(status);
1763 #undef FXNN
1766 /*
1767  *  ======== RcmServer_process_P ========
1768  */
1769 #define FXNN "RcmServer_process_P"
1770 Void RcmServer_process_P(RcmServer_Object *obj, RcmClient_Packet *packet)
1772     String name;
1773     UInt32 fxnIdx;
1774     RcmServer_MsgFxn fxn;
1775     RcmClient_Message *rcmMsg;
1776 #if USE_RPMESSAGE
1777     RcmServer_MsgCreateFxn createFxn = NULL;
1778 #endif
1779 #if USE_RPMESSAGE == 0
1780     MessageQ_Msg msgqMsg;
1781 #endif
1782     UInt16 messageType;
1783     Error_Block eb;
1784     UInt16 jobId;
1785     Int rval;
1786     Int status = RcmServer_S_SUCCESS;
1789     Log_print2(Diags_ENTRY,
1790         "--> "FXNN": (obj=0x%x, packet=0x%x)", (IArg)obj, (IArg)packet);
1792     Error_init(&eb);
1794     /* decode the message */
1795     rcmMsg = &packet->message;
1796 #if USE_RPMESSAGE == 0
1797     msgqMsg = &packet->msgqHeader;
1798 #endif
1799     Log_print1(Diags_INFO, FXNN": message desc=0x%x", (IArg)packet->desc);
1801     /* extract the message type from the packet descriptor field */
1802     messageType = (RcmClient_Desc_TYPE_MASK & packet->desc) >>
1803         RcmClient_Desc_TYPE_SHIFT;
1805     /* process the given message */
1806     switch (messageType) {
1808         case RcmClient_Desc_RCM_MSG:
1809             rval = RcmServer_execMsg_I(obj, rcmMsg);
1811             if (rval < 0) {
1812                 switch (rval) {
1813                     case RcmServer_E_InvalidFxnIdx:
1814                         RcmServer_setStatusCode_I(
1815                             packet, RcmServer_Status_INVALID_FXN);
1816                         break;
1817                     default:
1818                         RcmServer_setStatusCode_I(
1819                             packet, RcmServer_Status_Error);
1820                         break;
1821                 }
1822             }
1823             else if (rcmMsg->result < 0) {
1824                 RcmServer_setStatusCode_I(packet, RcmServer_Status_MSG_FXN_ERR);
1825             }
1826             else {
1827                 RcmServer_setStatusCode_I(packet, RcmServer_Status_SUCCESS);
1828             }
1830 #if USE_RPMESSAGE
1831 #if 0
1832             System_printf("RcmServer_process_P: Sending reply from: %d to: %d\n",
1833                       obj->localAddr, obj->replyAddr);
1834 #endif
1836             packet->hdr.type = OMX_RAW_MSG;
1837             packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1838             status = RPMessage_send(obj->dstProc, obj->replyAddr,
1839                                  obj->localAddr, (Ptr)&packet->hdr,
1840                                  PACKET_HDR_SIZE + packet->message.dataSize);
1841 #else
1842             status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1843 #endif
1844             if (status < 0) {
1845                 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
1846             }
1847             break;
1849         case RcmClient_Desc_CMD:
1850             status = RcmServer_execMsg_I(obj, rcmMsg);
1852             /* if all went well, free the message */
1853             if ((status >= 0) && (rcmMsg->result >= 0)) {
1855 #if USE_RPMESSAGE == 0
1856                 status = MessageQ_free(msgqMsg);
1857 #endif
1858                 if (status < 0) {
1859                     Log_error1(
1860                         FXNN": MessageQ_free returned error %d", (IArg)status);
1861                 }
1862             }
1864             /* an error occurred, must return message to client */
1865             else {
1866                 if (status < 0) {
1867                     /* error trying to process the message */
1868                     switch (status) {
1869                         case RcmServer_E_InvalidFxnIdx:
1870                             RcmServer_setStatusCode_I(
1871                                 packet, RcmServer_Status_INVALID_FXN);
1872                             break;
1873                         default:
1874                             RcmServer_setStatusCode_I(
1875                                 packet, RcmServer_Status_Error);
1876                             break;
1877                     }
1878                 }
1879                 else  {
1880                     /* error in message function */
1881                     RcmServer_setStatusCode_I(
1882                         packet, RcmServer_Status_MSG_FXN_ERR);
1883                 }
1885                 /* send error message back to client */
1886 #if USE_RPMESSAGE
1887                 packet->hdr.type = OMX_RAW_MSG;
1888                 packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1889                 status = RPMessage_send(obj->dstProc, obj->replyAddr,
1890                                  obj->localAddr, (Ptr)&packet->hdr,
1891                                  PACKET_HDR_SIZE + packet->message.dataSize);
1892 #else
1893                 status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1894 #endif
1895                 if (status < 0) {
1896                     Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
1897                 }
1898             }
1899             break;
1901         case RcmClient_Desc_DPC:
1902             rval = RcmServer_getFxnAddr_P(obj, rcmMsg->fxnIdx, &fxn,
1903                                           &createFxn);
1905             if (rval < 0) {
1906                 RcmServer_setStatusCode_I(
1907                     packet, RcmServer_Status_SYMBOL_NOT_FOUND);
1908                 Error_init(&eb);
1909             }
1911 #if USE_RPMESSAGE
1912             packet->hdr.type = OMX_RAW_MSG;
1913             packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1914             status = RPMessage_send(obj->dstProc, obj->replyAddr,
1915                                  obj->localAddr, (Ptr)&packet->hdr,
1916                                  PACKET_HDR_SIZE + packet->message.dataSize);
1917 #else
1918             status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1919 #endif
1920             if (status < 0) {
1921                 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
1922             }
1924             /* invoke the function with a null context */
1925 #if USE_RPMESSAGE
1926             if (createFxn)  {
1927                  (*createFxn)(obj, 0, NULL);
1928             }
1929             else {
1930                  (*fxn)(0, NULL);
1931             }
1932 #else
1933             (*fxn)(0, NULL);
1934 #endif
1935             break;
1937         case RcmClient_Desc_SYM_ADD:
1938             break;
1940         case RcmClient_Desc_SYM_IDX:
1941             name = (String)rcmMsg->data;
1942             rval = RcmServer_getSymIdx_P(obj, name, &fxnIdx);
1944             if (rval < 0) {
1945                 RcmServer_setStatusCode_I(
1946                     packet, RcmServer_Status_SYMBOL_NOT_FOUND);
1947             }
1948             else {
1949                 RcmServer_setStatusCode_I(packet, RcmServer_Status_SUCCESS);
1950                 rcmMsg->data[0] = fxnIdx;
1951                 rcmMsg->result = 0;
1952             }
1954 #if USE_RPMESSAGE
1955             packet->hdr.type = OMX_RAW_MSG;
1956             packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1957             status = RPMessage_send(obj->dstProc, obj->replyAddr,
1958                                  obj->localAddr, (Ptr)&packet->hdr,
1959                                  PACKET_HDR_SIZE + packet->message.dataSize);
1960 #else
1961             status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1962 #endif
1963             if (status < 0) {
1964                 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
1965             }
1966             break;
1968         case RcmClient_Desc_JOB_ACQ:
1969             rval = RcmServer_acqJobId_P(obj, &jobId);
1971             if (rval < 0) {
1972                 RcmServer_setStatusCode_I(packet, RcmServer_Status_Error);
1973             }
1974             else {
1975                 RcmServer_setStatusCode_I(packet, RcmServer_Status_SUCCESS);
1976                 *(UInt16 *)(&rcmMsg->data[0]) = jobId;
1977                 rcmMsg->result = 0;
1978             }
1980 #if USE_RPMESSAGE
1981             packet->hdr.type = OMX_RAW_MSG;
1982             packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1983             status = RPMessage_send(obj->dstProc, obj->replyAddr,
1984                                  obj->localAddr, (Ptr)&packet->hdr,
1985                                  PACKET_HDR_SIZE + packet->message.dataSize);
1986 #else
1987             status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1988 #endif
1989             if (status < 0) {
1990                 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
1991             }
1992             break;
1994         case RcmClient_Desc_JOB_REL:
1995             jobId = (UInt16)(rcmMsg->data[0]);
1996             rval = RcmServer_relJobId_P(obj, jobId);
1998             if (rval < 0) {
1999                 switch (rval) {
2000                     case RcmServer_E_JobIdNotFound:
2001                         RcmServer_setStatusCode_I(
2002                             packet, RcmServer_Status_JobNotFound);
2003                         break;
2004                     default:
2005                         RcmServer_setStatusCode_I(
2006                             packet, RcmServer_Status_Error);
2007                         break;
2008                 }
2009                 rcmMsg->result = rval;
2010             }
2011             else {
2012                 RcmServer_setStatusCode_I(packet, RcmServer_Status_SUCCESS);
2013                 rcmMsg->result = 0;
2014             }
2016 #if USE_RPMESSAGE
2017             packet->hdr.type = OMX_RAW_MSG;
2018             packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
2019             status = RPMessage_send(obj->dstProc, obj->replyAddr,
2020                                  obj->localAddr, (Ptr)&packet->hdr,
2021                                  PACKET_HDR_SIZE + packet->message.dataSize);
2022 #else
2023             status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
2024 #endif
2025             if (status < 0) {
2026                 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
2027             }
2028             break;
2030         default:
2031             Log_error1(FXNN": unknown message type recieved, 0x%x",
2032                 (IArg)messageType);
2033             break;
2034     }
2036     Log_print0(Diags_EXIT, "<-- "FXNN":");
2038 #undef FXNN
2041 /*
2042  *  ======== RcmServer_relJobId_P ========
2043  */
2044 #define FXNN "RcmServer_relJobId_P"
2045 Int RcmServer_relJobId_P(RcmServer_Object *obj, UInt16 jobId)
2047     GateThread_Handle gateH;
2048     IArg key;
2049     List_Elem *elem;
2050     List_Handle msgQueH;
2051     RcmClient_Packet *packet;
2052     RcmServer_JobStream *job;
2053 #if USE_RPMESSAGE == 0
2054     MessageQ_Msg msgqMsg;
2055 #endif
2056     Int rval;
2057     Int status = RcmServer_S_SUCCESS;
2060     Log_print2(Diags_ENTRY,
2061         "--> "FXNN": (obj=0x%x, jobId=0x%x)", (IArg)obj, (IArg)jobId);
2064     /* must protect job list while searching and modifying it */
2065     gateH = GateThread_handle(&obj->gate);
2066     key = GateThread_enter(gateH);
2068     /* find the job stream object in the list */
2069     elem = NULL;
2070     while ((elem = List_next(obj->jobList, elem)) != NULL) {
2071         job = (RcmServer_JobStream *)elem;
2073         /* remove the job stream object from the list */
2074         if (job->jobId == jobId) {
2075             List_remove(obj->jobList, elem);
2076             break;
2077         }
2078     }
2080     GateThread_leave(gateH, key);
2082     if (elem == NULL) {
2083         status = RcmServer_E_JobIdNotFound;
2084         Log_error1(FXNN": failed to find jobId=%d", (IArg)jobId);
2085         goto leave;
2086     }
2088     /* return any pending messages on the message queue */
2089     msgQueH = List_handle(&job->msgQue);
2091     while ((elem = List_get(msgQueH)) != NULL) {
2092         packet = (RcmClient_Packet *)elem;
2093         Log_warning2(
2094             FXNN": returning unprocessed message, jobId=0x%x, packet=0x%x",
2095             (IArg)jobId, (IArg)packet);
2097         RcmServer_setStatusCode_I(packet, RcmServer_Status_Unprocessed);
2099 #if USE_RPMESSAGE
2100         packet->hdr.type = OMX_RAW_MSG;
2101         packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
2102         rval = RPMessage_send(obj->dstProc, obj->replyAddr,
2103                                  obj->localAddr, (Ptr)&packet->hdr,
2104                                  PACKET_HDR_SIZE + packet->message.dataSize);
2105 #else
2106         msgqMsg = &packet->msgqHeader;
2107         rval = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
2108 #endif
2109         if (rval < 0) {
2110             Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)rval);
2111         }
2112     }
2114     /* finalize the job stream object */
2115     List_destruct(&job->msgQue);
2117     xdc_runtime_Memory_free(RcmServer_Module_heap(),
2118         (Ptr)job, sizeof(RcmServer_JobStream));
2121 leave:
2122     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
2123     return(status);
2125 #undef FXNN
2128 /*
2129  *  ======== RcmServer_serverThrFxn_P ========
2130  */
2131 #define FXNN "RcmServer_serverThrFxn_P"
2132 Void RcmServer_serverThrFxn_P(IArg arg)
2134     Error_Block eb;
2135     RcmClient_Packet *packet;
2136 #if USE_RPMESSAGE
2137     Char         recvBuf[MSGBUFFERSIZE];
2138     UInt16       len;
2139 #else
2140     MessageQ_Msg msgqMsg = NULL;
2141 #endif
2142     Int rval;
2143     Bool running = TRUE;
2144     RcmServer_Object *obj = (RcmServer_Object *)arg;
2145     Int dataSize;
2147 #if USE_RPMESSAGE
2148     packet = (RcmClient_Packet *)&recvBuf[0];
2149 #endif
2151     Log_print1(Diags_ENTRY, "--> "FXNN": (arg=0x%x)", arg);
2153     Error_init(&eb);
2155     /* wait until ready to run */
2156     Semaphore_pend(obj->run, Semaphore_FOREVER, &eb);
2158     if (Error_check(&eb)) {
2159         Log_error0(FXNN": Semaphore_pend failure in server thread");
2160     }
2162     /* main processing loop */
2163     while (running) {
2164         Log_print1(Diags_INFO,
2165             FXNN": waiting for message, thread=0x%x",
2166             (IArg)(obj->serverThread));
2168         /* block until message arrives */
2169         do {
2170 #if USE_RPMESSAGE
2171             rval = RPMessage_recv(obj->serverQue, (Ptr)&packet->hdr, &len,
2172                       &obj->replyAddr, RPMessage_FOREVER);
2173 #if 0
2174             System_printf("RcmServer_serverThrFxn_P: Received msg of len %d "
2175                           "from: %d\n",
2176                          len, obj->replyAddr);
2178             System_printf("hdr - t:%d f:%d l:%d\n", packet->hdr.type,
2179                           packet->hdr.flags, packet->hdr.len);
2181             System_printf("pkt - d:%d m:%d\n", packet->desc, packet->msgId);
2182 #endif
2183             if (packet->hdr.type == OMX_DISC_REQ) {
2184                 System_printf("RcmServer_serverThrFxn_P: Got OMX_DISCONNECT\n");
2185             }
2186             Assert_isTrue((len <= MSGBUFFERSIZE), NULL);
2187             Assert_isTrue((packet->hdr.type == OMX_RAW_MSG) ||
2188                           (packet->hdr.type == OMX_DISC_REQ) , NULL);
2190             if ((rval < 0) && (rval != RPMessage_E_UNBLOCKED)) {
2191 #else
2192             rval = MessageQ_get(obj->serverQue, &msgqMsg, MessageQ_FOREVER);
2193             if ((rval < 0) && (rval != MessageQ_E_UNBLOCKED)) {
2194 #endif
2195                 Log_error1(FXNN": ipc error 0x%x", (IArg)rval);
2196                 /* keep running and hope for the best */
2197             }
2198 #if USE_RPMESSAGE
2199         } while (FALSE);
2200 #else
2201         } while ((msgqMsg == NULL) && !obj->shutdown);
2202 #endif
2204         /* if shutdown, exit this thread */
2205 #if USE_RPMESSAGE
2206         if (obj->shutdown || packet->hdr.type == OMX_DISC_REQ) {
2207             running = FALSE;
2208             Log_print1(Diags_INFO,
2209                 FXNN": terminating, thread=0x%x", (IArg)(obj->serverThread));
2210             continue;
2211         }
2212 #else
2213         if (obj->shutdown) {
2214             running = FALSE;
2215             Log_print1(Diags_INFO,
2216                 FXNN": terminating, thread=0x%x", (IArg)(obj->serverThread));
2217             if (msgqMsg == NULL ) {
2218                 continue;
2219             }
2220         }
2221 #endif
2223 #if USE_RPMESSAGE == 0
2224         packet = (RcmClient_Packet *)msgqMsg;
2225 #endif
2227         Log_print2(Diags_INFO,
2228             FXNN": message received, thread=0x%x packet=0x%x",
2229             (IArg)(obj->serverThread), (IArg)packet);
2231         if ((packet->message.poolId == RcmClient_DEFAULTPOOLID)
2232             && ((obj->poolMap[0])[0].count == 0)) {
2234             /* in-band (server thread) message processing */
2235             RcmServer_process_P(obj, packet);
2236         }
2237         else {
2238             /* out-of-band (worker thread) message processing */
2239             rval = RcmServer_dispatch_P(obj, packet);
2241             /* if error, message was not dispatched; must return to client */
2242             if (rval < 0) {
2243                 switch (rval) {
2244                     case RcmServer_E_JobIdNotFound:
2245                         RcmServer_setStatusCode_I(
2246                             packet, RcmServer_Status_JobNotFound);
2247                         break;
2249                     case RcmServer_E_PoolIdNotFound:
2250                         RcmServer_setStatusCode_I(
2251                             packet, RcmServer_Status_PoolNotFound);
2252                         break;
2254                     default:
2255                         RcmServer_setStatusCode_I(
2256                             packet, RcmServer_Status_Error);
2257                         break;
2258                 }
2259                 packet->message.result = rval;
2261                 /* return the message to the client */
2262 #if USE_RPMESSAGE
2263 #if 0
2264                 System_printf("RcmServer_serverThrFxn_P: "
2265                               "Sending response from: %d to: %d\n",
2266                               obj->localAddr, obj->replyAddr);
2267                 System_printf("sending: %d + %d\n", PACKET_HDR_SIZE,
2268                               packet->message.dataSize);
2269 #endif
2270                 packet->hdr.type = OMX_RAW_MSG;
2271                 if (rval < 0) {
2272                     packet->hdr.len = sizeof(UInt32);
2273                     packet->desc = 0x4142;
2274                     packet->msgId = 0x0044;
2275                     dataSize = sizeof(struct rpmsg_omx_hdr) + sizeof(UInt32);
2276                 }
2277                 else {
2278                     packet->hdr.len = PACKET_DATA_SIZE +
2279                         packet->message.dataSize;
2280                     dataSize = PACKET_HDR_SIZE + packet->message.dataSize;
2281                 }
2282                 rval = RPMessage_send(obj->dstProc, obj->replyAddr,
2283                                  obj->localAddr, (Ptr)&packet->hdr, dataSize);
2284 #else
2285                 rval = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
2286 #endif
2287                 if (rval < 0) {
2288                     Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)rval);
2289                 }
2290             }
2291         }
2292     }
2294     System_printf("RcmServer_serverThrFxn_P: Exiting thread.\n");
2296     Log_print0(Diags_EXIT, "<-- "FXNN":");
2298 #undef FXNN
2301 /*
2302  *  ======== RcmServer_setStatusCode_I ========
2303  */
2304 Void RcmServer_setStatusCode_I(RcmClient_Packet *packet, UInt16 code)
2307     /* code must be 0 - 15, it has to fit in a 4-bit field */
2308     Assert_isTrue((code < 16), NULL);
2310     packet->desc &= ~(RcmClient_Desc_TYPE_MASK);
2311     packet->desc |= ((code << RcmClient_Desc_TYPE_SHIFT)
2312         & RcmClient_Desc_TYPE_MASK);
2316 /*
2317  *  ======== RcmServer_workerThrFxn_P ========
2318  */
2319 #define FXNN "RcmServer_workerThrFxn_P"
2320 Void RcmServer_workerThrFxn_P(IArg arg)
2322     Error_Block eb;
2323     RcmClient_Packet *packet;
2324     List_Elem *elem;
2325     List_Handle listH;
2326     List_Handle readyQueueH;
2327     UInt16 jobId;
2328     GateThread_Handle gateH;
2329     IArg key;
2330     RcmServer_ThreadPool *pool;
2331     RcmServer_JobStream *job;
2332     RcmServer_WorkerThread *obj;
2333     Bool running;
2334     Int rval;
2337     Log_print1(Diags_ENTRY, "--> "FXNN": (arg=0x%x)", arg);
2339     Error_init(&eb);
2340     obj = (RcmServer_WorkerThread *)arg;
2341     readyQueueH = List_handle(&obj->pool->readyQueue);
2342     packet = NULL;
2343     running = TRUE;
2345     /* main processing loop */
2346     while (running) {
2347         Log_print1(Diags_INFO,
2348             FXNN": waiting for job, thread=0x%x", (IArg)(obj->thread));
2350         /* if no current message, wait until signaled to run */
2351         if (packet == NULL) {
2352             Semaphore_pend(obj->pool->sem, Semaphore_FOREVER, &eb);
2354             if (Error_check(&eb)) {
2355                 Log_error0(FXNN": semaphore pend failed");
2356             }
2357         }
2359         /* check if thread should terminate */
2360         if (obj->terminate) {
2361             running = FALSE;
2362             Log_print1(Diags_INFO,
2363                 FXNN": terminating, thread=0x%x", (IArg)(obj->thread));
2364             continue;
2365         }
2367         /* get next message from ready queue */
2368         if (packet == NULL) {
2369             packet = (RcmClient_Packet *)List_get(readyQueueH);
2370         }
2372         if (packet == NULL) {
2373             Log_error1(FXNN": ready queue is empty, thread=0x%x",
2374                 (IArg)(obj->thread));
2375             continue;
2376         }
2378         Log_print2(Diags_INFO, FXNN": job received, thread=0x%x packet=0x%x",
2379             (IArg)obj->thread, (IArg)packet);
2381         /* remember the message job id */
2382         jobId = packet->message.jobId;
2384         /* process the message */
2385         RcmServer_process_P(obj->server, packet);
2386         packet = NULL;
2388         /* If this worker thread just finished processing a job message,
2389          * queue up the next message for this job id. As an optimization,
2390          * if the message is addressed to this worker's pool, then don't
2391          * signal the semaphore, just get the next message from the queue
2392          * and processes it. This keeps the current thread running instead
2393          * of switching to another thread.
2394          */
2395         if (jobId != RcmClient_DISCRETEJOBID) {
2397             /* must protect job list while searching it */
2398             gateH = GateThread_handle(&obj->server->gate);
2399             key = GateThread_enter(gateH);
2401             /* find the job object in the list */
2402             elem = NULL;
2403             while ((elem = List_next(obj->server->jobList, elem)) != NULL) {
2404                 job = (RcmServer_JobStream *)elem;
2405                 if (job->jobId == jobId) {
2406                     break;
2407                 }
2408             }
2410             /* if job object not found, it is not an error */
2411             if (elem == NULL) {
2412                 GateThread_leave(gateH, key);
2413                 continue;
2414             }
2416             /* found the job object */
2417             listH = List_handle(&job->msgQue);
2419             /* get next job message and either process it or queue it */
2420             do {
2421                 elem = List_get(listH);
2423                 if (elem == NULL) {
2424                     job->empty = TRUE;  /* no more messages */
2425                     break;
2426                 }
2427                 else {
2428                     /* get target pool id */
2429                     packet = (RcmClient_Packet *)elem;
2430                     rval = RcmServer_getPool_P(obj->server, packet, &pool);
2432                     /* if error, return the message to the client */
2433                     if (rval < 0) {
2434                         switch (rval) {
2435                             case RcmServer_E_PoolIdNotFound:
2436                                 RcmServer_setStatusCode_I(
2437                                     packet, RcmServer_Status_PoolNotFound);
2438                                 break;
2440                             default:
2441                                 RcmServer_setStatusCode_I(
2442                                     packet, RcmServer_Status_Error);
2443                                 break;
2444                         }
2445                         packet->message.result = rval;
2447 #if USE_RPMESSAGE
2448                         packet->hdr.type = OMX_RAW_MSG;
2449                         packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
2450                         rval = RPMessage_send(
2451                                  (obj->server)->dstProc,
2452                                  (obj->server)->replyAddr,
2453                                  (obj->server)->localAddr, (Ptr)&packet->hdr,
2454                                  PACKET_HDR_SIZE + packet->message.dataSize);
2455 #else
2456                         rval = MessageQ_put(
2457                             MessageQ_getReplyQueue(&packet->msgqHeader),
2458                             &packet->msgqHeader);
2459 #endif
2460                         if (rval < 0) {
2461                             Log_error1(
2462                                 FXNN": unknown ipc error, 0x%x", (IArg)rval);
2463                         }
2464                     }
2465                     /* packet is valid, queue it in the corresponding pool's
2466                      * ready queue */
2467                     else {
2468                         listH = List_handle(&pool->readyQueue);
2469                         List_put(listH, elem);
2470                         packet = NULL;
2471                         Semaphore_post(pool->sem, &eb);
2473                         if (Error_check(&eb)) {
2474                             Log_error0(FXNN": semaphore post failed");
2475                         }
2476                     }
2478                     /* loop around and wait to be run again */
2479                 }
2481             } while (rval < 0);
2483             GateThread_leave(gateH, key);
2484         }
2485     }  /* while (running) */
2487     Log_print0(Diags_EXIT, "<-- "FXNN":");
2489 #undef FXNN
2492 /*
2493  *  ======== RcmServer_getLocalAddress ========
2494  */
2495 UInt32 RcmServer_getLocalAddress(RcmServer_Object *obj)
2497     return(obj->localAddr);
2501 /*
2502  *  ======== RcmServer_getRemoteAddress ========
2503  */
2504 UInt32 RcmServer_getRemoteAddress(RcmServer_Object *obj)
2506     return(obj->replyAddr);
2511 /*
2512  *  ======== RcmServer_getRemoteProc ========
2513  */
2514 UInt16  RcmServer_getRemoteProc(RcmServer_Object *obj)
2516     return(obj->dstProc);