1 /*
2 * Copyright (c) 2011-2019, Texas Instruments Incorporated
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
33 /*
34 * ======== RcmServer.c ========
35 *
36 */
38 /* this define must precede inclusion of any xdc header file */
39 #define Registry_CURDESC ti_grcm_RcmServer__Desc
40 #define MODULE_NAME "ti.grcm.RcmServer"
42 #define xdc_runtime_Memory__nolocalnames /* short name clashes with SysLink */
44 /* rtsc header files */
45 #include <xdc/std.h>
46 #include <xdc/runtime/Assert.h>
47 #include <xdc/runtime/Diags.h>
48 #include <xdc/runtime/Error.h>
49 #include <xdc/runtime/IHeap.h>
50 #include <xdc/runtime/Log.h>
51 #include <xdc/runtime/Memory.h>
52 #include <xdc/runtime/Registry.h>
53 #include <xdc/runtime/Startup.h>
54 #include <xdc/runtime/knl/GateThread.h>
55 #include <xdc/runtime/knl/ISemaphore.h>
56 #include <xdc/runtime/knl/Semaphore.h>
57 #include <xdc/runtime/knl/SemThread.h>
58 #include <xdc/runtime/knl/Thread.h>
59 #include <xdc/runtime/System.h>
61 #include <ti/sysbios/knl/Task.h>
63 #define MSGBUFFERSIZE 512 /* Make global and move to RPMessage.h */
65 #if defined(RCM_ti_ipc)
66 #include <ti/sdo/utils/List.h>
67 #include <ti/ipc/MultiProc.h>
69 #elif defined(RCM_ti_syslink)
70 #include <ti/syslink/utils/List.h>
71 #define List_Struct List_Object
72 #define List_handle(exp) (exp)
74 #else
75 #error "undefined ipc binding";
76 #endif
78 /* local header files */
79 #include "RcmClient.h"
80 #include "RcmTypes.h"
81 #include "RcmServer.h"
83 #if USE_RPMESSAGE
84 #include <ti/srvmgr/rpmsg_omx.h>
85 #endif
87 #define _RCM_KeyResetValue 0x07FF /* key reset value*/
88 #define _RCM_KeyMask 0x7FF00000 /* key mask in function index*/
89 #define _RCM_KeyShift 20 /* key bit position in function index*/
91 #define RcmServer_MAX_TABLES 9 /* max number of function tables*/
92 #define RcmServer_POOL_MAP_LEN 4 /* pool map length*/
94 #define RcmServer_E_InvalidFxnIdx (-101)
95 #define RcmServer_E_JobIdNotFound (-102)
96 #define RcmServer_E_PoolIdNotFound (-103)
98 typedef struct { /* function table element*/
99 String name;
100 #if USE_RPMESSAGE
101 union {
102 RcmServer_MsgFxn fxn;
103 RcmServer_MsgCreateFxn createFxn;
104 }addr;
105 #else
106 RcmServer_MsgFxn addr;
107 #endif
108 UInt16 key;
109 } RcmServer_FxnTabElem;
111 typedef struct {
112 Int length;
113 RcmServer_FxnTabElem * elem;
114 } RcmServer_FxnTabElemAry;
116 typedef struct {
117 String name; /* pool name*/
118 Int count; /* thread count (at create time)*/
119 Thread_Priority priority; /* thread priority*/
120 Int osPriority;
121 SizeT stackSize; /* thread stack size*/
122 String stackSeg; /* thread stack placement*/
123 ISemaphore_Handle sem; /* message semaphore (counting)*/
124 List_Struct threadList; /* list of worker threads*/
125 List_Struct readyQueue; /* queue of messages*/
126 } RcmServer_ThreadPool;
128 typedef struct RcmServer_Object_tag {
129 GateThread_Struct gate; /* instance gate*/
130 Ptr run; /* run semaphore for the server*/
131 #if USE_RPMESSAGE
132 RPMessage_Handle serverQue; /* inbound message queue */
133 UInt32 localAddr; /* inbound message queue address */
134 UInt32 replyAddr; /* Reply address (same per inst.) */
135 UInt32 dstProc; /* Reply processor. */
136 #else
137 MessageQ_Handle serverQue; /* inbound message queue*/
138 #endif
139 Thread_Handle serverThread; /* server thread object*/
140 RcmServer_FxnTabElemAry fxnTabStatic; /* static function table*/
141 RcmServer_FxnTabElem * fxnTab[RcmServer_MAX_TABLES]; /* base pointers*/
142 UInt16 key; /* function index key*/
143 UInt16 jobId; /* job id tracker*/
144 Bool shutdown; /* server shutdown flag*/
145 Int poolMap0Len;/* length of static table*/
146 RcmServer_ThreadPool * poolMap[RcmServer_POOL_MAP_LEN];
147 List_Handle jobList; /* list of job stream queues*/
148 } RcmServer_Object;
150 typedef struct {
151 List_Elem elem;
152 UInt16 jobId; /* current job stream id*/
153 Thread_Handle thread; /* server thread object*/
154 Bool terminate; /* thread terminate flag*/
155 RcmServer_ThreadPool* pool; /* worker pool*/
156 RcmServer_Object * server; /* server instance*/
157 } RcmServer_WorkerThread;
159 typedef struct {
160 List_Elem elem;
161 UInt16 jobId; /* job stream id*/
162 Bool empty; /* true if no messages on server*/
163 List_Struct msgQue; /* queue of messages*/
164 } RcmServer_JobStream;
166 typedef struct RcmServer_Module_tag {
167 String name;
168 IHeap_Handle heap;
169 } RcmServer_Module;
172 /* private functions */
173 static
174 Int RcmServer_Instance_init_P(
175 RcmServer_Object * obj,
176 String name,
177 const RcmServer_Params * params
178 );
180 static
181 Int RcmServer_Instance_finalize_P(
182 RcmServer_Object * obj
183 );
185 static
186 Int RcmServer_acqJobId_P(
187 RcmServer_Object * obj,
188 UInt16 * jobIdPtr
189 );
191 static
192 Int RcmServer_dispatch_P(
193 RcmServer_Object * obj,
194 RcmClient_Packet * packet
195 );
197 static
198 Int RcmServer_execMsg_I(
199 RcmServer_Object * obj,
200 RcmClient_Message * msg
201 );
203 static
204 Int RcmServer_getFxnAddr_P(
205 RcmServer_Object * obj,
206 UInt32 fxnIdx,
207 RcmServer_MsgFxn * addrPtr,
208 RcmServer_MsgCreateFxn * createPtr
209 );
211 static
212 UInt16 RcmServer_getNextKey_P(
213 RcmServer_Object * obj
214 );
216 static
217 Int RcmServer_getSymIdx_P(
218 RcmServer_Object * obj,
219 String name,
220 UInt32 * index
221 );
223 static
224 Int RcmServer_getPool_P(
225 RcmServer_Object * obj,
226 RcmClient_Packet * packet,
227 RcmServer_ThreadPool ** poolP
228 );
230 static
231 Void RcmServer_process_P(
232 RcmServer_Object * obj,
233 RcmClient_Packet * packet
234 );
236 static
237 Int RcmServer_relJobId_P(
238 RcmServer_Object * obj,
239 UInt16 jobId
240 );
242 static
243 Void RcmServer_serverThrFxn_P(
244 IArg arg
245 );
247 static inline
248 Void RcmServer_setStatusCode_I(
249 RcmClient_Packet * packet,
250 UInt16 code
251 );
253 static
254 Void RcmServer_workerThrFxn_P(
255 IArg arg
256 );
259 #define RcmServer_Module_heap() (RcmServer_Mod.heap)
262 /* static objects */
263 static Int curInit = 0;
264 static Char ti_grcm_RcmServer_Name[] = {
265 't','i','.','s','d','o','.','r','c','m','.',
266 'R','c','m','S','e','r','v','e','r','\0'
267 };
269 static RcmServer_Module RcmServer_Mod = {
270 MODULE_NAME, /* name */
271 (IHeap_Handle)NULL /* heap */
272 };
274 /* module diags mask */
275 Registry_Desc Registry_CURDESC;
278 /*
279 * ======== RcmServer_init ========
280 *
281 * This function must be serialized by the caller
282 */
283 Void RcmServer_init(Void)
284 {
285 Registry_Result result;
288 if (curInit++ != 0) {
289 return; /* module already initialized */
290 }
292 /* register with xdc.runtime to get a diags mask */
293 /* result = Registry_addModule(&Registry_CURDESC, MODULE_NAME);*/
294 result = Registry_addModule(&Registry_CURDESC, ti_grcm_RcmServer_Name);
295 Assert_isTrue(result == Registry_SUCCESS, (Assert_Id)NULL);
297 /* the size of object and struct must be the same */
298 Assert_isTrue(sizeof(RcmServer_Object) == sizeof(RcmServer_Struct), NULL);
299 }
302 /*
303 * ======== RcmServer_exit ========
304 *
305 * This function must be serialized by the caller
306 */
307 Void RcmServer_exit(Void)
308 {
309 /* Registry_Result result;*/
312 if (--curInit != 0) {
313 return; /* module still in use */
314 }
316 /* unregister from xdc.runtime */
317 /* result = Registry_removeModule(MODULE_NAME);*/
318 /* Assert_isTrue(result == Registry_SUCCESS, (Assert_Id)NULL);*/
319 }
322 /*
323 * ======== RcmServer_Params_init ========
324 */
325 Void RcmServer_Params_init(RcmServer_Params *params)
326 {
327 /* server thread */
328 params->priority = Thread_Priority_HIGHEST;
329 params->osPriority = Thread_INVALID_OS_PRIORITY;
330 params->stackSize = 0; /* use system default*/
331 params->stackSeg = "";
333 /* default pool */
334 params->defaultPool.name = NULL;
335 params->defaultPool.count = 0;
336 params->defaultPool.priority = Thread_Priority_NORMAL;
337 params->defaultPool.osPriority = Thread_INVALID_OS_PRIORITY;
338 params->defaultPool.stackSize = 0; /* use system default*/
339 params->defaultPool.stackSeg = "";
341 /* worker pools */
342 params->workerPools.length = 0;
343 params->workerPools.elem = NULL;
345 /* function table */
346 params->fxns.length = 0;
347 params->fxns.elem = NULL;
348 }
351 /*
352 * ======== RcmServer_create ========
353 */
354 #define FXNN "RcmServer_create"
355 Int RcmServer_create(String name, RcmServer_Params *params,
356 RcmServer_Handle *handle)
357 {
358 RcmServer_Object *obj;
359 Error_Block eb;
360 Int status = RcmServer_S_SUCCESS;
363 Log_print3(Diags_ENTRY, "--> "FXNN": (name=0x%x, params=0x%x, hP=0x%x)",
364 (IArg)name, (IArg)params, (IArg)handle);
366 /* initialize the error block */
367 Error_init(&eb);
369 if (NULL == handle) {
370 Log_error0(FXNN": Invalid pointer");
371 status = RcmServer_E_FAIL;
372 goto leave;
373 }
375 *handle = (RcmServer_Handle)NULL;
377 /* check for valid params */
378 if (NULL == params) {
379 Log_error0(FXNN": params ptr must not be NULL");
380 status = RcmServer_E_FAIL;
381 goto leave;
382 }
383 if (NULL == name) {
384 Log_error0(FXNN": name passed is NULL!");
385 status = RcmServer_E_FAIL;
386 goto leave;
387 }
389 /* allocate the object */
390 obj = (RcmServer_Handle)xdc_runtime_Memory_calloc(RcmServer_Module_heap(),
391 sizeof(RcmServer_Object), sizeof(Int), &eb);
393 if (NULL == obj) {
394 Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
395 (IArg)RcmServer_Module_heap(), sizeof(RcmServer_Object));
396 status = RcmServer_E_NOMEMORY;
397 goto leave;
398 }
400 Log_print1(Diags_LIFECYCLE, FXNN": instance create: 0x%x", (IArg)obj);
402 /* object-specific initialization */
403 status = RcmServer_Instance_init_P(obj, name, params);
405 if (status < 0) {
406 RcmServer_Instance_finalize_P(obj);
407 xdc_runtime_Memory_free(RcmServer_Module_heap(),
408 (Ptr)obj, sizeof(RcmServer_Object));
409 goto leave;
410 }
412 /* success, return opaque pointer */
413 *handle = (RcmServer_Handle)obj;
416 leave:
417 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
418 return(status);
419 }
420 #undef FXNN
423 /*
424 * ======== RcmServer_construct ========
425 */
426 #define FXNN "RcmServer_construct"
427 Int RcmServer_construct(RcmServer_Struct *structPtr, String name,
428 const RcmServer_Params *params)
429 {
430 RcmServer_Object *obj = (RcmServer_Object*)structPtr;
431 Int status = RcmServer_S_SUCCESS;
434 Log_print1(Diags_ENTRY, "--> "FXNN": (structPtr=0x%x)", (IArg)structPtr);
435 Log_print1(Diags_LIFECYCLE, FXNN": instance construct: 0x%x", (IArg)obj);
437 /* ensure the constructed object is zeroed */
438 _memset((Void *)obj, 0, sizeof(RcmServer_Object));
440 /* object-specific initialization */
441 status = RcmServer_Instance_init_P(obj, name, params);
443 if (status < 0) {
444 RcmServer_Instance_finalize_P(obj);
445 goto leave;
446 }
449 leave:
450 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
451 return(status);
452 }
453 #undef FXNN
456 /*
457 * ======== RcmServer_Instance_init_P ========
458 */
459 #define FXNN "RcmServer_Instance_init_P"
460 Int RcmServer_Instance_init_P(RcmServer_Object *obj, String name,
461 const RcmServer_Params *params)
462 {
463 Error_Block eb;
464 List_Params listP;
465 #if USE_RPMESSAGE == 0
466 MessageQ_Params mqParams;
467 #endif
468 Thread_Params threadP;
469 SemThread_Params semThreadP;
470 SemThread_Handle semThreadH;
471 Int i, j;
472 SizeT size;
473 Char *cp;
474 RcmServer_ThreadPool *poolAry;
475 RcmServer_WorkerThread *worker;
476 List_Handle listH;
477 Int status = RcmServer_S_SUCCESS;
480 Log_print1(Diags_ENTRY, "--> "FXNN": (obj=0x%x)", (IArg)obj);
482 Error_init(&eb);
484 /* initialize instance state */
485 obj->shutdown = FALSE;
486 obj->key = 0;
487 obj->jobId = 0xFFFF;
488 obj->run = NULL;
489 obj->serverQue = NULL;
490 obj->serverThread = NULL;
491 obj->fxnTabStatic.length = 0;
492 obj->fxnTabStatic.elem = NULL;
493 obj->poolMap0Len = 0;
494 obj->jobList = NULL;
497 /* initialize the function table */
498 for (i = 0; i < RcmServer_MAX_TABLES; i++) {
499 obj->fxnTab[i] = NULL;
500 }
502 /* initialize the worker pool map */
503 for (i = 0; i < RcmServer_POOL_MAP_LEN; i++) {
504 obj->poolMap[i] = NULL;
505 }
507 /* create the instance gate */
508 GateThread_construct(&obj->gate, NULL, &eb);
510 if (Error_check(&eb)) {
511 Log_error0(FXNN": could not create gate object");
512 status = RcmServer_E_FAIL;
513 goto leave;
514 }
516 /* create list for job objects */
517 #if defined(RCM_ti_ipc)
518 List_Params_init(&listP);
519 obj->jobList = List_create(&listP, &eb);
521 if (Error_check(&eb)) {
522 Log_error0(FXNN": could not create list object");
523 status = RcmServer_E_FAIL;
524 goto leave;
525 }
526 #elif defined(RCM_ti_syslink)
527 List_Params_init(&listP);
528 obj->jobList = List_create(&listP, NULL);
530 if (obj->jobList == NULL) {
531 Log_error0(FXNN": could not create list object");
532 status = RcmServer_E_FAIL;
533 goto leave;
534 }
535 #endif
537 /* create the static function table */
538 if (params->fxns.length > 0) {
539 obj->fxnTabStatic.length = params->fxns.length;
541 /* allocate static function table */
542 size = params->fxns.length * sizeof(RcmServer_FxnTabElem);
543 obj->fxnTabStatic.elem = xdc_runtime_Memory_alloc(
544 RcmServer_Module_heap(), size, sizeof(Ptr), &eb);
546 if (Error_check(&eb)) {
547 Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
548 (IArg)RcmServer_Module_heap(), size);
549 status = RcmServer_E_NOMEMORY;
550 goto leave;
551 }
552 obj->fxnTabStatic.elem[0].name = NULL;
554 /* allocate a single block to store all name strings */
555 for (size = 0, i = 0; i < params->fxns.length; i++) {
556 size += _strlen(params->fxns.elem[i].name) + 1;
557 }
558 cp = xdc_runtime_Memory_alloc(
559 RcmServer_Module_heap(), size, sizeof(Ptr), &eb);
561 if (Error_check(&eb)) {
562 Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
563 (IArg)RcmServer_Module_heap(), size);
564 status = RcmServer_E_NOMEMORY;
565 goto leave;
566 }
568 /* copy function table data into allocated memory blocks */
569 for (i = 0; i < params->fxns.length; i++) {
570 _strcpy(cp, params->fxns.elem[i].name);
571 obj->fxnTabStatic.elem[i].name = cp;
572 cp += (_strlen(params->fxns.elem[i].name) + 1);
573 obj->fxnTabStatic.elem[i].addr.fxn = params->fxns.elem[i].addr.fxn;
574 obj->fxnTabStatic.elem[i].key = 0;
575 }
577 /* hook up the static function table */
578 obj->fxnTab[0] = obj->fxnTabStatic.elem;
579 }
581 /* create static worker pools */
582 if ((params->workerPools.length + 1) > RcmServer_POOL_MAP_LEN) {
583 Log_error1(FXNN": Exceeded maximum number of worker pools =%d",
584 (IArg) (params->workerPools.length) );
585 status = RcmServer_E_NOMEMORY;
586 goto leave;
587 }
588 obj->poolMap0Len = params->workerPools.length + 1; /* workers + default */
590 /* allocate the static worker pool table */
591 size = obj->poolMap0Len * sizeof(RcmServer_ThreadPool);
592 obj->poolMap[0] = (RcmServer_ThreadPool *)xdc_runtime_Memory_alloc(
593 RcmServer_Module_heap(), size, sizeof(Ptr), &eb);
595 if (Error_check(&eb)) {
596 Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
597 (IArg)RcmServer_Module_heap(), size);
598 status = RcmServer_E_NOMEMORY;
599 goto leave;
600 }
602 /* convenience alias */
603 poolAry = obj->poolMap[0];
605 /* allocate a single block to store all name strings */
606 /* Buffer format is: [SIZE][DPN][\0][WPN1][\0][WPN2][\0].... */
607 /* DPN = Default Pool Name, WPN = Worker Pool Name */
608 /* In case, DPN is NULL, format is: [SIZE][\0][WPN1][\0].... */
609 /* In case, WPN is NULL, format is: [SIZE][DPN][\0] */
610 size = sizeof(SizeT) /* block size */
611 + (params->defaultPool.name == NULL ? 1 :
612 _strlen(params->defaultPool.name) + 1);
614 for (i = 0; i < params->workerPools.length; i++) {
615 size += (params->workerPools.elem[i].name == NULL ? 0 :
616 _strlen(params->workerPools.elem[i].name) + 1);
617 }
618 cp = xdc_runtime_Memory_alloc(
619 RcmServer_Module_heap(), size, sizeof(Ptr), &eb);
621 if (Error_check(&eb)) {
622 Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
623 (IArg)RcmServer_Module_heap(), size);
624 status = RcmServer_E_NOMEMORY;
625 goto leave;
626 }
628 *(SizeT *)cp = size;
629 cp += sizeof(SizeT);
631 /* initialize the default worker pool, poolAry[0] */
632 if (params->defaultPool.name != NULL) {
633 _strcpy(cp, params->defaultPool.name);
634 poolAry[0].name = cp;
635 cp += (_strlen(params->defaultPool.name) + 1);
636 }
637 else {
638 poolAry[0].name = cp;
639 *cp++ = '\0';
640 }
642 poolAry[0].count = params->defaultPool.count;
643 poolAry[0].priority = params->defaultPool.priority;
644 poolAry[0].osPriority = params->defaultPool.osPriority;
645 poolAry[0].stackSize = params->defaultPool.stackSize;
646 poolAry[0].stackSeg = NULL;
647 poolAry[0].sem = NULL;
649 List_construct(&(poolAry[0].threadList), NULL);
650 List_construct(&(poolAry[0].readyQueue), NULL);
652 SemThread_Params_init(&semThreadP);
653 semThreadP.mode = SemThread_Mode_COUNTING;
655 semThreadH = SemThread_create(0, &semThreadP, &eb);
657 if (Error_check(&eb)) {
658 Log_error0(FXNN": could not create semaphore");
659 status = RcmServer_E_FAIL;
660 goto leave;
661 }
663 poolAry[0].sem = SemThread_Handle_upCast(semThreadH);
665 /* initialize the static worker pools, poolAry[1..(n-1)] */
666 for (i = 0; i < params->workerPools.length; i++) {
667 if (params->workerPools.elem[i].name != NULL) {
668 _strcpy(cp, params->workerPools.elem[i].name);
669 poolAry[i+1].name = cp;
670 cp += (_strlen(params->workerPools.elem[i].name) + 1);
671 }
672 else {
673 poolAry[i+1].name = NULL;
674 }
676 poolAry[i+1].count = params->workerPools.elem[i].count;
677 poolAry[i+1].priority = params->workerPools.elem[i].priority;
678 poolAry[i+1].osPriority =params->workerPools.elem[i].osPriority;
679 poolAry[i+1].stackSize = params->workerPools.elem[i].stackSize;
680 poolAry[i+1].stackSeg = NULL;
682 List_construct(&(poolAry[i+1].threadList), NULL);
683 List_construct(&(poolAry[i+1].readyQueue), NULL);
685 SemThread_Params_init(&semThreadP);
686 semThreadP.mode = SemThread_Mode_COUNTING;
688 semThreadH = SemThread_create(0, &semThreadP, &eb);
690 if (Error_check(&eb)) {
691 Log_error0(FXNN": could not create semaphore");
692 status = RcmServer_E_FAIL;
693 goto leave;
694 }
696 poolAry[i+1].sem = SemThread_Handle_upCast(semThreadH);
697 }
699 /* create the worker threads in each static pool */
700 for (i = 0; i < obj->poolMap0Len; i++) {
701 for (j = 0; j < poolAry[i].count; j++) {
703 /* allocate worker thread object */
704 size = sizeof(RcmServer_WorkerThread);
705 worker = (RcmServer_WorkerThread *)xdc_runtime_Memory_alloc(
706 RcmServer_Module_heap(), size, sizeof(Ptr), &eb);
708 if (Error_check(&eb)) {
709 Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
710 (IArg)RcmServer_Module_heap(), size);
711 status = RcmServer_E_NOMEMORY;
712 goto leave;
713 }
715 /* initialize worker thread object */
716 worker->jobId = RcmClient_DISCRETEJOBID;
717 worker->thread = NULL;
718 worker->terminate = FALSE;
719 worker->pool = &(poolAry[i]);
720 worker->server = obj;
722 /* add worker thread to worker pool */
723 listH = List_handle(&(poolAry[i].threadList));
724 List_putHead(listH, &(worker->elem));
726 /* create worker thread */
727 Thread_Params_init(&threadP);
728 threadP.arg = (IArg)worker;
729 threadP.priority = poolAry[i].priority;
730 threadP.osPriority = poolAry[i].osPriority;
731 threadP.stackSize = poolAry[i].stackSize;
732 threadP.instance->name = "RcmServer_workerThr";
734 worker->thread = Thread_create(
735 (Thread_RunFxn)(RcmServer_workerThrFxn_P), &threadP, &eb);
737 if (Error_check(&eb)) {
738 Log_error2(FXNN": could not create worker thread, "
739 "pool=%d, thread=%d", (IArg)i, (IArg)j);
740 status = RcmServer_E_FAIL;
741 goto leave;
742 }
743 }
744 }
746 /* create the semaphore used to release the server thread */
747 SemThread_Params_init(&semThreadP);
748 semThreadP.mode = SemThread_Mode_COUNTING;
750 obj->run = SemThread_create(0, &semThreadP, &eb);
752 if (Error_check(&eb)) {
753 Log_error0(FXNN": could not create semaphore");
754 status = RcmServer_E_FAIL;
755 goto leave;
756 }
758 /* create the message queue for inbound messages */
759 #if USE_RPMESSAGE
760 obj->serverQue = RPMessage_create(RPMessage_ASSIGN_ANY, NULL, NULL,
761 &obj->localAddr);
762 #ifdef BIOS_ONLY_TEST
763 obj->dstProc = MultiProc_self();
764 #else
765 obj->dstProc = MultiProc_getId("HOST");
766 #endif
768 #else
769 MessageQ_Params_init(&mqParams);
770 obj->serverQue = MessageQ_create(name, &mqParams);
771 #endif
773 if (NULL == obj->serverQue) {
774 Log_error0(FXNN": could not create server message queue");
775 status = RcmServer_E_FAIL;
776 goto leave;
777 }
779 /* create the server thread */
780 Thread_Params_init(&threadP);
781 threadP.arg = (IArg)obj;
782 threadP.priority = params->priority;
783 threadP.osPriority = params->osPriority;
784 threadP.stackSize = params->stackSize;
785 threadP.instance->name = "RcmServer_serverThr";
787 obj->serverThread = Thread_create(
788 (Thread_RunFxn)(RcmServer_serverThrFxn_P), &threadP, &eb);
790 if (Error_check(&eb)) {
791 Log_error0(FXNN": could not create server thread");
792 status = RcmServer_E_FAIL;
793 goto leave;
794 }
797 leave:
798 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
799 return(status);
800 }
801 #undef FXNN
804 /*
805 * ======== RcmServer_delete ========
806 */
807 #define FXNN "RcmServer_delete"
808 Int RcmServer_delete(RcmServer_Handle *handlePtr)
809 {
810 RcmServer_Object *obj = (RcmServer_Object *)(*handlePtr);
811 Int status = RcmClient_S_SUCCESS;
814 Log_print1(Diags_ENTRY, "--> "FXNN": (handlePtr=0x%x)", (IArg)handlePtr);
816 /* finalize the object */
817 status = RcmServer_Instance_finalize_P(obj);
819 /* free the object memory */
820 Log_print1(Diags_LIFECYCLE, FXNN": instance delete: 0x%x", (IArg)obj);
822 xdc_runtime_Memory_free(RcmServer_Module_heap(),
823 (Ptr)obj, sizeof(RcmServer_Object));
824 *handlePtr = NULL;
826 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
827 return(status);
828 }
829 #undef FXNN
832 /*
833 * ======== RcmServer_destruct ========
834 */
835 #define FXNN "RcmServer_destruct"
836 Int RcmServer_destruct(RcmServer_Struct *structPtr)
837 {
838 RcmServer_Object *obj = (RcmServer_Object *)(structPtr);
839 Int status = RcmClient_S_SUCCESS;
842 Log_print1(Diags_ENTRY, "--> "FXNN": (structPtr=0x%x)", (IArg)structPtr);
843 Log_print1(Diags_LIFECYCLE, FXNN": instance destruct: 0x%x", (IArg)obj);
845 /* finalize the object */
846 status = RcmServer_Instance_finalize_P(obj);
848 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
849 return(status);
850 }
851 #undef FXNN
854 /*
855 * ======== RcmServer_Instance_finalize_P ========
856 */
857 #define FXNN "RcmServer_Instance_finalize_P"
858 Int RcmServer_Instance_finalize_P(RcmServer_Object *obj)
859 {
860 Int i, j;
861 Int size;
862 Char *cp;
863 UInt tabCount;
864 RcmServer_FxnTabElem *fdp;
865 Error_Block eb;
866 RcmServer_ThreadPool *poolAry;
867 RcmServer_WorkerThread *worker;
868 List_Elem *elem;
869 List_Handle listH;
870 List_Handle msgQueH;
871 RcmClient_Packet *packet;
872 #if USE_RPMESSAGE == 0
873 MessageQ_Msg msgqMsg;
874 #endif
875 SemThread_Handle semThreadH;
876 RcmServer_JobStream *job;
877 Int rval;
878 Int status = RcmClient_S_SUCCESS;
880 Log_print1(Diags_ENTRY, "--> "FXNN": (obj=0x%x)", (IArg)obj);
882 /* must initialize the error block before using it */
883 Error_init(&eb);
885 /* block until server thread exits */
886 obj->shutdown = TRUE;
888 if (obj->serverThread != NULL) {
889 #if USE_RPMESSAGE
890 RPMessage_unblock(obj->serverQue);
891 #else
892 MessageQ_unblock(obj->serverQue);
893 #endif
894 Thread_join(obj->serverThread, &eb);
896 if (Error_check(&eb)) {
897 Log_error0(FXNN": server thread did not exit properly");
898 status = RcmServer_E_FAIL;
899 goto leave;
900 }
901 }
903 /* delete any remaining job objects (there should not be any) */
904 while ((elem = List_get(obj->jobList)) != NULL) {
905 job = (RcmServer_JobStream *)elem;
907 /* return any remaining messages (there should not be any) */
908 msgQueH = List_handle(&job->msgQue);
910 while ((elem = List_get(msgQueH)) != NULL) {
911 packet = (RcmClient_Packet *)elem;
912 Log_warning2(
913 FXNN": returning unprocessed message, jobId=0x%x, packet=0x%x",
914 (IArg)job->jobId, (IArg)packet);
916 RcmServer_setStatusCode_I(packet, RcmServer_Status_Unprocessed);
917 #if USE_RPMESSAGE
918 packet->hdr.type = OMX_RAW_MSG;
919 packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
920 rval = RPMessage_send(obj->dstProc, obj->replyAddr,
921 obj->localAddr, (Ptr)&packet->hdr,
922 PACKET_HDR_SIZE + packet->message.dataSize);
923 #else
924 msgqMsg = &packet->msgqHeader;
925 rval = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
926 #endif
927 if (rval < 0) {
928 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)rval);
929 }
930 }
932 /* finalize the job stream object */
933 List_destruct(&job->msgQue);
935 xdc_runtime_Memory_free(RcmServer_Module_heap(),
936 (Ptr)job, sizeof(RcmServer_JobStream));
937 }
938 List_delete(&(obj->jobList));
940 /* convenience alias */
941 poolAry = obj->poolMap[0];
943 /* free all the static pool resources */
944 for (i = 0; i < obj->poolMap0Len; i++) {
946 /* free all the worker thread objects */
947 listH = List_handle(&(poolAry[i].threadList));
949 /* mark each worker thread for termination */
950 elem = NULL;
951 while ((elem = List_next(listH, elem)) != NULL) {
952 worker = (RcmServer_WorkerThread *)elem;
953 worker->terminate = TRUE;
954 }
956 /* unblock each worker thread so it can terminate */
957 elem = NULL;
958 while ((elem = List_next(listH, elem)) != NULL) {
959 Semaphore_post(poolAry[i].sem, &eb);
961 if (Error_check(&eb)) {
962 Log_error0(FXNN": post failed on thread");
963 status = RcmServer_E_FAIL;
964 goto leave;
965 }
966 }
968 /* wait for each worker thread to terminate */
969 elem = NULL;
970 while ((elem = List_get(listH)) != NULL) {
971 worker = (RcmServer_WorkerThread *)elem;
973 Thread_join(worker->thread, &eb);
975 if (Error_check(&eb)) {
976 Log_error1(
977 FXNN": worker thread did not exit properly, thread=0x%x",
978 (IArg)worker->thread);
979 status = RcmServer_E_FAIL;
980 goto leave;
981 }
983 Thread_delete(&worker->thread);
985 /* free the worker thread object */
986 xdc_runtime_Memory_free(RcmServer_Module_heap(), (Ptr)worker,
987 sizeof(RcmServer_WorkerThread));
988 }
990 /* free up pool resources */
991 semThreadH = SemThread_Handle_downCast(poolAry[i].sem);
992 SemThread_delete(&semThreadH);
993 List_destruct(&(poolAry[i].threadList));
995 /* return any remaining messages on the readyQueue */
996 msgQueH = List_handle(&poolAry[i].readyQueue);
998 while ((elem = List_get(msgQueH)) != NULL) {
999 packet = (RcmClient_Packet *)elem;
1000 Log_warning2(
1001 FXNN": returning unprocessed message, msgId=0x%x, packet=0x%x",
1002 (IArg)packet->msgId, (IArg)packet);
1004 RcmServer_setStatusCode_I(packet, RcmServer_Status_Unprocessed);
1005 #if USE_RPMESSAGE
1006 packet->hdr.type = OMX_RAW_MSG;
1007 packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1008 rval = RPMessage_send(obj->dstProc, obj->replyAddr,
1009 obj->localAddr, (Ptr)&packet->hdr,
1010 PACKET_HDR_SIZE + packet->message.dataSize);
1011 #else
1012 msgqMsg = &packet->msgqHeader;
1013 rval = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1014 #endif
1015 if (rval < 0) {
1016 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)rval);
1017 }
1018 }
1020 List_destruct(&(poolAry[i].readyQueue));
1021 }
1023 /* free the name block for the static pools */
1024 if ((obj->poolMap[0] != NULL) && (obj->poolMap[0]->name != NULL)) {
1025 cp = obj->poolMap[0]->name;
1026 cp -= sizeof(SizeT);
1027 xdc_runtime_Memory_free(RcmServer_Module_heap(), (Ptr)cp, *(SizeT *)cp);
1028 }
1030 /* free the static worker pool table */
1031 if (obj->poolMap[0] != NULL) {
1032 xdc_runtime_Memory_free(RcmServer_Module_heap(), (Ptr)(obj->poolMap[0]),
1033 obj->poolMap0Len * sizeof(RcmServer_ThreadPool));
1034 obj->poolMap[0] = NULL;
1035 }
1037 #if 0
1038 /* free all dynamic worker pools */
1039 for (p = 1; p < RcmServer_POOL_MAP_LEN; p++) {
1040 if ((poolAry = obj->poolMap[p]) == NULL) {
1041 continue;
1042 }
1043 }
1044 #endif
1046 /* free up the dynamic function tables and any leftover name strings */
1047 for (i = 1; i < RcmServer_MAX_TABLES; i++) {
1048 if (obj->fxnTab[i] != NULL) {
1049 tabCount = (1 << (i + 4));
1050 for (j = 0; j < (Int)tabCount; j++) {
1051 if (((obj->fxnTab[i])+j)->name != NULL) {
1052 cp = ((obj->fxnTab[i])+j)->name;
1053 size = _strlen(cp) + 1;
1054 xdc_runtime_Memory_free(RcmServer_Module_heap(), cp, size);
1055 }
1056 }
1057 fdp = obj->fxnTab[i];
1058 size = tabCount * sizeof(RcmServer_FxnTabElem);
1059 xdc_runtime_Memory_free(RcmServer_Module_heap(), fdp, size);
1060 obj->fxnTab[i] = NULL;
1061 }
1062 }
1064 if (NULL != obj->serverThread) {
1065 Thread_delete(&obj->serverThread);
1066 }
1068 if (NULL != obj->serverQue) {
1069 #if USE_RPMESSAGE
1070 RPMessage_delete(&obj->serverQue);
1071 #else
1072 MessageQ_delete(&obj->serverQue);
1073 #endif
1074 }
1076 if (NULL != obj->run) {
1077 SemThread_delete((SemThread_Handle *)(&obj->run));
1078 }
1080 /* free the name block for the static function table */
1081 if ((NULL != obj->fxnTabStatic.elem) &&
1082 (NULL != obj->fxnTabStatic.elem[0].name)) {
1083 for (size = 0, i = 0; i < obj->fxnTabStatic.length; i++) {
1084 size += _strlen(obj->fxnTabStatic.elem[i].name) + 1;
1085 }
1086 xdc_runtime_Memory_free(
1087 RcmServer_Module_heap(),
1088 obj->fxnTabStatic.elem[0].name, size);
1089 }
1091 /* free the static function table */
1092 if (NULL != obj->fxnTabStatic.elem) {
1093 xdc_runtime_Memory_free(RcmServer_Module_heap(),
1094 obj->fxnTabStatic.elem,
1095 obj->fxnTabStatic.length * sizeof(RcmServer_FxnTabElem));
1096 }
1098 /* destruct the instance gate */
1099 GateThread_destruct(&obj->gate);
1102 leave:
1103 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1104 return(status);
1105 }
1106 #undef FXNN
1109 /*
1110 * ======== RcmServer_addSymbol ========
1111 */
1112 #define FXNN "RcmServer_addSymbol"
1113 Int RcmServer_addSymbol(RcmServer_Object *obj, String funcName,
1114 RcmServer_MsgFxn addr, UInt32 *index)
1115 {
1116 GateThread_Handle gateH;
1117 IArg key;
1118 Int len;
1119 UInt i, j;
1120 UInt tabCount;
1121 SizeT tabSize;
1122 UInt32 fxnIdx = 0xFFFF;
1123 RcmServer_FxnTabElem *slot = NULL;
1124 Error_Block eb;
1125 Int status = RcmServer_S_SUCCESS;
1128 Log_print3(Diags_ENTRY,
1129 "--> "FXNN": (obj=0x%x, name=0x%x, addr=0x%x)",
1130 (IArg)obj, (IArg)funcName, (IArg)addr);
1132 Error_init(&eb);
1134 /* protect the symbol table while changing it */
1135 gateH = GateThread_handle(&obj->gate);
1136 key = GateThread_enter(gateH);
1138 /* look for an empty slot to use */
1139 for (i = 1; i < RcmServer_MAX_TABLES; i++) {
1140 if (obj->fxnTab[i] != NULL) {
1141 for (j = 0; j < (Int)(1 << (i + 4)); j++) {
1142 if (((obj->fxnTab[i])+j)->addr.fxn == 0) {
1143 slot = (obj->fxnTab[i]) + j; /* found empty slot*/
1144 break;
1145 }
1146 }
1147 }
1148 else {
1149 /* all previous tables are full, allocate a new table */
1150 tabCount = (1 << (i + 4));
1151 tabSize = tabCount * sizeof(RcmServer_FxnTabElem);
1152 obj->fxnTab[i] = (RcmServer_FxnTabElem *)xdc_runtime_Memory_alloc(
1153 RcmServer_Module_heap(), tabSize, sizeof(Ptr), &eb);
1155 if (Error_check(&eb)) {
1156 Log_error0(FXNN": unable to allocate new function table");
1157 obj->fxnTab[i] = NULL;
1158 status = RcmServer_E_NOMEMORY;
1159 goto leave;
1160 }
1162 /* initialize the new table */
1163 for (j = 0; j < tabCount; j++) {
1164 ((obj->fxnTab[i])+j)->addr.fxn = 0;
1165 ((obj->fxnTab[i])+j)->name = NULL;
1166 ((obj->fxnTab[i])+j)->key = 0;
1167 }
1169 /* use first slot in new table */
1170 j = 0;
1171 slot = (obj->fxnTab[i])+j;
1172 }
1174 /* if new slot found, break out of loop */
1175 if (slot != NULL) {
1176 break;
1177 }
1178 }
1180 /* insert new symbol into slot */
1181 if (slot != NULL) {
1182 slot->addr.fxn = addr;
1183 len = _strlen(funcName) + 1;
1184 slot->name = (String)xdc_runtime_Memory_alloc(
1185 RcmServer_Module_heap(), len, 0, &eb);
1187 if (Error_check(&eb)) {
1188 Log_error0(FXNN": unable to allocate function name");
1189 slot->name = NULL;
1190 status = RcmServer_E_NOMEMORY;
1191 goto leave;
1192 }
1194 _strcpy(slot->name, funcName);
1195 slot->key = RcmServer_getNextKey_P(obj);
1196 fxnIdx = ((UInt32)(slot->key) << _RCM_KeyShift) | (i << 12) | j;
1197 }
1199 /* error, no more room to add new symbol */
1200 else {
1201 Log_error0(FXNN": cannot add symbol, table is full");
1202 status = RcmServer_E_SYMBOLTABLEFULL;
1203 goto leave;
1204 }
1207 leave:
1208 GateThread_leave(gateH, key);
1210 /* on success, return new function index */
1211 if (status >= 0) {
1212 *index = fxnIdx;
1213 }
1214 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1215 return(status);
1216 }
1217 #undef FXNN
1220 /*
1221 * ======== RcmServer_removeSymbol ========
1222 */
1223 #define FXNN "RcmServer_removeSymbol"
1224 Int RcmServer_removeSymbol(RcmServer_Object *obj, String name)
1225 {
1226 GateThread_Handle gateH;
1227 IArg key;
1228 UInt32 fxnIdx;
1229 UInt tabIdx, tabOff;
1230 RcmServer_FxnTabElem *slot;
1231 Int status = RcmServer_S_SUCCESS;
1234 Log_print2(Diags_ENTRY,
1235 "--> "FXNN": (obj=0x%x, name=0x%x)", (IArg)obj, (IArg)name);
1237 /* protect the symbol table while changing it */
1238 gateH = GateThread_handle(&obj->gate);
1239 key = GateThread_enter(gateH);
1241 /* find the symbol in the table */
1242 status = RcmServer_getSymIdx_P(obj, name, &fxnIdx);
1244 if (status < 0) {
1245 Log_error0(FXNN": given symbol not found");
1246 goto leave;
1247 }
1249 /* static symbols have bit-31 set, cannot remove these symbols */
1250 if (fxnIdx & 0x80000000) {
1251 Log_error0(FXNN": cannot remove static symbol");
1252 status = RcmServer_E_SYMBOLSTATIC;
1253 goto leave;
1254 }
1256 /* get slot pointer */
1257 tabIdx = (fxnIdx & 0xF000) >> 12;
1258 tabOff = (fxnIdx & 0xFFF);
1259 slot = (obj->fxnTab[tabIdx]) + tabOff;
1261 /* clear the table index */
1262 slot->addr.fxn = 0;
1263 if (slot->name != NULL) {
1264 xdc_runtime_Memory_free(
1265 RcmServer_Module_heap(), slot->name, _strlen(slot->name) + 1);
1266 slot->name = NULL;
1267 }
1268 slot->key = 0;
1270 leave:
1271 GateThread_leave(gateH, key);
1272 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1273 return(status);
1274 }
1275 #undef FXNN
1278 /*
1279 * ======== RcmServer_start ========
1280 */
1281 #define FXNN "RcmServer_start"
1282 Int RcmServer_start(RcmServer_Object *obj)
1283 {
1284 Error_Block eb;
1285 Int status = RcmServer_S_SUCCESS;
1288 Log_print1(Diags_ENTRY, "--> "FXNN": (obj=0x%x)", (IArg)obj);
1290 Error_init(&eb);
1292 /* unblock the server thread */
1293 Semaphore_post(obj->run, &eb);
1295 if (Error_check(&eb)) {
1296 Log_error0(FXNN": semaphore post failed");
1297 status = RcmServer_E_FAIL;
1298 }
1300 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1301 return(status);
1302 }
1303 #undef FXNN
1306 /*
1307 * ======== RcmServer_acqJobId_P ========
1308 */
1309 #define FXNN "RcmServer_acqJobId_P"
1310 Int RcmServer_acqJobId_P(RcmServer_Object *obj, UInt16 *jobIdPtr)
1311 {
1312 Error_Block eb;
1313 GateThread_Handle gateH;
1314 IArg key;
1315 Int count;
1316 UInt16 jobId;
1317 List_Elem *elem;
1318 RcmServer_JobStream *job;
1319 Int status = RcmServer_S_SUCCESS;
1322 Log_print2(Diags_ENTRY,
1323 "--> "FXNN": (obj=0x%x, jobIdPtr=0x%x)", (IArg)obj, (IArg)jobIdPtr);
1325 Error_init(&eb);
1326 gateH = GateThread_handle(&obj->gate);
1328 /* enter critical section */
1329 key = GateThread_enter(gateH);
1331 /* compute new job id */
1332 for (count = 0xFFFF; count > 0; count--) {
1334 /* generate a new job id */
1335 jobId = (obj->jobId == 0xFFFF ? obj->jobId = 1 : ++(obj->jobId));
1337 /* verify job id is not in use */
1338 elem = NULL;
1339 while ((elem = List_next(obj->jobList, elem)) != NULL) {
1340 job = (RcmServer_JobStream *)elem;
1341 if (jobId == job->jobId) {
1342 jobId = RcmClient_DISCRETEJOBID;
1343 break;
1344 }
1345 }
1347 if (jobId != RcmClient_DISCRETEJOBID) {
1348 break;
1349 }
1350 }
1352 /* check if job id was acquired */
1353 if (jobId == RcmClient_DISCRETEJOBID) {
1354 *jobIdPtr = RcmClient_DISCRETEJOBID;
1355 Log_error0(FXNN": no job id available");
1356 status = RcmServer_E_FAIL;
1357 GateThread_leave(gateH, key);
1358 goto leave;
1359 }
1361 /* create a new job steam object */
1362 job = xdc_runtime_Memory_alloc(RcmServer_Module_heap(),
1363 sizeof(RcmServer_JobStream), sizeof(Ptr), &eb);
1365 if (Error_check(&eb)) {
1366 Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
1367 (IArg)RcmServer_Module_heap(), sizeof(RcmServer_JobStream));
1368 status = RcmServer_E_NOMEMORY;
1369 GateThread_leave(gateH, key);
1370 goto leave;
1371 }
1373 /* initialize new job stream object */
1374 job->jobId = jobId;
1375 job->empty = TRUE;
1376 List_construct(&(job->msgQue), NULL);
1378 /* put new job stream object at end of server list */
1379 List_put(obj->jobList, (List_Elem *)job);
1381 /* leave critical section */
1382 GateThread_leave(gateH, key);
1384 /* return new job id */
1385 *jobIdPtr = jobId;
1388 leave:
1389 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1390 return(status);
1391 }
1392 #undef FXNN
1395 /*
1396 * ======== RcmServer_dispatch_P ========
1397 *
1398 * Return Value
1399 * < 0: error
1400 * 0: success, job stream queue
1401 * > 0: success, ready queue
1402 *
1403 * Pool id description
1404 *
1405 * Static Worker Pools
1406 * --------------------------------------------------------------------
1407 * 15 1 = static pool
1408 * 14:8 reserved
1409 * 7:0 offset: 0 - 255
1410 *
1411 * Dynamic Worker Pools
1412 * --------------------------------------------------------------------
1413 * 15 0 = dynamic pool
1414 * 14:7 key: 0 - 255
1415 * 6:5 index: 1 - 3
1416 * 4:0 offset: 0 - [7, 15, 31]
1417 */
1418 #define FXNN "RcmServer_dispatch_P"
1419 Int RcmServer_dispatch_P(RcmServer_Object *obj, RcmClient_Packet *packet)
1420 {
1421 GateThread_Handle gateH;
1422 IArg key;
1423 List_Elem *elem;
1424 List_Handle listH;
1425 RcmServer_ThreadPool *pool;
1426 UInt16 jobId;
1427 RcmServer_JobStream *job;
1428 Error_Block eb;
1429 Int status = RcmServer_S_SUCCESS;
1432 Log_print2(Diags_ENTRY,
1433 "--> "FXNN": (obj=0x%x, packet=0x%x)", (IArg)obj, (IArg)packet);
1435 Error_init(&eb);
1437 /* get the target pool id from the message */
1438 status = RcmServer_getPool_P(obj, packet, &pool);
1440 if (status < 0) {
1441 goto leave;
1442 }
1444 Log_print4(Diags_INFO, FXNN": Rcm dispatch: p:%d j:%d f:%d l:%d",
1445 packet->message.poolId, packet->message.jobId,
1446 packet->message.fxnIdx, packet->message.dataSize);
1448 /* discrete jobs go on the end of the ready queue */
1449 jobId = packet->message.jobId;
1451 if (jobId == RcmClient_DISCRETEJOBID) {
1452 listH = List_handle(&pool->readyQueue);
1453 List_put(listH, (List_Elem *)packet);
1455 /* dispatch a new worker thread */
1456 Semaphore_post(pool->sem, &eb);
1458 if (Error_check(&eb)) {
1459 Log_error0(FXNN": semaphore post failed");
1460 }
1461 }
1463 /* must be a job stream message */
1464 else {
1465 /* must protect job list while searching it */
1466 gateH = GateThread_handle(&obj->gate);
1467 key = GateThread_enter(gateH);
1469 /* find the job stream object in the list */
1470 elem = NULL;
1471 while ((elem = List_next(obj->jobList, elem)) != NULL) {
1472 job = (RcmServer_JobStream *)elem;
1473 if (job->jobId == jobId) {
1474 break;
1475 }
1476 }
1478 if (elem == NULL) {
1479 Log_error1(FXNN": failed to find jobId=%d", (IArg)jobId);
1480 status = RcmServer_E_JobIdNotFound;
1481 }
1483 /* if job object is empty, place message directly on ready queue */
1484 else if (job->empty) {
1485 job->empty = FALSE;
1486 listH = List_handle(&pool->readyQueue);
1487 List_put(listH, (List_Elem *)packet);
1489 /* dispatch a new worker thread */
1490 Semaphore_post(pool->sem, &eb);
1492 if (Error_check(&eb)) {
1493 Log_error0(FXNN": semaphore post failed");
1494 }
1495 }
1497 /* place message on job queue */
1498 else {
1499 listH = List_handle(&job->msgQue);
1500 List_put(listH, (List_Elem *)packet);
1501 }
1503 GateThread_leave(gateH, key);
1504 }
1507 leave:
1508 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1509 return(status);
1510 }
1511 #undef FXNN
1514 /*
1515 * ======== RcmServer_execMsg_I ========
1516 */
1517 Int RcmServer_execMsg_I(RcmServer_Object *obj, RcmClient_Message *msg)
1518 {
1519 RcmServer_MsgFxn fxn;
1520 #if USE_RPMESSAGE
1521 RcmServer_MsgCreateFxn createFxn = NULL;
1522 #endif
1523 Int status;
1525 status = RcmServer_getFxnAddr_P(obj, msg->fxnIdx, &fxn, &createFxn);
1527 if (status >= 0) {
1528 #if 0
1529 System_printf("RcmServer_execMsg_I: Calling fxnIdx: %d\n",
1530 (msg->fxnIdx & 0x0000FFFF));
1531 #endif
1532 Task_setEnv(Task_self(), (Ptr)((UArg)RcmServer_getLocalAddress(obj)));
1533 #if USE_RPMESSAGE
1534 if (createFxn) {
1535 msg->result = (*createFxn)(obj, msg->dataSize, msg->data);
1536 }
1537 else {
1538 msg->result = (*fxn)(msg->dataSize, msg->data);
1539 }
1540 #else
1541 msg->result = (*fxn)(msg->dataSize, msg->data);
1542 #endif
1543 Task_setEnv(Task_self(), NULL);
1544 }
1546 return(status);
1547 }
1550 /*
1551 * ======== RcmServer_getFxnAddr_P ========
1552 *
1553 * The function index (fxnIdx) uses the following format. Note that the
1554 * format differs for static vs. dynamic functions. All static functions
1555 * are in fxnTab[0], all dynamic functions are in fxnTab[1 - 8].
1556 *
1557 * Bits Description
1558 *
1559 * Static Function Index
1560 * --------------------------------------------------------------------
1561 * 31 static/dynamic function flag
1562 * 0 = dynamic function
1563 * 1 = static function
1564 * 30:20 reserved
1565 * 19:16 reserved
1566 * 15:0 offset: 0 - 65,535
1567 *
1568 * Dynamic Function Index
1569 * --------------------------------------------------------------------
1570 * 31 static/dynamic function flag
1571 * 0 = dynamic function
1572 * 1 = static function
1573 * 30:20 key
1574 * 19:16 reserved
1575 * 15:12 index: 1 - 8
1576 * 11:0 offset: 0 - [31, 63, 127, 255, 511, 1023, 2047, 4095]
1577 */
1578 #define FXNN "RcmServer_getFxnAddr_P"
1579 Int RcmServer_getFxnAddr_P(RcmServer_Object *obj, UInt32 fxnIdx,
1580 RcmServer_MsgFxn *addrPtr, RcmServer_MsgCreateFxn *createPtr)
1581 {
1582 UInt i, j;
1583 UInt16 key;
1584 RcmServer_FxnTabElem *slot;
1585 RcmServer_MsgFxn addr = NULL;
1586 RcmServer_MsgCreateFxn createAddr = NULL;
1587 Int status = RcmServer_S_SUCCESS;
1589 /* static functions have bit-31 set */
1590 if (fxnIdx & 0x80000000) {
1591 j = (fxnIdx & 0x0000FFFF);
1592 if (j < (UInt)(obj->fxnTabStatic.length)) {
1594 /* fetch the function address from the table */
1595 slot = (obj->fxnTab[0])+j;
1596 if (j == 0) {
1597 createAddr = slot->addr.createFxn;
1598 }
1599 else {
1600 addr = slot->addr.fxn;
1601 }
1602 }
1603 else {
1604 Log_error1(FXNN": invalid function index 0x%x", (IArg)fxnIdx);
1605 status = RcmServer_E_InvalidFxnIdx;
1606 goto leave;
1607 }
1608 }
1609 /* must be a dynamic function */
1610 else {
1611 /* extract the key from the function index */
1612 key = (fxnIdx & _RCM_KeyMask) >> _RCM_KeyShift;
1614 i = (fxnIdx & 0xF000) >> 12;
1615 if ((i > 0) && (i < RcmServer_MAX_TABLES) && (obj->fxnTab[i] != NULL)) {
1617 /* fetch the function address from the table */
1618 j = (fxnIdx & 0x0FFF);
1619 slot = (obj->fxnTab[i])+j;
1620 addr = slot->addr.fxn;
1622 /* validate the key */
1623 if (key != slot->key) {
1624 Log_error1(FXNN": invalid function index 0x%x", (IArg)fxnIdx);
1625 status = RcmServer_E_InvalidFxnIdx;
1626 goto leave;
1627 }
1628 }
1629 else {
1630 Log_error1(FXNN": invalid function index 0x%x", (IArg)fxnIdx);
1631 status = RcmServer_E_InvalidFxnIdx;
1632 goto leave;
1633 }
1634 }
1636 leave:
1637 if (status >= 0) {
1638 if (j == 0) {
1639 *createPtr = createAddr;
1640 }
1641 else {
1642 *addrPtr = addr;
1643 }
1644 }
1645 return(status);
1646 }
1647 #undef FXNN
1650 /* * ======== RcmServer_getSymIdx_P ========
1651 *
1652 * Must have table gate before calling this function.
1653 */
1654 #define FXNN "RcmServer_getSymIdx_P"
1655 Int RcmServer_getSymIdx_P(RcmServer_Object *obj, String name, UInt32 *index)
1656 {
1657 UInt i, j, len;
1658 RcmServer_FxnTabElem *slot;
1659 UInt32 fxnIdx = 0xFFFFFFFF;
1660 Int status = RcmServer_S_SUCCESS;
1663 Log_print3(Diags_ENTRY,
1664 "--> "FXNN": (obj=0x%x, name=0x%x, index=0x%x)",
1665 (IArg)obj, (IArg)name, (IArg)index);
1667 /* search tables for given function name */
1668 for (i = 0; i < RcmServer_MAX_TABLES; i++) {
1669 if (obj->fxnTab[i] != NULL) {
1670 len = (i == 0) ? obj->fxnTabStatic.length : (1 << (i + 4));
1671 for (j = 0; j < len; j++) {
1672 slot = (obj->fxnTab[i]) + j;
1673 if ((((obj->fxnTab[i])+j)->name != NULL) &&
1674 (_strcmp(((obj->fxnTab[i])+j)->name, name) == 0)) {
1675 /* found function name */
1676 if (i == 0) {
1677 fxnIdx = 0x80000000 | j;
1678 } else {
1679 fxnIdx = ((UInt32)(slot->key) << _RCM_KeyShift) |
1680 (i << 12) | j;
1681 }
1682 break;
1683 }
1684 }
1685 }
1687 if (0xFFFFFFFF != fxnIdx) {
1688 break;
1689 }
1690 }
1692 /* log an error if the symbol was not found */
1693 if (fxnIdx == 0xFFFFFFFF) {
1694 Log_error0(FXNN": given symbol not found");
1695 status = RcmServer_E_SYMBOLNOTFOUND;
1696 }
1698 /* if success, return symbol index */
1699 if (status >= 0) {
1700 *index = fxnIdx;
1701 }
1703 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1704 return(status);
1705 }
1706 #undef FXNN
1709 /*
1710 * ======== RcmServer_getNextKey_P ========
1711 */
1712 UInt16 RcmServer_getNextKey_P(RcmServer_Object *obj)
1713 {
1714 GateThread_Handle gateH;
1715 IArg gateKey;
1716 UInt16 key;
1719 gateH = GateThread_handle(&obj->gate);
1720 gateKey = GateThread_enter(gateH);
1722 if (obj->key <= 1) {
1723 obj->key = _RCM_KeyResetValue; /* don't use 0 as a key value */
1724 }
1725 else {
1726 (obj->key)--;
1727 }
1728 key = obj->key;
1730 GateThread_leave(gateH, gateKey);
1732 return(key);
1733 }
1736 /*
1737 * ======== RcmServer_getPool_P ========
1738 */
1739 #define FXNN "RcmServer_getPool_P"
1740 Int RcmServer_getPool_P(RcmServer_Object *obj,
1741 RcmClient_Packet *packet, RcmServer_ThreadPool **poolP)
1742 {
1743 UInt16 poolId;
1744 UInt16 offset;
1745 Int status = RcmServer_S_SUCCESS;
1748 poolId = packet->message.poolId;
1750 /* static pools have bit-15 set */
1751 if (poolId & 0x8000) {
1752 offset = (poolId & 0x00FF);
1753 if (offset < obj->poolMap0Len) {
1754 *poolP = &(obj->poolMap[0])[offset];
1755 }
1756 else {
1757 Log_error1(FXNN": pool id=0x%x not found", (IArg)poolId);
1758 *poolP = NULL;
1759 status = RcmServer_E_PoolIdNotFound;
1760 goto leave;
1761 }
1762 }
1764 leave:
1765 return(status);
1766 }
1767 #undef FXNN
1770 /*
1771 * ======== RcmServer_process_P ========
1772 */
1773 #define FXNN "RcmServer_process_P"
1774 Void RcmServer_process_P(RcmServer_Object *obj, RcmClient_Packet *packet)
1775 {
1776 String name;
1777 UInt32 fxnIdx;
1778 RcmServer_MsgFxn fxn;
1779 RcmClient_Message *rcmMsg;
1780 #if USE_RPMESSAGE
1781 RcmServer_MsgCreateFxn createFxn = NULL;
1782 #endif
1783 #if USE_RPMESSAGE == 0
1784 MessageQ_Msg msgqMsg;
1785 #endif
1786 UInt16 messageType;
1787 Error_Block eb;
1788 UInt16 jobId;
1789 Int rval;
1790 Int status = RcmServer_S_SUCCESS;
1793 Log_print2(Diags_ENTRY,
1794 "--> "FXNN": (obj=0x%x, packet=0x%x)", (IArg)obj, (IArg)packet);
1796 Error_init(&eb);
1798 /* decode the message */
1799 rcmMsg = &packet->message;
1800 #if USE_RPMESSAGE == 0
1801 msgqMsg = &packet->msgqHeader;
1802 #endif
1803 Log_print1(Diags_INFO, FXNN": message desc=0x%x", (IArg)packet->desc);
1805 /* extract the message type from the packet descriptor field */
1806 messageType = (RcmClient_Desc_TYPE_MASK & packet->desc) >>
1807 RcmClient_Desc_TYPE_SHIFT;
1809 /* process the given message */
1810 switch (messageType) {
1812 case RcmClient_Desc_RCM_MSG:
1813 rval = RcmServer_execMsg_I(obj, rcmMsg);
1815 if (rval < 0) {
1816 switch (rval) {
1817 case RcmServer_E_InvalidFxnIdx:
1818 RcmServer_setStatusCode_I(
1819 packet, RcmServer_Status_INVALID_FXN);
1820 break;
1821 default:
1822 RcmServer_setStatusCode_I(
1823 packet, RcmServer_Status_Error);
1824 break;
1825 }
1826 }
1827 else if (rcmMsg->result < 0) {
1828 RcmServer_setStatusCode_I(packet, RcmServer_Status_MSG_FXN_ERR);
1829 }
1830 else {
1831 RcmServer_setStatusCode_I(packet, RcmServer_Status_SUCCESS);
1832 }
1834 #if USE_RPMESSAGE
1835 #if 0
1836 System_printf("RcmServer_process_P: Sending reply from: %d to: %d\n",
1837 obj->localAddr, obj->replyAddr);
1838 #endif
1840 packet->hdr.type = OMX_RAW_MSG;
1841 packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1842 status = RPMessage_send(obj->dstProc, obj->replyAddr,
1843 obj->localAddr, (Ptr)&packet->hdr,
1844 PACKET_HDR_SIZE + packet->message.dataSize);
1845 #else
1846 status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1847 #endif
1848 if (status < 0) {
1849 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
1850 }
1851 break;
1853 case RcmClient_Desc_CMD:
1854 status = RcmServer_execMsg_I(obj, rcmMsg);
1856 /* if all went well, free the message */
1857 if ((status >= 0) && (rcmMsg->result >= 0)) {
1859 #if USE_RPMESSAGE == 0
1860 status = MessageQ_free(msgqMsg);
1861 #endif
1862 if (status < 0) {
1863 Log_error1(
1864 FXNN": MessageQ_free returned error %d", (IArg)status);
1865 }
1866 }
1868 /* an error occurred, must return message to client */
1869 else {
1870 if (status < 0) {
1871 /* error trying to process the message */
1872 switch (status) {
1873 case RcmServer_E_InvalidFxnIdx:
1874 RcmServer_setStatusCode_I(
1875 packet, RcmServer_Status_INVALID_FXN);
1876 break;
1877 default:
1878 RcmServer_setStatusCode_I(
1879 packet, RcmServer_Status_Error);
1880 break;
1881 }
1882 }
1883 else {
1884 /* error in message function */
1885 RcmServer_setStatusCode_I(
1886 packet, RcmServer_Status_MSG_FXN_ERR);
1887 }
1889 /* send error message back to client */
1890 #if USE_RPMESSAGE
1891 packet->hdr.type = OMX_RAW_MSG;
1892 packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1893 status = RPMessage_send(obj->dstProc, obj->replyAddr,
1894 obj->localAddr, (Ptr)&packet->hdr,
1895 PACKET_HDR_SIZE + packet->message.dataSize);
1896 #else
1897 status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1898 #endif
1899 if (status < 0) {
1900 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
1901 }
1902 }
1903 break;
1905 case RcmClient_Desc_DPC:
1906 rval = RcmServer_getFxnAddr_P(obj, rcmMsg->fxnIdx, &fxn,
1907 &createFxn);
1909 if (rval < 0) {
1910 RcmServer_setStatusCode_I(
1911 packet, RcmServer_Status_SYMBOL_NOT_FOUND);
1912 Error_init(&eb);
1913 }
1915 #if USE_RPMESSAGE
1916 packet->hdr.type = OMX_RAW_MSG;
1917 packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1918 status = RPMessage_send(obj->dstProc, obj->replyAddr,
1919 obj->localAddr, (Ptr)&packet->hdr,
1920 PACKET_HDR_SIZE + packet->message.dataSize);
1921 #else
1922 status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1923 #endif
1924 if (status < 0) {
1925 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
1926 }
1928 /* invoke the function with a null context */
1929 #if USE_RPMESSAGE
1930 if (createFxn) {
1931 (*createFxn)(obj, 0, NULL);
1932 }
1933 else {
1934 (*fxn)(0, NULL);
1935 }
1936 #else
1937 (*fxn)(0, NULL);
1938 #endif
1939 break;
1941 case RcmClient_Desc_SYM_ADD:
1942 break;
1944 case RcmClient_Desc_SYM_IDX:
1945 name = (String)rcmMsg->data;
1946 rval = RcmServer_getSymIdx_P(obj, name, &fxnIdx);
1948 if (rval < 0) {
1949 RcmServer_setStatusCode_I(
1950 packet, RcmServer_Status_SYMBOL_NOT_FOUND);
1951 }
1952 else {
1953 RcmServer_setStatusCode_I(packet, RcmServer_Status_SUCCESS);
1954 rcmMsg->data[0] = fxnIdx;
1955 rcmMsg->result = 0;
1956 }
1958 #if USE_RPMESSAGE
1959 packet->hdr.type = OMX_RAW_MSG;
1960 packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1961 status = RPMessage_send(obj->dstProc, obj->replyAddr,
1962 obj->localAddr, (Ptr)&packet->hdr,
1963 PACKET_HDR_SIZE + packet->message.dataSize);
1964 #else
1965 status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1966 #endif
1967 if (status < 0) {
1968 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
1969 }
1970 break;
1972 case RcmClient_Desc_JOB_ACQ:
1973 rval = RcmServer_acqJobId_P(obj, &jobId);
1975 if (rval < 0) {
1976 RcmServer_setStatusCode_I(packet, RcmServer_Status_Error);
1977 }
1978 else {
1979 RcmServer_setStatusCode_I(packet, RcmServer_Status_SUCCESS);
1980 *(UInt16 *)(&rcmMsg->data[0]) = jobId;
1981 rcmMsg->result = 0;
1982 }
1984 #if USE_RPMESSAGE
1985 packet->hdr.type = OMX_RAW_MSG;
1986 packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1987 status = RPMessage_send(obj->dstProc, obj->replyAddr,
1988 obj->localAddr, (Ptr)&packet->hdr,
1989 PACKET_HDR_SIZE + packet->message.dataSize);
1990 #else
1991 status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1992 #endif
1993 if (status < 0) {
1994 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
1995 }
1996 break;
1998 case RcmClient_Desc_JOB_REL:
1999 jobId = (UInt16)(rcmMsg->data[0]);
2000 rval = RcmServer_relJobId_P(obj, jobId);
2002 if (rval < 0) {
2003 switch (rval) {
2004 case RcmServer_E_JobIdNotFound:
2005 RcmServer_setStatusCode_I(
2006 packet, RcmServer_Status_JobNotFound);
2007 break;
2008 default:
2009 RcmServer_setStatusCode_I(
2010 packet, RcmServer_Status_Error);
2011 break;
2012 }
2013 rcmMsg->result = rval;
2014 }
2015 else {
2016 RcmServer_setStatusCode_I(packet, RcmServer_Status_SUCCESS);
2017 rcmMsg->result = 0;
2018 }
2020 #if USE_RPMESSAGE
2021 packet->hdr.type = OMX_RAW_MSG;
2022 packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
2023 status = RPMessage_send(obj->dstProc, obj->replyAddr,
2024 obj->localAddr, (Ptr)&packet->hdr,
2025 PACKET_HDR_SIZE + packet->message.dataSize);
2026 #else
2027 status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
2028 #endif
2029 if (status < 0) {
2030 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
2031 }
2032 break;
2034 default:
2035 Log_error1(FXNN": unknown message type recieved, 0x%x",
2036 (IArg)messageType);
2037 break;
2038 }
2040 Log_print0(Diags_EXIT, "<-- "FXNN":");
2041 }
2042 #undef FXNN
2045 /*
2046 * ======== RcmServer_relJobId_P ========
2047 */
2048 #define FXNN "RcmServer_relJobId_P"
2049 Int RcmServer_relJobId_P(RcmServer_Object *obj, UInt16 jobId)
2050 {
2051 GateThread_Handle gateH;
2052 IArg key;
2053 List_Elem *elem;
2054 List_Handle msgQueH;
2055 RcmClient_Packet *packet;
2056 RcmServer_JobStream *job;
2057 #if USE_RPMESSAGE == 0
2058 MessageQ_Msg msgqMsg;
2059 #endif
2060 Int rval;
2061 Int status = RcmServer_S_SUCCESS;
2064 Log_print2(Diags_ENTRY,
2065 "--> "FXNN": (obj=0x%x, jobId=0x%x)", (IArg)obj, (IArg)jobId);
2068 /* must protect job list while searching and modifying it */
2069 gateH = GateThread_handle(&obj->gate);
2070 key = GateThread_enter(gateH);
2072 /* find the job stream object in the list */
2073 elem = NULL;
2074 while ((elem = List_next(obj->jobList, elem)) != NULL) {
2075 job = (RcmServer_JobStream *)elem;
2077 /* remove the job stream object from the list */
2078 if (job->jobId == jobId) {
2079 List_remove(obj->jobList, elem);
2080 break;
2081 }
2082 }
2084 GateThread_leave(gateH, key);
2086 if (elem == NULL) {
2087 status = RcmServer_E_JobIdNotFound;
2088 Log_error1(FXNN": failed to find jobId=%d", (IArg)jobId);
2089 goto leave;
2090 }
2092 /* return any pending messages on the message queue */
2093 msgQueH = List_handle(&job->msgQue);
2095 while ((elem = List_get(msgQueH)) != NULL) {
2096 packet = (RcmClient_Packet *)elem;
2097 Log_warning2(
2098 FXNN": returning unprocessed message, jobId=0x%x, packet=0x%x",
2099 (IArg)jobId, (IArg)packet);
2101 RcmServer_setStatusCode_I(packet, RcmServer_Status_Unprocessed);
2103 #if USE_RPMESSAGE
2104 packet->hdr.type = OMX_RAW_MSG;
2105 packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
2106 rval = RPMessage_send(obj->dstProc, obj->replyAddr,
2107 obj->localAddr, (Ptr)&packet->hdr,
2108 PACKET_HDR_SIZE + packet->message.dataSize);
2109 #else
2110 msgqMsg = &packet->msgqHeader;
2111 rval = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
2112 #endif
2113 if (rval < 0) {
2114 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)rval);
2115 }
2116 }
2118 /* finalize the job stream object */
2119 List_destruct(&job->msgQue);
2121 xdc_runtime_Memory_free(RcmServer_Module_heap(),
2122 (Ptr)job, sizeof(RcmServer_JobStream));
2125 leave:
2126 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
2127 return(status);
2128 }
2129 #undef FXNN
2132 /*
2133 * ======== RcmServer_serverThrFxn_P ========
2134 */
2135 #define FXNN "RcmServer_serverThrFxn_P"
2136 Void RcmServer_serverThrFxn_P(IArg arg)
2137 {
2138 Error_Block eb;
2139 RcmClient_Packet *packet;
2140 #if USE_RPMESSAGE
2141 Char recvBuf[MSGBUFFERSIZE];
2142 UInt16 len;
2143 #else
2144 MessageQ_Msg msgqMsg = NULL;
2145 #endif
2146 Int rval;
2147 Bool running = TRUE;
2148 RcmServer_Object *obj = (RcmServer_Object *)arg;
2149 Int dataSize;
2151 #if USE_RPMESSAGE
2152 packet = (RcmClient_Packet *)&recvBuf[0];
2153 #endif
2155 Log_print1(Diags_ENTRY, "--> "FXNN": (arg=0x%x)", arg);
2157 Error_init(&eb);
2159 /* wait until ready to run */
2160 Semaphore_pend(obj->run, Semaphore_FOREVER, &eb);
2162 if (Error_check(&eb)) {
2163 Log_error0(FXNN": Semaphore_pend failure in server thread");
2164 }
2166 /* main processing loop */
2167 while (running) {
2168 Log_print1(Diags_INFO,
2169 FXNN": waiting for message, thread=0x%x",
2170 (IArg)(obj->serverThread));
2172 /* block until message arrives */
2173 do {
2174 #if USE_RPMESSAGE
2175 rval = RPMessage_recv(obj->serverQue, (Ptr)&packet->hdr, &len,
2176 &obj->replyAddr, RPMessage_FOREVER);
2177 #if 0
2178 System_printf("RcmServer_serverThrFxn_P: Received msg of len %d "
2179 "from: %d\n",
2180 len, obj->replyAddr);
2182 System_printf("hdr - t:%d l:%d\n", packet->hdr.type,
2183 packet->hdr.len);
2185 System_printf("pkt - d:%d m:%d\n", packet->desc, packet->msgId);
2186 #endif
2187 if (packet->hdr.type == OMX_DISC_REQ) {
2188 System_printf("RcmServer_serverThrFxn_P: Got OMX_DISCONNECT\n");
2189 }
2190 if (!obj->shutdown) {
2191 Assert_isTrue((len <= MSGBUFFERSIZE), NULL);
2192 Assert_isTrue((packet->hdr.type == OMX_RAW_MSG) ||
2193 (packet->hdr.type == OMX_DISC_REQ) , NULL);
2194 }
2196 if ((rval < 0) && (rval != RPMessage_E_UNBLOCKED)) {
2197 #else
2198 rval = MessageQ_get(obj->serverQue, &msgqMsg, MessageQ_FOREVER);
2199 if ((rval < 0) && (rval != MessageQ_E_UNBLOCKED)) {
2200 #endif
2201 Log_error1(FXNN": ipc error 0x%x", (IArg)rval);
2202 /* keep running and hope for the best */
2203 }
2204 #if USE_RPMESSAGE
2205 } while (FALSE);
2206 #else
2207 } while ((msgqMsg == NULL) && !obj->shutdown);
2208 #endif
2210 /* if shutdown, exit this thread */
2211 #if USE_RPMESSAGE
2212 if (obj->shutdown || packet->hdr.type == OMX_DISC_REQ) {
2213 running = FALSE;
2214 Log_print1(Diags_INFO,
2215 FXNN": terminating, thread=0x%x", (IArg)(obj->serverThread));
2216 continue;
2217 }
2218 #else
2219 if (obj->shutdown) {
2220 running = FALSE;
2221 Log_print1(Diags_INFO,
2222 FXNN": terminating, thread=0x%x", (IArg)(obj->serverThread));
2223 if (msgqMsg == NULL ) {
2224 continue;
2225 }
2226 }
2227 #endif
2229 #if USE_RPMESSAGE == 0
2230 packet = (RcmClient_Packet *)msgqMsg;
2231 #endif
2233 Log_print2(Diags_INFO,
2234 FXNN": message received, thread=0x%x packet=0x%x",
2235 (IArg)(obj->serverThread), (IArg)packet);
2237 if ((packet->message.poolId == RcmClient_DEFAULTPOOLID)
2238 && ((obj->poolMap[0])[0].count == 0)) {
2240 /* in-band (server thread) message processing */
2241 RcmServer_process_P(obj, packet);
2242 }
2243 else {
2244 /* out-of-band (worker thread) message processing */
2245 rval = RcmServer_dispatch_P(obj, packet);
2247 /* if error, message was not dispatched; must return to client */
2248 if (rval < 0) {
2249 switch (rval) {
2250 case RcmServer_E_JobIdNotFound:
2251 RcmServer_setStatusCode_I(
2252 packet, RcmServer_Status_JobNotFound);
2253 break;
2255 case RcmServer_E_PoolIdNotFound:
2256 RcmServer_setStatusCode_I(
2257 packet, RcmServer_Status_PoolNotFound);
2258 break;
2260 default:
2261 RcmServer_setStatusCode_I(
2262 packet, RcmServer_Status_Error);
2263 break;
2264 }
2265 packet->message.result = rval;
2267 /* return the message to the client */
2268 #if USE_RPMESSAGE
2269 #if 0
2270 System_printf("RcmServer_serverThrFxn_P: "
2271 "Sending response from: %d to: %d\n",
2272 obj->localAddr, obj->replyAddr);
2273 System_printf("sending: %d + %d\n", PACKET_HDR_SIZE,
2274 packet->message.dataSize);
2275 #endif
2276 packet->hdr.type = OMX_RAW_MSG;
2277 if (rval < 0) {
2278 packet->hdr.len = sizeof(UInt32);
2279 packet->desc = 0x4142;
2280 packet->msgId = 0x0044;
2281 dataSize = sizeof(struct rpmsg_omx_hdr) + sizeof(UInt32);
2282 }
2283 else {
2284 packet->hdr.len = PACKET_DATA_SIZE +
2285 packet->message.dataSize;
2286 dataSize = PACKET_HDR_SIZE + packet->message.dataSize;
2287 }
2288 rval = RPMessage_send(obj->dstProc, obj->replyAddr,
2289 obj->localAddr, (Ptr)&packet->hdr, dataSize);
2290 #else
2291 rval = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
2292 #endif
2293 if (rval < 0) {
2294 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)rval);
2295 }
2296 }
2297 }
2298 }
2300 System_printf("RcmServer_serverThrFxn_P: Exiting thread.\n");
2302 Log_print0(Diags_EXIT, "<-- "FXNN":");
2303 }
2304 #undef FXNN
2307 /*
2308 * ======== RcmServer_setStatusCode_I ========
2309 */
2310 Void RcmServer_setStatusCode_I(RcmClient_Packet *packet, UInt16 code)
2311 {
2313 /* code must be 0 - 15, it has to fit in a 4-bit field */
2314 Assert_isTrue((code < 16), NULL);
2316 packet->desc &= ~(RcmClient_Desc_TYPE_MASK);
2317 packet->desc |= ((code << RcmClient_Desc_TYPE_SHIFT)
2318 & RcmClient_Desc_TYPE_MASK);
2319 }
2322 /*
2323 * ======== RcmServer_workerThrFxn_P ========
2324 */
2325 #define FXNN "RcmServer_workerThrFxn_P"
2326 Void RcmServer_workerThrFxn_P(IArg arg)
2327 {
2328 Error_Block eb;
2329 RcmClient_Packet *packet;
2330 List_Elem *elem;
2331 List_Handle listH;
2332 List_Handle readyQueueH;
2333 UInt16 jobId;
2334 GateThread_Handle gateH;
2335 IArg key;
2336 RcmServer_ThreadPool *pool;
2337 RcmServer_JobStream *job;
2338 RcmServer_WorkerThread *obj;
2339 Bool running;
2340 Int rval;
2343 Log_print1(Diags_ENTRY, "--> "FXNN": (arg=0x%x)", arg);
2345 Error_init(&eb);
2346 obj = (RcmServer_WorkerThread *)arg;
2347 readyQueueH = List_handle(&obj->pool->readyQueue);
2348 packet = NULL;
2349 running = TRUE;
2351 /* main processing loop */
2352 while (running) {
2353 Log_print1(Diags_INFO,
2354 FXNN": waiting for job, thread=0x%x", (IArg)(obj->thread));
2356 /* if no current message, wait until signaled to run */
2357 if (packet == NULL) {
2358 Semaphore_pend(obj->pool->sem, Semaphore_FOREVER, &eb);
2360 if (Error_check(&eb)) {
2361 Log_error0(FXNN": semaphore pend failed");
2362 }
2363 }
2365 /* check if thread should terminate */
2366 if (obj->terminate) {
2367 running = FALSE;
2368 Log_print1(Diags_INFO,
2369 FXNN": terminating, thread=0x%x", (IArg)(obj->thread));
2370 continue;
2371 }
2373 /* get next message from ready queue */
2374 if (packet == NULL) {
2375 packet = (RcmClient_Packet *)List_get(readyQueueH);
2376 }
2378 if (packet == NULL) {
2379 Log_error1(FXNN": ready queue is empty, thread=0x%x",
2380 (IArg)(obj->thread));
2381 continue;
2382 }
2384 Log_print2(Diags_INFO, FXNN": job received, thread=0x%x packet=0x%x",
2385 (IArg)obj->thread, (IArg)packet);
2387 /* remember the message job id */
2388 jobId = packet->message.jobId;
2390 /* process the message */
2391 RcmServer_process_P(obj->server, packet);
2392 packet = NULL;
2394 /* If this worker thread just finished processing a job message,
2395 * queue up the next message for this job id. As an optimization,
2396 * if the message is addressed to this worker's pool, then don't
2397 * signal the semaphore, just get the next message from the queue
2398 * and processes it. This keeps the current thread running instead
2399 * of switching to another thread.
2400 */
2401 if (jobId != RcmClient_DISCRETEJOBID) {
2403 /* must protect job list while searching it */
2404 gateH = GateThread_handle(&obj->server->gate);
2405 key = GateThread_enter(gateH);
2407 /* find the job object in the list */
2408 elem = NULL;
2409 while ((elem = List_next(obj->server->jobList, elem)) != NULL) {
2410 job = (RcmServer_JobStream *)elem;
2411 if (job->jobId == jobId) {
2412 break;
2413 }
2414 }
2416 /* if job object not found, it is not an error */
2417 if (elem == NULL) {
2418 GateThread_leave(gateH, key);
2419 continue;
2420 }
2422 /* found the job object */
2423 listH = List_handle(&job->msgQue);
2425 /* get next job message and either process it or queue it */
2426 do {
2427 elem = List_get(listH);
2429 if (elem == NULL) {
2430 job->empty = TRUE; /* no more messages */
2431 break;
2432 }
2433 else {
2434 /* get target pool id */
2435 packet = (RcmClient_Packet *)elem;
2436 rval = RcmServer_getPool_P(obj->server, packet, &pool);
2438 /* if error, return the message to the client */
2439 if (rval < 0) {
2440 switch (rval) {
2441 case RcmServer_E_PoolIdNotFound:
2442 RcmServer_setStatusCode_I(
2443 packet, RcmServer_Status_PoolNotFound);
2444 break;
2446 default:
2447 RcmServer_setStatusCode_I(
2448 packet, RcmServer_Status_Error);
2449 break;
2450 }
2451 packet->message.result = rval;
2453 #if USE_RPMESSAGE
2454 packet->hdr.type = OMX_RAW_MSG;
2455 packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
2456 rval = RPMessage_send(
2457 (obj->server)->dstProc,
2458 (obj->server)->replyAddr,
2459 (obj->server)->localAddr, (Ptr)&packet->hdr,
2460 PACKET_HDR_SIZE + packet->message.dataSize);
2461 #else
2462 rval = MessageQ_put(
2463 MessageQ_getReplyQueue(&packet->msgqHeader),
2464 &packet->msgqHeader);
2465 #endif
2466 if (rval < 0) {
2467 Log_error1(
2468 FXNN": unknown ipc error, 0x%x", (IArg)rval);
2469 }
2470 }
2471 /* packet is valid, queue it in the corresponding pool's
2472 * ready queue */
2473 else {
2474 listH = List_handle(&pool->readyQueue);
2475 List_put(listH, elem);
2476 packet = NULL;
2477 Semaphore_post(pool->sem, &eb);
2479 if (Error_check(&eb)) {
2480 Log_error0(FXNN": semaphore post failed");
2481 }
2482 }
2484 /* loop around and wait to be run again */
2485 }
2487 } while (rval < 0);
2489 GateThread_leave(gateH, key);
2490 }
2491 } /* while (running) */
2493 Log_print0(Diags_EXIT, "<-- "FXNN":");
2494 }
2495 #undef FXNN
2498 /*
2499 * ======== RcmServer_getLocalAddress ========
2500 */
2501 UInt32 RcmServer_getLocalAddress(RcmServer_Object *obj)
2502 {
2503 return(obj->localAddr);
2504 }
2507 /*
2508 * ======== RcmServer_getRemoteAddress ========
2509 */
2510 UInt32 RcmServer_getRemoteAddress(RcmServer_Object *obj)
2511 {
2512 return(obj->replyAddr);
2513 }
2517 /*
2518 * ======== RcmServer_getRemoteProc ========
2519 */
2520 UInt16 RcmServer_getRemoteProc(RcmServer_Object *obj)
2521 {
2522 return(obj->dstProc);
2523 }