1 /*
2 * Copyright (c) 2011-2013, Texas Instruments Incorporated
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
33 /*
34 * ======== RcmServer.c ========
35 *
36 */
38 /* this define must precede inclusion of any xdc header file */
39 #define Registry_CURDESC ti_grcm_RcmServer__Desc
40 #define MODULE_NAME "ti.grcm.RcmServer"
42 #define xdc_runtime_Memory__nolocalnames /* short name clashes with SysLink */
44 /* rtsc header files */
45 #include <xdc/std.h>
46 #include <xdc/runtime/Assert.h>
47 #include <xdc/runtime/Diags.h>
48 #include <xdc/runtime/Error.h>
49 #include <xdc/runtime/IHeap.h>
50 #include <xdc/runtime/Log.h>
51 #include <xdc/runtime/Memory.h>
52 #include <xdc/runtime/Registry.h>
53 #include <xdc/runtime/Startup.h>
54 #include <xdc/runtime/knl/GateThread.h>
55 #include <xdc/runtime/knl/ISemaphore.h>
56 #include <xdc/runtime/knl/Semaphore.h>
57 #include <xdc/runtime/knl/SemThread.h>
58 #include <xdc/runtime/knl/Thread.h>
59 #include <xdc/runtime/System.h>
61 #define MSGBUFFERSIZE 512 // Make global and move to MessageQCopy.h
63 #if defined(RCM_ti_ipc)
64 #include <ti/sdo/utils/List.h>
65 #include <ti/ipc/MultiProc.h>
67 #elif defined(RCM_ti_syslink)
68 #include <ti/syslink/utils/List.h>
69 #define List_Struct List_Object
70 #define List_handle(exp) (exp)
72 #else
73 #error "undefined ipc binding";
74 #endif
76 /* local header files */
77 #include "RcmClient.h"
78 #include "RcmTypes.h"
79 #include "RcmServer.h"
81 #if USE_MESSAGEQCOPY
82 #include <ti/srvmgr/rpmsg_omx.h>
83 #endif
85 #define _RCM_KeyResetValue 0x07FF // key reset value
86 #define _RCM_KeyMask 0x7FF00000 // key mask in function index
87 #define _RCM_KeyShift 20 // key bit position in function index
89 #define RcmServer_MAX_TABLES 9 // max number of function tables
90 #define RcmServer_POOL_MAP_LEN 4 // pool map length
92 #define RcmServer_E_InvalidFxnIdx (-101)
93 #define RcmServer_E_JobIdNotFound (-102)
94 #define RcmServer_E_PoolIdNotFound (-103)
96 typedef struct { // function table element
97 String name;
98 #if USE_MESSAGEQCOPY
99 union {
100 RcmServer_MsgFxn fxn;
101 RcmServer_MsgCreateFxn createFxn;
102 }addr;
103 #else
104 RcmServer_MsgFxn addr;
105 #endif
106 UInt16 key;
107 } RcmServer_FxnTabElem;
109 typedef struct {
110 Int length;
111 RcmServer_FxnTabElem * elem;
112 } RcmServer_FxnTabElemAry;
114 typedef struct {
115 String name; // pool name
116 Int count; // thread count (at create time)
117 Thread_Priority priority; // thread priority
118 Int osPriority;
119 SizeT stackSize; // thread stack size
120 String stackSeg; // thread stack placement
121 ISemaphore_Handle sem; // message semaphore (counting)
122 List_Struct threadList; // list of worker threads
123 List_Struct readyQueue; // queue of messages
124 } RcmServer_ThreadPool;
126 typedef struct RcmServer_Object_tag {
127 GateThread_Struct gate; // instance gate
128 Ptr run; // run semaphore for the server
129 #if USE_MESSAGEQCOPY
130 MessageQCopy_Handle serverQue; // inbound message queue
131 UInt32 localAddr; // inbound message queue address
132 UInt32 replyAddr; // Reply address (same per inst.)
133 UInt32 dstProc; // Reply processor.
134 #else
135 MessageQ_Handle serverQue; // inbound message queue
136 #endif
137 Thread_Handle serverThread; // server thread object
138 RcmServer_FxnTabElemAry fxnTabStatic; // static function table
139 RcmServer_FxnTabElem * fxnTab[RcmServer_MAX_TABLES]; // base pointers
140 UInt16 key; // function index key
141 UInt16 jobId; // job id tracker
142 Bool shutdown; // server shutdown flag
143 Int poolMap0Len;// length of static table
144 RcmServer_ThreadPool * poolMap[RcmServer_POOL_MAP_LEN];
145 List_Handle jobList; // list of job stream queues
146 } RcmServer_Object;
148 typedef struct {
149 List_Elem elem;
150 UInt16 jobId; // current job stream id
151 Thread_Handle thread; // server thread object
152 Bool terminate; // thread terminate flag
153 RcmServer_ThreadPool* pool; // worker pool
154 RcmServer_Object * server; // server instance
155 } RcmServer_WorkerThread;
157 typedef struct {
158 List_Elem elem;
159 UInt16 jobId; // job stream id
160 Bool empty; // true if no messages on server
161 List_Struct msgQue; // queue of messages
162 } RcmServer_JobStream;
164 typedef struct RcmServer_Module_tag {
165 String name;
166 IHeap_Handle heap;
167 } RcmServer_Module;
170 /* private functions */
171 static
172 Int RcmServer_Instance_init_P(
173 RcmServer_Object * obj,
174 String name,
175 const RcmServer_Params * params
176 );
178 static
179 Int RcmServer_Instance_finalize_P(
180 RcmServer_Object * obj
181 );
183 static
184 Int RcmServer_acqJobId_P(
185 RcmServer_Object * obj,
186 UInt16 * jobIdPtr
187 );
189 static
190 Int RcmServer_dispatch_P(
191 RcmServer_Object * obj,
192 RcmClient_Packet * packet
193 );
195 static
196 Int RcmServer_execMsg_I(
197 RcmServer_Object * obj,
198 RcmClient_Message * msg
199 );
201 static
202 Int RcmServer_getFxnAddr_P(
203 RcmServer_Object * obj,
204 UInt32 fxnIdx,
205 RcmServer_MsgFxn * addrPtr,
206 RcmServer_MsgCreateFxn * createPtr
207 );
209 static
210 UInt16 RcmServer_getNextKey_P(
211 RcmServer_Object * obj
212 );
214 static
215 Int RcmServer_getSymIdx_P(
216 RcmServer_Object * obj,
217 String name,
218 UInt32 * index
219 );
221 static
222 Int RcmServer_getPool_P(
223 RcmServer_Object * obj,
224 RcmClient_Packet * packet,
225 RcmServer_ThreadPool ** poolP
226 );
228 static
229 Void RcmServer_process_P(
230 RcmServer_Object * obj,
231 RcmClient_Packet * packet
232 );
234 static
235 Int RcmServer_relJobId_P(
236 RcmServer_Object * obj,
237 UInt16 jobId
238 );
240 static
241 Void RcmServer_serverThrFxn_P(
242 IArg arg
243 );
245 static inline
246 Void RcmServer_setStatusCode_I(
247 RcmClient_Packet * packet,
248 UInt16 code
249 );
251 static
252 Void RcmServer_workerThrFxn_P(
253 IArg arg
254 );
257 #define RcmServer_Module_heap() (RcmServer_Mod.heap)
260 /* static objects */
261 static Int curInit = 0;
262 static Char ti_grcm_RcmServer_Name[] = {
263 't','i','.','s','d','o','.','r','c','m','.',
264 'R','c','m','S','e','r','v','e','r','\0'
265 };
267 static RcmServer_Module RcmServer_Mod = {
268 MODULE_NAME, /* name */
269 (IHeap_Handle)NULL /* heap */
270 };
272 /* module diags mask */
273 Registry_Desc Registry_CURDESC;
276 /*
277 * ======== RcmServer_init ========
278 *
279 * This function must be serialized by the caller
280 */
281 Void RcmServer_init(Void)
282 {
283 Registry_Result result;
286 if (curInit++ != 0) {
287 return; /* module already initialized */
288 }
290 /* register with xdc.runtime to get a diags mask */
291 // result = Registry_addModule(&Registry_CURDESC, MODULE_NAME);
292 result = Registry_addModule(&Registry_CURDESC, ti_grcm_RcmServer_Name);
293 Assert_isTrue(result == Registry_SUCCESS, (Assert_Id)NULL);
295 /* the size of object and struct must be the same */
296 Assert_isTrue(sizeof(RcmServer_Object) == sizeof(RcmServer_Struct), NULL);
297 }
300 /*
301 * ======== RcmServer_exit ========
302 *
303 * This function must be serialized by the caller
304 */
305 Void RcmServer_exit(Void)
306 {
307 // Registry_Result result;
310 if (--curInit != 0) {
311 return; /* module still in use */
312 }
314 /* unregister from xdc.runtime */
315 // result = Registry_removeModule(MODULE_NAME);
316 // Assert_isTrue(result == Registry_SUCCESS, (Assert_Id)NULL);
317 }
320 /*
321 * ======== RcmServer_Params_init ========
322 */
323 Void RcmServer_Params_init(RcmServer_Params *params)
324 {
325 /* server thread */
326 params->priority = Thread_Priority_HIGHEST;
327 params->osPriority = Thread_INVALID_OS_PRIORITY;
328 params->stackSize = 0; // use system default
329 params->stackSeg = "";
331 /* default pool */
332 params->defaultPool.name = NULL;
333 params->defaultPool.count = 0;
334 params->defaultPool.priority = Thread_Priority_NORMAL;
335 params->defaultPool.osPriority = Thread_INVALID_OS_PRIORITY;
336 params->defaultPool.stackSize = 0; // use system default
337 params->defaultPool.stackSeg = "";
339 /* worker pools */
340 params->workerPools.length = 0;
341 params->workerPools.elem = NULL;
343 /* function table */
344 params->fxns.length = 0;
345 params->fxns.elem = NULL;
346 }
349 /*
350 * ======== RcmServer_create ========
351 */
352 #define FXNN "RcmServer_create"
353 Int RcmServer_create(String name, RcmServer_Params *params,
354 RcmServer_Handle *handle)
355 {
356 RcmServer_Object *obj;
357 Error_Block eb;
358 Int status = RcmServer_S_SUCCESS;
361 Log_print3(Diags_ENTRY, "--> "FXNN": (name=0x%x, params=0x%x, hP=0x%x)",
362 (IArg)name, (IArg)params, (IArg)handle);
364 /* initialize the error block */
365 Error_init(&eb);
366 *handle = (RcmServer_Handle)NULL;
368 /* check for valid params */
369 if (NULL == params) {
370 Log_error0(FXNN": params ptr must not be NULL");
371 status = RcmServer_E_FAIL;
372 goto leave;
373 }
374 if (NULL == handle) {
375 Log_error0(FXNN": Invalid pointer");
376 status = RcmServer_E_FAIL;
377 goto leave;
378 }
379 if (NULL == name) {
380 Log_error0(FXNN": name passed is NULL!");
381 status = RcmServer_E_FAIL;
382 goto leave;
383 }
385 /* allocate the object */
386 obj = (RcmServer_Handle)xdc_runtime_Memory_calloc(RcmServer_Module_heap(),
387 sizeof(RcmServer_Object), sizeof(Int), &eb);
389 if (NULL == obj) {
390 Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
391 (IArg)RcmServer_Module_heap(), sizeof(RcmServer_Object));
392 status = RcmServer_E_NOMEMORY;
393 goto leave;
394 }
396 Log_print1(Diags_LIFECYCLE, FXNN": instance create: 0x%x", (IArg)obj);
398 /* object-specific initialization */
399 status = RcmServer_Instance_init_P(obj, name, params);
401 if (status < 0) {
402 RcmServer_Instance_finalize_P(obj);
403 xdc_runtime_Memory_free(RcmServer_Module_heap(),
404 (Ptr)obj, sizeof(RcmServer_Object));
405 goto leave;
406 }
408 /* success, return opaque pointer */
409 *handle = (RcmServer_Handle)obj;
412 leave:
413 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
414 return(status);
415 }
416 #undef FXNN
419 /*
420 * ======== RcmServer_construct ========
421 */
422 #define FXNN "RcmServer_construct"
423 Int RcmServer_construct(RcmServer_Struct *structPtr, String name,
424 const RcmServer_Params *params)
425 {
426 RcmServer_Object *obj = (RcmServer_Object*)structPtr;
427 Int status = RcmServer_S_SUCCESS;
430 Log_print1(Diags_ENTRY, "--> "FXNN": (structPtr=0x%x)", (IArg)structPtr);
431 Log_print1(Diags_LIFECYCLE, FXNN": instance construct: 0x%x", (IArg)obj);
433 /* ensure the constructed object is zeroed */
434 _memset((Void *)obj, 0, sizeof(RcmServer_Object));
436 /* object-specific initialization */
437 status = RcmServer_Instance_init_P(obj, name, params);
439 if (status < 0) {
440 RcmServer_Instance_finalize_P(obj);
441 goto leave;
442 }
445 leave:
446 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
447 return(status);
448 }
449 #undef FXNN
452 /*
453 * ======== RcmServer_Instance_init_P ========
454 */
455 #define FXNN "RcmServer_Instance_init_P"
456 Int RcmServer_Instance_init_P(RcmServer_Object *obj, String name,
457 const RcmServer_Params *params)
458 {
459 Error_Block eb;
460 List_Params listP;
461 #if USE_MESSAGEQCOPY == 0
462 MessageQ_Params mqParams;
463 #endif
464 Thread_Params threadP;
465 SemThread_Params semThreadP;
466 SemThread_Handle semThreadH;
467 Int i, j;
468 SizeT size;
469 Char *cp;
470 RcmServer_ThreadPool *poolAry;
471 RcmServer_WorkerThread *worker;
472 List_Handle listH;
473 Int status = RcmServer_S_SUCCESS;
476 Log_print1(Diags_ENTRY, "--> "FXNN": (obj=0x%x)", (IArg)obj);
478 Error_init(&eb);
480 /* initialize instance state */
481 obj->shutdown = FALSE;
482 obj->key = 0;
483 obj->jobId = 0xFFFF;
484 obj->run = NULL;
485 obj->serverQue = NULL;
486 obj->serverThread = NULL;
487 obj->fxnTabStatic.length = 0;
488 obj->fxnTabStatic.elem = NULL;
489 obj->poolMap0Len = 0;
490 obj->jobList = NULL;
493 /* initialize the function table */
494 for (i = 0; i < RcmServer_MAX_TABLES; i++) {
495 obj->fxnTab[i] = NULL;
496 }
498 /* initialize the worker pool map */
499 for (i = 0; i < RcmServer_POOL_MAP_LEN; i++) {
500 obj->poolMap[i] = NULL;
501 }
503 /* create the instance gate */
504 GateThread_construct(&obj->gate, NULL, &eb);
506 if (Error_check(&eb)) {
507 Log_error0(FXNN": could not create gate object");
508 status = RcmServer_E_FAIL;
509 goto leave;
510 }
512 /* create list for job objects */
513 #if defined(RCM_ti_ipc)
514 List_Params_init(&listP);
515 obj->jobList = List_create(&listP, &eb);
517 if (Error_check(&eb)) {
518 Log_error0(FXNN": could not create list object");
519 status = RcmServer_E_FAIL;
520 goto leave;
521 }
522 #elif defined(RCM_ti_syslink)
523 List_Params_init(&listP);
524 obj->jobList = List_create(&listP, NULL);
526 if (obj->jobList == NULL) {
527 Log_error0(FXNN": could not create list object");
528 status = RcmServer_E_FAIL;
529 goto leave;
530 }
531 #endif
533 /* create the static function table */
534 if (params->fxns.length > 0) {
535 obj->fxnTabStatic.length = params->fxns.length;
537 /* allocate static function table */
538 size = params->fxns.length * sizeof(RcmServer_FxnTabElem);
539 obj->fxnTabStatic.elem = xdc_runtime_Memory_alloc(
540 RcmServer_Module_heap(), size, sizeof(Ptr), &eb);
542 if (Error_check(&eb)) {
543 Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
544 (IArg)RcmServer_Module_heap(), size);
545 status = RcmServer_E_NOMEMORY;
546 goto leave;
547 }
548 obj->fxnTabStatic.elem[0].name = NULL;
550 /* allocate a single block to store all name strings */
551 for (size = 0, i = 0; i < params->fxns.length; i++) {
552 size += _strlen(params->fxns.elem[i].name) + 1;
553 }
554 cp = xdc_runtime_Memory_alloc(
555 RcmServer_Module_heap(), size, sizeof(Ptr), &eb);
557 if (Error_check(&eb)) {
558 Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
559 (IArg)RcmServer_Module_heap(), size);
560 status = RcmServer_E_NOMEMORY;
561 goto leave;
562 }
564 /* copy function table data into allocated memory blocks */
565 for (i = 0; i < params->fxns.length; i++) {
566 _strcpy(cp, params->fxns.elem[i].name);
567 obj->fxnTabStatic.elem[i].name = cp;
568 cp += (_strlen(params->fxns.elem[i].name) + 1);
569 obj->fxnTabStatic.elem[i].addr.fxn = params->fxns.elem[i].addr.fxn;
570 obj->fxnTabStatic.elem[i].key = 0;
571 }
573 /* hook up the static function table */
574 obj->fxnTab[0] = obj->fxnTabStatic.elem;
575 }
577 /* create static worker pools */
578 if ((params->workerPools.length + 1) > RcmServer_POOL_MAP_LEN) {
579 Log_error1(FXNN": Exceeded maximum number of worker pools =%d",
580 (IArg) (params->workerPools.length) );
581 status = RcmServer_E_NOMEMORY;
582 goto leave;
583 }
584 obj->poolMap0Len = params->workerPools.length + 1; /* workers + default */
586 /* allocate the static worker pool table */
587 size = obj->poolMap0Len * sizeof(RcmServer_ThreadPool);
588 obj->poolMap[0] = (RcmServer_ThreadPool *)xdc_runtime_Memory_alloc(
589 RcmServer_Module_heap(), size, sizeof(Ptr), &eb);
591 if (Error_check(&eb)) {
592 Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
593 (IArg)RcmServer_Module_heap(), size);
594 status = RcmServer_E_NOMEMORY;
595 goto leave;
596 }
598 /* convenience alias */
599 poolAry = obj->poolMap[0];
601 /* allocate a single block to store all name strings */
602 /* Buffer format is: [SIZE][DPN][\0][WPN1][\0][WPN2][\0].... */
603 /* DPN = Default Pool Name, WPN = Worker Pool Name */
604 /* In case, DPN is NULL, format is: [SIZE][\0][WPN1][\0].... */
605 /* In case, WPN is NULL, format is: [SIZE][DPN][\0] */
606 size = sizeof(SizeT) /* block size */
607 + (params->defaultPool.name == NULL ? 1 :
608 _strlen(params->defaultPool.name) + 1);
610 for (i = 0; i < params->workerPools.length; i++) {
611 size += (params->workerPools.elem[i].name == NULL ? 0 :
612 _strlen(params->workerPools.elem[i].name) + 1);
613 }
614 cp = xdc_runtime_Memory_alloc(
615 RcmServer_Module_heap(), size, sizeof(Ptr), &eb);
617 if (Error_check(&eb)) {
618 Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
619 (IArg)RcmServer_Module_heap(), size);
620 status = RcmServer_E_NOMEMORY;
621 goto leave;
622 }
624 *(SizeT *)cp = size;
625 cp += sizeof(SizeT);
627 /* initialize the default worker pool, poolAry[0] */
628 if (params->defaultPool.name != NULL) {
629 _strcpy(cp, params->defaultPool.name);
630 poolAry[0].name = cp;
631 cp += (_strlen(params->defaultPool.name) + 1);
632 }
633 else {
634 poolAry[0].name = cp;
635 *cp++ = '\0';
636 }
638 poolAry[0].count = params->defaultPool.count;
639 poolAry[0].priority = params->defaultPool.priority;
640 poolAry[0].osPriority = params->defaultPool.osPriority;
641 poolAry[0].stackSize = params->defaultPool.stackSize;
642 poolAry[0].stackSeg = NULL;
643 poolAry[0].sem = NULL;
645 List_construct(&(poolAry[0].threadList), NULL);
646 List_construct(&(poolAry[0].readyQueue), NULL);
648 SemThread_Params_init(&semThreadP);
649 semThreadP.mode = SemThread_Mode_COUNTING;
651 semThreadH = SemThread_create(0, &semThreadP, &eb);
653 if (Error_check(&eb)) {
654 Log_error0(FXNN": could not create semaphore");
655 status = RcmServer_E_FAIL;
656 goto leave;
657 }
659 poolAry[0].sem = SemThread_Handle_upCast(semThreadH);
661 /* initialize the static worker pools, poolAry[1..(n-1)] */
662 for (i = 0; i < params->workerPools.length; i++) {
663 if (params->workerPools.elem[i].name != NULL) {
664 _strcpy(cp, params->workerPools.elem[i].name);
665 poolAry[i+1].name = cp;
666 cp += (_strlen(params->workerPools.elem[i].name) + 1);
667 }
668 else {
669 poolAry[i+1].name = NULL;
670 }
672 poolAry[i+1].count = params->workerPools.elem[i].count;
673 poolAry[i+1].priority = params->workerPools.elem[i].priority;
674 poolAry[i+1].osPriority =params->workerPools.elem[i].osPriority;
675 poolAry[i+1].stackSize = params->workerPools.elem[i].stackSize;
676 poolAry[i+1].stackSeg = NULL;
678 List_construct(&(poolAry[i+1].threadList), NULL);
679 List_construct(&(poolAry[i+1].readyQueue), NULL);
681 SemThread_Params_init(&semThreadP);
682 semThreadP.mode = SemThread_Mode_COUNTING;
684 semThreadH = SemThread_create(0, &semThreadP, &eb);
686 if (Error_check(&eb)) {
687 Log_error0(FXNN": could not create semaphore");
688 status = RcmServer_E_FAIL;
689 goto leave;
690 }
692 poolAry[i+1].sem = SemThread_Handle_upCast(semThreadH);
693 }
695 /* create the worker threads in each static pool */
696 for (i = 0; i < obj->poolMap0Len; i++) {
697 for (j = 0; j < poolAry[i].count; j++) {
699 /* allocate worker thread object */
700 size = sizeof(RcmServer_WorkerThread);
701 worker = (RcmServer_WorkerThread *)xdc_runtime_Memory_alloc(
702 RcmServer_Module_heap(), size, sizeof(Ptr), &eb);
704 if (Error_check(&eb)) {
705 Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
706 (IArg)RcmServer_Module_heap(), size);
707 status = RcmServer_E_NOMEMORY;
708 goto leave;
709 }
711 /* initialize worker thread object */
712 worker->jobId = RcmClient_DISCRETEJOBID;
713 worker->thread = NULL;
714 worker->terminate = FALSE;
715 worker->pool = &(poolAry[i]);
716 worker->server = obj;
718 /* add worker thread to worker pool */
719 listH = List_handle(&(poolAry[i].threadList));
720 List_putHead(listH, &(worker->elem));
722 /* create worker thread */
723 Thread_Params_init(&threadP);
724 threadP.arg = (IArg)worker;
725 threadP.priority = poolAry[i].priority;
726 threadP.osPriority = poolAry[i].osPriority;
727 threadP.stackSize = poolAry[i].stackSize;
728 threadP.instance->name = "RcmServer_workerThr";
730 worker->thread = Thread_create(
731 (Thread_RunFxn)(RcmServer_workerThrFxn_P), &threadP, &eb);
733 if (Error_check(&eb)) {
734 Log_error2(FXNN": could not create worker thread, "
735 "pool=%d, thread=%d", (IArg)i, (IArg)j);
736 status = RcmServer_E_FAIL;
737 goto leave;
738 }
739 }
740 }
742 /* create the semaphore used to release the server thread */
743 SemThread_Params_init(&semThreadP);
744 semThreadP.mode = SemThread_Mode_COUNTING;
746 obj->run = SemThread_create(0, &semThreadP, &eb);
748 if (Error_check(&eb)) {
749 Log_error0(FXNN": could not create semaphore");
750 status = RcmServer_E_FAIL;
751 goto leave;
752 }
754 /* create the message queue for inbound messages */
755 #if USE_MESSAGEQCOPY
756 obj->serverQue = MessageQCopy_create(MessageQCopy_ASSIGN_ANY, NULL, NULL,
757 &obj->localAddr);
758 #ifdef BIOS_ONLY_TEST
759 obj->dstProc = MultiProc_self();
760 #else
761 obj->dstProc = MultiProc_getId("HOST");
762 #endif
764 #else
765 MessageQ_Params_init(&mqParams);
766 obj->serverQue = MessageQ_create(name, &mqParams);
767 #endif
769 if (NULL == obj->serverQue) {
770 Log_error0(FXNN": could not create server message queue");
771 status = RcmServer_E_FAIL;
772 goto leave;
773 }
775 /* create the server thread */
776 Thread_Params_init(&threadP);
777 threadP.arg = (IArg)obj;
778 threadP.priority = params->priority;
779 threadP.osPriority = params->osPriority;
780 threadP.stackSize = params->stackSize;
781 threadP.instance->name = "RcmServer_serverThr";
783 obj->serverThread = Thread_create(
784 (Thread_RunFxn)(RcmServer_serverThrFxn_P), &threadP, &eb);
786 if (Error_check(&eb)) {
787 Log_error0(FXNN": could not create server thread");
788 status = RcmServer_E_FAIL;
789 goto leave;
790 }
793 leave:
794 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
795 return(status);
796 }
797 #undef FXNN
800 /*
801 * ======== RcmServer_delete ========
802 */
803 #define FXNN "RcmServer_delete"
804 Int RcmServer_delete(RcmServer_Handle *handlePtr)
805 {
806 RcmServer_Object *obj = (RcmServer_Object *)(*handlePtr);
807 Int status = RcmClient_S_SUCCESS;
810 Log_print1(Diags_ENTRY, "--> "FXNN": (handlePtr=0x%x)", (IArg)handlePtr);
812 /* finalize the object */
813 status = RcmServer_Instance_finalize_P(obj);
815 /* free the object memory */
816 Log_print1(Diags_LIFECYCLE, FXNN": instance delete: 0x%x", (IArg)obj);
818 xdc_runtime_Memory_free(RcmServer_Module_heap(),
819 (Ptr)obj, sizeof(RcmServer_Object));
820 *handlePtr = NULL;
822 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
823 return(status);
824 }
825 #undef FXNN
828 /*
829 * ======== RcmServer_destruct ========
830 */
831 #define FXNN "RcmServer_destruct"
832 Int RcmServer_destruct(RcmServer_Struct *structPtr)
833 {
834 RcmServer_Object *obj = (RcmServer_Object *)(structPtr);
835 Int status = RcmClient_S_SUCCESS;
838 Log_print1(Diags_ENTRY, "--> "FXNN": (structPtr=0x%x)", (IArg)structPtr);
839 Log_print1(Diags_LIFECYCLE, FXNN": instance destruct: 0x%x", (IArg)obj);
841 /* finalize the object */
842 status = RcmServer_Instance_finalize_P(obj);
844 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
845 return(status);
846 }
847 #undef FXNN
850 /*
851 * ======== RcmServer_Instance_finalize_P ========
852 */
853 #define FXNN "RcmServer_Instance_finalize_P"
854 Int RcmServer_Instance_finalize_P(RcmServer_Object *obj)
855 {
856 Int i, j;
857 Int size;
858 Char *cp;
859 UInt tabCount;
860 RcmServer_FxnTabElem *fdp;
861 Error_Block eb;
862 RcmServer_ThreadPool *poolAry;
863 RcmServer_WorkerThread *worker;
864 List_Elem *elem;
865 List_Handle listH;
866 List_Handle msgQueH;
867 RcmClient_Packet *packet;
868 #if USE_MESSAGEQCOPY == 0
869 MessageQ_Msg msgqMsg;
870 #endif
871 SemThread_Handle semThreadH;
872 RcmServer_JobStream *job;
873 Int rval;
874 Int status = RcmClient_S_SUCCESS;
876 Log_print1(Diags_ENTRY, "--> "FXNN": (obj=0x%x)", (IArg)obj);
878 /* must initialize the error block before using it */
879 Error_init(&eb);
881 /* block until server thread exits */
882 obj->shutdown = TRUE;
884 if (obj->serverThread != NULL) {
885 #if USE_MESSAGEQCOPY
886 MessageQCopy_unblock(obj->serverQue);
887 #else
888 MessageQ_unblock(obj->serverQue);
889 #endif
890 Thread_join(obj->serverThread, &eb);
892 if (Error_check(&eb)) {
893 Log_error0(FXNN": server thread did not exit properly");
894 status = RcmServer_E_FAIL;
895 goto leave;
896 }
897 }
899 /* delete any remaining job objects (there should not be any) */
900 while ((elem = List_get(obj->jobList)) != NULL) {
901 job = (RcmServer_JobStream *)elem;
903 /* return any remaining messages (there should not be any) */
904 msgQueH = List_handle(&job->msgQue);
906 while ((elem = List_get(msgQueH)) != NULL) {
907 packet = (RcmClient_Packet *)elem;
908 Log_warning2(
909 FXNN": returning unprocessed message, jobId=0x%x, packet=0x%x",
910 (IArg)job->jobId, (IArg)packet);
912 RcmServer_setStatusCode_I(packet, RcmServer_Status_Unprocessed);
913 #if USE_MESSAGEQCOPY
914 packet->hdr.type = OMX_RAW_MSG;
915 packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
916 rval = MessageQCopy_send(obj->dstProc, obj->replyAddr,
917 obj->localAddr, (Ptr)&packet->hdr,
918 PACKET_HDR_SIZE + packet->message.dataSize);
919 #else
920 msgqMsg = &packet->msgqHeader;
921 rval = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
922 #endif
923 if (rval < 0) {
924 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)rval);
925 }
926 }
928 /* finalize the job stream object */
929 List_destruct(&job->msgQue);
931 xdc_runtime_Memory_free(RcmServer_Module_heap(),
932 (Ptr)job, sizeof(RcmServer_JobStream));
933 }
934 List_delete(&(obj->jobList));
936 /* convenience alias */
937 poolAry = obj->poolMap[0];
939 /* free all the static pool resources */
940 for (i = 0; i < obj->poolMap0Len; i++) {
942 /* free all the worker thread objects */
943 listH = List_handle(&(poolAry[i].threadList));
945 /* mark each worker thread for termination */
946 elem = NULL;
947 while ((elem = List_next(listH, elem)) != NULL) {
948 worker = (RcmServer_WorkerThread *)elem;
949 worker->terminate = TRUE;
950 }
952 /* unblock each worker thread so it can terminate */
953 elem = NULL;
954 while ((elem = List_next(listH, elem)) != NULL) {
955 Semaphore_post(poolAry[i].sem, &eb);
957 if (Error_check(&eb)) {
958 Log_error0(FXNN": post failed on thread");
959 status = RcmServer_E_FAIL;
960 goto leave;
961 }
962 }
964 /* wait for each worker thread to terminate */
965 elem = NULL;
966 while ((elem = List_get(listH)) != NULL) {
967 worker = (RcmServer_WorkerThread *)elem;
969 Thread_join(worker->thread, &eb);
971 if (Error_check(&eb)) {
972 Log_error1(
973 FXNN": worker thread did not exit properly, thread=0x%x",
974 (IArg)worker->thread);
975 status = RcmServer_E_FAIL;
976 goto leave;
977 }
979 Thread_delete(&worker->thread);
981 /* free the worker thread object */
982 xdc_runtime_Memory_free(RcmServer_Module_heap(), (Ptr)worker,
983 sizeof(RcmServer_WorkerThread));
984 }
986 /* free up pool resources */
987 semThreadH = SemThread_Handle_downCast(poolAry[i].sem);
988 SemThread_delete(&semThreadH);
989 List_destruct(&(poolAry[i].threadList));
991 /* return any remaining messages on the readyQueue */
992 msgQueH = List_handle(&poolAry[i].readyQueue);
994 while ((elem = List_get(msgQueH)) != NULL) {
995 packet = (RcmClient_Packet *)elem;
996 Log_warning2(
997 FXNN": returning unprocessed message, msgId=0x%x, packet=0x%x",
998 (IArg)packet->msgId, (IArg)packet);
1000 RcmServer_setStatusCode_I(packet, RcmServer_Status_Unprocessed);
1001 #if USE_MESSAGEQCOPY
1002 packet->hdr.type = OMX_RAW_MSG;
1003 packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1004 rval = MessageQCopy_send(obj->dstProc, obj->replyAddr,
1005 obj->localAddr, (Ptr)&packet->hdr,
1006 PACKET_HDR_SIZE + packet->message.dataSize);
1007 #else
1008 msgqMsg = &packet->msgqHeader;
1009 rval = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1010 #endif
1011 if (rval < 0) {
1012 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)rval);
1013 }
1014 }
1016 List_destruct(&(poolAry[i].readyQueue));
1017 }
1019 /* free the name block for the static pools */
1020 if ((obj->poolMap[0] != NULL) && (obj->poolMap[0]->name != NULL)) {
1021 cp = obj->poolMap[0]->name;
1022 cp -= sizeof(SizeT);
1023 xdc_runtime_Memory_free(RcmServer_Module_heap(), (Ptr)cp, *(SizeT *)cp);
1024 }
1026 /* free the static worker pool table */
1027 if (obj->poolMap[0] != NULL) {
1028 xdc_runtime_Memory_free(RcmServer_Module_heap(), (Ptr)(obj->poolMap[0]),
1029 obj->poolMap0Len * sizeof(RcmServer_ThreadPool));
1030 obj->poolMap[0] = NULL;
1031 }
1033 #if 0
1034 /* free all dynamic worker pools */
1035 for (p = 1; p < RcmServer_POOL_MAP_LEN; p++) {
1036 if ((poolAry = obj->poolMap[p]) == NULL) {
1037 continue;
1038 }
1039 }
1040 #endif
1042 /* free up the dynamic function tables and any leftover name strings */
1043 for (i = 1; i < RcmServer_MAX_TABLES; i++) {
1044 if (obj->fxnTab[i] != NULL) {
1045 tabCount = (1 << (i + 4));
1046 for (j = 0; j < tabCount; j++) {
1047 if (((obj->fxnTab[i])+j)->name != NULL) {
1048 cp = ((obj->fxnTab[i])+j)->name;
1049 size = _strlen(cp) + 1;
1050 xdc_runtime_Memory_free(RcmServer_Module_heap(), cp, size);
1051 }
1052 }
1053 fdp = obj->fxnTab[i];
1054 size = tabCount * sizeof(RcmServer_FxnTabElem);
1055 xdc_runtime_Memory_free(RcmServer_Module_heap(), fdp, size);
1056 obj->fxnTab[i] = NULL;
1057 }
1058 }
1060 if (NULL != obj->serverThread) {
1061 Thread_delete(&obj->serverThread);
1062 }
1064 if (NULL != obj->serverQue) {
1065 #if USE_MESSAGEQCOPY
1066 MessageQCopy_delete(&obj->serverQue);
1067 #else
1068 MessageQ_delete(&obj->serverQue);
1069 #endif
1070 }
1072 if (NULL != obj->run) {
1073 SemThread_delete((SemThread_Handle *)(&obj->run));
1074 }
1076 /* free the name block for the static function table */
1077 if ((NULL != obj->fxnTabStatic.elem) &&
1078 (NULL != obj->fxnTabStatic.elem[0].name)) {
1079 for (size = 0, i = 0; i < obj->fxnTabStatic.length; i++) {
1080 size += _strlen(obj->fxnTabStatic.elem[i].name) + 1;
1081 }
1082 xdc_runtime_Memory_free(
1083 RcmServer_Module_heap(),
1084 obj->fxnTabStatic.elem[0].name, size);
1085 }
1087 /* free the static function table */
1088 if (NULL != obj->fxnTabStatic.elem) {
1089 xdc_runtime_Memory_free(RcmServer_Module_heap(),
1090 obj->fxnTabStatic.elem,
1091 obj->fxnTabStatic.length * sizeof(RcmServer_FxnTabElem));
1092 }
1094 /* destruct the instance gate */
1095 GateThread_destruct(&obj->gate);
1098 leave:
1099 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1100 return(status);
1101 }
1102 #undef FXNN
1105 /*
1106 * ======== RcmServer_addSymbol ========
1107 */
1108 #define FXNN "RcmServer_addSymbol"
1109 Int RcmServer_addSymbol(RcmServer_Object *obj, String funcName,
1110 RcmServer_MsgFxn addr, UInt32 *index)
1111 {
1112 GateThread_Handle gateH;
1113 IArg key;
1114 Int len;
1115 UInt i, j;
1116 UInt tabCount;
1117 SizeT tabSize;
1118 UInt32 fxnIdx = 0xFFFF;
1119 RcmServer_FxnTabElem *slot = NULL;
1120 Error_Block eb;
1121 Int status = RcmServer_S_SUCCESS;
1124 Log_print3(Diags_ENTRY,
1125 "--> "FXNN": (obj=0x%x, name=0x%x, addr=0x%x)",
1126 (IArg)obj, (IArg)funcName, (IArg)addr);
1128 Error_init(&eb);
1130 /* protect the symbol table while changing it */
1131 gateH = GateThread_handle(&obj->gate);
1132 key = GateThread_enter(gateH);
1134 /* look for an empty slot to use */
1135 for (i = 1; i < RcmServer_MAX_TABLES; i++) {
1136 if (obj->fxnTab[i] != NULL) {
1137 for (j = 0; j < (1 << (i + 4)); j++) {
1138 if (((obj->fxnTab[i])+j)->addr.fxn == 0) {
1139 slot = (obj->fxnTab[i]) + j; // found empty slot
1140 break;
1141 }
1142 }
1143 }
1144 else {
1145 /* all previous tables are full, allocate a new table */
1146 tabCount = (1 << (i + 4));
1147 tabSize = tabCount * sizeof(RcmServer_FxnTabElem);
1148 obj->fxnTab[i] = (RcmServer_FxnTabElem *)xdc_runtime_Memory_alloc(
1149 RcmServer_Module_heap(), tabSize, sizeof(Ptr), &eb);
1151 if (Error_check(&eb)) {
1152 Log_error0(FXNN": unable to allocate new function table");
1153 obj->fxnTab[i] = NULL;
1154 status = RcmServer_E_NOMEMORY;
1155 goto leave;
1156 }
1158 /* initialize the new table */
1159 for (j = 0; j < tabCount; j++) {
1160 ((obj->fxnTab[i])+j)->addr.fxn = 0;
1161 ((obj->fxnTab[i])+j)->name = NULL;
1162 ((obj->fxnTab[i])+j)->key = 0;
1163 }
1165 /* use first slot in new table */
1166 j = 0;
1167 slot = (obj->fxnTab[i])+j;
1168 }
1170 /* if new slot found, break out of loop */
1171 if (slot != NULL) {
1172 break;
1173 }
1174 }
1176 /* insert new symbol into slot */
1177 if (slot != NULL) {
1178 slot->addr.fxn = addr;
1179 len = _strlen(funcName) + 1;
1180 slot->name = (String)xdc_runtime_Memory_alloc(
1181 RcmServer_Module_heap(), len, sizeof(Char *), &eb);
1183 if (Error_check(&eb)) {
1184 Log_error0(FXNN": unable to allocate function name");
1185 slot->name = NULL;
1186 status = RcmServer_E_NOMEMORY;
1187 goto leave;
1188 }
1190 _strcpy(slot->name, funcName);
1191 slot->key = RcmServer_getNextKey_P(obj);
1192 fxnIdx = (slot->key << _RCM_KeyShift) | (i << 12) | j;
1193 }
1195 /* error, no more room to add new symbol */
1196 else {
1197 Log_error0(FXNN": cannot add symbol, table is full");
1198 status = RcmServer_E_SYMBOLTABLEFULL;
1199 goto leave;
1200 }
1203 leave:
1204 GateThread_leave(gateH, key);
1206 /* on success, return new function index */
1207 if (status >= 0) {
1208 *index = fxnIdx;
1209 }
1210 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1211 return(status);
1212 }
1213 #undef FXNN
1216 /*
1217 * ======== RcmServer_removeSymbol ========
1218 */
1219 #define FXNN "RcmServer_removeSymbol"
1220 Int RcmServer_removeSymbol(RcmServer_Object *obj, String name)
1221 {
1222 GateThread_Handle gateH;
1223 IArg key;
1224 UInt32 fxnIdx;
1225 UInt tabIdx, tabOff;
1226 RcmServer_FxnTabElem *slot;
1227 Int status = RcmServer_S_SUCCESS;
1230 Log_print2(Diags_ENTRY,
1231 "--> "FXNN": (obj=0x%x, name=0x%x)", (IArg)obj, (IArg)name);
1233 /* protect the symbol table while changing it */
1234 gateH = GateThread_handle(&obj->gate);
1235 key = GateThread_enter(gateH);
1237 /* find the symbol in the table */
1238 status = RcmServer_getSymIdx_P(obj, name, &fxnIdx);
1240 if (status < 0) {
1241 Log_error0(FXNN": given symbol not found");
1242 goto leave;
1243 }
1245 /* static symbols have bit-31 set, cannot remove these symbols */
1246 if (fxnIdx & 0x80000000) {
1247 Log_error0(FXNN": cannot remove static symbol");
1248 status = RcmServer_E_SYMBOLSTATIC;
1249 goto leave;
1250 }
1252 /* get slot pointer */
1253 tabIdx = (fxnIdx & 0xF000) >> 12;
1254 tabOff = (fxnIdx & 0xFFF);
1255 slot = (obj->fxnTab[tabIdx]) + tabOff;
1257 /* clear the table index */
1258 slot->addr.fxn = 0;
1259 if (slot->name != NULL) {
1260 xdc_runtime_Memory_free(
1261 RcmServer_Module_heap(), slot->name, _strlen(slot->name) + 1);
1262 slot->name = NULL;
1263 }
1264 slot->key = 0;
1266 leave:
1267 GateThread_leave(gateH, key);
1268 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1269 return(status);
1270 }
1271 #undef FXNN
1274 /*
1275 * ======== RcmServer_start ========
1276 */
1277 #define FXNN "RcmServer_start"
1278 Int RcmServer_start(RcmServer_Object *obj)
1279 {
1280 Error_Block eb;
1281 Int status = RcmServer_S_SUCCESS;
1284 Log_print1(Diags_ENTRY, "--> "FXNN": (obj=0x%x)", (IArg)obj);
1286 Error_init(&eb);
1288 /* unblock the server thread */
1289 Semaphore_post(obj->run, &eb);
1291 if (Error_check(&eb)) {
1292 Log_error0(FXNN": semaphore post failed");
1293 status = RcmServer_E_FAIL;
1294 }
1296 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1297 return(status);
1298 }
1299 #undef FXNN
1302 /*
1303 * ======== RcmServer_acqJobId_P ========
1304 */
1305 #define FXNN "RcmServer_acqJobId_P"
1306 Int RcmServer_acqJobId_P(RcmServer_Object *obj, UInt16 *jobIdPtr)
1307 {
1308 Error_Block eb;
1309 GateThread_Handle gateH;
1310 IArg key;
1311 Int count;
1312 UInt16 jobId;
1313 List_Elem *elem;
1314 RcmServer_JobStream *job;
1315 Int status = RcmServer_S_SUCCESS;
1318 Log_print2(Diags_ENTRY,
1319 "--> "FXNN": (obj=0x%x, jobIdPtr=0x%x)", (IArg)obj, (IArg)jobIdPtr);
1321 Error_init(&eb);
1322 gateH = GateThread_handle(&obj->gate);
1324 /* enter critical section */
1325 key = GateThread_enter(gateH);
1327 /* compute new job id */
1328 for (count = 0xFFFF; count > 0; count--) {
1330 /* generate a new job id */
1331 jobId = (obj->jobId == 0xFFFF ? obj->jobId = 1 : ++(obj->jobId));
1333 /* verify job id is not in use */
1334 elem = NULL;
1335 while ((elem = List_next(obj->jobList, elem)) != NULL) {
1336 job = (RcmServer_JobStream *)elem;
1337 if (jobId == job->jobId) {
1338 jobId = RcmClient_DISCRETEJOBID;
1339 break;
1340 }
1341 }
1343 if (jobId != RcmClient_DISCRETEJOBID) {
1344 break;
1345 }
1346 }
1348 /* check if job id was acquired */
1349 if (jobId == RcmClient_DISCRETEJOBID) {
1350 *jobIdPtr = RcmClient_DISCRETEJOBID;
1351 Log_error0(FXNN": no job id available");
1352 status = RcmServer_E_FAIL;
1353 GateThread_leave(gateH, key);
1354 goto leave;
1355 }
1357 /* create a new job steam object */
1358 job = xdc_runtime_Memory_alloc(RcmServer_Module_heap(),
1359 sizeof(RcmServer_JobStream), sizeof(Ptr), &eb);
1361 if (Error_check(&eb)) {
1362 Log_error2(FXNN": out of memory: heap=0x%x, size=%u",
1363 (IArg)RcmServer_Module_heap(), sizeof(RcmServer_JobStream));
1364 status = RcmServer_E_NOMEMORY;
1365 GateThread_leave(gateH, key);
1366 goto leave;
1367 }
1369 /* initialize new job stream object */
1370 job->jobId = jobId;
1371 job->empty = TRUE;
1372 List_construct(&(job->msgQue), NULL);
1374 /* put new job stream object at end of server list */
1375 List_put(obj->jobList, (List_Elem *)job);
1377 /* leave critical section */
1378 GateThread_leave(gateH, key);
1380 /* return new job id */
1381 *jobIdPtr = jobId;
1384 leave:
1385 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1386 return(status);
1387 }
1388 #undef FXNN
1391 /*
1392 * ======== RcmServer_dispatch_P ========
1393 *
1394 * Return Value
1395 * < 0: error
1396 * 0: success, job stream queue
1397 * > 0: success, ready queue
1398 *
1399 * Pool id description
1400 *
1401 * Static Worker Pools
1402 * --------------------------------------------------------------------
1403 * 15 1 = static pool
1404 * 14:8 reserved
1405 * 7:0 offset: 0 - 255
1406 *
1407 * Dynamic Worker Pools
1408 * --------------------------------------------------------------------
1409 * 15 0 = dynamic pool
1410 * 14:7 key: 0 - 255
1411 * 6:5 index: 1 - 3
1412 * 4:0 offset: 0 - [7, 15, 31]
1413 */
1414 #define FXNN "RcmServer_dispatch_P"
1415 Int RcmServer_dispatch_P(RcmServer_Object *obj, RcmClient_Packet *packet)
1416 {
1417 GateThread_Handle gateH;
1418 IArg key;
1419 List_Elem *elem;
1420 List_Handle listH;
1421 RcmServer_ThreadPool *pool;
1422 UInt16 jobId;
1423 RcmServer_JobStream *job;
1424 Error_Block eb;
1425 Int status = RcmServer_S_SUCCESS;
1428 Log_print2(Diags_ENTRY,
1429 "--> "FXNN": (obj=0x%x, packet=0x%x)", (IArg)obj, (IArg)packet);
1431 Error_init(&eb);
1433 /* get the target pool id from the message */
1434 status = RcmServer_getPool_P(obj, packet, &pool);
1436 if (status < 0) {
1437 goto leave;
1438 }
1440 System_printf("Rcm dispatch: p:%d j:%d f:%d l:%d\n",
1441 packet->message.poolId, packet->message.jobId,
1442 packet->message.fxnIdx, packet->message.dataSize);
1444 /* discrete jobs go on the end of the ready queue */
1445 jobId = packet->message.jobId;
1447 if (jobId == RcmClient_DISCRETEJOBID) {
1448 listH = List_handle(&pool->readyQueue);
1449 List_put(listH, (List_Elem *)packet);
1451 /* dispatch a new worker thread */
1452 Semaphore_post(pool->sem, &eb);
1454 if (Error_check(&eb)) {
1455 Log_error0(FXNN": semaphore post failed");
1456 }
1457 }
1459 /* must be a job stream message */
1460 else {
1461 /* must protect job list while searching it */
1462 gateH = GateThread_handle(&obj->gate);
1463 key = GateThread_enter(gateH);
1465 /* find the job stream object in the list */
1466 elem = NULL;
1467 while ((elem = List_next(obj->jobList, elem)) != NULL) {
1468 job = (RcmServer_JobStream *)elem;
1469 if (job->jobId == jobId) {
1470 break;
1471 }
1472 }
1474 if (elem == NULL) {
1475 Log_error1(FXNN": failed to find jobId=%d", (IArg)jobId);
1476 status = RcmServer_E_JobIdNotFound;
1477 }
1479 /* if job object is empty, place message directly on ready queue */
1480 else if (job->empty) {
1481 job->empty = FALSE;
1482 listH = List_handle(&pool->readyQueue);
1483 List_put(listH, (List_Elem *)packet);
1485 /* dispatch a new worker thread */
1486 Semaphore_post(pool->sem, &eb);
1488 if (Error_check(&eb)) {
1489 Log_error0(FXNN": semaphore post failed");
1490 }
1491 }
1493 /* place message on job queue */
1494 else {
1495 listH = List_handle(&job->msgQue);
1496 List_put(listH, (List_Elem *)packet);
1497 }
1499 GateThread_leave(gateH, key);
1500 }
1503 leave:
1504 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1505 return(status);
1506 }
1507 #undef FXNN
1510 /*
1511 * ======== RcmServer_execMsg_I ========
1512 */
1513 Int RcmServer_execMsg_I(RcmServer_Object *obj, RcmClient_Message *msg)
1514 {
1515 RcmServer_MsgFxn fxn;
1516 #if USE_MESSAGEQCOPY
1517 RcmServer_MsgCreateFxn createFxn = NULL;
1518 #endif
1519 Int status;
1521 status = RcmServer_getFxnAddr_P(obj, msg->fxnIdx, &fxn, &createFxn);
1523 if (status >= 0) {
1524 #if 0
1525 System_printf("RcmServer_execMsg_I: Calling fxnIdx: %d\n",
1526 (msg->fxnIdx & 0x0000FFFF));
1527 #endif
1528 #if USE_MESSAGEQCOPY
1529 if (createFxn) {
1530 msg->result = (*createFxn)(obj, msg->dataSize, msg->data);
1531 }
1532 else {
1533 msg->result = (*fxn)(msg->dataSize, msg->data);
1534 }
1535 #else
1536 msg->result = (*fxn)(msg->dataSize, msg->data);
1537 #endif
1538 }
1540 return(status);
1541 }
1544 /*
1545 * ======== RcmServer_getFxnAddr_P ========
1546 *
1547 * The function index (fxnIdx) uses the following format. Note that the
1548 * format differs for static vs. dynamic functions. All static functions
1549 * are in fxnTab[0], all dynamic functions are in fxnTab[1 - 8].
1550 *
1551 * Bits Description
1552 *
1553 * Static Function Index
1554 * --------------------------------------------------------------------
1555 * 31 static/dynamic function flag
1556 * 0 = dynamic function
1557 * 1 = static function
1558 * 30:20 reserved
1559 * 19:16 reserved
1560 * 15:0 offset: 0 - 65,535
1561 *
1562 * Dynamic Function Index
1563 * --------------------------------------------------------------------
1564 * 31 static/dynamic function flag
1565 * 0 = dynamic function
1566 * 1 = static function
1567 * 30:20 key
1568 * 19:16 reserved
1569 * 15:12 index: 1 - 8
1570 * 11:0 offset: 0 - [31, 63, 127, 255, 511, 1023, 2047, 4095]
1571 */
1572 #define FXNN "RcmServer_getFxnAddr_P"
1573 Int RcmServer_getFxnAddr_P(RcmServer_Object *obj, UInt32 fxnIdx,
1574 RcmServer_MsgFxn *addrPtr, RcmServer_MsgCreateFxn *createPtr)
1575 {
1576 UInt i, j;
1577 UInt16 key;
1578 RcmServer_FxnTabElem *slot;
1579 RcmServer_MsgFxn addr = NULL;
1580 RcmServer_MsgCreateFxn createAddr = NULL;
1581 Int status = RcmServer_S_SUCCESS;
1583 /* static functions have bit-31 set */
1584 if (fxnIdx & 0x80000000) {
1585 j = (fxnIdx & 0x0000FFFF);
1586 if (j < (obj->fxnTabStatic.length)) {
1588 /* fetch the function address from the table */
1589 slot = (obj->fxnTab[0])+j;
1590 if (j == 0) {
1591 createAddr = slot->addr.createFxn;
1592 }
1593 else {
1594 addr = slot->addr.fxn;
1595 }
1596 }
1597 else {
1598 Log_error1(FXNN": invalid function index 0x%x", (IArg)fxnIdx);
1599 status = RcmServer_E_InvalidFxnIdx;
1600 goto leave;
1601 }
1602 }
1603 /* must be a dynamic function */
1604 else {
1605 /* extract the key from the function index */
1606 key = (fxnIdx & _RCM_KeyMask) >> _RCM_KeyShift;
1608 i = (fxnIdx & 0xF000) >> 12;
1609 if ((i > 0) && (i < RcmServer_MAX_TABLES) && (obj->fxnTab[i] != NULL)) {
1611 /* fetch the function address from the table */
1612 j = (fxnIdx & 0x0FFF);
1613 slot = (obj->fxnTab[i])+j;
1614 addr = slot->addr.fxn;
1616 /* validate the key */
1617 if (key != slot->key) {
1618 Log_error1(FXNN": invalid function index 0x%x", (IArg)fxnIdx);
1619 status = RcmServer_E_InvalidFxnIdx;
1620 goto leave;
1621 }
1622 }
1623 else {
1624 Log_error1(FXNN": invalid function index 0x%x", (IArg)fxnIdx);
1625 status = RcmServer_E_InvalidFxnIdx;
1626 goto leave;
1627 }
1628 }
1630 leave:
1631 if (status >= 0) {
1632 if (j == 0) {
1633 *createPtr = createAddr;
1634 }
1635 else {
1636 *addrPtr = addr;
1637 }
1638 }
1639 return(status);
1640 }
1641 #undef FXNN
1644 /* * ======== RcmServer_getSymIdx_P ========
1645 *
1646 * Must have table gate before calling this function.
1647 */
1648 #define FXNN "RcmServer_getSymIdx_P"
1649 Int RcmServer_getSymIdx_P(RcmServer_Object *obj, String name, UInt32 *index)
1650 {
1651 UInt i, j, len;
1652 RcmServer_FxnTabElem *slot;
1653 UInt32 fxnIdx = 0xFFFFFFFF;
1654 Int status = RcmServer_S_SUCCESS;
1657 Log_print3(Diags_ENTRY,
1658 "--> "FXNN": (obj=0x%x, name=0x%x, index=0x%x)",
1659 (IArg)obj, (IArg)name, (IArg)index);
1661 /* search tables for given function name */
1662 for (i = 0; i < RcmServer_MAX_TABLES; i++) {
1663 if (obj->fxnTab[i] != NULL) {
1664 len = (i == 0) ? obj->fxnTabStatic.length : (1 << (i + 4));
1665 for (j = 0; j < len; j++) {
1666 slot = (obj->fxnTab[i]) + j;
1667 if ((((obj->fxnTab[i])+j)->name != NULL) &&
1668 (_strcmp(((obj->fxnTab[i])+j)->name, name) == 0)) {
1669 /* found function name */
1670 if (i == 0) {
1671 fxnIdx = 0x80000000 | j;
1672 } else {
1673 fxnIdx = (slot->key << _RCM_KeyShift) | (i << 12) | j;
1674 }
1675 break;
1676 }
1677 }
1678 }
1680 if (0xFFFFFFFF != fxnIdx) {
1681 break;
1682 }
1683 }
1685 /* log an error if the symbol was not found */
1686 if (fxnIdx == 0xFFFFFFFF) {
1687 Log_error0(FXNN": given symbol not found");
1688 status = RcmServer_E_SYMBOLNOTFOUND;
1689 }
1691 /* if success, return symbol index */
1692 if (status >= 0) {
1693 *index = fxnIdx;
1694 }
1696 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
1697 return(status);
1698 }
1699 #undef FXNN
1702 /*
1703 * ======== RcmServer_getNextKey_P ========
1704 */
1705 UInt16 RcmServer_getNextKey_P(RcmServer_Object *obj)
1706 {
1707 GateThread_Handle gateH;
1708 IArg gateKey;
1709 UInt16 key;
1712 gateH = GateThread_handle(&obj->gate);
1713 gateKey = GateThread_enter(gateH);
1715 if (obj->key <= 1) {
1716 obj->key = _RCM_KeyResetValue; /* don't use 0 as a key value */
1717 }
1718 else {
1719 (obj->key)--;
1720 }
1721 key = obj->key;
1723 GateThread_leave(gateH, gateKey);
1725 return(key);
1726 }
1729 /*
1730 * ======== RcmServer_getPool_P ========
1731 */
1732 #define FXNN "RcmServer_getPool_P"
1733 Int RcmServer_getPool_P(RcmServer_Object *obj,
1734 RcmClient_Packet *packet, RcmServer_ThreadPool **poolP)
1735 {
1736 UInt16 poolId;
1737 UInt16 offset;
1738 Int status = RcmServer_S_SUCCESS;
1741 poolId = packet->message.poolId;
1743 /* static pools have bit-15 set */
1744 if (poolId & 0x8000) {
1745 offset = (poolId & 0x00FF);
1746 if (offset < obj->poolMap0Len) {
1747 *poolP = &(obj->poolMap[0])[offset];
1748 }
1749 else {
1750 Log_error1(FXNN": pool id=0x%x not found", (IArg)poolId);
1751 *poolP = NULL;
1752 status = RcmServer_E_PoolIdNotFound;
1753 goto leave;
1754 }
1755 }
1757 leave:
1758 return(status);
1759 }
1760 #undef FXNN
1763 /*
1764 * ======== RcmServer_process_P ========
1765 */
1766 #define FXNN "RcmServer_process_P"
1767 Void RcmServer_process_P(RcmServer_Object *obj, RcmClient_Packet *packet)
1768 {
1769 String name;
1770 UInt32 fxnIdx;
1771 RcmServer_MsgFxn fxn;
1772 RcmClient_Message *rcmMsg;
1773 #if USE_MESSAGEQCOPY
1774 RcmServer_MsgCreateFxn createFxn = NULL;
1775 #endif
1776 #if USE_MESSAGEQCOPY == 0
1777 MessageQ_Msg msgqMsg;
1778 #endif
1779 UInt16 messageType;
1780 Error_Block eb;
1781 UInt16 jobId;
1782 Int rval;
1783 Int status = RcmServer_S_SUCCESS;
1786 Log_print2(Diags_ENTRY,
1787 "--> "FXNN": (obj=0x%x, packet=0x%x)", (IArg)obj, (IArg)packet);
1789 Error_init(&eb);
1791 /* decode the message */
1792 rcmMsg = &packet->message;
1793 #if USE_MESSAGEQCOPY == 0
1794 msgqMsg = &packet->msgqHeader;
1795 #endif
1796 Log_print1(Diags_INFO, FXNN": message desc=0x%x", (IArg)packet->desc);
1798 /* extract the message type from the packet descriptor field */
1799 messageType = (RcmClient_Desc_TYPE_MASK & packet->desc) >>
1800 RcmClient_Desc_TYPE_SHIFT;
1802 /* process the given message */
1803 switch (messageType) {
1805 case RcmClient_Desc_RCM_MSG:
1806 rval = RcmServer_execMsg_I(obj, rcmMsg);
1808 if (rval < 0) {
1809 switch (rval) {
1810 case RcmServer_E_InvalidFxnIdx:
1811 RcmServer_setStatusCode_I(
1812 packet, RcmServer_Status_INVALID_FXN);
1813 break;
1814 default:
1815 RcmServer_setStatusCode_I(
1816 packet, RcmServer_Status_Error);
1817 break;
1818 }
1819 }
1820 else if (rcmMsg->result < 0) {
1821 RcmServer_setStatusCode_I(packet, RcmServer_Status_MSG_FXN_ERR);
1822 }
1823 else {
1824 RcmServer_setStatusCode_I(packet, RcmServer_Status_SUCCESS);
1825 }
1827 #if USE_MESSAGEQCOPY
1828 #if 0
1829 System_printf("RcmServer_process_P: Sending reply from: %d to: %d\n",
1830 obj->localAddr, obj->replyAddr);
1831 #endif
1833 packet->hdr.type = OMX_RAW_MSG;
1834 packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1835 status = MessageQCopy_send(obj->dstProc, obj->replyAddr,
1836 obj->localAddr, (Ptr)&packet->hdr,
1837 PACKET_HDR_SIZE + packet->message.dataSize);
1838 #else
1839 status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1840 #endif
1841 if (status < 0) {
1842 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
1843 }
1844 break;
1846 case RcmClient_Desc_CMD:
1847 status = RcmServer_execMsg_I(obj, rcmMsg);
1849 /* if all went well, free the message */
1850 if ((status >= 0) && (rcmMsg->result >= 0)) {
1852 #if USE_MESSAGEQCOPY == 0
1853 status = MessageQ_free(msgqMsg);
1854 #endif
1855 if (status < 0) {
1856 Log_error1(
1857 FXNN": MessageQ_free returned error %d", (IArg)status);
1858 }
1859 }
1861 /* an error occurred, must return message to client */
1862 else {
1863 if (status < 0) {
1864 /* error trying to process the message */
1865 switch (status) {
1866 case RcmServer_E_InvalidFxnIdx:
1867 RcmServer_setStatusCode_I(
1868 packet, RcmServer_Status_INVALID_FXN);
1869 break;
1870 default:
1871 RcmServer_setStatusCode_I(
1872 packet, RcmServer_Status_Error);
1873 break;
1874 }
1875 }
1876 else {
1877 /* error in message function */
1878 RcmServer_setStatusCode_I(
1879 packet, RcmServer_Status_MSG_FXN_ERR);
1880 }
1882 /* send error message back to client */
1883 #if USE_MESSAGEQCOPY
1884 packet->hdr.type = OMX_RAW_MSG;
1885 packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1886 status = MessageQCopy_send(obj->dstProc, obj->replyAddr,
1887 obj->localAddr, (Ptr)&packet->hdr,
1888 PACKET_HDR_SIZE + packet->message.dataSize);
1889 #else
1890 status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1891 #endif
1892 if (status < 0) {
1893 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
1894 }
1895 }
1896 break;
1898 case RcmClient_Desc_DPC:
1899 rval = RcmServer_getFxnAddr_P(obj, rcmMsg->fxnIdx, &fxn,
1900 &createFxn);
1902 if (rval < 0) {
1903 RcmServer_setStatusCode_I(
1904 packet, RcmServer_Status_SYMBOL_NOT_FOUND);
1905 Error_init(&eb);
1906 }
1908 #if USE_MESSAGEQCOPY
1909 packet->hdr.type = OMX_RAW_MSG;
1910 packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1911 status = MessageQCopy_send(obj->dstProc, obj->replyAddr,
1912 obj->localAddr, (Ptr)&packet->hdr,
1913 PACKET_HDR_SIZE + packet->message.dataSize);
1914 #else
1915 status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1916 #endif
1917 if (status < 0) {
1918 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
1919 }
1921 /* invoke the function with a null context */
1922 #if USE_MESSAGEQCOPY
1923 if (createFxn) {
1924 (*createFxn)(obj, 0, NULL);
1925 }
1926 else {
1927 (*fxn)(0, NULL);
1928 }
1929 #else
1930 (*fxn)(0, NULL);
1931 #endif
1932 break;
1934 case RcmClient_Desc_SYM_ADD:
1935 break;
1937 case RcmClient_Desc_SYM_IDX:
1938 name = (String)rcmMsg->data;
1939 rval = RcmServer_getSymIdx_P(obj, name, &fxnIdx);
1941 if (rval < 0) {
1942 RcmServer_setStatusCode_I(
1943 packet, RcmServer_Status_SYMBOL_NOT_FOUND);
1944 }
1945 else {
1946 RcmServer_setStatusCode_I(packet, RcmServer_Status_SUCCESS);
1947 rcmMsg->data[0] = fxnIdx;
1948 rcmMsg->result = 0;
1949 }
1951 #if USE_MESSAGEQCOPY
1952 packet->hdr.type = OMX_RAW_MSG;
1953 packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1954 status = MessageQCopy_send(obj->dstProc, obj->replyAddr,
1955 obj->localAddr, (Ptr)&packet->hdr,
1956 PACKET_HDR_SIZE + packet->message.dataSize);
1957 #else
1958 status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1959 #endif
1960 if (status < 0) {
1961 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
1962 }
1963 break;
1965 case RcmClient_Desc_JOB_ACQ:
1966 rval = RcmServer_acqJobId_P(obj, &jobId);
1968 if (rval < 0) {
1969 RcmServer_setStatusCode_I(packet, RcmServer_Status_Error);
1970 }
1971 else {
1972 RcmServer_setStatusCode_I(packet, RcmServer_Status_SUCCESS);
1973 *(UInt16 *)(&rcmMsg->data[0]) = jobId;
1974 rcmMsg->result = 0;
1975 }
1977 #if USE_MESSAGEQCOPY
1978 packet->hdr.type = OMX_RAW_MSG;
1979 packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
1980 status = MessageQCopy_send(obj->dstProc, obj->replyAddr,
1981 obj->localAddr, (Ptr)&packet->hdr,
1982 PACKET_HDR_SIZE + packet->message.dataSize);
1983 #else
1984 status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
1985 #endif
1986 if (status < 0) {
1987 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
1988 }
1989 break;
1991 case RcmClient_Desc_JOB_REL:
1992 jobId = (UInt16)(rcmMsg->data[0]);
1993 rval = RcmServer_relJobId_P(obj, jobId);
1995 if (rval < 0) {
1996 switch (rval) {
1997 case RcmServer_E_JobIdNotFound:
1998 RcmServer_setStatusCode_I(
1999 packet, RcmServer_Status_JobNotFound);
2000 break;
2001 default:
2002 RcmServer_setStatusCode_I(
2003 packet, RcmServer_Status_Error);
2004 break;
2005 }
2006 rcmMsg->result = rval;
2007 }
2008 else {
2009 RcmServer_setStatusCode_I(packet, RcmServer_Status_SUCCESS);
2010 rcmMsg->result = 0;
2011 }
2013 #if USE_MESSAGEQCOPY
2014 packet->hdr.type = OMX_RAW_MSG;
2015 packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
2016 status = MessageQCopy_send(obj->dstProc, obj->replyAddr,
2017 obj->localAddr, (Ptr)&packet->hdr,
2018 PACKET_HDR_SIZE + packet->message.dataSize);
2019 #else
2020 status = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
2021 #endif
2022 if (status < 0) {
2023 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)status);
2024 }
2025 break;
2027 default:
2028 Log_error1(FXNN": unknown message type recieved, 0x%x",
2029 (IArg)messageType);
2030 break;
2031 }
2033 Log_print0(Diags_EXIT, "<-- "FXNN":");
2034 }
2035 #undef FXNN
2038 /*
2039 * ======== RcmServer_relJobId_P ========
2040 */
2041 #define FXNN "RcmServer_relJobId_P"
2042 Int RcmServer_relJobId_P(RcmServer_Object *obj, UInt16 jobId)
2043 {
2044 GateThread_Handle gateH;
2045 IArg key;
2046 List_Elem *elem;
2047 List_Handle msgQueH;
2048 RcmClient_Packet *packet;
2049 RcmServer_JobStream *job;
2050 #if USE_MESSAGEQCOPY == 0
2051 MessageQ_Msg msgqMsg;
2052 #endif
2053 Int rval;
2054 Int status = RcmServer_S_SUCCESS;
2057 Log_print2(Diags_ENTRY,
2058 "--> "FXNN": (obj=0x%x, jobId=0x%x)", (IArg)obj, (IArg)jobId);
2061 /* must protect job list while searching and modifying it */
2062 gateH = GateThread_handle(&obj->gate);
2063 key = GateThread_enter(gateH);
2065 /* find the job stream object in the list */
2066 elem = NULL;
2067 while ((elem = List_next(obj->jobList, elem)) != NULL) {
2068 job = (RcmServer_JobStream *)elem;
2070 /* remove the job stream object from the list */
2071 if (job->jobId == jobId) {
2072 List_remove(obj->jobList, elem);
2073 break;
2074 }
2075 }
2077 GateThread_leave(gateH, key);
2079 if (elem == NULL) {
2080 status = RcmServer_E_JobIdNotFound;
2081 Log_error1(FXNN": failed to find jobId=%d", (IArg)jobId);
2082 goto leave;
2083 }
2085 /* return any pending messages on the message queue */
2086 msgQueH = List_handle(&job->msgQue);
2088 while ((elem = List_get(msgQueH)) != NULL) {
2089 packet = (RcmClient_Packet *)elem;
2090 Log_warning2(
2091 FXNN": returning unprocessed message, jobId=0x%x, packet=0x%x",
2092 (IArg)jobId, (IArg)packet);
2094 RcmServer_setStatusCode_I(packet, RcmServer_Status_Unprocessed);
2096 #if USE_MESSAGEQCOPY
2097 packet->hdr.type = OMX_RAW_MSG;
2098 packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
2099 rval = MessageQCopy_send(obj->dstProc, obj->replyAddr,
2100 obj->localAddr, (Ptr)&packet->hdr,
2101 PACKET_HDR_SIZE + packet->message.dataSize);
2102 #else
2103 msgqMsg = &packet->msgqHeader;
2104 rval = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
2105 #endif
2106 if (rval < 0) {
2107 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)rval);
2108 }
2109 }
2111 /* finalize the job stream object */
2112 List_destruct(&job->msgQue);
2114 xdc_runtime_Memory_free(RcmServer_Module_heap(),
2115 (Ptr)job, sizeof(RcmServer_JobStream));
2118 leave:
2119 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
2120 return(status);
2121 }
2122 #undef FXNN
2125 /*
2126 * ======== RcmServer_serverThrFxn_P ========
2127 */
2128 #define FXNN "RcmServer_serverThrFxn_P"
2129 Void RcmServer_serverThrFxn_P(IArg arg)
2130 {
2131 Error_Block eb;
2132 RcmClient_Packet *packet;
2133 #if USE_MESSAGEQCOPY
2134 Char recvBuf[MSGBUFFERSIZE];
2135 UInt16 len;
2136 #else
2137 MessageQ_Msg msgqMsg = NULL;
2138 #endif
2139 Int rval;
2140 Bool running = TRUE;
2141 RcmServer_Object *obj = (RcmServer_Object *)arg;
2142 Int dataSize;
2144 #if USE_MESSAGEQCOPY
2145 packet = (RcmClient_Packet *)&recvBuf[0];
2146 #endif
2148 Log_print1(Diags_ENTRY, "--> "FXNN": (arg=0x%x)", arg);
2150 Error_init(&eb);
2152 /* wait until ready to run */
2153 Semaphore_pend(obj->run, Semaphore_FOREVER, &eb);
2155 if (Error_check(&eb)) {
2156 Log_error0(FXNN": Semaphore_pend failure in server thread");
2157 }
2159 /* main processing loop */
2160 while (running) {
2161 Log_print1(Diags_INFO,
2162 FXNN": waiting for message, thread=0x%x",
2163 (IArg)(obj->serverThread));
2165 /* block until message arrives */
2166 do {
2167 #if USE_MESSAGEQCOPY
2168 rval = MessageQCopy_recv(obj->serverQue, (Ptr)&packet->hdr, &len,
2169 &obj->replyAddr, MessageQCopy_FOREVER);
2170 #if 0
2171 System_printf("RcmServer_serverThrFxn_P: Received msg of len %d "
2172 "from: %d\n",
2173 len, obj->replyAddr);
2175 System_printf("hdr - t:%d f:%d l:%d\n", packet->hdr.type,
2176 packet->hdr.flags, packet->hdr.len);
2178 System_printf("pkt - d:%d m:%d\n", packet->desc, packet->msgId);
2179 #endif
2180 if (packet->hdr.type == OMX_DISC_REQ) {
2181 System_printf("RcmServer_serverThrFxn_P: Got OMX_DISCONNECT\n");
2182 }
2183 Assert_isTrue((len <= MSGBUFFERSIZE), NULL);
2184 Assert_isTrue((packet->hdr.type == OMX_RAW_MSG) ||
2185 (packet->hdr.type == OMX_DISC_REQ) , NULL);
2187 if ((rval < 0) && (rval != MessageQCopy_E_UNBLOCKED)) {
2188 #else
2189 rval = MessageQ_get(obj->serverQue, &msgqMsg, MessageQ_FOREVER);
2190 if ((rval < 0) && (rval != MessageQ_E_UNBLOCKED)) {
2191 #endif
2192 Log_error1(FXNN": ipc error 0x%x", (IArg)rval);
2193 /* keep running and hope for the best */
2194 }
2195 #if USE_MESSAGEQCOPY
2196 } while (FALSE);
2197 #else
2198 } while ((msgqMsg == NULL) && !obj->shutdown);
2199 #endif
2201 /* if shutdown, exit this thread */
2202 #if USE_MESSAGEQCOPY
2203 if (obj->shutdown || packet->hdr.type == OMX_DISC_REQ) {
2204 running = FALSE;
2205 Log_print1(Diags_INFO,
2206 FXNN": terminating, thread=0x%x", (IArg)(obj->serverThread));
2207 continue;
2208 }
2209 #else
2210 if (obj->shutdown) {
2211 running = FALSE;
2212 Log_print1(Diags_INFO,
2213 FXNN": terminating, thread=0x%x", (IArg)(obj->serverThread));
2214 if (msgqMsg == NULL ) {
2215 continue;
2216 }
2217 }
2218 #endif
2220 #if USE_MESSAGEQCOPY == 0
2221 packet = (RcmClient_Packet *)msgqMsg;
2222 #endif
2224 Log_print2(Diags_INFO,
2225 FXNN": message received, thread=0x%x packet=0x%x",
2226 (IArg)(obj->serverThread), (IArg)packet);
2228 if ((packet->message.poolId == RcmClient_DEFAULTPOOLID)
2229 && ((obj->poolMap[0])[0].count == 0)) {
2231 /* in-band (server thread) message processing */
2232 RcmServer_process_P(obj, packet);
2233 }
2234 else {
2235 /* out-of-band (worker thread) message processing */
2236 rval = RcmServer_dispatch_P(obj, packet);
2238 /* if error, message was not dispatched; must return to client */
2239 if (rval < 0) {
2240 switch (rval) {
2241 case RcmServer_E_JobIdNotFound:
2242 RcmServer_setStatusCode_I(
2243 packet, RcmServer_Status_JobNotFound);
2244 break;
2246 case RcmServer_E_PoolIdNotFound:
2247 RcmServer_setStatusCode_I(
2248 packet, RcmServer_Status_PoolNotFound);
2249 break;
2251 default:
2252 RcmServer_setStatusCode_I(
2253 packet, RcmServer_Status_Error);
2254 break;
2255 }
2256 packet->message.result = rval;
2258 /* return the message to the client */
2259 #if USE_MESSAGEQCOPY
2260 #if 0
2261 System_printf("RcmServer_serverThrFxn_P: "
2262 "Sending response from: %d to: %d\n",
2263 obj->localAddr, obj->replyAddr);
2264 System_printf("sending: %d + %d\n", PACKET_HDR_SIZE,
2265 packet->message.dataSize);
2266 #endif
2267 packet->hdr.type = OMX_RAW_MSG;
2268 if (rval < 0) {
2269 packet->hdr.len = sizeof(UInt32);
2270 packet->desc = 0x4142;
2271 packet->msgId = 0x0044;
2272 dataSize = sizeof(struct rpmsg_omx_hdr) + sizeof(UInt32);
2273 }
2274 else {
2275 packet->hdr.len = PACKET_DATA_SIZE +
2276 packet->message.dataSize;
2277 dataSize = PACKET_HDR_SIZE + packet->message.dataSize;
2278 }
2279 rval = MessageQCopy_send(obj->dstProc, obj->replyAddr,
2280 obj->localAddr, (Ptr)&packet->hdr, dataSize);
2281 #else
2282 rval = MessageQ_put(MessageQ_getReplyQueue(msgqMsg), msgqMsg);
2283 #endif
2284 if (rval < 0) {
2285 Log_error1(FXNN": unknown ipc error, 0x%x", (IArg)rval);
2286 }
2287 }
2288 }
2289 }
2291 System_printf("RcmServer_serverThrFxn_P: Exiting thread.\n");
2293 Log_print0(Diags_EXIT, "<-- "FXNN":");
2294 }
2295 #undef FXNN
2298 /*
2299 * ======== RcmServer_setStatusCode_I ========
2300 */
2301 Void RcmServer_setStatusCode_I(RcmClient_Packet *packet, UInt16 code)
2302 {
2304 /* code must be 0 - 15, it has to fit in a 4-bit field */
2305 Assert_isTrue((code < 16), NULL);
2307 packet->desc &= ~(RcmClient_Desc_TYPE_MASK);
2308 packet->desc |= ((code << RcmClient_Desc_TYPE_SHIFT)
2309 & RcmClient_Desc_TYPE_MASK);
2310 }
2313 /*
2314 * ======== RcmServer_workerThrFxn_P ========
2315 */
2316 #define FXNN "RcmServer_workerThrFxn_P"
2317 Void RcmServer_workerThrFxn_P(IArg arg)
2318 {
2319 Error_Block eb;
2320 RcmClient_Packet *packet;
2321 List_Elem *elem;
2322 List_Handle listH;
2323 List_Handle readyQueueH;
2324 UInt16 jobId;
2325 GateThread_Handle gateH;
2326 IArg key;
2327 RcmServer_ThreadPool *pool;
2328 RcmServer_JobStream *job;
2329 RcmServer_WorkerThread *obj;
2330 Bool running;
2331 Int rval;
2334 Log_print1(Diags_ENTRY, "--> "FXNN": (arg=0x%x)", arg);
2336 Error_init(&eb);
2337 obj = (RcmServer_WorkerThread *)arg;
2338 readyQueueH = List_handle(&obj->pool->readyQueue);
2339 packet = NULL;
2340 running = TRUE;
2342 /* main processing loop */
2343 while (running) {
2344 Log_print1(Diags_INFO,
2345 FXNN": waiting for job, thread=0x%x", (IArg)(obj->thread));
2347 /* if no current message, wait until signaled to run */
2348 if (packet == NULL) {
2349 Semaphore_pend(obj->pool->sem, Semaphore_FOREVER, &eb);
2351 if (Error_check(&eb)) {
2352 Log_error0(FXNN": semaphore pend failed");
2353 }
2354 }
2356 /* check if thread should terminate */
2357 if (obj->terminate) {
2358 running = FALSE;
2359 Log_print1(Diags_INFO,
2360 FXNN": terminating, thread=0x%x", (IArg)(obj->thread));
2361 continue;
2362 }
2364 /* get next message from ready queue */
2365 if (packet == NULL) {
2366 packet = (RcmClient_Packet *)List_get(readyQueueH);
2367 }
2369 if (packet == NULL) {
2370 Log_error1(FXNN": ready queue is empty, thread=0x%x",
2371 (IArg)(obj->thread));
2372 continue;
2373 }
2375 Log_print2(Diags_INFO, FXNN": job received, thread=0x%x packet=0x%x",
2376 (IArg)obj->thread, (IArg)packet);
2378 /* remember the message job id */
2379 jobId = packet->message.jobId;
2381 /* process the message */
2382 RcmServer_process_P(obj->server, packet);
2383 packet = NULL;
2385 /* If this worker thread just finished processing a job message,
2386 * queue up the next message for this job id. As an optimization,
2387 * if the message is addressed to this worker's pool, then don't
2388 * signal the semaphore, just get the next message from the queue
2389 * and processes it. This keeps the current thread running instead
2390 * of switching to another thread.
2391 */
2392 if (jobId != RcmClient_DISCRETEJOBID) {
2394 /* must protect job list while searching it */
2395 gateH = GateThread_handle(&obj->server->gate);
2396 key = GateThread_enter(gateH);
2398 /* find the job object in the list */
2399 elem = NULL;
2400 while ((elem = List_next(obj->server->jobList, elem)) != NULL) {
2401 job = (RcmServer_JobStream *)elem;
2402 if (job->jobId == jobId) {
2403 break;
2404 }
2405 }
2407 /* if job object not found, it is not an error */
2408 if (elem == NULL) {
2409 GateThread_leave(gateH, key);
2410 continue;
2411 }
2413 /* found the job object */
2414 listH = List_handle(&job->msgQue);
2416 /* get next job message and either process it or queue it */
2417 do {
2418 elem = List_get(listH);
2420 if (elem == NULL) {
2421 job->empty = TRUE; /* no more messages */
2422 break;
2423 }
2424 else {
2425 /* get target pool id */
2426 packet = (RcmClient_Packet *)elem;
2427 rval = RcmServer_getPool_P(obj->server, packet, &pool);
2429 /* if error, return the message to the client */
2430 if (rval < 0) {
2431 switch (rval) {
2432 case RcmServer_E_PoolIdNotFound:
2433 RcmServer_setStatusCode_I(
2434 packet, RcmServer_Status_PoolNotFound);
2435 break;
2437 default:
2438 RcmServer_setStatusCode_I(
2439 packet, RcmServer_Status_Error);
2440 break;
2441 }
2442 packet->message.result = rval;
2444 #if USE_MESSAGEQCOPY
2445 packet->hdr.type = OMX_RAW_MSG;
2446 packet->hdr.len = PACKET_DATA_SIZE + packet->message.dataSize;
2447 rval = MessageQCopy_send(
2448 (obj->server)->dstProc,
2449 (obj->server)->replyAddr,
2450 (obj->server)->localAddr, (Ptr)&packet->hdr,
2451 PACKET_HDR_SIZE + packet->message.dataSize);
2452 #else
2453 rval = MessageQ_put(
2454 MessageQ_getReplyQueue(&packet->msgqHeader),
2455 &packet->msgqHeader);
2456 #endif
2457 if (rval < 0) {
2458 Log_error1(
2459 FXNN": unknown ipc error, 0x%x", (IArg)rval);
2460 }
2461 }
2462 /* packet is valid, queue it in the corresponding pool's
2463 * ready queue */
2464 else {
2465 listH = List_handle(&pool->readyQueue);
2466 List_put(listH, elem);
2467 packet = NULL;
2468 Semaphore_post(pool->sem, &eb);
2470 if (Error_check(&eb)) {
2471 Log_error0(FXNN": semaphore post failed");
2472 }
2473 }
2475 /* loop around and wait to be run again */
2476 }
2478 } while (rval < 0);
2480 GateThread_leave(gateH, key);
2481 }
2482 } /* while (running) */
2484 Log_print0(Diags_EXIT, "<-- "FXNN":");
2485 }
2486 #undef FXNN
2489 /*
2490 * ======== RcmServer_getLocalAddress ========
2491 */
2492 UInt32 RcmServer_getLocalAddress(RcmServer_Object *obj)
2493 {
2494 return(obj->localAddr);
2495 }
2498 /*
2499 * ======== RcmServer_getRemoteAddress ========
2500 */
2501 UInt32 RcmServer_getRemoteAddress(RcmServer_Object *obj)
2502 {
2503 return(obj->replyAddr);
2504 }
2508 /*
2509 * ======== RcmServer_getRemoteProc ========
2510 */
2511 UInt16 RcmServer_getRemoteProc(RcmServer_Object *obj)
2512 {
2513 return(obj->dstProc);
2514 }