Merge remote-tracking branch 'origin/3.36' into ipc-next
[ipc/ipcdev.git] / linux / src / api / MessageQ.c
1 /*
2  * Copyright (c) 2012-2015 Texas Instruments Incorporated - http://www.ti.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * *  Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  *
12  * *  Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * *  Neither the name of Texas Instruments Incorporated nor the names of
17  *    its contributors may be used to endorse or promote products derived
18  *    from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 /*
33  *  @file   MessageQ.c
34  *
35  *  @brief  MessageQ Linux implementation
36  *
37  *  This implementation is geared for use in a "client/server" model, whereby
38  *  system-wide data is maintained in a "server" component and process-
39  *  specific data is handled here.  At the moment, this implementation
40  *  connects and communicates with LAD for the server connection.
41  */
43 /* Standard IPC header */
44 #include <ti/ipc/Std.h>
46 /* Module level headers */
47 #include <ti/ipc/NameServer.h>
48 #include <ti/ipc/MultiProc.h>
49 #include <_MultiProc.h>
50 #define MessageQ_internal 1     /* must be defined before include file */
51 #include <ti/ipc/MessageQ.h>
52 #include <_MessageQ.h>
53 #include <ti/ipc/interfaces/ITransport.h>
54 #include <ti/ipc/interfaces/IMessageQTransport.h>
55 #include <ti/ipc/interfaces/INetworkTransport.h>
57 /* Socket Headers */
58 #include <sys/select.h>
59 #include <sys/time.h>
60 #include <sys/types.h>
61 #include <sys/param.h>
62 #include <sys/eventfd.h>
63 #include <sys/queue.h>
64 #include <errno.h>
65 #include <stdio.h>
66 #include <string.h>
67 #include <stdlib.h>
68 #include <unistd.h>
69 #include <assert.h>
70 #include <pthread.h>
71 #include <semaphore.h>
73 #include <ladclient.h>
74 #include <_lad.h>
76 /* =============================================================================
77  * Macros/Constants
78  * =============================================================================
79  */
81 /*!
82  *  @brief  Name of the reserved NameServer used for MessageQ.
83  */
84 #define MessageQ_NAMESERVER  "MessageQ"
86 #define MessageQ_MAXTRANSPORTS 8
88 #define MessageQ_GROWSIZE 32
90 /* Trace flag settings: */
91 #define TRACESHIFT    12
92 #define TRACEMASK     0x1000
94 /* Define BENCHMARK to quiet key MessageQ APIs: */
95 //#define BENCHMARK
97 /* =============================================================================
98  * Structures & Enums
99  * =============================================================================
100  */
102 /* params structure evolution */
103 typedef struct {
104     Void *synchronizer;
105 } MessageQ_Params_Legacy;
107 typedef struct {
108     Int __version;
109     Void *synchronizer;
110     MessageQ_QueueIndex queueIndex;
111 } MessageQ_Params_Version2;
113 /* structure for MessageQ module state */
114 typedef struct MessageQ_ModuleObject {
115     MessageQ_Handle           *queues;
116     Int                       numQueues;
117     Int                       refCount;
118     NameServer_Handle         nameServer;
119     pthread_mutex_t           gate;
120     int                       seqNum;
121     pthread_mutex_t           seqNumGate;
122     IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
123     ITransport_Handle         transInst[MessageQ_MAXTRANSPORTS];
124     MessageQ_PutHookFxn       putHookFxn;
125 } MessageQ_ModuleObject;
127 typedef struct MessageQ_CIRCLEQ_ENTRY {
128      CIRCLEQ_ENTRY(MessageQ_CIRCLEQ_ENTRY) elem;
129 } MessageQ_CIRCLEQ_ENTRY;
131 /*!
132  *  @brief  Structure for the Handle for the MessageQ.
133  */
134 typedef struct MessageQ_Object_tag {
135     CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
136     pthread_mutex_t              msgListGate;
137     MessageQ_Params              params;
138     MessageQ_QueueId             queue;
139     int                          unblocked;
140     void                         *serverHandle;
141     sem_t                        synchronizer;
142 } MessageQ_Object;
144 /* traces in this file are controlled via _MessageQ_verbose */
145 Bool _MessageQ_verbose = FALSE;
146 #define verbose _MessageQ_verbose
148 /* =============================================================================
149  *  Globals
150  * =============================================================================
151  */
152 static MessageQ_ModuleObject MessageQ_state =
154     .refCount   = 0,
155     .nameServer = NULL,
156 #if defined(IPC_BUILDOS_ANDROID)
157     .gate       = PTHREAD_RECURSIVE_MUTEX_INITIALIZER,
158 #else
159     .gate       = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
160 #endif
161     .seqNumGate = PTHREAD_MUTEX_INITIALIZER,
162     .putHookFxn = NULL
163 };
165 /*!
166  *  @var    MessageQ_module
167  *
168  *  @brief  Pointer to the MessageQ module state.
169  */
170 MessageQ_ModuleObject *MessageQ_module = &MessageQ_state;
172 Void _MessageQ_grow(UInt16 queueIndex);
174 /* =============================================================================
175  * APIS
176  * =============================================================================
177  */
179 Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
180                                 UInt16 rprocId, UInt priority)
182     Int status = FALSE;
183     UInt16 clusterId;
185     if (handle == NULL) {
186         fprintf(stderr,
187                 "MessageQ_registerTransport: invalid handle, must be non-NULL\n"
188                );
190         return status;
191     }
193     /* map procId to clusterId */
194     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
196     if (clusterId >= MultiProc_MAXPROCESSORS) {
197         fprintf(stderr,
198                 "MessageQ_registerTransport: invalid procId %d\n", rprocId);
200         return status;
201     }
203     if (MessageQ_module->transports[clusterId][priority] == NULL) {
204         MessageQ_module->transports[clusterId][priority] = handle;
206         status = TRUE;
207     }
209     return status;
212 Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
214     if (inst == NULL) {
215         fprintf(stderr, "MessageQ_registerTransportId: invalid NULL handle\n");
217         return MessageQ_E_INVALIDARG;
218     }
220     if (tid >= MessageQ_MAXTRANSPORTS) {
221         fprintf(stderr,
222                 "MessageQ_unregisterNetTransport: invalid transport id %d, "
223                 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
225         return MessageQ_E_INVALIDARG;
226     }
228     if (MessageQ_module->transInst[tid] != NULL) {
229         fprintf(stderr,
230                 "MessageQ_registerTransportId: transport id %d already "
231                 "registered\n", tid);
233         return MessageQ_E_ALREADYEXISTS;
234     }
236     MessageQ_module->transInst[tid] = inst;
238     return MessageQ_S_SUCCESS;
241 Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority)
243     UInt16 clusterId;
245     /* map procId to clusterId */
246     clusterId = rprocId - MultiProc_getBaseIdOfCluster();
248     if (clusterId >= MultiProc_MAXPROCESSORS) {
249         fprintf(stderr, "MessageQ_unregisterTransport: invalid rprocId %d\n",
250                 rprocId);
252         return;
253     }
255     MessageQ_module->transports[clusterId][priority] = NULL;
258 Void MessageQ_unregisterTransportId(UInt tid)
260     if (tid >= MessageQ_MAXTRANSPORTS) {
261         fprintf(stderr,
262                 "MessageQ_unregisterTransportId: invalid transport id %d, "
263                 "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
265         return;
266     }
268     MessageQ_module->transInst[tid] = NULL;
271 /*
272  * Function to get default configuration for the MessageQ module.
273  */
274 Void MessageQ_getConfig(MessageQ_Config *cfg)
276     Int status;
277     LAD_ClientHandle handle;
278     struct LAD_CommandObj cmd;
279     union LAD_ResponseObj rsp;
281     assert (cfg != NULL);
283     handle = LAD_findHandle();
284     if (handle == LAD_MAXNUMCLIENTS) {
285         PRINTVERBOSE1(
286           "MessageQ_getConfig: can't find connection to daemon for pid %d\n",
287            getpid())
289         return;
290     }
292     cmd.cmd = LAD_MESSAGEQ_GETCONFIG;
293     cmd.clientId = handle;
295     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
296         PRINTVERBOSE1(
297           "MessageQ_getConfig: sending LAD command failed, status=%d\n", status)
298         return;
299     }
301     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
302         PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n",
303                       status)
304         return;
305     }
306     status = rsp.messageQGetConfig.status;
308     PRINTVERBOSE2(
309       "MessageQ_getConfig: got LAD response for client %d, status=%d\n",
310       handle, status)
312     memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof (*cfg));
314     return;
317 /*
318  *  Function to setup the MessageQ module.
319  */
320 Int MessageQ_setup(const MessageQ_Config *cfg)
322     Int status = MessageQ_S_SUCCESS;
323     LAD_ClientHandle handle;
324     struct LAD_CommandObj cmd;
325     union LAD_ResponseObj rsp;
326     Int pri;
327     Int i;
328     Int tid;
330     /* this entire function must be serialized */
331     pthread_mutex_lock(&MessageQ_module->gate);
333     /* ensure only first thread performs startup procedure */
334     if (++MessageQ_module->refCount > 1) {
335         PRINTVERBOSE1("MessageQ module has been already setup, refCount=%d\n",
336                 MessageQ_module->refCount)
337         status = MessageQ_S_ALREADYSETUP;
338         goto exit;
339     }
341     handle = LAD_findHandle();
342     if (handle == LAD_MAXNUMCLIENTS) {
343         PRINTVERBOSE1("MessageQ_setup: can't find connection to daemon for "
344                 "pid %d\n", getpid())
345         status = MessageQ_E_RESOURCE;
346         goto exit;
347     }
349     cmd.cmd = LAD_MESSAGEQ_SETUP;
350     cmd.clientId = handle;
351     memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof(*cfg));
353     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
354         PRINTVERBOSE1("MessageQ_setup: sending LAD command failed, "
355                 "status=%d\n", status)
356         status = MessageQ_E_FAIL;
357         goto exit;
358     }
360     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
361         PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
362         status = MessageQ_E_FAIL;
363         goto exit;
364     }
365     status = rsp.setup.status;
367     PRINTVERBOSE2("MessageQ_setup: LAD response for client %d, status=%d\n",
368             handle, status)
370     MessageQ_module->seqNum = 0;
371     MessageQ_module->nameServer = rsp.setup.nameServerHandle;
372     MessageQ_module->numQueues = cfg->maxRuntimeEntries;
373     MessageQ_module->queues = calloc(cfg->maxRuntimeEntries,
374             sizeof(MessageQ_Handle));
376     for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
377         for (pri = 0; pri < 2; pri++) {
378             MessageQ_module->transports[i][pri] = NULL;
379         }
380     }
382     for (tid = 0; tid < MessageQ_MAXTRANSPORTS; tid++) {
383         MessageQ_module->transInst[tid] = NULL;
384     }
386 exit:
387     /* if error, must decrement reference count */
388     if (status < 0) {
389         MessageQ_module->refCount--;
390     }
392     pthread_mutex_unlock(&MessageQ_module->gate);
394     return (status);
397 /*
398  *  MessageQ_destroy - destroy the MessageQ module.
399  */
400 Int MessageQ_destroy(void)
402     Int status = MessageQ_S_SUCCESS;
403     LAD_ClientHandle handle;
404     struct LAD_CommandObj cmd;
405     union LAD_ResponseObj rsp;
407     /* this entire function must be serialized */
408     pthread_mutex_lock(&MessageQ_module->gate);
410     /* ensure only last thread does the work */
411     if (--MessageQ_module->refCount > 0) {
412         goto exit;
413     }
415     handle = LAD_findHandle();
416     if (handle == LAD_MAXNUMCLIENTS) {
417         PRINTVERBOSE1("MessageQ_destroy: can't find connection to daemon "
418                 "for pid %d\n", getpid())
419         status =  MessageQ_E_RESOURCE;
420         goto exit;
421     }
423     cmd.cmd = LAD_MESSAGEQ_DESTROY;
424     cmd.clientId = handle;
426     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
427         PRINTVERBOSE1("MessageQ_destroy: sending LAD command failed, "
428                 "status=%d\n", status)
429         status = MessageQ_E_FAIL;
430         goto exit;
431     }
433     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
434         PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
435         status = MessageQ_E_FAIL;
436         goto exit;
437     }
438     status = rsp.status;
440     PRINTVERBOSE2("MessageQ_destroy: got LAD response for client %d, "
441             "status=%d\n", handle, status)
443 exit:
444     pthread_mutex_unlock(&MessageQ_module->gate);
446     return (status);
449 /*
450  *  ======== MessageQ_Params_init ========
451  *  Legacy implementation.
452  */
453 Void MessageQ_Params_init(MessageQ_Params *params)
455     ((MessageQ_Params_Legacy *)params)->synchronizer = NULL;
458 /*
459  *  ======== MessageQ_Params_init__S ========
460  *  New implementation which is version aware.
461  */
462 Void MessageQ_Params_init__S(MessageQ_Params *params, Int version)
464     MessageQ_Params_Version2 *params2;
466     switch (version) {
468         case MessageQ_Params_VERSION_2:
469             params2 = (MessageQ_Params_Version2 *)params;
470             params2->__version = MessageQ_Params_VERSION_2;
471             params2->synchronizer = NULL;
472             params2->queueIndex = MessageQ_ANY;
473             break;
475         default:
476             assert(FALSE);
477             break;
478     }
481 /*
482  *  ======== MessageQ_create ========
483  */
484 MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
486     Int                   status;
487     MessageQ_Object      *obj = NULL;
488     IMessageQTransport_Handle transport;
489     INetworkTransport_Handle netTrans;
490     ITransport_Handle     baseTrans;
491     UInt16                queueIndex;
492     UInt16                clusterId;
493     Int                   tid;
494     Int                   priority;
495     LAD_ClientHandle      handle;
496     struct LAD_CommandObj cmd;
497     union LAD_ResponseObj rsp;
498     MessageQ_Params ps;
500     MessageQ_Params_init__S(&ps, MessageQ_Params_VERSION);
502     /* copy the given params into the current params structure */
503     if (pp != NULL) {
505         /* snoop the params pointer to see if it's a legacy structure */
506         if ((pp->__version == 0) || (pp->__version > 100)) {
507             ps.synchronizer = ((MessageQ_Params_Legacy *)pp)->synchronizer;
508         }
510         /* not legacy structure, use params version field */
511         else if (pp->__version == MessageQ_Params_VERSION_2) {
512             ps.__version = ((MessageQ_Params_Version2 *)pp)->__version;
513             ps.synchronizer = ((MessageQ_Params_Version2 *)pp)->synchronizer;
514             ps.queueIndex = ((MessageQ_Params_Version2 *)pp)->queueIndex;
515         }
516         else {
517             assert(FALSE);
518         }
519     }
521     handle = LAD_findHandle();
522     if (handle == LAD_MAXNUMCLIENTS) {
523         PRINTVERBOSE1(
524           "MessageQ_create: can't find connection to daemon for pid %d\n",
525            getpid())
527         return NULL;
528     }
530     cmd.cmd = LAD_MESSAGEQ_CREATE;
531     cmd.clientId = handle;
533     if (name == NULL) {
534         cmd.args.messageQCreate.name[0] = '\0';
535     }
536     else {
537         strncpy(cmd.args.messageQCreate.name, name,
538                 LAD_MESSAGEQCREATEMAXNAMELEN - 1);
539         cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
540     }
542     memcpy(&cmd.args.messageQCreate.params, &ps, sizeof(ps));
544     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
545         PRINTVERBOSE1(
546           "MessageQ_create: sending LAD command failed, status=%d\n", status)
547         return NULL;
548     }
550     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
551         PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
552         return NULL;
553     }
554     status = rsp.messageQCreate.status;
556     PRINTVERBOSE2(
557       "MessageQ_create: got LAD response for client %d, status=%d\n",
558       handle, status)
560     if (status == -1) {
561        PRINTVERBOSE1(
562           "MessageQ_create: MessageQ server operation failed, status=%d\n",
563           status)
564        return NULL;
565     }
567     /* Create the generic obj */
568     obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
570    /* Populate the params member */
571     memcpy(&obj->params, &ps, sizeof(ps));
574     obj->queue = rsp.messageQCreate.queueId;
575     obj->serverHandle = rsp.messageQCreate.serverHandle;
576     pthread_mutex_init(&obj->msgListGate, NULL);
577     CIRCLEQ_INIT(&obj->msgList);
578     if (sem_init(&obj->synchronizer, 0, 0) < 0) {
579         PRINTVERBOSE1(
580           "MessageQ_create: failed to create synchronizer (errno %d)\n", errno)
582         MessageQ_delete((MessageQ_Handle *)&obj);
584         return NULL;
585     }
587     /* lad returns the queue port # (queueIndex + PORT_OFFSET) */
588     queueIndex = MessageQ_getQueueIndex(rsp.messageQCreate.queueId);
590     PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' "
591             "queueIndex %d\n", (name == NULL) ? "NULL" : name , queueIndex)
593     pthread_mutex_lock(&MessageQ_module->gate);
595     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
596         for (priority = 0; priority < 2; priority++) {
597             transport = MessageQ_module->transports[clusterId][priority];
598             if (transport) {
599                 /* need to check return and do something if error */
600                 IMessageQTransport_bind((Void *)transport, obj->queue);
601             }
602         }
603     }
605     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
606         baseTrans = MessageQ_module->transInst[tid];
608         if (baseTrans != NULL) {
609             switch (ITransport_itype(baseTrans)) {
610                 case INetworkTransport_TypeId:
611                     netTrans = INetworkTransport_downCast(baseTrans);
612                     INetworkTransport_bind((void *)netTrans, obj->queue);
613                     break;
615                 default:
616                     /* error */
617                     fprintf(stderr,
618                             "MessageQ_create: Error: transport id %d is an "
619                             "unsupported transport type.\n", tid);
620                     break;
621             }
622         }
623     }
625     /* LAD's MessageQ module can grow, we need to grow as well */
626     if (queueIndex >= MessageQ_module->numQueues) {
627         _MessageQ_grow(queueIndex);
628     }
630     /*  No need to "allocate" slot since the queueIndex returned by
631      *  LAD is guaranteed to be unique.
632      */
633     MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
635     pthread_mutex_unlock(&MessageQ_module->gate);
637     return (MessageQ_Handle)obj;
640 /*
641  *  ======== MessageQ_delete ========
642  */
643 Int MessageQ_delete(MessageQ_Handle *handlePtr)
645     MessageQ_Object *obj;
646     IMessageQTransport_Handle transport;
647     INetworkTransport_Handle  netTrans;
648     ITransport_Handle         baseTrans;
649     Int              status = MessageQ_S_SUCCESS;
650     UInt16           queueIndex;
651     UInt16                clusterId;
652     Int                   tid;
653     Int                   priority;
654     LAD_ClientHandle handle;
655     struct LAD_CommandObj cmd;
656     union LAD_ResponseObj rsp;
658     handle = LAD_findHandle();
659     if (handle == LAD_MAXNUMCLIENTS) {
660         PRINTVERBOSE1(
661           "MessageQ_delete: can't find connection to daemon for pid %d\n",
662            getpid())
664         return MessageQ_E_FAIL;
665     }
667     obj = (MessageQ_Object *)(*handlePtr);
669     cmd.cmd = LAD_MESSAGEQ_DELETE;
670     cmd.clientId = handle;
671     cmd.args.messageQDelete.serverHandle = obj->serverHandle;
673     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
674         PRINTVERBOSE1(
675           "MessageQ_delete: sending LAD command failed, status=%d\n", status)
676         return MessageQ_E_FAIL;
677     }
679     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
680         PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status)
681         return MessageQ_E_FAIL;
682     }
683     status = rsp.messageQDelete.status;
685     PRINTVERBOSE2(
686       "MessageQ_delete: got LAD response for client %d, status=%d\n",
687       handle, status)
689     pthread_mutex_lock(&MessageQ_module->gate);
691     for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
692         for (priority = 0; priority < 2; priority++) {
693             transport = MessageQ_module->transports[clusterId][priority];
694             if (transport) {
695                 IMessageQTransport_unbind((Void *)transport, obj->queue);
696             }
697         }
698     }
700     for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
701         baseTrans = MessageQ_module->transInst[tid];
703         if (baseTrans != NULL) {
704             switch (ITransport_itype(baseTrans)) {
705                 case INetworkTransport_TypeId:
706                     netTrans = INetworkTransport_downCast(baseTrans);
707                     INetworkTransport_unbind((void *)netTrans, obj->queue);
708                     break;
710                 default:
711                     /* error */
712                     fprintf(stderr,
713                             "MessageQ_create: Error: transport id %d is an "
714                             "unsupported transport type.\n", tid);
715                     break;
716             }
717         }
718     }
720     /* extract the queue index from the queueId */
721     queueIndex = MessageQ_getQueueIndex(obj->queue);
722     MessageQ_module->queues[queueIndex] = NULL;
724     pthread_mutex_unlock(&MessageQ_module->gate);
726     free(obj);
727     *handlePtr = NULL;
729     return status;
732 /*
733  *  ======== MessageQ_open ========
734  *  Acquire a queueId for use in sending messages to the queue
735  */
736 Int MessageQ_open(String name, MessageQ_QueueId *queueId)
738     Int status = MessageQ_S_SUCCESS;
740     status = NameServer_getUInt32(MessageQ_module->nameServer,
741                                   name, queueId, NULL);
743     if (status == NameServer_E_NOTFOUND) {
744         /* Set return queue ID to invalid */
745         *queueId = MessageQ_INVALIDMESSAGEQ;
746         status = MessageQ_E_NOTFOUND;
747     }
748     else if (status >= 0) {
749         /* Override with a MessageQ status code */
750         status = MessageQ_S_SUCCESS;
751     }
752     else {
753         /* Set return queue ID to invalid */
754         *queueId = MessageQ_INVALIDMESSAGEQ;
756         /* Override with a MessageQ status code */
757         if (status == NameServer_E_TIMEOUT) {
758             status = MessageQ_E_TIMEOUT;
759         }
760         else {
761             status = MessageQ_E_FAIL;
762         }
763     }
765     return status;
768 /*
769  *  ======== MessageQ_openQueueId ========
770  */
771 MessageQ_QueueId MessageQ_openQueueId(UInt16 queueIndex, UInt16 procId)
773     MessageQ_QueueIndex queuePort;
774     MessageQ_QueueId queueId;
776     /* queue port is embedded in the queueId */
777     queuePort = queueIndex + MessageQ_PORTOFFSET;
778     queueId = ((MessageQ_QueueId)(procId) << 16) | queuePort;
780     return (queueId);
783 /*
784  *  ======== MessageQ_close ========
785  *  Closes previously opened instance of MessageQ
786  */
787 Int MessageQ_close(MessageQ_QueueId *queueId)
789     Int32 status = MessageQ_S_SUCCESS;
791     /* Nothing more to be done for closing the MessageQ. */
792     *queueId = MessageQ_INVALIDMESSAGEQ;
794     return status;
797 /*
798  *  ======== MessageQ_put ========
799  *  Deliver the given message, either locally or to the transport
800  *
801  *  If the destination is a local queue, deliver the message. Otherwise,
802  *  pass the message to a transport for delivery. The transport handles
803  *  the sending of the message using the appropriate interface (socket,
804  *  device ioctl, etc.).
805  */
806 Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
808     Int status = MessageQ_S_SUCCESS;
809     MessageQ_Object *obj;
810     UInt16 dstProcId;
811     UInt16 queueIndex;
812     UInt16 queuePort;
813     ITransport_Handle baseTrans;
814     IMessageQTransport_Handle msgTrans;
815     INetworkTransport_Handle netTrans;
816     Int priority;
817     UInt tid;
818     UInt16 clusterId;
819     Bool delivered;
821     /* extract destination address from the given queueId */
822     dstProcId  = (UInt16)(queueId >> 16);
823     queuePort = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
825     /* write the destination address into the message header */
826     msg->dstId = queuePort;
827     msg->dstProc= dstProcId;
829     /* invoke the hook function after addressing the message */
830     if (MessageQ_module->putHookFxn != NULL) {
831         MessageQ_module->putHookFxn(queueId, msg);
832     }
834     /*  For an outbound message: If message destination is on this
835      *  processor, then check if the destination queue is in this
836      *  process (thread-to-thread messaging).
837      *
838      *  For an inbound message: Check if destination queue is in this
839      *  process (process-to-process messaging).
840      */
841     if (dstProcId == MultiProc_self()) {
842         queueIndex = queuePort - MessageQ_PORTOFFSET;
844         if (queueIndex < MessageQ_module->numQueues) {
845             obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
847             if (obj != NULL) {
848                 /* deliver message to queue */
849                 pthread_mutex_lock(&obj->msgListGate);
850                 CIRCLEQ_INSERT_TAIL(&obj->msgList,
851                         (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
852                 pthread_mutex_unlock(&obj->msgListGate);
853                 sem_post(&obj->synchronizer);
854                 goto done;
855             }
856         }
857     }
859     /*  Getting here implies the message is outbound. Must give it to
860      *  either the primary or secondary transport for delivery. Start
861      *  by extracting the transport ID from the message header.
862      */
863     tid = MessageQ_getTransportId(msg);
865     if (tid >= MessageQ_MAXTRANSPORTS) {
866         fprintf(stderr,
867                 "MessageQ_put: Error: transport id %d too big, must be < %d\n",
868                 tid, MessageQ_MAXTRANSPORTS);
869         status = MessageQ_E_FAIL;
870         goto done;
871     }
873     /* if transportId is set, use secondary transport for message delivery */
874     if (tid != 0) {
875         baseTrans = MessageQ_module->transInst[tid];
877         if (baseTrans == NULL) {
878             fprintf(stderr, "MessageQ_put: Error: transport is null\n");
879             status = MessageQ_E_FAIL;
880             goto done;
881         }
883         /* downcast instance pointer to transport interface */
884         switch (ITransport_itype(baseTrans)) {
885             case INetworkTransport_TypeId:
886                 netTrans = INetworkTransport_downCast(baseTrans);
887                 delivered = INetworkTransport_put(netTrans, (Ptr)msg);
888                 status = (delivered ? MessageQ_S_SUCCESS :
889                           (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN :
890                            MessageQ_E_FAIL));
891                 break;
893             default:
894                 /* error */
895                 fprintf(stderr, "MessageQ_put: Error: transport id %d is an "
896                         "unsupported transport type\n", tid);
897                 status = MessageQ_E_FAIL;
898                 break;
899         }
900     }
901     else {
902         /* use primary transport for delivery */
903         priority = MessageQ_getMsgPri(msg);
904         clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
906         /* primary transport can only be used for intra-cluster delivery */
907         if (clusterId > MultiProc_getNumProcsInCluster()) {
908             fprintf(stderr,
909                     "MessageQ_put: Error: destination procId=%d is not "
910                     "in cluster. Must specify a transportId.\n", dstProcId);
911             status =  MessageQ_E_FAIL;
912             goto done;
913         }
915         msgTrans = MessageQ_module->transports[clusterId][priority];
916         delivered = IMessageQTransport_put(msgTrans, (Ptr)msg);
917         status = (delivered ? MessageQ_S_SUCCESS :
918                   (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN : MessageQ_E_FAIL));
919     }
921 done:
922     return (status);
925 /*
926  *  MessageQ_get - gets a message for a message queue and blocks if
927  *  the queue is empty.
928  *
929  *  If a message is present, it returns it.  Otherwise it blocks
930  *  waiting for a message to arrive.
931  *  When a message is returned, it is owned by the caller.
932  */
933 Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
935     MessageQ_Object * obj = (MessageQ_Object *)handle;
936     Int     status = MessageQ_S_SUCCESS;
937     struct timespec ts;
938     struct timeval tv;
940 #if 0
941 /*
942  * Optimization here to get a message without going in to the sem
943  * operation, but the sem count will not be maintained properly.
944  */
945     pthread_mutex_lock(&obj->msgListGate);
947     if (obj->msgList.cqh_first != &obj->msgList) {
948         *msg = (MessageQ_Msg)obj->msglist.cqh_first;
949         CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
951         pthread_mutex_unlock(&obj->msgListGate);
952     }
953     else {
954         pthread_mutex_unlock(&obj->msgListGate);
955     }
956 #endif
958     if (timeout == MessageQ_FOREVER) {
959         sem_wait(&obj->synchronizer);
960     }
961     else {
962         /* add timeout (microseconds) to current time of day */
963         gettimeofday(&tv, NULL);
964         tv.tv_sec += timeout / 1000000;
965         tv.tv_usec += timeout % 1000000;
967         if (tv.tv_usec >= 1000000) {
968               tv.tv_sec++;
969               tv.tv_usec -= 1000000;
970         }
972         /* set absolute timeout value */
973         ts.tv_sec = tv.tv_sec;
974         ts.tv_nsec = tv.tv_usec * 1000; /* convert to nanoseconds */
976         if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
977             if (errno == ETIMEDOUT) {
978                 PRINTVERBOSE0("MessageQ_get: operation timed out\n")
979                 return (MessageQ_E_TIMEOUT);
980             }
981             else {
982                 PRINTVERBOSE0("MessageQ_get: sem_timedwait error\n")
983                 return (MessageQ_E_FAIL);
984             }
985         }
986     }
988     if (obj->unblocked) {
989         return obj->unblocked;
990     }
992     pthread_mutex_lock(&obj->msgListGate);
994     *msg = (MessageQ_Msg)obj->msgList.cqh_first;
995     CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
997     pthread_mutex_unlock(&obj->msgListGate);
999     return status;
1002 /*
1003  * Return a count of the number of messages in the queue
1004  *
1005  * TBD: Implement as a socket ioctl, using getsockopt().  Return -1 for now.
1006  */
1007 Int MessageQ_count(MessageQ_Handle handle)
1009     Int               count = -1;
1010 #if 0
1011     MessageQ_Object * obj   = (MessageQ_Object *) handle;
1012     socklen_t         optlen;
1014     /*
1015      * TBD: Need to find a way to implement (if anyone uses it!), and
1016      * push down into transport..
1017      */
1019     /*
1020      * 2nd arg to getsockopt should be transport independent, but using
1021      *  SSKPROTO_SHMFIFO for now:
1022      */
1023     getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT,
1024                  &count, &optlen);
1025 #endif
1027     return count;
1030 /*
1031  *  Initializes a message not obtained from MessageQ_alloc.
1032  */
1033 Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
1035     /* Fill in the fields of the message */
1036     MessageQ_msgInit(msg);
1037     msg->heapId = MessageQ_STATICMSG;
1038     msg->msgSize = size;
1041 /*
1042  *  Allocate a message and initialize the needed fields (note some
1043  *  of the fields in the header are set via other APIs or in the
1044  *  MessageQ_put function,
1045  */
1046 MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
1048     MessageQ_Msg msg;
1050     /*
1051      * heapId not used for local alloc (as this is over a copy transport), but
1052      * we need to send to other side as heapId is used in BIOS transport.
1053      */
1054     msg = (MessageQ_Msg)calloc(1, size);
1055     MessageQ_msgInit(msg);
1056     msg->msgSize = size;
1057     msg->heapId = heapId;
1059     return msg;
1062 /*
1063  *  Frees the message back to the heap that was used to allocate it.
1064  */
1065 Int MessageQ_free(MessageQ_Msg msg)
1067     UInt32 status = MessageQ_S_SUCCESS;
1069     /* Check to ensure this was not allocated by user: */
1070     if (msg->heapId == MessageQ_STATICMSG) {
1071         status = MessageQ_E_CANNOTFREESTATICMSG;
1072     }
1073     else {
1074         free(msg);
1075     }
1077     return status;
1080 /* Register a heap with MessageQ. */
1081 Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
1083     Int status = MessageQ_S_SUCCESS;
1085     /* Do nothing, as this uses a copy transport */
1087     return status;
1090 /* Unregister a heap with MessageQ. */
1091 Int MessageQ_unregisterHeap(UInt16 heapId)
1093     Int status = MessageQ_S_SUCCESS;
1095     /* Do nothing, as this uses a copy transport */
1097     return status;
1100 /* Unblocks a MessageQ */
1101 Void MessageQ_unblock(MessageQ_Handle handle)
1103     MessageQ_Object *obj = (MessageQ_Object *)handle;
1105     obj->unblocked = MessageQ_E_UNBLOCKED;
1106     sem_post(&obj->synchronizer);
1109 /* Unblocks a MessageQ that's been shutdown due to transport failure */
1110 Void MessageQ_shutdown(MessageQ_Handle handle)
1112     MessageQ_Object *obj = (MessageQ_Object *)handle;
1114     obj->unblocked = MessageQ_E_SHUTDOWN;
1115     sem_post(&obj->synchronizer);
1118 /* Embeds a source message queue into a message */
1119 Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
1121     MessageQ_Object *obj = (MessageQ_Object *)handle;
1123     msg->replyId = (UInt16)(obj->queue);
1124     msg->replyProc = (UInt16)(obj->queue >> 16);
1127 /* Returns the QueueId associated with the handle. */
1128 MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
1130     MessageQ_Object *obj = (MessageQ_Object *) handle;
1131     UInt32 queueId;
1133     queueId = (obj->queue);
1135     return queueId;
1138 /* Returns the local handle associated with queueId. */
1139 MessageQ_Handle MessageQ_getLocalHandle(MessageQ_QueueId queueId)
1141     MessageQ_Object *obj;
1142     MessageQ_QueueIndex queueIndex;
1143     UInt16 procId;
1145     procId = MessageQ_getProcId(queueId);
1146     if (procId != MultiProc_self()) {
1147         return NULL;
1148     }
1150     queueIndex = MessageQ_getQueueIndex(queueId);
1151     obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
1153     return (MessageQ_Handle)obj;
1156 /* Sets the tracing of a message */
1157 Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
1159     msg->flags = (msg->flags & ~TRACEMASK) |   (traceFlag << TRACESHIFT);
1162 /*
1163  *  Returns the amount of shared memory used by one transport instance.
1164  *
1165  *  The MessageQ module itself does not use any shared memory but the
1166  *  underlying transport may use some shared memory.
1167  */
1168 SizeT MessageQ_sharedMemReq(Ptr sharedAddr)
1170     SizeT memReq = 0u;
1172     /* Do nothing, as this is a copy transport. */
1174     return memReq;
1177 /*
1178  * This is a helper function to initialize a message.
1179  */
1180 Void MessageQ_msgInit(MessageQ_Msg msg)
1182 #if 0
1183     Int                 status    = MessageQ_S_SUCCESS;
1184     LAD_ClientHandle handle;
1185     struct LAD_CommandObj cmd;
1186     union LAD_ResponseObj rsp;
1188     handle = LAD_findHandle();
1189     if (handle == LAD_MAXNUMCLIENTS) {
1190         PRINTVERBOSE1(
1191           "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
1192            getpid())
1194         return;
1195     }
1197     cmd.cmd = LAD_MESSAGEQ_MSGINIT;
1198     cmd.clientId = handle;
1200     if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
1201         PRINTVERBOSE1(
1202           "MessageQ_msgInit: sending LAD command failed, status=%d\n", status)
1203         return;
1204     }
1206     if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
1207         PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status)
1208         return;
1209     }
1210     status = rsp.msgInit.status;
1212     PRINTVERBOSE2(
1213       "MessageQ_msgInit: got LAD response for client %d, status=%d\n",
1214       handle, status)
1216     memcpy(msg, &rsp.msgInit.msg, sizeof (*msg));
1217 #else
1218     msg->reserved0 = 0;  /* We set this to distinguish from NameServerMsg */
1219     msg->replyId   = (UInt16)MessageQ_INVALIDMESSAGEQ;
1220     msg->msgId     = MessageQ_INVALIDMSGID;
1221     msg->dstId     = (UInt16)MessageQ_INVALIDMESSAGEQ;
1222     msg->flags     = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
1223     msg->srcProc   = MultiProc_self();
1225     pthread_mutex_lock(&MessageQ_module->seqNumGate);
1226     msg->seqNum  = MessageQ_module->seqNum++;
1227     pthread_mutex_unlock(&MessageQ_module->seqNumGate);
1228 #endif
1231 /*
1232  *  ======== _MessageQ_grow ========
1233  *  Increase module's queues array to accommodate queueIndex from LAD
1234  *
1235  *  Note: this function takes the queue index value (i.e. without the
1236  *  port offset).
1237  */
1238 Void _MessageQ_grow(UInt16 queueIndex)
1240     MessageQ_Handle *queues;
1241     MessageQ_Handle *oldQueues;
1242     UInt oldSize;
1244     pthread_mutex_lock(&MessageQ_module->gate);
1246     oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
1248     queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof(MessageQ_Handle));
1249     memcpy(queues, MessageQ_module->queues, oldSize);
1251     oldQueues = MessageQ_module->queues;
1252     MessageQ_module->queues = queues;
1253     MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
1255     pthread_mutex_unlock(&MessageQ_module->gate);
1257     free(oldQueues);
1259     return;
1262 /*
1263  *  ======== MessageQ_bind ========
1264  *  Bind all existing message queues to the given processor
1265  *
1266  *  Note: This function is a hack to work around the driver.
1267  *
1268  *  The Linux rpmsgproto driver requires a socket for each
1269  *  message queue and remote processor tuple.
1270  *
1271  *      socket --> (queue, processor)
1272  *
1273  *  Therefore, each time a new remote processor is started, all
1274  *  existing message queues need to create a socket for the new
1275  *  processor.
1276  *
1277  *  The driver should not have this requirement. One socket per
1278  *  message queue should be sufficient to uniquely identify the
1279  *  endpoint to the driver.
1280  */
1281 Void MessageQ_bind(UInt16 procId)
1283     int q;
1284     int clusterId;
1285     int priority;
1286     MessageQ_Handle handle;
1287     MessageQ_QueueId queue;
1288     IMessageQTransport_Handle transport;
1290     clusterId = procId - MultiProc_getBaseIdOfCluster();
1291     pthread_mutex_lock(&MessageQ_module->gate);
1293     for (q = 0; q < MessageQ_module->numQueues; q++) {
1295         if ((handle = MessageQ_module->queues[q]) == NULL) {
1296             continue;
1297         }
1299         queue = ((MessageQ_Object *)handle)->queue;
1301         for (priority = 0; priority < 2; priority++) {
1302             transport = MessageQ_module->transports[clusterId][priority];
1303             if (transport != NULL) {
1304                 IMessageQTransport_bind((Void *)transport, queue);
1305             }
1306         }
1307     }
1309     pthread_mutex_unlock(&MessageQ_module->gate);
1312 /*
1313  *  ======== MessageQ_unbind ========
1314  *  Unbind all existing message queues from the given processor
1315  *
1316  *  Hack: see MessageQ_bind.
1317  */
1318 Void MessageQ_unbind(UInt16 procId)
1320     int q;
1321     int clusterId;
1322     int priority;
1323     MessageQ_Handle handle;
1324     MessageQ_QueueId queue;
1325     IMessageQTransport_Handle transport;
1327     pthread_mutex_lock(&MessageQ_module->gate);
1329     for (q = 0; q < MessageQ_module->numQueues; q++) {
1331         if ((handle = MessageQ_module->queues[q]) == NULL) {
1332             continue;
1333         }
1335         queue = ((MessageQ_Object *)handle)->queue;
1336         clusterId = procId - MultiProc_getBaseIdOfCluster();
1338         for (priority = 0; priority < 2; priority++) {
1339             transport = MessageQ_module->transports[clusterId][priority];
1340             if (transport != NULL) {
1341                 IMessageQTransport_unbind((Void *)transport, queue);
1342             }
1343         }
1344     }
1346     pthread_mutex_unlock(&MessageQ_module->gate);