/* * Copyright (c) 2012-2014, Texas Instruments Incorporated * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * * Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * * Neither the name of Texas Instruments Incorporated nor the names of * its contributors may be used to endorse or promote products derived * from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ /* * @file MessageQ.c * * @brief MessageQ Linux implementation * * This implementation is geared for use in a "client/server" model, whereby * system-wide data is maintained in a "server" component and process- * specific data is handled here. At the moment, this implementation * connects and communicates with LAD for the server connection. */ /* Standard IPC header */ #include /* Module level headers */ #include #include #include <_MultiProc.h> #include #include <_MessageQ.h> #include #include #include /* Socket Headers */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include /* Socket Protocol Family */ #include #include #include <_lad.h> /* ============================================================================= * Macros/Constants * ============================================================================= */ /*! * @brief Name of the reserved NameServer used for MessageQ. */ #define MessageQ_NAMESERVER "MessageQ" #define MessageQ_MAXTRANSPORTS 8 #define MessageQ_GROWSIZE 32 /* Trace flag settings: */ #define TRACESHIFT 12 #define TRACEMASK 0x1000 /* Define BENCHMARK to quiet key MessageQ APIs: */ //#define BENCHMARK /* ============================================================================= * Structures & Enums * ============================================================================= */ /* structure for MessageQ module state */ typedef struct MessageQ_ModuleObject { MessageQ_Handle *queues; Int numQueues; Int refCount; NameServer_Handle nameServer; pthread_mutex_t gate; MessageQ_Params defaultInstParams; int seqNum; IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2]; INetworkTransport_Handle transInst[MessageQ_MAXTRANSPORTS]; } MessageQ_ModuleObject; typedef struct MessageQ_CIRCLEQ_ENTRY { CIRCLEQ_ENTRY(MessageQ_CIRCLEQ_ENTRY) elem; } MessageQ_CIRCLEQ_ENTRY; /*! * @brief Structure for the Handle for the MessageQ. */ typedef struct MessageQ_Object_tag { CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList; MessageQ_Params params; MessageQ_QueueId queue; int unblocked; void *serverHandle; sem_t synchronizer; } MessageQ_Object; /* traces in this file are controlled via _MessageQ_verbose */ Bool _MessageQ_verbose = FALSE; #define verbose _MessageQ_verbose /* ============================================================================= * Globals * ============================================================================= */ static MessageQ_ModuleObject MessageQ_state = { .refCount = 0, .nameServer = NULL, }; /*! * @var MessageQ_module * * @brief Pointer to the MessageQ module state. */ MessageQ_ModuleObject *MessageQ_module = &MessageQ_state; Void _MessageQ_grow(UInt16 queueIndex); /* ============================================================================= * APIS * ============================================================================= */ Bool MessageQ_registerTransport(IMessageQTransport_Handle handle, UInt16 rprocId, UInt priority) { Int status = FALSE; if (handle == NULL) { printf("MessageQ_registerTransport: invalid handle, must be non-NULL\n" ); return status; } if (rprocId >= MultiProc_MAXPROCESSORS) { printf("MessageQ_registerTransport: invalid procId %d\n", rprocId); return status; } if (MessageQ_module->transports[rprocId][priority] == NULL) { MessageQ_module->transports[rprocId][priority] = handle; status = TRUE; } return status; } Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst) { if (inst == NULL) { printf("MessageQ_registerTransportId: invalid NULL handle\n"); return MessageQ_E_INVALIDARG; } if (tid >= MessageQ_MAXTRANSPORTS) { printf("MessageQ_unregisterNetTransport: invalid transport id %d, must be < %d\n", tid, MessageQ_MAXTRANSPORTS); return MessageQ_E_INVALIDARG; } if (MessageQ_module->transInst[tid] != NULL) { printf("MessageQ_registerTransportId: transport id %d already registered\n", tid); return MessageQ_E_ALREADYEXISTS; } MessageQ_module->transInst[tid] = (INetworkTransport_Handle)inst; return MessageQ_S_SUCCESS; } Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority) { if (rprocId >= MultiProc_MAXPROCESSORS) { printf("MessageQ_registerTransport: invalid rprocId %d\n", rprocId); return; } MessageQ_module->transports[rprocId][priority] = NULL; } Void MessageQ_unregisterTransportId(UInt tid) { if (tid >= MessageQ_MAXTRANSPORTS) { printf("MessageQ_unregisterTransportId: invalid transport id %d, must be < %d\n", tid, MessageQ_MAXTRANSPORTS); return; } MessageQ_module->transInst[tid] = NULL; } /* * Function to get default configuration for the MessageQ module. */ Void MessageQ_getConfig(MessageQ_Config *cfg) { Int status; LAD_ClientHandle handle; struct LAD_CommandObj cmd; union LAD_ResponseObj rsp; assert (cfg != NULL); handle = LAD_findHandle(); if (handle == LAD_MAXNUMCLIENTS) { PRINTVERBOSE1( "MessageQ_getConfig: can't find connection to daemon for pid %d\n", getpid()) return; } cmd.cmd = LAD_MESSAGEQ_GETCONFIG; cmd.clientId = handle; if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) { PRINTVERBOSE1( "MessageQ_getConfig: sending LAD command failed, status=%d\n", status) return; } if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) { PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n", status) return; } status = rsp.messageQGetConfig.status; PRINTVERBOSE2( "MessageQ_getConfig: got LAD response for client %d, status=%d\n", handle, status) memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof (*cfg)); return; } /* * Function to setup the MessageQ module. */ Int MessageQ_setup(const MessageQ_Config *cfg) { Int status; LAD_ClientHandle handle; struct LAD_CommandObj cmd; union LAD_ResponseObj rsp; Int pri; Int rprocId; Int tid; pthread_mutex_lock(&MessageQ_module->gate); MessageQ_module->refCount++; if (MessageQ_module->refCount > 1) { pthread_mutex_unlock(&MessageQ_module->gate); PRINTVERBOSE1("MessageQ module has been already setup, refCount=%d\n", MessageQ_module->refCount) return MessageQ_S_ALREADYSETUP; } pthread_mutex_unlock(&MessageQ_module->gate); handle = LAD_findHandle(); if (handle == LAD_MAXNUMCLIENTS) { PRINTVERBOSE1( "MessageQ_setup: can't find connection to daemon for pid %d\n", getpid()) return MessageQ_E_RESOURCE; } cmd.cmd = LAD_MESSAGEQ_SETUP; cmd.clientId = handle; memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof (*cfg)); if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) { PRINTVERBOSE1( "MessageQ_setup: sending LAD command failed, status=%d\n", status) return MessageQ_E_FAIL; } if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) { PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status) return status; } status = rsp.setup.status; PRINTVERBOSE2( "MessageQ_setup: got LAD response for client %d, status=%d\n", handle, status) MessageQ_module->seqNum = 0; MessageQ_module->nameServer = rsp.setup.nameServerHandle; MessageQ_module->numQueues = cfg->maxRuntimeEntries; MessageQ_module->queues = calloc(cfg->maxRuntimeEntries, sizeof (MessageQ_Handle)); pthread_mutex_init(&MessageQ_module->gate, NULL); for (rprocId = 0; rprocId < MultiProc_MAXPROCESSORS; rprocId++) { for (pri = 0; pri < 2; pri++) { MessageQ_module->transports[rprocId][pri] = NULL; } } for (tid = 0; tid < MessageQ_MAXTRANSPORTS; tid++) { MessageQ_module->transInst[tid] = NULL; } return status; } /* * MessageQ_destroy - destroy the MessageQ module. */ Int MessageQ_destroy(void) { Int status; LAD_ClientHandle handle; struct LAD_CommandObj cmd; union LAD_ResponseObj rsp; handle = LAD_findHandle(); if (handle == LAD_MAXNUMCLIENTS) { PRINTVERBOSE1( "MessageQ_destroy: can't find connection to daemon for pid %d\n", getpid()) return MessageQ_E_RESOURCE; } cmd.cmd = LAD_MESSAGEQ_DESTROY; cmd.clientId = handle; if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) { PRINTVERBOSE1( "MessageQ_destroy: sending LAD command failed, status=%d\n", status) return MessageQ_E_FAIL; } if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) { PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status) return status; } status = rsp.status; PRINTVERBOSE2( "MessageQ_destroy: got LAD response for client %d, status=%d\n", handle, status) return status; } /* Function to initialize the parameters for the MessageQ instance. */ Void MessageQ_Params_init(MessageQ_Params *params) { memcpy (params, &(MessageQ_module->defaultInstParams), sizeof (MessageQ_Params)); return; } /* * MessageQ_create - create a MessageQ object for receiving. */ MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *params) { Int status; MessageQ_Object *obj = NULL; IMessageQTransport_Handle transport; INetworkTransport_Handle transInst; UInt16 queueIndex; UInt16 rprocId; Int tid; Int priority; LAD_ClientHandle handle; struct LAD_CommandObj cmd; union LAD_ResponseObj rsp; handle = LAD_findHandle(); if (handle == LAD_MAXNUMCLIENTS) { PRINTVERBOSE1( "MessageQ_create: can't find connection to daemon for pid %d\n", getpid()) return NULL; } cmd.cmd = LAD_MESSAGEQ_CREATE; cmd.clientId = handle; if (name == NULL) { cmd.args.messageQCreate.name[0] = '\0'; } else { strncpy(cmd.args.messageQCreate.name, name, LAD_MESSAGEQCREATEMAXNAMELEN - 1); cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0'; } if (params) { memcpy(&cmd.args.messageQCreate.params, params, sizeof (*params)); } if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) { PRINTVERBOSE1( "MessageQ_create: sending LAD command failed, status=%d\n", status) return NULL; } if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) { PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status) return NULL; } status = rsp.messageQCreate.status; PRINTVERBOSE2( "MessageQ_create: got LAD response for client %d, status=%d\n", handle, status) if (status == -1) { PRINTVERBOSE1( "MessageQ_create: MessageQ server operation failed, status=%d\n", status) return NULL; } /* Create the generic obj */ obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object)); if (params != NULL) { /* Populate the params member */ memcpy((Ptr) &obj->params, (Ptr)params, sizeof (MessageQ_Params)); } queueIndex = (MessageQ_QueueIndex)(rsp.messageQCreate.queueId & 0x0000ffff); obj->queue = rsp.messageQCreate.queueId; obj->serverHandle = rsp.messageQCreate.serverHandle; CIRCLEQ_INIT(&obj->msgList); if (sem_init(&obj->synchronizer, 0, 0) < 0) { PRINTVERBOSE1( "MessageQ_create: failed to create synchronizer (errno %d)\n", errno) MessageQ_delete((MessageQ_Handle *)&obj); return NULL; } PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' queueIndex %d\n", name, queueIndex) for (rprocId = 0; rprocId < MultiProc_MAXPROCESSORS; rprocId++) { for (priority = 0; priority < 2; priority++) { transport = MessageQ_module->transports[rprocId][priority]; if (transport) { /* need to check return and do something if error */ IMessageQTransport_bind((Void *)transport, obj->queue); } } } for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) { transInst = MessageQ_module->transInst[tid]; if (transInst) { /* need to check return and do something if error */ INetworkTransport_bind((Void *)transInst, obj->queue); } } /* * Since LAD's MessageQ_module can grow, we need to be able to grow as well */ if (queueIndex >= MessageQ_module->numQueues) { _MessageQ_grow(queueIndex); } /* * No need to "allocate" slot since the queueIndex returned by * LAD is guaranteed to be unique. */ MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj; return (MessageQ_Handle)obj; } /* * MessageQ_delete - delete a MessageQ object. */ Int MessageQ_delete(MessageQ_Handle *handlePtr) { MessageQ_Object *obj; IMessageQTransport_Handle transport; INetworkTransport_Handle transInst; Int status = MessageQ_S_SUCCESS; UInt16 queueIndex; UInt16 rprocId; Int tid; Int priority; LAD_ClientHandle handle; struct LAD_CommandObj cmd; union LAD_ResponseObj rsp; handle = LAD_findHandle(); if (handle == LAD_MAXNUMCLIENTS) { PRINTVERBOSE1( "MessageQ_delete: can't find connection to daemon for pid %d\n", getpid()) return MessageQ_E_FAIL; } obj = (MessageQ_Object *)(*handlePtr); cmd.cmd = LAD_MESSAGEQ_DELETE; cmd.clientId = handle; cmd.args.messageQDelete.serverHandle = obj->serverHandle; if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) { PRINTVERBOSE1( "MessageQ_delete: sending LAD command failed, status=%d\n", status) return MessageQ_E_FAIL; } if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) { PRINTVERBOSE1("MessageQ_delete: no LAD response, status=%d\n", status) return MessageQ_E_FAIL; } status = rsp.messageQDelete.status; PRINTVERBOSE2( "MessageQ_delete: got LAD response for client %d, status=%d\n", handle, status) for (rprocId = 0; rprocId < MultiProc_MAXPROCESSORS; rprocId++) { for (priority = 0; priority < 2; priority++) { transport = MessageQ_module->transports[rprocId][priority]; if (transport) { IMessageQTransport_unbind((Void *)transport, obj->queue); } } } for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) { transInst = MessageQ_module->transInst[tid]; if (transInst) { INetworkTransport_unbind((Void *)transInst, obj->queue); } } queueIndex = (MessageQ_QueueIndex)(obj->queue & 0x0000ffff); MessageQ_module->queues[queueIndex] = NULL; free(obj); *handlePtr = NULL; return status; } /* * MessageQ_open - Opens an instance of MessageQ for sending. */ Int MessageQ_open(String name, MessageQ_QueueId *queueId) { Int status = MessageQ_S_SUCCESS; status = NameServer_getUInt32(MessageQ_module->nameServer, name, queueId, NULL); if (status == NameServer_E_NOTFOUND) { /* Set return queue ID to invalid */ *queueId = MessageQ_INVALIDMESSAGEQ; status = MessageQ_E_NOTFOUND; } else if (status >= 0) { /* Override with a MessageQ status code */ status = MessageQ_S_SUCCESS; } else { /* Set return queue ID to invalid */ *queueId = MessageQ_INVALIDMESSAGEQ; /* Override with a MessageQ status code */ if (status == NameServer_E_TIMEOUT) { status = MessageQ_E_TIMEOUT; } else { status = MessageQ_E_FAIL; } } return status; } /* * MessageQ_close - Closes previously opened instance of MessageQ. */ Int MessageQ_close(MessageQ_QueueId *queueId) { Int32 status = MessageQ_S_SUCCESS; /* Nothing more to be done for closing the MessageQ. */ *queueId = MessageQ_INVALIDMESSAGEQ; return status; } /* * MessageQ_put - place a message onto a message queue. * * Calls transport's put(), which handles the sending of the message using the * appropriate kernel interface (socket, device ioctl) call for the remote * procId encoded in the queueId argument. * */ Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg) { MessageQ_Object *obj; UInt16 dstProcId = (UInt16)(queueId >> 16); UInt16 queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff); Int status = MessageQ_S_SUCCESS; ITransport_Handle transport; IMessageQTransport_Handle msgTrans; INetworkTransport_Handle netTrans; Int priority; UInt tid; msg->dstId = queueIndex; msg->dstProc = dstProcId; if (dstProcId != MultiProc_self()) { tid = MessageQ_getTransportId(msg); if (tid == 0) { priority = MessageQ_getMsgPri(msg); msgTrans = MessageQ_module->transports[dstProcId][priority]; IMessageQTransport_put(msgTrans, (Ptr)msg); } else { if (tid >= MessageQ_MAXTRANSPORTS) { printf("MessageQ_put: transport id %d too big, must be < %d\n", tid, MessageQ_MAXTRANSPORTS); return MessageQ_E_FAIL; } /* use secondary transport */ netTrans = MessageQ_module->transInst[tid]; transport = INetworkTransport_upCast(netTrans); /* downcast instance pointer to transport interface */ switch (ITransport_itype(transport)) { case INetworkTransport_TypeId: INetworkTransport_put(netTrans, (Ptr)msg); break; default: /* error */ printf("MessageQ_put: transport id %d is an unsupported transport type\n", tid); status = MessageQ_E_FAIL; break; } } } else { obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex]; pthread_mutex_lock(&MessageQ_module->gate); /* It is a local MessageQ */ CIRCLEQ_INSERT_TAIL(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)msg, elem); pthread_mutex_unlock(&MessageQ_module->gate); sem_post(&obj->synchronizer); } return status; } /* * MessageQ_get - gets a message for a message queue and blocks if * the queue is empty. * * If a message is present, it returns it. Otherwise it blocks * waiting for a message to arrive. * When a message is returned, it is owned by the caller. */ Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout) { MessageQ_Object * obj = (MessageQ_Object *)handle; Int status = MessageQ_S_SUCCESS; struct timespec ts; struct timeval tv; #if 0 /* * Optimization here to get a message without going in to the sem * operation, but the sem count will not be maintained properly. */ pthread_mutex_lock(&MessageQ_module->gate); if (obj->msgList.cqh_first != &obj->msgList) { *msg = (MessageQ_Msg)obj->msglist.cqh_first; CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0); pthread_mutex_unlock(&MessageQ_module->gate); } else { pthread_mutex_unlock(&MessageQ_module->gate); } #endif if (timeout == MessageQ_FOREVER) { sem_wait(&obj->synchronizer); } else { gettimeofday(&tv, NULL); ts.tv_sec = tv.tv_sec; ts.tv_nsec = (tv.tv_usec + timeout) * 1000; if (sem_timedwait(&obj->synchronizer, &ts) < 0) { if (errno == ETIMEDOUT) { PRINTVERBOSE0("MessageQ_get: operation timed out\n") return MessageQ_E_TIMEOUT; } } } if (obj->unblocked) { return MessageQ_E_UNBLOCKED; } pthread_mutex_lock(&MessageQ_module->gate); *msg = (MessageQ_Msg)obj->msgList.cqh_first; CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem); pthread_mutex_unlock(&MessageQ_module->gate); return status; } /* * Return a count of the number of messages in the queue * * TBD: Implement as a socket ioctl, using getsockopt(). Return -1 for now. */ Int MessageQ_count(MessageQ_Handle handle) { Int count = -1; #if 0 MessageQ_Object * obj = (MessageQ_Object *) handle; socklen_t optlen; /* * TBD: Need to find a way to implement (if anyone uses it!), and * push down into transport.. */ /* * 2nd arg to getsockopt should be transport independent, but using * SSKPROTO_SHMFIFO for now: */ getsockopt(obj->fd, SSKPROTO_SHMFIFO, SSKGETOPT_GETMSGQCOUNT, &count, &optlen); #endif return count; } /* * Initializes a message not obtained from MessageQ_alloc. */ Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size) { /* Fill in the fields of the message */ MessageQ_msgInit(msg); msg->heapId = MessageQ_STATICMSG; msg->msgSize = size; } /* * Allocate a message and initialize the needed fields (note some * of the fields in the header are set via other APIs or in the * MessageQ_put function, */ MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size) { MessageQ_Msg msg; /* * heapId not used for local alloc (as this is over a copy transport), but * we need to send to other side as heapId is used in BIOS transport. */ msg = (MessageQ_Msg)calloc(1, size); MessageQ_msgInit(msg); msg->msgSize = size; msg->heapId = heapId; return msg; } /* * Frees the message back to the heap that was used to allocate it. */ Int MessageQ_free(MessageQ_Msg msg) { UInt32 status = MessageQ_S_SUCCESS; /* Check to ensure this was not allocated by user: */ if (msg->heapId == MessageQ_STATICMSG) { status = MessageQ_E_CANNOTFREESTATICMSG; } else { free(msg); } return status; } /* Register a heap with MessageQ. */ Int MessageQ_registerHeap(Ptr heap, UInt16 heapId) { Int status = MessageQ_S_SUCCESS; /* Do nothing, as this uses a copy transport */ return status; } /* Unregister a heap with MessageQ. */ Int MessageQ_unregisterHeap(UInt16 heapId) { Int status = MessageQ_S_SUCCESS; /* Do nothing, as this uses a copy transport */ return status; } /* Unblocks a MessageQ */ Void MessageQ_unblock(MessageQ_Handle handle) { MessageQ_Object *obj = (MessageQ_Object *)handle; obj->unblocked = TRUE; sem_post(&obj->synchronizer); } /* Embeds a source message queue into a message */ Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg) { MessageQ_Object *obj = (MessageQ_Object *)handle; msg->replyId = (UInt16)(obj->queue); msg->replyProc = (UInt16)(obj->queue >> 16); } /* Returns the QueueId associated with the handle. */ MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle) { MessageQ_Object *obj = (MessageQ_Object *) handle; UInt32 queueId; queueId = (obj->queue); return queueId; } /* Sets the tracing of a message */ Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag) { msg->flags = (msg->flags & ~TRACEMASK) | (traceFlag << TRACESHIFT); } /* * Returns the amount of shared memory used by one transport instance. * * The MessageQ module itself does not use any shared memory but the * underlying transport may use some shared memory. */ SizeT MessageQ_sharedMemReq(Ptr sharedAddr) { SizeT memReq = 0u; /* Do nothing, as this is a copy transport. */ return memReq; } /* * This is a helper function to initialize a message. */ Void MessageQ_msgInit(MessageQ_Msg msg) { #if 0 Int status = MessageQ_S_SUCCESS; LAD_ClientHandle handle; struct LAD_CommandObj cmd; union LAD_ResponseObj rsp; handle = LAD_findHandle(); if (handle == LAD_MAXNUMCLIENTS) { PRINTVERBOSE1( "MessageQ_msgInit: can't find connection to daemon for pid %d\n", getpid()) return; } cmd.cmd = LAD_MESSAGEQ_MSGINIT; cmd.clientId = handle; if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) { PRINTVERBOSE1( "MessageQ_msgInit: sending LAD command failed, status=%d\n", status) return; } if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) { PRINTVERBOSE1("MessageQ_msgInit: no LAD response, status=%d\n", status) return; } status = rsp.msgInit.status; PRINTVERBOSE2( "MessageQ_msgInit: got LAD response for client %d, status=%d\n", handle, status) memcpy(msg, &rsp.msgInit.msg, sizeof (*msg)); #else msg->reserved0 = 0; /* We set this to distinguish from NameServerMsg */ msg->replyId = (UInt16)MessageQ_INVALIDMESSAGEQ; msg->msgId = MessageQ_INVALIDMSGID; msg->dstId = (UInt16)MessageQ_INVALIDMESSAGEQ; msg->flags = MessageQ_HEADERVERSION | MessageQ_NORMALPRI; msg->srcProc = MultiProc_self(); pthread_mutex_lock(&MessageQ_module->gate); msg->seqNum = MessageQ_module->seqNum++; pthread_mutex_unlock(&MessageQ_module->gate); #endif } /* * Grow module's queues[] array to accommodate queueIndex from LAD */ Void _MessageQ_grow(UInt16 queueIndex) { MessageQ_Handle *queues; MessageQ_Handle *oldQueues; UInt oldSize; oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle); queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof (MessageQ_Handle)); memcpy(queues, MessageQ_module->queues, oldSize); oldQueues = MessageQ_module->queues; MessageQ_module->queues = queues; MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE; free(oldQueues); return; }