168030b010824fa451008eed0e1d99f3e1e72022
[ipc/ipcdev.git] / packages / ti / grcm / RcmServer.c
1 /*
2  * Copyright (c) 2011-2013, Texas Instruments Incorporated
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
33 /*
34  *  ======== RcmServer.c ========
35  *
36  */
38 /* this define must precede inclusion of any xdc header file */
39 #define Registry_CURDESC ti_grcm_RcmServer__Desc
40 #define MODULE_NAME "ti.grcm.RcmServer"
42 #define xdc_runtime_Memory__nolocalnames  /* short name clashes with SysLink */
44 /* rtsc header files */
45 #include <xdc/std.h>
46 #include <xdc/runtime/Assert.h>
47 #include <xdc/runtime/Diags.h>
48 #include <xdc/runtime/Error.h>
49 #include <xdc/runtime/IHeap.h>
50 #include <xdc/runtime/Log.h>
51 #include <xdc/runtime/Memory.h>
52 #include <xdc/runtime/Registry.h>
53 #include <xdc/runtime/Startup.h>
54 #include <xdc/runtime/knl/GateThread.h>
55 #include <xdc/runtime/knl/ISemaphore.h>
56 #include <xdc/runtime/knl/Semaphore.h>
57 #include <xdc/runtime/knl/SemThread.h>
58 #include <xdc/runtime/knl/Thread.h>
59 #include <xdc/runtime/System.h>
61 #define MSGBUFFERSIZE    512   // Make global and move to RPMessage.h
63 #if defined(RCM_ti_ipc)
64 #include <ti/sdo/utils/List.h>
65 #include <ti/ipc/MultiProc.h>
67 #elif defined(RCM_ti_syslink)
68 #include <ti/syslink/utils/List.h>
69 #define List_Struct List_Object
70 #define List_handle(exp) (exp)
72 #else
73     #error "undefined ipc binding";
74 #endif
76 /* local header files */
77 #include "RcmClient.h"
78 #include "RcmTypes.h"
79 #include "RcmServer.h"
81 #if USE_RPMESSAGE
82 #include <ti/srvmgr/rpmsg_omx.h>
83 #endif
85 #define _RCM_KeyResetValue 0x07FF       // key reset value
86 #define _RCM_KeyMask 0x7FF00000         // key mask in function index
87 #define _RCM_KeyShift 20                // key bit position in function index
89 #define RcmServer_MAX_TABLES 9          // max number of function tables
90 #define RcmServer_POOL_MAP_LEN 4        // pool map length
92 #define RcmServer_E_InvalidFxnIdx       (-101)
93 #define RcmServer_E_JobIdNotFound       (-102)
94 #define RcmServer_E_PoolIdNotFound      (-103)
96 typedef struct {                        // function table element
97     String                      name;
98 #if USE_RPMESSAGE
99     union  {
100        RcmServer_MsgFxn         fxn;
101        RcmServer_MsgCreateFxn   createFxn;
102     }addr;
103 #else
104     RcmServer_MsgFxn            addr;
105 #endif
106     UInt16                      key;
107 } RcmServer_FxnTabElem;
109 typedef struct {
110     Int                         length;
111     RcmServer_FxnTabElem *      elem;
112 } RcmServer_FxnTabElemAry;
114 typedef struct {
115     String                      name;       // pool name
116     Int                         count;      // thread count (at create time)
117     Thread_Priority             priority;   // thread priority
118     Int                         osPriority;
119     SizeT                       stackSize;  // thread stack size
120     String                      stackSeg;   // thread stack placement
121     ISemaphore_Handle           sem;        // message semaphore (counting)
122     List_Struct                 threadList; // list of worker threads
123     List_Struct                 readyQueue; // queue of messages
124 } RcmServer_ThreadPool;
126 typedef struct RcmServer_Object_tag {
127     GateThread_Struct           gate;       // instance gate
128     Ptr                         run;        // run semaphore for the server
129 #if USE_RPMESSAGE
130     RPMessage_Handle         serverQue;  // inbound message queue
131     UInt32                      localAddr;  // inbound message queue address
132     UInt32                      replyAddr;  // Reply address (same per inst.)
133     UInt32                      dstProc;    // Reply processor.
134 #else
135     MessageQ_Handle             serverQue;  // inbound message queue
136 #endif
137     Thread_Handle               serverThread; // server thread object
138     RcmServer_FxnTabElemAry     fxnTabStatic; // static function table
139     RcmServer_FxnTabElem *      fxnTab[RcmServer_MAX_TABLES]; // base pointers
140     UInt16                      key;        // function index key
141     UInt16                      jobId;      // job id tracker
142     Bool                        shutdown;   // server shutdown flag
143     Int                         poolMap0Len;// length of static table
144     RcmServer_ThreadPool *      poolMap[RcmServer_POOL_MAP_LEN];
145     List_Handle                 jobList;    // list of job stream queues
146 } RcmServer_Object;
148 typedef struct {
149     List_Elem                   elem;
150     UInt16                      jobId;      // current job stream id
151     Thread_Handle               thread;     // server thread object
152     Bool                        terminate;  // thread terminate flag
153     RcmServer_ThreadPool*       pool;       // worker pool
154     RcmServer_Object *          server;     // server instance
155 } RcmServer_WorkerThread;
157 typedef struct {
158     List_Elem                   elem;
159     UInt16                      jobId;      // job stream id
160     Bool                        empty;      // true if no messages on server
161     List_Struct                 msgQue;     // queue of messages
162 } RcmServer_JobStream;
164 typedef struct RcmServer_Module_tag {
165     String              name;
166     IHeap_Handle        heap;
167 } RcmServer_Module;
170 /* private functions */
171 static
172 Int RcmServer_Instance_init_P(
173         RcmServer_Object *              obj,
174         String                          name,
175         const RcmServer_Params *        params
176     );
178 static
179 Int RcmServer_Instance_finalize_P(
180         RcmServer_Object *              obj
181     );
183 static
184 Int RcmServer_acqJobId_P(
185         RcmServer_Object *              obj,
186         UInt16 *                        jobIdPtr
187     );
189 static
190 Int RcmServer_dispatch_P(
191         RcmServer_Object *              obj,
192         RcmClient_Packet *              packet
193     );
195 static
196 Int RcmServer_execMsg_I(
197         RcmServer_Object *              obj,
198         RcmClient_Message *             msg
199     );
201 static
202 Int RcmServer_getFxnAddr_P(
203         RcmServer_Object *              obj,
204         UInt32                          fxnIdx,
205         RcmServer_MsgFxn *              addrPtr,
206         RcmServer_MsgCreateFxn *        createPtr
207     );
209 static
210 UInt16 RcmServer_getNextKey_P(
211         RcmServer_Object *              obj
212     );
214 static
215 Int RcmServer_getSymIdx_P(
216         RcmServer_Object *              obj,
217         String                          name,
218         UInt32 *                        index
219     );
221 static
222 Int RcmServer_getPool_P(
223         RcmServer_Object *              obj,
224         RcmClient_Packet *              packet,
225         RcmServer_ThreadPool **         poolP
226     );
228 static
229 Void RcmServer_process_P(
230         RcmServer_Object *              obj,
231         RcmClient_Packet *              packet
232     );
234 static
235 Int RcmServer_relJobId_P(
236         RcmServer_Object *              obj,
237         UInt16                          jobId
238     );
240 static
241 Void RcmServer_serverThrFxn_P(
242         IArg                            arg
243     );
245 static inline
246 Void RcmServer_setStatusCode_I(
247         RcmClient_Packet *              packet,
248         UInt16                          code
249     );
251 static
252 Void RcmServer_workerThrFxn_P(
253         IArg                            arg
254     );
257 #define RcmServer_Module_heap() (RcmServer_Mod.heap)
260 /* static objects */
261 static Int curInit = 0;
262 static Char ti_grcm_RcmServer_Name[] = {
263         't','i','.','s','d','o','.','r','c','m','.',
264         'R','c','m','S','e','r','v','e','r','\0'
265     };
267 static RcmServer_Module RcmServer_Mod = {
268     MODULE_NAME,        /* name */
269     (IHeap_Handle)NULL  /* heap */
270 };
272 /* module diags mask */
273 Registry_Desc Registry_CURDESC;
276 /*
277  *  ======== RcmServer_init ========
278  *
279  *  This function must be serialized by the caller
280  */
281 Void RcmServer_init(Void)
283     Registry_Result result;
286     if (curInit++ != 0) {
287         return; /* module already initialized */
288     }
290     /* register with xdc.runtime to get a diags mask */
291 //  result = Registry_addModule(&Registry_CURDESC, MODULE_NAME);
292     result = Registry_addModule(&Registry_CURDESC, ti_grcm_RcmServer_Name);
293     Assert_isTrue(result == Registry_SUCCESS, (Assert_Id)NULL);
295     /* the size of object and struct must be the same */
296     Assert_isTrue(sizeof(RcmServer_Object) == sizeof(RcmServer_Struct), NULL);
300 /*
301  *  ======== RcmServer_exit ========
302  *
303  *  This function must be serialized by the caller
304  */
305 Void RcmServer_exit(Void)
307 //  Registry_Result result;
310     if (--curInit != 0) {
311         return; /* module still in use */
312     }
314     /* unregister from xdc.runtime */
315 //  result = Registry_removeModule(MODULE_NAME);
316 //  Assert_isTrue(result == Registry_SUCCESS, (Assert_Id)NULL);
320 /*
321  *  ======== RcmServer_Params_init ========
322  */
323 Void RcmServer_Params_init(RcmServer_Params *params)
325     /* server thread */
326     params->priority = Thread_Priority_HIGHEST;
327     params->osPriority = Thread_INVALID_OS_PRIORITY;
328     params->stackSize = 0;  // use system default
329     params->stackSeg = "";
331     /* default pool */
332     params->defaultPool.name = NULL;
333     params->defaultPool.count = 0;
334     params->defaultPool.priority = Thread_Priority_NORMAL;
335     params->defaultPool.osPriority = Thread_INVALID_OS_PRIORITY;
336     params->defaultPool.stackSize = 0;  // use system default
337     params->defaultPool.stackSeg = "";
339     /* worker pools */
340     params->workerPools.length = 0;
341     params->workerPools.elem = NULL;
343     /* function table */
344     params->fxns.length = 0;
345     params->fxns.elem = NULL;
349 /*
350  *  ======== RcmServer_create ========
351  */
352 #define FXNN "RcmServer_create"
353 Int RcmServer_create(String name, RcmServer_Params *params,
354         RcmServer_Handle *handle)
356     RcmServer_Object *obj;
357     Error_Block eb;
358     Int status = RcmServer_S_SUCCESS;
361     Log_print3(Diags_ENTRY, "--> "FXNN": (name=0x%x, params=0x%x, hP=0x%x)",
362         (IArg)name, (IArg)params, (IArg)handle);
364     /* initialize the error block */
365     Error_init(&eb);
367     if (NULL == handle) {
368         Log_error0(FXNN": Invalid pointer");
369         status = RcmServer_E_FAIL;
370         goto leave;
371     }
373     *handle = (RcmServer_Handle)NULL;
375     /* check for valid params */
376     if (NULL == params) {
377         Log_error0(FXNN": params ptr must not be NULL");
378         status = RcmServer_E_FAIL;
379         goto leave;
380     }
381     if (NULL == name) {
382         Log_error0(FXNN": name passed is NULL!");
383         status = RcmServer_E_FAIL;
384         goto leave;
385     }
387     /* allocate the object */
388     obj = (RcmServer_Handle)xdc_runtime_Memory_calloc(RcmServer_Module_heap(),
389         sizeof(RcmServer_Object), sizeof(Int), &eb);
391     if (NULL == obj) {
392         Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
393             (IArg)RcmServer_Module_heap(), sizeof(RcmServer_Object));
394         status = RcmServer_E_NOMEMORY;
395         goto leave;
396     }
398     Log_print1(Diags_LIFECYCLE, FXNN": instance create: 0x%x", (IArg)obj);
400     /* object-specific initialization */
401     status = RcmServer_Instance_init_P(obj, name, params);
403     if (status < 0) {
404         RcmServer_Instance_finalize_P(obj);
405         xdc_runtime_Memory_free(RcmServer_Module_heap(),
406             (Ptr)obj, sizeof(RcmServer_Object));
407         goto leave;
408     }
410     /* success, return opaque pointer */
411     *handle = (RcmServer_Handle)obj;
414 leave:
415     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
416     return(status);
418 #undef FXNN
421 /*
422  *  ======== RcmServer_construct ========
423  */
424 #define FXNN "RcmServer_construct"
425 Int RcmServer_construct(RcmServer_Struct *structPtr, String name,
426     const RcmServer_Params *params)
428     RcmServer_Object *obj = (RcmServer_Object*)structPtr;
429     Int status = RcmServer_S_SUCCESS;
432     Log_print1(Diags_ENTRY, "--> "FXNN": (structPtr=0x%x)", (IArg)structPtr);
433     Log_print1(Diags_LIFECYCLE, FXNN": instance construct: 0x%x", (IArg)obj);
435     /* ensure the constructed object is zeroed */
436     _memset((Void *)obj, 0, sizeof(RcmServer_Object));
438     /* object-specific initialization */
439     status = RcmServer_Instance_init_P(obj, name, params);
441     if (status < 0) {
442         RcmServer_Instance_finalize_P(obj);
443         goto leave;
444     }
447 leave:
448     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
449     return(status);
451 #undef FXNN
454 /*
455  *  ======== RcmServer_Instance_init_P ========
456  */
457 #define FXNN "RcmServer_Instance_init_P"
458 Int RcmServer_Instance_init_P(RcmServer_Object *obj, String name,
459         const RcmServer_Params *params)
461     Error_Block eb;
462     List_Params listP;
463 #if USE_RPMESSAGE == 0
464     MessageQ_Params mqParams;
465 #endif
466     Thread_Params threadP;
467     SemThread_Params semThreadP;
468     SemThread_Handle semThreadH;
469     Int i, j;
470     SizeT size;
471     Char *cp;
472     RcmServer_ThreadPool *poolAry;
473     RcmServer_WorkerThread *worker;
474     List_Handle listH;
475     Int status = RcmServer_S_SUCCESS;
478     Log_print1(Diags_ENTRY, "--> "FXNN": (obj=0x%x)", (IArg)obj);
480     Error_init(&eb);
482     /* initialize instance state */
483     obj->shutdown = FALSE;
484     obj->key = 0;
485     obj->jobId = 0xFFFF;
486     obj->run = NULL;
487     obj->serverQue = NULL;
488     obj->serverThread = NULL;
489     obj->fxnTabStatic.length = 0;
490     obj->fxnTabStatic.elem = NULL;
491     obj->poolMap0Len = 0;
492     obj->jobList = NULL;
495     /* initialize the function table */
496     for (i = 0; i < RcmServer_MAX_TABLES; i++) {
497         obj->fxnTab[i] = NULL;
498     }
500     /* initialize the worker pool map */
501     for (i = 0; i < RcmServer_POOL_MAP_LEN; i++) {
502         obj->poolMap[i] = NULL;
503     }
505     /* create the instance gate */
506     GateThread_construct(&obj->gate, NULL, &eb);
508     if (Error_check(&eb)) {
509         Log_error0(FXNN": could not create gate object");
510         status = RcmServer_E_FAIL;
511         goto leave;
512     }
514     /* create list for job objects */
515 #if defined(RCM_ti_ipc)
516     List_Params_init(&listP);
517     obj->jobList = List_create(&listP, &eb);
519     if (Error_check(&eb)) {
520         Log_error0(FXNN": could not create list object");
521         status = RcmServer_E_FAIL;
522         goto leave;
523     }
524 #elif defined(RCM_ti_syslink)
525     List_Params_init(&listP);
526     obj->jobList = List_create(&listP, NULL);
528     if (obj->jobList == NULL) {
529         Log_error0(FXNN": could not create list object");
530         status = RcmServer_E_FAIL;
531         goto leave;
532     }
533 #endif
535     /* create the static function table */
536     if (params->fxns.length > 0) {
537         obj->fxnTabStatic.length = params->fxns.length;
539         /* allocate static function table */
540         size = params->fxns.length * sizeof(RcmServer_FxnTabElem);
541         obj->fxnTabStatic.elem = xdc_runtime_Memory_alloc(
542             RcmServer_Module_heap(), size, sizeof(Ptr), &eb);
544         if (Error_check(&eb)) {
545             Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
546                 (IArg)RcmServer_Module_heap(), size);
547             status = RcmServer_E_NOMEMORY;
548             goto leave;
549         }
550         obj->fxnTabStatic.elem[0].name = NULL;
552         /* allocate a single block to store all name strings */
553         for (size = 0, i = 0; i < params->fxns.length; i++) {
554             size += _strlen(params->fxns.elem[i].name) + 1;
555         }
556         cp = xdc_runtime_Memory_alloc(
557             RcmServer_Module_heap(), size, sizeof(Ptr), &eb);
559         if (Error_check(&eb)) {
560             Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
561                 (IArg)RcmServer_Module_heap(), size);
562             status = RcmServer_E_NOMEMORY;
563             goto leave;
564         }
566         /* copy function table data into allocated memory blocks */
567         for (i = 0; i < params->fxns.length; i++) {
568             _strcpy(cp, params->fxns.elem[i].name);
569             obj->fxnTabStatic.elem[i].name = cp;
570             cp += (_strlen(params->fxns.elem[i].name) + 1);
571             obj->fxnTabStatic.elem[i].addr.fxn = params->fxns.elem[i].addr.fxn;
572             obj->fxnTabStatic.elem[i].key = 0;
573         }
575         /* hook up the static function table */
576         obj->fxnTab[0] = obj->fxnTabStatic.elem;
577     }
579     /* create static worker pools */
580     if ((params->workerPools.length + 1) > RcmServer_POOL_MAP_LEN) {
581         Log_error1(FXNN": Exceeded maximum number of worker pools =%d",
582             (IArg) (params->workerPools.length) );
583         status = RcmServer_E_NOMEMORY;
584         goto leave;
585     }
586     obj->poolMap0Len = params->workerPools.length + 1; /* workers + default */
588     /* allocate the static worker pool table */
589     size = obj->poolMap0Len * sizeof(RcmServer_ThreadPool);
590     obj->poolMap[0] = (RcmServer_ThreadPool *)xdc_runtime_Memory_alloc(
591         RcmServer_Module_heap(), size, sizeof(Ptr), &eb);
593     if (Error_check(&eb)) {
594         Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
595             (IArg)RcmServer_Module_heap(), size);
596         status = RcmServer_E_NOMEMORY;
597         goto leave;
598     }
600     /* convenience alias */
601     poolAry = obj->poolMap[0];
603     /* allocate a single block to store all name strings         */
604     /* Buffer format is: [SIZE][DPN][\0][WPN1][\0][WPN2][\0].... */
605     /* DPN = Default Pool Name, WPN = Worker Pool Name           */
606     /* In case, DPN is NULL, format is: [SIZE][\0][WPN1][\0].... */
607     /* In case, WPN is NULL, format is: [SIZE][DPN][\0]          */
608     size = sizeof(SizeT) /* block size */
609         + (params->defaultPool.name == NULL ? 1 :
610         _strlen(params->defaultPool.name) + 1);
612     for (i = 0; i < params->workerPools.length; i++) {
613         size += (params->workerPools.elem[i].name == NULL ? 0 :
614             _strlen(params->workerPools.elem[i].name) + 1);
615     }
616     cp = xdc_runtime_Memory_alloc(
617         RcmServer_Module_heap(), size, sizeof(Ptr), &eb);
619     if (Error_check(&eb)) {
620         Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
621             (IArg)RcmServer_Module_heap(), size);
622         status = RcmServer_E_NOMEMORY;
623         goto leave;
624     }
626     *(SizeT *)cp = size;
627     cp += sizeof(SizeT);
629     /* initialize the default worker pool, poolAry[0] */
630     if (params->defaultPool.name != NULL) {
631         _strcpy(cp, params->defaultPool.name);
632         poolAry[0].name = cp;
633         cp += (_strlen(params->defaultPool.name) + 1);
634     }
635     else {
636         poolAry[0].name = cp;
637         *cp++ = '\0';
638     }
640     poolAry[0].count = params->defaultPool.count;
641     poolAry[0].priority = params->defaultPool.priority;
642     poolAry[0].osPriority = params->defaultPool.osPriority;
643     poolAry[0].stackSize = params->defaultPool.stackSize;
644     poolAry[0].stackSeg = NULL;
645     poolAry[0].sem = NULL;
647     List_construct(&(poolAry[0].threadList), NULL);
648     List_construct(&(poolAry[0].readyQueue), NULL);
650     SemThread_Params_init(&semThreadP);
651     semThreadP.mode = SemThread_Mode_COUNTING;
653     semThreadH = SemThread_create(0, &semThreadP, &eb);
655     if (Error_check(&eb)) {
656         Log_error0(FXNN": could not create semaphore");
657         status = RcmServer_E_FAIL;
658         goto leave;
659     }
661     poolAry[0].sem = SemThread_Handle_upCast(semThreadH);
663     /* initialize the static worker pools, poolAry[1..(n-1)] */
664     for (i = 0; i < params->workerPools.length; i++) {
665         if (params->workerPools.elem[i].name != NULL) {
666             _strcpy(cp, params->workerPools.elem[i].name);
667             poolAry[i+1].name = cp;
668             cp += (_strlen(params->workerPools.elem[i].name) + 1);
669         }
670         else {
671             poolAry[i+1].name = NULL;
672         }
674         poolAry[i+1].count = params->workerPools.elem[i].count;
675         poolAry[i+1].priority = params->workerPools.elem[i].priority;
676         poolAry[i+1].osPriority =params->workerPools.elem[i].osPriority;
677         poolAry[i+1].stackSize = params->workerPools.elem[i].stackSize;
678         poolAry[i+1].stackSeg = NULL;
680         List_construct(&(poolAry[i+1].threadList), NULL);
681         List_construct(&(poolAry[i+1].readyQueue), NULL);
683         SemThread_Params_init(&semThreadP);
684         semThreadP.mode = SemThread_Mode_COUNTING;
686         semThreadH = SemThread_create(0, &semThreadP, &eb);
688         if (Error_check(&eb)) {
689             Log_error0(FXNN": could not create semaphore");
690             status = RcmServer_E_FAIL;
691             goto leave;
692         }
694         poolAry[i+1].sem = SemThread_Handle_upCast(semThreadH);
695     }
697     /* create the worker threads in each static pool */
698     for (i = 0; i < obj->poolMap0Len; i++) {
699         for (j = 0; j < poolAry[i].count; j++) {
701             /* allocate worker thread object */
702             size = sizeof(RcmServer_WorkerThread);
703             worker = (RcmServer_WorkerThread *)xdc_runtime_Memory_alloc(
704                 RcmServer_Module_heap(), size, sizeof(Ptr), &eb);
706             if (Error_check(&eb)) {
707                 Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
708                     (IArg)RcmServer_Module_heap(), size);
709                 status = RcmServer_E_NOMEMORY;
710                 goto leave;
711             }
713             /* initialize worker thread object */
714             worker->jobId = RcmClient_DISCRETEJOBID;
715             worker->thread = NULL;
716             worker->terminate = FALSE;
717             worker->pool = &(poolAry[i]);
718             worker->server = obj;
720             /* add worker thread to worker pool */
721             listH = List_handle(&(poolAry[i].threadList));
722             List_putHead(listH, &(worker->elem));
724             /* create worker thread */
725             Thread_Params_init(&threadP);
726             threadP.arg = (IArg)worker;
727             threadP.priority = poolAry[i].priority;
728             threadP.osPriority = poolAry[i].osPriority;
729             threadP.stackSize = poolAry[i].stackSize;
730             threadP.instance->name = "RcmServer_workerThr";
732             worker->thread = Thread_create(
733                 (Thread_RunFxn)(RcmServer_workerThrFxn_P), &threadP, &eb);
735             if (Error_check(&eb)) {
736                 Log_error2(FXNN": could not create worker thread, "
737                     "pool=%d, thread=%d", (IArg)i, (IArg)j);
738                 status = RcmServer_E_FAIL;
739                 goto leave;
740             }
741         }
742     }
744     /* create the semaphore used to release the server thread */
745     SemThread_Params_init(&semThreadP);
746     semThreadP.mode = SemThread_Mode_COUNTING;
748     obj->run = SemThread_create(0, &semThreadP, &eb);
750     if (Error_check(&eb)) {
751         Log_error0(FXNN": could not create semaphore");
752         status = RcmServer_E_FAIL;
753         goto leave;
754     }
756     /* create the message queue for inbound messages */
757 #if USE_RPMESSAGE
758     obj->serverQue = RPMessage_create(RPMessage_ASSIGN_ANY, NULL, NULL,
759                                          &obj->localAddr);
760 #ifdef BIOS_ONLY_TEST
761     obj->dstProc = MultiProc_self();
762 #else
763     obj->dstProc = MultiProc_getId("HOST");
764 #endif
766 #else
767     MessageQ_Params_init(&mqParams);
768     obj->serverQue = MessageQ_create(name, &mqParams);
769 #endif
771     if (NULL == obj->serverQue) {
772         Log_error0(FXNN": could not create server message queue");
773         status = RcmServer_E_FAIL;
774         goto leave;
775     }
777     /* create the server thread */
778     Thread_Params_init(&threadP);
779     threadP.arg = (IArg)obj;
780     threadP.priority = params->priority;
781     threadP.osPriority = params->osPriority;
782     threadP.stackSize = params->stackSize;
783     threadP.instance->name = "RcmServer_serverThr";
785     obj->serverThread = Thread_create(
786         (Thread_RunFxn)(RcmServer_serverThrFxn_P), &threadP, &eb);
788     if (Error_check(&eb)) {
789         Log_error0(FXNN": could not create server thread");
790         status = RcmServer_E_FAIL;
791         goto leave;
792     }
795 leave:
796     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
797     return(status);
799 #undef FXNN
802 /*
803  *  ======== RcmServer_delete ========
804  */
805 #define FXNN "RcmServer_delete"
806 Int RcmServer_delete(RcmServer_Handle *handlePtr)
808     RcmServer_Object *obj = (RcmServer_Object *)(*handlePtr);
809     Int status = RcmClient_S_SUCCESS;
812     Log_print1(Diags_ENTRY, "--> "FXNN": (handlePtr=0x%x)", (IArg)handlePtr);
814     /* finalize the object */
815     status = RcmServer_Instance_finalize_P(obj);
817     /* free the object memory */
818     Log_print1(Diags_LIFECYCLE, FXNN": instance delete: 0x%x", (IArg)obj);
820     xdc_runtime_Memory_free(RcmServer_Module_heap(),
821         (Ptr)obj, sizeof(RcmServer_Object));
822     *handlePtr = NULL;
824     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
825     return(status);
827 #undef FXNN
830 /*
831  *  ======== RcmServer_destruct ========
832  */
833 #define FXNN "RcmServer_destruct"
834 Int RcmServer_destruct(RcmServer_Struct *structPtr)
836     RcmServer_Object *obj = (RcmServer_Object *)(structPtr);
837     Int status = RcmClient_S_SUCCESS;
840     Log_print1(Diags_ENTRY, "--> "FXNN": (structPtr=0x%x)", (IArg)structPtr);
841     Log_print1(Diags_LIFECYCLE, FXNN": instance destruct: 0x%x", (IArg)obj);
843     /* finalize the object */
844     status = RcmServer_Instance_finalize_P(obj);
846     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
847     return(status);
849 #undef FXNN
852 /*
853  *  ======== RcmServer_Instance_finalize_P ========
854  */
855 #define FXNN "RcmServer_Instance_finalize_P"
856 Int RcmServer_Instance_finalize_P(RcmServer_Object *obj)
858     Int i, j;
859     Int size;
860     Char *cp;
861     UInt tabCount;
862     RcmServer_FxnTabElem *fdp;
863     Error_Block eb;
864     RcmServer_ThreadPool *poolAry;
865     RcmServer_WorkerThread *worker;
866     List_Elem *elem;
867     List_Handle listH;
868     List_Handle msgQueH;
869     RcmClient_Packet *packet;
870 #if USE_RPMESSAGE == 0
871     MessageQ_Msg msgqMsg;
872 #endif
873     SemThread_Handle semThreadH;
874     RcmServer_JobStream *job;
875     Int rval;
876     Int status = RcmClient_S_SUCCESS;
878     Log_print1(Diags_ENTRY, "--> "FXNN": (obj=0x%x)", (IArg)obj);
880     /* must initialize the error block before using it */
881     Error_init(&eb);
883     /* block until server thread exits */
884     obj->shutdown = TRUE;
886     if (obj->serverThread != NULL) {
887 #if USE_RPMESSAGE
888         RPMessage_unblock(obj->serverQue);
889 #else
890         MessageQ_unblock(obj->serverQue);
891 #endif
892         Thread_join(obj->serverThread, &eb);
894         if (Error_check(&eb)) {
895             Log_error0(FXNN": server thread did not exit properly");
896             status = RcmServer_E_FAIL;
897             goto leave;
898         }
899     }
901     /* delete any remaining job objects (there should not be any) */
902     while ((elem = List_get(obj->jobList)) != NULL) {
903         job = (RcmServer_JobStream *)elem;
905         /* return any remaining messages (there should not be any) */
906         msgQueH = List_handle(&job->msgQue);
908         while ((elem = List_get(msgQueH)) != NULL) {
909             packet = (RcmClient_Packet *)elem;
910             Log_warning2(
911                 FXNN": returning unprocessed message, jobId=0x%x, packet=0x%x",
912                 (IArg)job->jobId, (IArg)packet);
914             RcmServer_setStatusCode_I(packet, RcmServer_Status_Unprocessed);
915 #if USE_RPMESSAGE
916             packet->hdr.type = OMX_RAW_MSG;
917             packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
918             rval = RPMessage_send(obj->dstProc, obj->replyAddr,
919                                  obj->localAddr, (Ptr)&packet->hdr,
920                                  PACKET_HDR_SIZE + packet->message.dataSize);
921 #else
922             msgqMsg = &packet->msgqHeader;
923             rval = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
924 #endif
925             if (rval < 0) {
926                 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)rval);
927             }
928         }
930         /* finalize the job stream object */
931         List_destruct(&job->msgQue);
933         xdc_runtime_Memory_free(RcmServer_Module_heap(),
934             (Ptr)job, sizeof(RcmServer_JobStream));
935     }
936     List_delete(&(obj->jobList));
938     /* convenience alias */
939     poolAry = obj->poolMap[0];
941     /* free all the static pool resources */
942     for (i = 0; i < obj->poolMap0Len; i++) {
944         /* free all the worker thread objects */
945         listH = List_handle(&(poolAry[i].threadList));
947         /* mark each worker thread for termination */
948         elem = NULL;
949         while ((elem = List_next(listH, elem)) != NULL) {
950             worker = (RcmServer_WorkerThread *)elem;
951             worker->terminate = TRUE;
952         }
954         /* unblock each worker thread so it can terminate */
955         elem = NULL;
956         while ((elem = List_next(listH, elem)) != NULL) {
957             Semaphore_post(poolAry[i].sem, &eb);
959             if (Error_check(&eb)) {
960                 Log_error0(FXNN": post failed on thread");
961                 status = RcmServer_E_FAIL;
962                 goto leave;
963             }
964         }
966         /* wait for each worker thread to terminate */
967         elem = NULL;
968         while ((elem = List_get(listH)) != NULL) {
969             worker = (RcmServer_WorkerThread *)elem;
971             Thread_join(worker->thread, &eb);
973             if (Error_check(&eb)) {
974                 Log_error1(
975                     FXNN": worker thread did not exit properly, thread=0x%x",
976                     (IArg)worker->thread);
977                 status = RcmServer_E_FAIL;
978                 goto leave;
979             }
981             Thread_delete(&worker->thread);
983             /* free the worker thread object */
984             xdc_runtime_Memory_free(RcmServer_Module_heap(), (Ptr)worker,
985                 sizeof(RcmServer_WorkerThread));
986         }
988         /* free up pool resources */
989         semThreadH = SemThread_Handle_downCast(poolAry[i].sem);
990         SemThread_delete(&semThreadH);
991         List_destruct(&(poolAry[i].threadList));
993         /* return any remaining messages on the readyQueue */
994         msgQueH = List_handle(&poolAry[i].readyQueue);
996         while ((elem = List_get(msgQueH)) != NULL) {
997             packet = (RcmClient_Packet *)elem;
998             Log_warning2(
999                 FXNN": returning unprocessed message, msgId=0x%x, packet=0x%x",
1000                 (IArg)packet->msgId, (IArg)packet);
1002             RcmServer_setStatusCode_I(packet, RcmServer_Status_Unprocessed);
1003 #if USE_RPMESSAGE
1004             packet->hdr.type = OMX_RAW_MSG;
1005             packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1006             rval = RPMessage_send(obj->dstProc, obj->replyAddr,
1007                                  obj->localAddr, (Ptr)&packet->hdr,
1008                                  PACKET_HDR_SIZE + packet->message.dataSize);
1009 #else
1010             msgqMsg = &packet->msgqHeader;
1011             rval = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1012 #endif
1013             if (rval < 0) {
1014                 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)rval);
1015             }
1016         }
1018         List_destruct(&(poolAry[i].readyQueue));
1019     }
1021     /* free the name block for the static pools */
1022     if ((obj->poolMap[0] != NULL) && (obj->poolMap[0]->name != NULL)) {
1023         cp = obj->poolMap[0]->name;
1024         cp -= sizeof(SizeT);
1025         xdc_runtime_Memory_free(RcmServer_Module_heap(), (Ptr)cp, *(SizeT *)cp);
1026     }
1028     /* free the static worker pool table */
1029     if (obj->poolMap[0] != NULL) {
1030         xdc_runtime_Memory_free(RcmServer_Module_heap(), (Ptr)(obj->poolMap[0]),
1031             obj->poolMap0Len * sizeof(RcmServer_ThreadPool));
1032         obj->poolMap[0] = NULL;
1033     }
1035 #if 0
1036     /* free all dynamic worker pools */
1037     for (p = 1; p < RcmServer_POOL_MAP_LEN; p++) {
1038         if ((poolAry = obj->poolMap[p]) == NULL) {
1039             continue;
1040         }
1041     }
1042 #endif
1044     /* free up the dynamic function tables and any leftover name strings */
1045     for (i = 1; i < RcmServer_MAX_TABLES; i++) {
1046         if (obj->fxnTab[i] != NULL) {
1047             tabCount = (1 << (i + 4));
1048             for (j = 0; j < tabCount; j++) {
1049                 if (((obj->fxnTab[i])+j)->name != NULL) {
1050                     cp = ((obj->fxnTab[i])+j)->name;
1051                     size = _strlen(cp) + 1;
1052                     xdc_runtime_Memory_free(RcmServer_Module_heap(), cp, size);
1053                 }
1054             }
1055             fdp = obj->fxnTab[i];
1056             size = tabCount * sizeof(RcmServer_FxnTabElem);
1057             xdc_runtime_Memory_free(RcmServer_Module_heap(), fdp, size);
1058             obj->fxnTab[i] = NULL;
1059         }
1060     }
1062     if (NULL != obj->serverThread) {
1063         Thread_delete(&obj->serverThread);
1064     }
1066     if (NULL != obj->serverQue) {
1067 #if USE_RPMESSAGE
1068         RPMessage_delete(&obj->serverQue);
1069 #else
1070         MessageQ_delete(&obj->serverQue);
1071 #endif
1072     }
1074     if (NULL != obj->run) {
1075         SemThread_delete((SemThread_Handle *)(&obj->run));
1076     }
1078     /* free the name block for the static function table */
1079     if ((NULL != obj->fxnTabStatic.elem) &&
1080         (NULL != obj->fxnTabStatic.elem[0].name)) {
1081         for (size = 0, i = 0; i < obj->fxnTabStatic.length; i++) {
1082             size += _strlen(obj->fxnTabStatic.elem[i].name) + 1;
1083         }
1084         xdc_runtime_Memory_free(
1085             RcmServer_Module_heap(),
1086             obj->fxnTabStatic.elem[0].name, size);
1087     }
1089     /* free the static function table */
1090     if (NULL != obj->fxnTabStatic.elem) {
1091         xdc_runtime_Memory_free(RcmServer_Module_heap(),
1092             obj->fxnTabStatic.elem,
1093             obj->fxnTabStatic.length * sizeof(RcmServer_FxnTabElem));
1094     }
1096     /* destruct the instance gate */
1097     GateThread_destruct(&obj->gate);
1100 leave:
1101     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1102     return(status);
1104 #undef FXNN
1107 /*
1108  *  ======== RcmServer_addSymbol ========
1109  */
1110 #define FXNN "RcmServer_addSymbol"
1111 Int RcmServer_addSymbol(RcmServer_Object *obj, String funcName,
1112         RcmServer_MsgFxn addr, UInt32 *index)
1114     GateThread_Handle gateH;
1115     IArg key;
1116     Int len;
1117     UInt i, j;
1118     UInt tabCount;
1119     SizeT tabSize;
1120     UInt32 fxnIdx = 0xFFFF;
1121     RcmServer_FxnTabElem *slot = NULL;
1122     Error_Block eb;
1123     Int status = RcmServer_S_SUCCESS;
1126     Log_print3(Diags_ENTRY,
1127         "--> "FXNN": (obj=0x%x, name=0x%x, addr=0x%x)",
1128         (IArg)obj, (IArg)funcName, (IArg)addr);
1130     Error_init(&eb);
1132     /* protect the symbol table while changing it */
1133     gateH = GateThread_handle(&obj->gate);
1134     key = GateThread_enter(gateH);
1136     /* look for an empty slot to use */
1137     for (i = 1; i < RcmServer_MAX_TABLES; i++) {
1138         if (obj->fxnTab[i] != NULL) {
1139             for (j = 0; j < (1 << (i + 4)); j++) {
1140                 if (((obj->fxnTab[i])+j)->addr.fxn == 0) {
1141                     slot = (obj->fxnTab[i]) + j;  // found empty slot
1142                     break;
1143                 }
1144             }
1145         }
1146         else {
1147             /* all previous tables are full, allocate a new table */
1148             tabCount = (1 << (i + 4));
1149             tabSize = tabCount * sizeof(RcmServer_FxnTabElem);
1150             obj->fxnTab[i] = (RcmServer_FxnTabElem *)xdc_runtime_Memory_alloc(
1151                 RcmServer_Module_heap(), tabSize, sizeof(Ptr), &eb);
1153             if (Error_check(&eb)) {
1154                 Log_error0(FXNN": unable to allocate new function table");
1155                 obj->fxnTab[i] = NULL;
1156                 status = RcmServer_E_NOMEMORY;
1157                 goto leave;
1158             }
1160             /* initialize the new table */
1161             for (j = 0; j < tabCount; j++) {
1162                 ((obj->fxnTab[i])+j)->addr.fxn = 0;
1163                 ((obj->fxnTab[i])+j)->name = NULL;
1164                 ((obj->fxnTab[i])+j)->key = 0;
1165             }
1167             /* use first slot in new table */
1168             j = 0;
1169             slot = (obj->fxnTab[i])+j;
1170         }
1172         /* if new slot found, break out of loop */
1173         if (slot != NULL) {
1174             break;
1175         }
1176     }
1178     /* insert new symbol into slot */
1179     if (slot != NULL) {
1180         slot->addr.fxn = addr;
1181         len = _strlen(funcName) + 1;
1182         slot->name = (String)xdc_runtime_Memory_alloc(
1183             RcmServer_Module_heap(), len, 0, &eb);
1185         if (Error_check(&eb)) {
1186             Log_error0(FXNN": unable to allocate function name");
1187             slot->name = NULL;
1188             status = RcmServer_E_NOMEMORY;
1189             goto leave;
1190         }
1192         _strcpy(slot->name, funcName);
1193         slot->key = RcmServer_getNextKey_P(obj);
1194         fxnIdx = (slot->key << _RCM_KeyShift) | (i << 12) | j;
1195     }
1197     /* error, no more room to add new symbol */
1198     else {
1199         Log_error0(FXNN": cannot add symbol, table is full");
1200         status = RcmServer_E_SYMBOLTABLEFULL;
1201         goto leave;
1202     }
1205 leave:
1206     GateThread_leave(gateH, key);
1208     /* on success, return new function index */
1209     if (status >= 0) {
1210         *index = fxnIdx;
1211     }
1212     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1213     return(status);
1215 #undef FXNN
1218 /*
1219  *  ======== RcmServer_removeSymbol ========
1220  */
1221 #define FXNN "RcmServer_removeSymbol"
1222 Int RcmServer_removeSymbol(RcmServer_Object *obj, String name)
1224     GateThread_Handle gateH;
1225     IArg key;
1226     UInt32 fxnIdx;
1227     UInt tabIdx, tabOff;
1228     RcmServer_FxnTabElem *slot;
1229     Int status = RcmServer_S_SUCCESS;
1232     Log_print2(Diags_ENTRY,
1233         "--> "FXNN": (obj=0x%x, name=0x%x)", (IArg)obj, (IArg)name);
1235     /* protect the symbol table while changing it */
1236     gateH = GateThread_handle(&obj->gate);
1237     key = GateThread_enter(gateH);
1239     /* find the symbol in the table */
1240     status = RcmServer_getSymIdx_P(obj, name, &fxnIdx);
1242     if (status < 0) {
1243         Log_error0(FXNN": given symbol not found");
1244         goto leave;
1245     }
1247     /* static symbols have bit-31 set, cannot remove these symbols */
1248     if (fxnIdx & 0x80000000) {
1249         Log_error0(FXNN": cannot remove static symbol");
1250         status = RcmServer_E_SYMBOLSTATIC;
1251         goto leave;
1252     }
1254     /* get slot pointer */
1255     tabIdx = (fxnIdx & 0xF000) >> 12;
1256     tabOff = (fxnIdx & 0xFFF);
1257     slot = (obj->fxnTab[tabIdx]) + tabOff;
1259     /* clear the table index */
1260     slot->addr.fxn = 0;
1261     if (slot->name != NULL) {
1262         xdc_runtime_Memory_free(
1263             RcmServer_Module_heap(), slot->name, _strlen(slot->name) + 1);
1264         slot->name = NULL;
1265     }
1266     slot->key = 0;
1268 leave:
1269     GateThread_leave(gateH, key);
1270     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1271     return(status);
1273 #undef FXNN
1276 /*
1277  *  ======== RcmServer_start ========
1278  */
1279 #define FXNN "RcmServer_start"
1280 Int RcmServer_start(RcmServer_Object *obj)
1282     Error_Block eb;
1283     Int status = RcmServer_S_SUCCESS;
1286     Log_print1(Diags_ENTRY, "--> "FXNN": (obj=0x%x)", (IArg)obj);
1288     Error_init(&eb);
1290     /* unblock the server thread */
1291     Semaphore_post(obj->run, &eb);
1293     if (Error_check(&eb)) {
1294         Log_error0(FXNN": semaphore post failed");
1295         status = RcmServer_E_FAIL;
1296     }
1298     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1299     return(status);
1301 #undef FXNN
1304 /*
1305  *  ======== RcmServer_acqJobId_P ========
1306  */
1307 #define FXNN "RcmServer_acqJobId_P"
1308 Int RcmServer_acqJobId_P(RcmServer_Object *obj, UInt16 *jobIdPtr)
1310     Error_Block eb;
1311     GateThread_Handle gateH;
1312     IArg key;
1313     Int count;
1314     UInt16 jobId;
1315     List_Elem *elem;
1316     RcmServer_JobStream *job;
1317     Int status = RcmServer_S_SUCCESS;
1320     Log_print2(Diags_ENTRY,
1321         "--> "FXNN": (obj=0x%x, jobIdPtr=0x%x)", (IArg)obj, (IArg)jobIdPtr);
1323     Error_init(&eb);
1324     gateH = GateThread_handle(&obj->gate);
1326     /* enter critical section */
1327     key = GateThread_enter(gateH);
1329     /* compute new job id */
1330     for (count = 0xFFFF; count > 0; count--) {
1332         /* generate a new job id */
1333         jobId = (obj->jobId == 0xFFFF ? obj->jobId = 1 : ++(obj->jobId));
1335         /* verify job id is not in use */
1336         elem = NULL;
1337         while ((elem = List_next(obj->jobList, elem)) != NULL) {
1338             job = (RcmServer_JobStream *)elem;
1339             if (jobId == job->jobId) {
1340                 jobId = RcmClient_DISCRETEJOBID;
1341                 break;
1342             }
1343         }
1345         if (jobId != RcmClient_DISCRETEJOBID) {
1346             break;
1347         }
1348     }
1350     /* check if job id was acquired */
1351     if (jobId == RcmClient_DISCRETEJOBID) {
1352         *jobIdPtr = RcmClient_DISCRETEJOBID;
1353         Log_error0(FXNN": no job id available");
1354         status = RcmServer_E_FAIL;
1355         GateThread_leave(gateH, key);
1356         goto leave;
1357     }
1359     /* create a new job steam object */
1360     job = xdc_runtime_Memory_alloc(RcmServer_Module_heap(),
1361         sizeof(RcmServer_JobStream), sizeof(Ptr), &eb);
1363     if (Error_check(&eb)) {
1364         Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
1365             (IArg)RcmServer_Module_heap(), sizeof(RcmServer_JobStream));
1366         status = RcmServer_E_NOMEMORY;
1367         GateThread_leave(gateH, key);
1368         goto leave;
1369     }
1371     /* initialize new job stream object */
1372     job->jobId = jobId;
1373     job->empty = TRUE;
1374     List_construct(&(job->msgQue), NULL);
1376     /* put new job stream object at end of server list */
1377     List_put(obj->jobList, (List_Elem *)job);
1379     /* leave critical section */
1380     GateThread_leave(gateH, key);
1382     /* return new job id */
1383     *jobIdPtr = jobId;
1386 leave:
1387     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1388     return(status);
1390 #undef FXNN
1393 /*
1394  *  ======== RcmServer_dispatch_P ========
1395  *
1396  *  Return Value
1397  *      < 0: error
1398  *        0: success, job stream queue
1399  *      > 0: success, ready queue
1400  *
1401  *  Pool id description
1402  *
1403  *  Static Worker Pools
1404  *  --------------------------------------------------------------------
1405  *  15      1 = static pool
1406  *  14:8    reserved
1407  *  7:0     offset: 0 - 255
1408  *
1409  *  Dynamic Worker Pools
1410  *  --------------------------------------------------------------------
1411  *  15      0 = dynamic pool
1412  *  14:7    key: 0 - 255
1413  *  6:5     index: 1 - 3
1414  *  4:0     offset: 0 - [7, 15, 31]
1415  */
1416 #define FXNN "RcmServer_dispatch_P"
1417 Int RcmServer_dispatch_P(RcmServer_Object *obj, RcmClient_Packet *packet)
1419     GateThread_Handle gateH;
1420     IArg key;
1421     List_Elem *elem;
1422     List_Handle listH;
1423     RcmServer_ThreadPool *pool;
1424     UInt16 jobId;
1425     RcmServer_JobStream *job;
1426     Error_Block eb;
1427     Int status = RcmServer_S_SUCCESS;
1430     Log_print2(Diags_ENTRY,
1431         "--> "FXNN": (obj=0x%x, packet=0x%x)", (IArg)obj, (IArg)packet);
1433     Error_init(&eb);
1435     /* get the target pool id from the message */
1436     status = RcmServer_getPool_P(obj, packet, &pool);
1438     if (status < 0) {
1439         goto leave;
1440     }
1442     System_printf("Rcm dispatch: p:%d j:%d f:%d l:%d\n",
1443                   packet->message.poolId, packet->message.jobId,
1444                   packet->message.fxnIdx, packet->message.dataSize);
1446     /* discrete jobs go on the end of the ready queue */
1447     jobId = packet->message.jobId;
1449     if (jobId == RcmClient_DISCRETEJOBID) {
1450         listH = List_handle(&pool->readyQueue);
1451         List_put(listH, (List_Elem *)packet);
1453         /* dispatch a new worker thread */
1454         Semaphore_post(pool->sem, &eb);
1456         if (Error_check(&eb)) {
1457             Log_error0(FXNN": semaphore post failed");
1458         }
1459     }
1461     /* must be a job stream message */
1462     else {
1463         /* must protect job list while searching it */
1464         gateH = GateThread_handle(&obj->gate);
1465         key = GateThread_enter(gateH);
1467         /* find the job stream object in the list */
1468         elem = NULL;
1469         while ((elem = List_next(obj->jobList, elem)) != NULL) {
1470             job = (RcmServer_JobStream *)elem;
1471             if (job->jobId == jobId) {
1472                 break;
1473             }
1474         }
1476         if (elem == NULL) {
1477             Log_error1(FXNN": failed to find jobId=%d", (IArg)jobId);
1478             status = RcmServer_E_JobIdNotFound;
1479         }
1481         /* if job object is empty, place message directly on ready queue */
1482         else if (job->empty) {
1483             job->empty = FALSE;
1484             listH = List_handle(&pool->readyQueue);
1485             List_put(listH, (List_Elem *)packet);
1487             /* dispatch a new worker thread */
1488             Semaphore_post(pool->sem, &eb);
1490             if (Error_check(&eb)) {
1491                 Log_error0(FXNN": semaphore post failed");
1492             }
1493         }
1495         /* place message on job queue */
1496         else {
1497             listH = List_handle(&job->msgQue);
1498             List_put(listH, (List_Elem *)packet);
1499         }
1501         GateThread_leave(gateH, key);
1502     }
1505 leave:
1506     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1507     return(status);
1509 #undef FXNN
1512 /*
1513  *  ======== RcmServer_execMsg_I ========
1514  */
1515 Int RcmServer_execMsg_I(RcmServer_Object *obj, RcmClient_Message *msg)
1517     RcmServer_MsgFxn fxn;
1518 #if USE_RPMESSAGE
1519     RcmServer_MsgCreateFxn createFxn = NULL;
1520 #endif
1521     Int status;
1523     status = RcmServer_getFxnAddr_P(obj, msg->fxnIdx, &fxn, &createFxn);
1525     if (status >= 0) {
1526 #if 0
1527         System_printf("RcmServer_execMsg_I: Calling fxnIdx: %d\n",
1528                       (msg->fxnIdx & 0x0000FFFF));
1529 #endif
1530 #if USE_RPMESSAGE
1531         if (createFxn)  {
1532             msg->result = (*createFxn)(obj, msg->dataSize, msg->data);
1533         }
1534         else {
1535             msg->result = (*fxn)(msg->dataSize, msg->data);
1536         }
1537 #else
1538         msg->result = (*fxn)(msg->dataSize, msg->data);
1539 #endif
1540     }
1542     return(status);
1546 /*
1547  *  ======== RcmServer_getFxnAddr_P ========
1548  *
1549  *  The function index (fxnIdx) uses the following format. Note that the
1550  *  format differs for static vs. dynamic functions. All static functions
1551  *  are in fxnTab[0], all dynamic functions are in fxnTab[1 - 8].
1552  *
1553  *  Bits    Description
1554  *
1555  *  Static Function Index
1556  *  --------------------------------------------------------------------
1557  *  31      static/dynamic function flag
1558  *              0 = dynamic function
1559  *              1 = static function
1560  *  30:20   reserved
1561  *  19:16   reserved
1562  *  15:0    offset: 0 - 65,535
1563  *
1564  *  Dynamic Function Index
1565  *  --------------------------------------------------------------------
1566  *  31      static/dynamic function flag
1567  *              0 = dynamic function
1568  *              1 = static function
1569  *  30:20   key
1570  *  19:16   reserved
1571  *  15:12   index: 1 - 8
1572  *  11:0    offset: 0 - [31, 63, 127, 255, 511, 1023, 2047, 4095]
1573  */
1574 #define FXNN "RcmServer_getFxnAddr_P"
1575 Int RcmServer_getFxnAddr_P(RcmServer_Object *obj, UInt32 fxnIdx,
1576         RcmServer_MsgFxn *addrPtr, RcmServer_MsgCreateFxn *createPtr)
1578     UInt i, j;
1579     UInt16 key;
1580     RcmServer_FxnTabElem *slot;
1581     RcmServer_MsgFxn addr = NULL;
1582     RcmServer_MsgCreateFxn createAddr = NULL;
1583     Int status = RcmServer_S_SUCCESS;
1585     /* static functions have bit-31 set */
1586     if (fxnIdx & 0x80000000) {
1587         j = (fxnIdx & 0x0000FFFF);
1588         if (j < (obj->fxnTabStatic.length)) {
1590             /* fetch the function address from the table */
1591             slot = (obj->fxnTab[0])+j;
1592             if (j == 0)  {
1593                  createAddr = slot->addr.createFxn;
1594             }
1595             else {
1596                  addr = slot->addr.fxn;
1597             }
1598         }
1599         else {
1600             Log_error1(FXNN": invalid function index 0x%x", (IArg)fxnIdx);
1601             status = RcmServer_E_InvalidFxnIdx;
1602             goto leave;
1603         }
1604     }
1605     /* must be a dynamic function */
1606     else {
1607         /* extract the key from the function index */
1608         key = (fxnIdx & _RCM_KeyMask) >> _RCM_KeyShift;
1610         i = (fxnIdx & 0xF000) >> 12;
1611         if ((i > 0) && (i < RcmServer_MAX_TABLES) && (obj->fxnTab[i] != NULL)) {
1613             /* fetch the function address from the table */
1614             j = (fxnIdx & 0x0FFF);
1615             slot = (obj->fxnTab[i])+j;
1616             addr = slot->addr.fxn;
1618             /* validate the key */
1619             if (key != slot->key) {
1620                 Log_error1(FXNN": invalid function index 0x%x", (IArg)fxnIdx);
1621                 status = RcmServer_E_InvalidFxnIdx;
1622                 goto leave;
1623             }
1624         }
1625         else {
1626             Log_error1(FXNN": invalid function index 0x%x", (IArg)fxnIdx);
1627             status = RcmServer_E_InvalidFxnIdx;
1628             goto leave;
1629         }
1630     }
1632 leave:
1633     if (status >= 0) {
1634        if (j == 0)  {
1635            *createPtr = createAddr;
1636        }
1637        else {
1638            *addrPtr = addr;
1639        }
1640     }
1641     return(status);
1643 #undef FXNN
1646 /* *  ======== RcmServer_getSymIdx_P ========
1647  *
1648  *  Must have table gate before calling this function.
1649  */
1650 #define FXNN "RcmServer_getSymIdx_P"
1651 Int RcmServer_getSymIdx_P(RcmServer_Object *obj, String name, UInt32 *index)
1653     UInt i, j, len;
1654     RcmServer_FxnTabElem *slot;
1655     UInt32 fxnIdx = 0xFFFFFFFF;
1656     Int status = RcmServer_S_SUCCESS;
1659     Log_print3(Diags_ENTRY,
1660         "--> "FXNN": (obj=0x%x, name=0x%x, index=0x%x)",
1661         (IArg)obj, (IArg)name, (IArg)index);
1663     /* search tables for given function name */
1664     for (i = 0; i < RcmServer_MAX_TABLES; i++) {
1665         if (obj->fxnTab[i] != NULL) {
1666             len = (i == 0) ? obj->fxnTabStatic.length : (1 << (i + 4));
1667             for (j = 0; j < len; j++) {
1668                 slot = (obj->fxnTab[i]) + j;
1669                 if ((((obj->fxnTab[i])+j)->name != NULL) &&
1670                     (_strcmp(((obj->fxnTab[i])+j)->name, name) == 0)) {
1671                     /* found function name */
1672                     if (i == 0) {
1673                         fxnIdx = 0x80000000 | j;
1674                     } else {
1675                         fxnIdx = (slot->key << _RCM_KeyShift) | (i << 12) | j;
1676                     }
1677                     break;
1678                 }
1679             }
1680         }
1682         if (0xFFFFFFFF != fxnIdx) {
1683             break;
1684         }
1685     }
1687     /* log an error if the symbol was not found */
1688     if (fxnIdx == 0xFFFFFFFF) {
1689         Log_error0(FXNN": given symbol not found");
1690         status = RcmServer_E_SYMBOLNOTFOUND;
1691     }
1693     /* if success, return symbol index */
1694     if (status >= 0) {
1695         *index = fxnIdx;
1696     }
1698     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1699     return(status);
1701 #undef FXNN
1704 /*
1705  *  ======== RcmServer_getNextKey_P ========
1706  */
1707 UInt16 RcmServer_getNextKey_P(RcmServer_Object *obj)
1709     GateThread_Handle gateH;
1710     IArg gateKey;
1711     UInt16 key;
1714     gateH = GateThread_handle(&obj->gate);
1715     gateKey = GateThread_enter(gateH);
1717     if (obj->key <= 1) {
1718         obj->key = _RCM_KeyResetValue;  /* don't use 0 as a key value */
1719     }
1720     else {
1721         (obj->key)--;
1722     }
1723     key = obj->key;
1725     GateThread_leave(gateH, gateKey);
1727     return(key);
1731 /*
1732  *  ======== RcmServer_getPool_P ========
1733  */
1734 #define FXNN "RcmServer_getPool_P"
1735 Int RcmServer_getPool_P(RcmServer_Object *obj,
1736         RcmClient_Packet *packet, RcmServer_ThreadPool **poolP)
1738     UInt16 poolId;
1739     UInt16 offset;
1740     Int status = RcmServer_S_SUCCESS;
1743     poolId = packet->message.poolId;
1745     /* static pools have bit-15 set */
1746     if (poolId & 0x8000) {
1747         offset = (poolId & 0x00FF);
1748         if (offset < obj->poolMap0Len) {
1749             *poolP = &(obj->poolMap[0])[offset];
1750         }
1751         else {
1752             Log_error1(FXNN": pool id=0x%x not found", (IArg)poolId);
1753             *poolP = NULL;
1754             status = RcmServer_E_PoolIdNotFound;
1755             goto leave;
1756         }
1757     }
1759 leave:
1760     return(status);
1762 #undef FXNN
1765 /*
1766  *  ======== RcmServer_process_P ========
1767  */
1768 #define FXNN "RcmServer_process_P"
1769 Void RcmServer_process_P(RcmServer_Object *obj, RcmClient_Packet *packet)
1771     String name;
1772     UInt32 fxnIdx;
1773     RcmServer_MsgFxn fxn;
1774     RcmClient_Message *rcmMsg;
1775 #if USE_RPMESSAGE
1776     RcmServer_MsgCreateFxn createFxn = NULL;
1777 #endif
1778 #if USE_RPMESSAGE == 0
1779     MessageQ_Msg msgqMsg;
1780 #endif
1781     UInt16 messageType;
1782     Error_Block eb;
1783     UInt16 jobId;
1784     Int rval;
1785     Int status = RcmServer_S_SUCCESS;
1788     Log_print2(Diags_ENTRY,
1789         "--> "FXNN": (obj=0x%x, packet=0x%x)", (IArg)obj, (IArg)packet);
1791     Error_init(&eb);
1793     /* decode the message */
1794     rcmMsg = &packet->message;
1795 #if USE_RPMESSAGE == 0
1796     msgqMsg = &packet->msgqHeader;
1797 #endif
1798     Log_print1(Diags_INFO, FXNN": message desc=0x%x", (IArg)packet->desc);
1800     /* extract the message type from the packet descriptor field */
1801     messageType = (RcmClient_Desc_TYPE_MASK & packet->desc) >>
1802         RcmClient_Desc_TYPE_SHIFT;
1804     /* process the given message */
1805     switch (messageType) {
1807         case RcmClient_Desc_RCM_MSG:
1808             rval = RcmServer_execMsg_I(obj, rcmMsg);
1810             if (rval < 0) {
1811                 switch (rval) {
1812                     case RcmServer_E_InvalidFxnIdx:
1813                         RcmServer_setStatusCode_I(
1814                             packet, RcmServer_Status_INVALID_FXN);
1815                         break;
1816                     default:
1817                         RcmServer_setStatusCode_I(
1818                             packet, RcmServer_Status_Error);
1819                         break;
1820                 }
1821             }
1822             else if (rcmMsg->result < 0) {
1823                 RcmServer_setStatusCode_I(packet, RcmServer_Status_MSG_FXN_ERR);
1824             }
1825             else {
1826                 RcmServer_setStatusCode_I(packet, RcmServer_Status_SUCCESS);
1827             }
1829 #if USE_RPMESSAGE
1830 #if 0
1831             System_printf("RcmServer_process_P: Sending reply from: %d to: %d\n",
1832                       obj->localAddr, obj->replyAddr);
1833 #endif
1835             packet->hdr.type = OMX_RAW_MSG;
1836             packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1837             status = RPMessage_send(obj->dstProc, obj->replyAddr,
1838                                  obj->localAddr, (Ptr)&packet->hdr,
1839                                  PACKET_HDR_SIZE + packet->message.dataSize);
1840 #else
1841             status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1842 #endif
1843             if (status < 0) {
1844                 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
1845             }
1846             break;
1848         case RcmClient_Desc_CMD:
1849             status = RcmServer_execMsg_I(obj, rcmMsg);
1851             /* if all went well, free the message */
1852             if ((status >= 0) && (rcmMsg->result >= 0)) {
1854 #if USE_RPMESSAGE == 0
1855                 status = MessageQ_free(msgqMsg);
1856 #endif
1857                 if (status < 0) {
1858                     Log_error1(
1859                         FXNN": MessageQ_free returned error %d", (IArg)status);
1860                 }
1861             }
1863             /* an error occurred, must return message to client */
1864             else {
1865                 if (status < 0) {
1866                     /* error trying to process the message */
1867                     switch (status) {
1868                         case RcmServer_E_InvalidFxnIdx:
1869                             RcmServer_setStatusCode_I(
1870                                 packet, RcmServer_Status_INVALID_FXN);
1871                             break;
1872                         default:
1873                             RcmServer_setStatusCode_I(
1874                                 packet, RcmServer_Status_Error);
1875                             break;
1876                     }
1877                 }
1878                 else  {
1879                     /* error in message function */
1880                     RcmServer_setStatusCode_I(
1881                         packet, RcmServer_Status_MSG_FXN_ERR);
1882                 }
1884                 /* send error message back to client */
1885 #if USE_RPMESSAGE
1886                 packet->hdr.type = OMX_RAW_MSG;
1887                 packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1888                 status = RPMessage_send(obj->dstProc, obj->replyAddr,
1889                                  obj->localAddr, (Ptr)&packet->hdr,
1890                                  PACKET_HDR_SIZE + packet->message.dataSize);
1891 #else
1892                 status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1893 #endif
1894                 if (status < 0) {
1895                     Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
1896                 }
1897             }
1898             break;
1900         case RcmClient_Desc_DPC:
1901             rval = RcmServer_getFxnAddr_P(obj, rcmMsg->fxnIdx, &fxn,
1902                                           &createFxn);
1904             if (rval < 0) {
1905                 RcmServer_setStatusCode_I(
1906                     packet, RcmServer_Status_SYMBOL_NOT_FOUND);
1907                 Error_init(&eb);
1908             }
1910 #if USE_RPMESSAGE
1911             packet->hdr.type = OMX_RAW_MSG;
1912             packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1913             status = RPMessage_send(obj->dstProc, obj->replyAddr,
1914                                  obj->localAddr, (Ptr)&packet->hdr,
1915                                  PACKET_HDR_SIZE + packet->message.dataSize);
1916 #else
1917             status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1918 #endif
1919             if (status < 0) {
1920                 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
1921             }
1923             /* invoke the function with a null context */
1924 #if USE_RPMESSAGE
1925             if (createFxn)  {
1926                  (*createFxn)(obj, 0, NULL);
1927             }
1928             else {
1929                  (*fxn)(0, NULL);
1930             }
1931 #else
1932             (*fxn)(0, NULL);
1933 #endif
1934             break;
1936         case RcmClient_Desc_SYM_ADD:
1937             break;
1939         case RcmClient_Desc_SYM_IDX:
1940             name = (String)rcmMsg->data;
1941             rval = RcmServer_getSymIdx_P(obj, name, &fxnIdx);
1943             if (rval < 0) {
1944                 RcmServer_setStatusCode_I(
1945                     packet, RcmServer_Status_SYMBOL_NOT_FOUND);
1946             }
1947             else {
1948                 RcmServer_setStatusCode_I(packet, RcmServer_Status_SUCCESS);
1949                 rcmMsg->data[0] = fxnIdx;
1950                 rcmMsg->result = 0;
1951             }
1953 #if USE_RPMESSAGE
1954             packet->hdr.type = OMX_RAW_MSG;
1955             packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1956             status = RPMessage_send(obj->dstProc, obj->replyAddr,
1957                                  obj->localAddr, (Ptr)&packet->hdr,
1958                                  PACKET_HDR_SIZE + packet->message.dataSize);
1959 #else
1960             status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1961 #endif
1962             if (status < 0) {
1963                 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
1964             }
1965             break;
1967         case RcmClient_Desc_JOB_ACQ:
1968             rval = RcmServer_acqJobId_P(obj, &jobId);
1970             if (rval < 0) {
1971                 RcmServer_setStatusCode_I(packet, RcmServer_Status_Error);
1972             }
1973             else {
1974                 RcmServer_setStatusCode_I(packet, RcmServer_Status_SUCCESS);
1975                 *(UInt16 *)(&rcmMsg->data[0]) = jobId;
1976                 rcmMsg->result = 0;
1977             }
1979 #if USE_RPMESSAGE
1980             packet->hdr.type = OMX_RAW_MSG;
1981             packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1982             status = RPMessage_send(obj->dstProc, obj->replyAddr,
1983                                  obj->localAddr, (Ptr)&packet->hdr,
1984                                  PACKET_HDR_SIZE + packet->message.dataSize);
1985 #else
1986             status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1987 #endif
1988             if (status < 0) {
1989                 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
1990             }
1991             break;
1993         case RcmClient_Desc_JOB_REL:
1994             jobId = (UInt16)(rcmMsg->data[0]);
1995             rval = RcmServer_relJobId_P(obj, jobId);
1997             if (rval < 0) {
1998                 switch (rval) {
1999                     case RcmServer_E_JobIdNotFound:
2000                         RcmServer_setStatusCode_I(
2001                             packet, RcmServer_Status_JobNotFound);
2002                         break;
2003                     default:
2004                         RcmServer_setStatusCode_I(
2005                             packet, RcmServer_Status_Error);
2006                         break;
2007                 }
2008                 rcmMsg->result = rval;
2009             }
2010             else {
2011                 RcmServer_setStatusCode_I(packet, RcmServer_Status_SUCCESS);
2012                 rcmMsg->result = 0;
2013             }
2015 #if USE_RPMESSAGE
2016             packet->hdr.type = OMX_RAW_MSG;
2017             packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
2018             status = RPMessage_send(obj->dstProc, obj->replyAddr,
2019                                  obj->localAddr, (Ptr)&packet->hdr,
2020                                  PACKET_HDR_SIZE + packet->message.dataSize);
2021 #else
2022             status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
2023 #endif
2024             if (status < 0) {
2025                 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
2026             }
2027             break;
2029         default:
2030             Log_error1(FXNN": unknown message type recieved, 0x%x",
2031                 (IArg)messageType);
2032             break;
2033     }
2035     Log_print0(Diags_EXIT, "<-- "FXNN":");
2037 #undef FXNN
2040 /*
2041  *  ======== RcmServer_relJobId_P ========
2042  */
2043 #define FXNN "RcmServer_relJobId_P"
2044 Int RcmServer_relJobId_P(RcmServer_Object *obj, UInt16 jobId)
2046     GateThread_Handle gateH;
2047     IArg key;
2048     List_Elem *elem;
2049     List_Handle msgQueH;
2050     RcmClient_Packet *packet;
2051     RcmServer_JobStream *job;
2052 #if USE_RPMESSAGE == 0
2053     MessageQ_Msg msgqMsg;
2054 #endif
2055     Int rval;
2056     Int status = RcmServer_S_SUCCESS;
2059     Log_print2(Diags_ENTRY,
2060         "--> "FXNN": (obj=0x%x, jobId=0x%x)", (IArg)obj, (IArg)jobId);
2063     /* must protect job list while searching and modifying it */
2064     gateH = GateThread_handle(&obj->gate);
2065     key = GateThread_enter(gateH);
2067     /* find the job stream object in the list */
2068     elem = NULL;
2069     while ((elem = List_next(obj->jobList, elem)) != NULL) {
2070         job = (RcmServer_JobStream *)elem;
2072         /* remove the job stream object from the list */
2073         if (job->jobId == jobId) {
2074             List_remove(obj->jobList, elem);
2075             break;
2076         }
2077     }
2079     GateThread_leave(gateH, key);
2081     if (elem == NULL) {
2082         status = RcmServer_E_JobIdNotFound;
2083         Log_error1(FXNN": failed to find jobId=%d", (IArg)jobId);
2084         goto leave;
2085     }
2087     /* return any pending messages on the message queue */
2088     msgQueH = List_handle(&job->msgQue);
2090     while ((elem = List_get(msgQueH)) != NULL) {
2091         packet = (RcmClient_Packet *)elem;
2092         Log_warning2(
2093             FXNN": returning unprocessed message, jobId=0x%x, packet=0x%x",
2094             (IArg)jobId, (IArg)packet);
2096         RcmServer_setStatusCode_I(packet, RcmServer_Status_Unprocessed);
2098 #if USE_RPMESSAGE
2099         packet->hdr.type = OMX_RAW_MSG;
2100         packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
2101         rval = RPMessage_send(obj->dstProc, obj->replyAddr,
2102                                  obj->localAddr, (Ptr)&packet->hdr,
2103                                  PACKET_HDR_SIZE + packet->message.dataSize);
2104 #else
2105         msgqMsg = &packet->msgqHeader;
2106         rval = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
2107 #endif
2108         if (rval < 0) {
2109             Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)rval);
2110         }
2111     }
2113     /* finalize the job stream object */
2114     List_destruct(&job->msgQue);
2116     xdc_runtime_Memory_free(RcmServer_Module_heap(),
2117         (Ptr)job, sizeof(RcmServer_JobStream));
2120 leave:
2121     Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
2122     return(status);
2124 #undef FXNN
2127 /*
2128  *  ======== RcmServer_serverThrFxn_P ========
2129  */
2130 #define FXNN "RcmServer_serverThrFxn_P"
2131 Void RcmServer_serverThrFxn_P(IArg arg)
2133     Error_Block eb;
2134     RcmClient_Packet *packet;
2135 #if USE_RPMESSAGE
2136     Char         recvBuf[MSGBUFFERSIZE];
2137     UInt16       len;
2138 #else
2139     MessageQ_Msg msgqMsg = NULL;
2140 #endif
2141     Int rval;
2142     Bool running = TRUE;
2143     RcmServer_Object *obj = (RcmServer_Object *)arg;
2144     Int dataSize;
2146 #if USE_RPMESSAGE
2147     packet = (RcmClient_Packet *)&recvBuf[0];
2148 #endif
2150     Log_print1(Diags_ENTRY, "--> "FXNN": (arg=0x%x)", arg);
2152     Error_init(&eb);
2154     /* wait until ready to run */
2155     Semaphore_pend(obj->run, Semaphore_FOREVER, &eb);
2157     if (Error_check(&eb)) {
2158         Log_error0(FXNN": Semaphore_pend failure in server thread");
2159     }
2161     /* main processing loop */
2162     while (running) {
2163         Log_print1(Diags_INFO,
2164             FXNN": waiting for message, thread=0x%x",
2165             (IArg)(obj->serverThread));
2167         /* block until message arrives */
2168         do {
2169 #if USE_RPMESSAGE
2170             rval = RPMessage_recv(obj->serverQue, (Ptr)&packet->hdr, &len,
2171                       &obj->replyAddr, RPMessage_FOREVER);
2172 #if 0
2173             System_printf("RcmServer_serverThrFxn_P: Received msg of len %d "
2174                           "from: %d\n",
2175                          len, obj->replyAddr);
2177             System_printf("hdr - t:%d f:%d l:%d\n", packet->hdr.type,
2178                           packet->hdr.flags, packet->hdr.len);
2180             System_printf("pkt - d:%d m:%d\n", packet->desc, packet->msgId);
2181 #endif
2182             if (packet->hdr.type == OMX_DISC_REQ) {
2183                 System_printf("RcmServer_serverThrFxn_P: Got OMX_DISCONNECT\n");
2184             }
2185             Assert_isTrue((len <= MSGBUFFERSIZE), NULL);
2186             Assert_isTrue((packet->hdr.type == OMX_RAW_MSG) ||
2187                           (packet->hdr.type == OMX_DISC_REQ) , NULL);
2189             if ((rval < 0) && (rval != RPMessage_E_UNBLOCKED)) {
2190 #else
2191             rval = MessageQ_get(obj->serverQue, &msgqMsg, MessageQ_FOREVER);
2192             if ((rval < 0) && (rval != MessageQ_E_UNBLOCKED)) {
2193 #endif
2194                 Log_error1(FXNN": ipc error 0x%x", (IArg)rval);
2195                 /* keep running and hope for the best */
2196             }
2197 #if USE_RPMESSAGE
2198         } while (FALSE);
2199 #else
2200         } while ((msgqMsg == NULL) && !obj->shutdown);
2201 #endif
2203         /* if shutdown, exit this thread */
2204 #if USE_RPMESSAGE
2205         if (obj->shutdown || packet->hdr.type == OMX_DISC_REQ) {
2206             running = FALSE;
2207             Log_print1(Diags_INFO,
2208                 FXNN": terminating, thread=0x%x", (IArg)(obj->serverThread));
2209             continue;
2210         }
2211 #else
2212         if (obj->shutdown) {
2213             running = FALSE;
2214             Log_print1(Diags_INFO,
2215                 FXNN": terminating, thread=0x%x", (IArg)(obj->serverThread));
2216             if (msgqMsg == NULL ) {
2217                 continue;
2218             }
2219         }
2220 #endif
2222 #if USE_RPMESSAGE == 0
2223         packet = (RcmClient_Packet *)msgqMsg;
2224 #endif
2226         Log_print2(Diags_INFO,
2227             FXNN": message received, thread=0x%x packet=0x%x",
2228             (IArg)(obj->serverThread), (IArg)packet);
2230         if ((packet->message.poolId == RcmClient_DEFAULTPOOLID)
2231             && ((obj->poolMap[0])[0].count == 0)) {
2233             /* in-band (server thread) message processing */
2234             RcmServer_process_P(obj, packet);
2235         }
2236         else {
2237             /* out-of-band (worker thread) message processing */
2238             rval = RcmServer_dispatch_P(obj, packet);
2240             /* if error, message was not dispatched; must return to client */
2241             if (rval < 0) {
2242                 switch (rval) {
2243                     case RcmServer_E_JobIdNotFound:
2244                         RcmServer_setStatusCode_I(
2245                             packet, RcmServer_Status_JobNotFound);
2246                         break;
2248                     case RcmServer_E_PoolIdNotFound:
2249                         RcmServer_setStatusCode_I(
2250                             packet, RcmServer_Status_PoolNotFound);
2251                         break;
2253                     default:
2254                         RcmServer_setStatusCode_I(
2255                             packet, RcmServer_Status_Error);
2256                         break;
2257                 }
2258                 packet->message.result = rval;
2260                 /* return the message to the client */
2261 #if USE_RPMESSAGE
2262 #if 0
2263                 System_printf("RcmServer_serverThrFxn_P: "
2264                               "Sending response from: %d to: %d\n",
2265                               obj->localAddr, obj->replyAddr);
2266                 System_printf("sending: %d + %d\n", PACKET_HDR_SIZE,
2267                               packet->message.dataSize);
2268 #endif
2269                 packet->hdr.type = OMX_RAW_MSG;
2270                 if (rval < 0) {
2271                     packet->hdr.len = sizeof(UInt32);
2272                     packet->desc = 0x4142;
2273                     packet->msgId = 0x0044;
2274                     dataSize = sizeof(struct rpmsg_omx_hdr) + sizeof(UInt32);
2275                 }
2276                 else {
2277                     packet->hdr.len = PACKET_DATA_SIZE +
2278                         packet->message.dataSize;
2279                     dataSize = PACKET_HDR_SIZE + packet->message.dataSize;
2280                 }
2281                 rval = RPMessage_send(obj->dstProc, obj->replyAddr,
2282                                  obj->localAddr, (Ptr)&packet->hdr, dataSize);
2283 #else
2284                 rval = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
2285 #endif
2286                 if (rval < 0) {
2287                     Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)rval);
2288                 }
2289             }
2290         }
2291     }
2293     System_printf("RcmServer_serverThrFxn_P: Exiting thread.\n");
2295     Log_print0(Diags_EXIT, "<-- "FXNN":");
2297 #undef FXNN
2300 /*
2301  *  ======== RcmServer_setStatusCode_I ========
2302  */
2303 Void RcmServer_setStatusCode_I(RcmClient_Packet *packet, UInt16 code)
2306     /* code must be 0 - 15, it has to fit in a 4-bit field */
2307     Assert_isTrue((code < 16), NULL);
2309     packet->desc &= ~(RcmClient_Desc_TYPE_MASK);
2310     packet->desc |= ((code << RcmClient_Desc_TYPE_SHIFT)
2311         & RcmClient_Desc_TYPE_MASK);
2315 /*
2316  *  ======== RcmServer_workerThrFxn_P ========
2317  */
2318 #define FXNN "RcmServer_workerThrFxn_P"
2319 Void RcmServer_workerThrFxn_P(IArg arg)
2321     Error_Block eb;
2322     RcmClient_Packet *packet;
2323     List_Elem *elem;
2324     List_Handle listH;
2325     List_Handle readyQueueH;
2326     UInt16 jobId;
2327     GateThread_Handle gateH;
2328     IArg key;
2329     RcmServer_ThreadPool *pool;
2330     RcmServer_JobStream *job;
2331     RcmServer_WorkerThread *obj;
2332     Bool running;
2333     Int rval;
2336     Log_print1(Diags_ENTRY, "--> "FXNN": (arg=0x%x)", arg);
2338     Error_init(&eb);
2339     obj = (RcmServer_WorkerThread *)arg;
2340     readyQueueH = List_handle(&obj->pool->readyQueue);
2341     packet = NULL;
2342     running = TRUE;
2344     /* main processing loop */
2345     while (running) {
2346         Log_print1(Diags_INFO,
2347             FXNN": waiting for job, thread=0x%x", (IArg)(obj->thread));
2349         /* if no current message, wait until signaled to run */
2350         if (packet == NULL) {
2351             Semaphore_pend(obj->pool->sem, Semaphore_FOREVER, &eb);
2353             if (Error_check(&eb)) {
2354                 Log_error0(FXNN": semaphore pend failed");
2355             }
2356         }
2358         /* check if thread should terminate */
2359         if (obj->terminate) {
2360             running = FALSE;
2361             Log_print1(Diags_INFO,
2362                 FXNN": terminating, thread=0x%x", (IArg)(obj->thread));
2363             continue;
2364         }
2366         /* get next message from ready queue */
2367         if (packet == NULL) {
2368             packet = (RcmClient_Packet *)List_get(readyQueueH);
2369         }
2371         if (packet == NULL) {
2372             Log_error1(FXNN": ready queue is empty, thread=0x%x",
2373                 (IArg)(obj->thread));
2374             continue;
2375         }
2377         Log_print2(Diags_INFO, FXNN": job received, thread=0x%x packet=0x%x",
2378             (IArg)obj->thread, (IArg)packet);
2380         /* remember the message job id */
2381         jobId = packet->message.jobId;
2383         /* process the message */
2384         RcmServer_process_P(obj->server, packet);
2385         packet = NULL;
2387         /* If this worker thread just finished processing a job message,
2388          * queue up the next message for this job id. As an optimization,
2389          * if the message is addressed to this worker's pool, then don't
2390          * signal the semaphore, just get the next message from the queue
2391          * and processes it. This keeps the current thread running instead
2392          * of switching to another thread.
2393          */
2394         if (jobId != RcmClient_DISCRETEJOBID) {
2396             /* must protect job list while searching it */
2397             gateH = GateThread_handle(&obj->server->gate);
2398             key = GateThread_enter(gateH);
2400             /* find the job object in the list */
2401             elem = NULL;
2402             while ((elem = List_next(obj->server->jobList, elem)) != NULL) {
2403                 job = (RcmServer_JobStream *)elem;
2404                 if (job->jobId == jobId) {
2405                     break;
2406                 }
2407             }
2409             /* if job object not found, it is not an error */
2410             if (elem == NULL) {
2411                 GateThread_leave(gateH, key);
2412                 continue;
2413             }
2415             /* found the job object */
2416             listH = List_handle(&job->msgQue);
2418             /* get next job message and either process it or queue it */
2419             do {
2420                 elem = List_get(listH);
2422                 if (elem == NULL) {
2423                     job->empty = TRUE;  /* no more messages */
2424                     break;
2425                 }
2426                 else {
2427                     /* get target pool id */
2428                     packet = (RcmClient_Packet *)elem;
2429                     rval = RcmServer_getPool_P(obj->server, packet, &pool);
2431                     /* if error, return the message to the client */
2432                     if (rval < 0) {
2433                         switch (rval) {
2434                             case RcmServer_E_PoolIdNotFound:
2435                                 RcmServer_setStatusCode_I(
2436                                     packet, RcmServer_Status_PoolNotFound);
2437                                 break;
2439                             default:
2440                                 RcmServer_setStatusCode_I(
2441                                     packet, RcmServer_Status_Error);
2442                                 break;
2443                         }
2444                         packet->message.result = rval;
2446 #if USE_RPMESSAGE
2447                         packet->hdr.type = OMX_RAW_MSG;
2448                         packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
2449                         rval = RPMessage_send(
2450                                  (obj->server)->dstProc,
2451                                  (obj->server)->replyAddr,
2452                                  (obj->server)->localAddr, (Ptr)&packet->hdr,
2453                                  PACKET_HDR_SIZE + packet->message.dataSize);
2454 #else
2455                         rval = MessageQ_put(
2456                             MessageQ_getReplyQueue(&packet->msgqHeader),
2457                             &packet->msgqHeader);
2458 #endif
2459                         if (rval < 0) {
2460                             Log_error1(
2461                                 FXNN": unknown ipc error, 0x%x", (IArg)rval);
2462                         }
2463                     }
2464                     /* packet is valid, queue it in the corresponding pool's
2465                      * ready queue */
2466                     else {
2467                         listH = List_handle(&pool->readyQueue);
2468                         List_put(listH, elem);
2469                         packet = NULL;
2470                         Semaphore_post(pool->sem, &eb);
2472                         if (Error_check(&eb)) {
2473                             Log_error0(FXNN": semaphore post failed");
2474                         }
2475                     }
2477                     /* loop around and wait to be run again */
2478                 }
2480             } while (rval < 0);
2482             GateThread_leave(gateH, key);
2483         }
2484     }  /* while (running) */
2486     Log_print0(Diags_EXIT, "<-- "FXNN":");
2488 #undef FXNN
2491 /*
2492  *  ======== RcmServer_getLocalAddress ========
2493  */
2494 UInt32 RcmServer_getLocalAddress(RcmServer_Object *obj)
2496     return(obj->localAddr);
2500 /*
2501  *  ======== RcmServer_getRemoteAddress ========
2502  */
2503 UInt32 RcmServer_getRemoteAddress(RcmServer_Object *obj)
2505     return(obj->replyAddr);
2510 /*
2511  *  ======== RcmServer_getRemoteProc ========
2512  */
2513 UInt16  RcmServer_getRemoteProc(RcmServer_Object *obj)
2515     return(obj->dstProc);