index 5921fe78325111b341ea28621cdc463a342bd9c5..945712ea3c8d090570e22f47d3db8657ba414a75 100644 (file)
--- a/linux/src/api/MessageQ.c
+++ b/linux/src/api/MessageQ.c
/*
- * Copyright (c) 2012-2013, Texas Instruments Incorporated
+ * Copyright (c) 2012-2015 Texas Instruments Incorporated - http://www.ti.com
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
/*
* @file MessageQ.c
*
- * @brief MessageQ module "client" implementation
+ * @brief MessageQ Linux implementation
*
* This implementation is geared for use in a "client/server" model, whereby
* system-wide data is maintained in a "server" component and process-
* connects and communicates with LAD for the server connection.
*/
-
/* Standard IPC header */
#include <ti/ipc/Std.h>
-/* Linux specific header files, replacing OSAL: */
-#include <pthread.h>
-
/* Module level headers */
#include <ti/ipc/NameServer.h>
#include <ti/ipc/MultiProc.h>
#include <_MultiProc.h>
+#define MessageQ_internal 1 /* must be defined before include file */
#include <ti/ipc/MessageQ.h>
#include <_MessageQ.h>
+#include <ti/ipc/interfaces/IHeap.h>
+#include <ti/ipc/interfaces/ITransport.h>
+#include <ti/ipc/interfaces/IMessageQTransport.h>
+#include <ti/ipc/interfaces/INetworkTransport.h>
/* Socket Headers */
#include <sys/select.h>
#include <sys/types.h>
#include <sys/param.h>
#include <sys/eventfd.h>
-#include <sys/socket.h>
+#include <sys/queue.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
-
-/* Socket Protocol Family */
-#include <net/rpmsg.h>
-
-/* Socket utils: */
-#include <SocketFxns.h>
+#include <pthread.h>
+#include <semaphore.h>
#include <ladclient.h>
#include <_lad.h>
*/
#define MessageQ_NAMESERVER "MessageQ"
-/*!
- * @brief Value of an invalid socket ID:
- */
-#define Transport_INVALIDSOCKET (0xFFFFFFFF)
+#define MessageQ_MAXTRANSPORTS 8
-/* More magic rpmsg port numbers: */
-#define MESSAGEQ_RPMSG_PORT 61
-#define MESSAGEQ_RPMSG_MAXSIZE 512
+#define MessageQ_GROWSIZE 32
/* Trace flag settings: */
#define TRACESHIFT 12
* =============================================================================
*/
+/* params structure evolution */
+typedef struct {
+ Void *synchronizer;
+} MessageQ_Params_Legacy;
+
+typedef struct {
+ Int __version;
+ Void *synchronizer;
+ MessageQ_QueueIndex queueIndex;
+} MessageQ_Params_Version2;
+
/* structure for MessageQ module state */
typedef struct MessageQ_ModuleObject {
- Int refCount;
- /*!< Reference count */
- NameServer_Handle nameServer;
- /*!< Handle to the local NameServer used for storing GP objects */
- pthread_mutex_t gate;
- /*!< Handle of gate to be used for local thread safety */
- MessageQ_Params defaultInstParams;
- /*!< Default instance creation parameters */
- int sock[MultiProc_MAXPROCESSORS];
- /*!< Sockets to for sending to each remote processor */
- int seqNum;
- /*!< Process-specific sequence number */
+ MessageQ_Handle *queues;
+ Int numQueues;
+ Int refCount;
+ NameServer_Handle nameServer;
+ pthread_mutex_t gate;
+ int seqNum;
+ pthread_mutex_t seqNumGate;
+ IMessageQTransport_Handle transports[MultiProc_MAXPROCESSORS][2];
+ ITransport_Handle transInst[MessageQ_MAXTRANSPORTS];
+ MessageQ_PutHookFxn putHookFxn;
+ Ptr *heaps;
+ Int numHeaps;
} MessageQ_ModuleObject;
+typedef struct MessageQ_CIRCLEQ_ENTRY {
+ CIRCLEQ_ENTRY(MessageQ_CIRCLEQ_ENTRY) elem;
+} MessageQ_CIRCLEQ_ENTRY;
+
/*!
* @brief Structure for the Handle for the MessageQ.
*/
typedef struct MessageQ_Object_tag {
- MessageQ_Params params;
- /*! Instance specific creation parameters */
- MessageQ_QueueId queue;
- /* Unique id */
- int fd[MultiProc_MAXPROCESSORS];
- /* File Descriptor to block on messages from remote processors. */
- int unblockFd;
- /* Write this fd to unblock the select() call in MessageQ _get() */
- void *serverHandle;
+ CIRCLEQ_HEAD(dummy2, MessageQ_CIRCLEQ_ENTRY) msgList;
+ pthread_mutex_t msgListGate;
+ MessageQ_Params params;
+ MessageQ_QueueId queue;
+ int unblocked;
+ void *serverHandle;
+ sem_t synchronizer;
} MessageQ_Object;
-static Bool verbose = FALSE;
-
+/* traces in this file are controlled via _MessageQ_verbose */
+Bool _MessageQ_verbose = FALSE;
+#define verbose _MessageQ_verbose
/* =============================================================================
* Globals
*/
static MessageQ_ModuleObject MessageQ_state =
{
- .refCount = 0,
- .nameServer = NULL,
+ .refCount = 0,
+ .nameServer = NULL,
+#if defined(IPC_BUILDOS_ANDROID) && (PLATFORM_SDK_VERSION < 23)
+ .gate = PTHREAD_RECURSIVE_MUTEX_INITIALIZER,
+#else
+ .gate = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
+#endif
+ .seqNumGate = PTHREAD_MUTEX_INITIALIZER,
+ .putHookFxn = NULL,
+ .heaps = NULL,
+ .numHeaps = 0
};
/*!
*
* @brief Pointer to the MessageQ module state.
*/
-MessageQ_ModuleObject * MessageQ_module = &MessageQ_state;
+MessageQ_ModuleObject *MessageQ_module = &MessageQ_state;
+Void _MessageQ_grow(UInt16 queueIndex);
/* =============================================================================
- * Forward declarations of internal functions
+ * APIS
* =============================================================================
*/
-/* This is a helper function to initialize a message. */
-static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex);
-static Int transportCloseEndpoint(int fd);
-static Int transportGet(int sock, MessageQ_Msg * retMsg);
-static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId);
+Bool MessageQ_registerTransport(IMessageQTransport_Handle handle,
+ UInt16 rprocId, UInt priority)
+{
+ Int status = FALSE;
+ UInt16 clusterId;
-/* =============================================================================
- * APIS
- * =============================================================================
- */
-/* Function to get default configuration for the MessageQ module.
- *
+ if (handle == NULL) {
+ fprintf(stderr,
+ "MessageQ_registerTransport: invalid handle, must be non-NULL\n"
+ );
+
+ return status;
+ }
+
+ /* map procId to clusterId */
+ clusterId = rprocId - MultiProc_getBaseIdOfCluster();
+
+ if (clusterId >= MultiProc_MAXPROCESSORS) {
+ fprintf(stderr,
+ "MessageQ_registerTransport: invalid procId %d\n", rprocId);
+
+ return status;
+ }
+
+ if (MessageQ_module->transports[clusterId][priority] == NULL) {
+ MessageQ_module->transports[clusterId][priority] = handle;
+
+ status = TRUE;
+ }
+
+ return status;
+}
+
+Bool MessageQ_registerTransportId(UInt tid, ITransport_Handle inst)
+{
+ if (inst == NULL) {
+ fprintf(stderr, "MessageQ_registerTransportId: invalid NULL handle\n");
+
+ return MessageQ_E_INVALIDARG;
+ }
+
+ if (tid >= MessageQ_MAXTRANSPORTS) {
+ fprintf(stderr,
+ "MessageQ_unregisterNetTransport: invalid transport id %d, "
+ "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
+
+ return MessageQ_E_INVALIDARG;
+ }
+
+ if (MessageQ_module->transInst[tid] != NULL) {
+ fprintf(stderr,
+ "MessageQ_registerTransportId: transport id %d already "
+ "registered\n", tid);
+
+ return MessageQ_E_ALREADYEXISTS;
+ }
+
+ MessageQ_module->transInst[tid] = inst;
+
+ return MessageQ_S_SUCCESS;
+}
+
+Void MessageQ_unregisterTransport(UInt16 rprocId, UInt priority)
+{
+ UInt16 clusterId;
+
+ /* map procId to clusterId */
+ clusterId = rprocId - MultiProc_getBaseIdOfCluster();
+
+ if (clusterId >= MultiProc_MAXPROCESSORS) {
+ fprintf(stderr, "MessageQ_unregisterTransport: invalid rprocId %d\n",
+ rprocId);
+
+ return;
+ }
+
+ MessageQ_module->transports[clusterId][priority] = NULL;
+}
+
+Void MessageQ_unregisterTransportId(UInt tid)
+{
+ if (tid >= MessageQ_MAXTRANSPORTS) {
+ fprintf(stderr,
+ "MessageQ_unregisterTransportId: invalid transport id %d, "
+ "must be < %d\n", tid, MessageQ_MAXTRANSPORTS);
+
+ return;
+ }
+
+ MessageQ_module->transInst[tid] = NULL;
+}
+
+/*
+ * Function to get default configuration for the MessageQ module.
*/
-Void MessageQ_getConfig (MessageQ_Config * cfg)
+Void MessageQ_getConfig(MessageQ_Config *cfg)
{
Int status;
LAD_ClientHandle handle;
}
if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
- PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n", status)
+ PRINTVERBOSE1("MessageQ_getConfig: no LAD response, status=%d\n",
+ status)
return;
}
status = rsp.messageQGetConfig.status;
"MessageQ_getConfig: got LAD response for client %d, status=%d\n",
handle, status)
- memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof(*cfg));
+ memcpy(cfg, &rsp.messageQGetConfig.cfg, sizeof (*cfg));
return;
}
-/* Function to setup the MessageQ module. */
-Int MessageQ_setup (const MessageQ_Config * cfg)
+/*
+ * Function to setup the MessageQ module.
+ */
+Int MessageQ_setup(const MessageQ_Config *cfg)
{
- Int status;
+ Int status = MessageQ_S_SUCCESS;
LAD_ClientHandle handle;
struct LAD_CommandObj cmd;
union LAD_ResponseObj rsp;
+ Int pri;
Int i;
+ Int tid;
+
+ /* this entire function must be serialized */
+ pthread_mutex_lock(&MessageQ_module->gate);
+
+ /* ensure only first thread performs startup procedure */
+ if (++MessageQ_module->refCount > 1) {
+ PRINTVERBOSE1("MessageQ module has been already setup, refCount=%d\n",
+ MessageQ_module->refCount)
+ status = MessageQ_S_ALREADYSETUP;
+ goto exit;
+ }
handle = LAD_findHandle();
if (handle == LAD_MAXNUMCLIENTS) {
- PRINTVERBOSE1(
- "MessageQ_setup: can't find connection to daemon for pid %d\n",
- getpid())
-
- return MessageQ_E_RESOURCE;
+ PRINTVERBOSE1("MessageQ_setup: can't find connection to daemon for "
+ "pid %d\n", getpid())
+ status = MessageQ_E_RESOURCE;
+ goto exit;
}
cmd.cmd = LAD_MESSAGEQ_SETUP;
memcpy(&cmd.args.messageQSetup.cfg, cfg, sizeof(*cfg));
if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
- PRINTVERBOSE1(
- "MessageQ_setup: sending LAD command failed, status=%d\n", status)
- return MessageQ_E_FAIL;
+ PRINTVERBOSE1("MessageQ_setup: sending LAD command failed, "
+ "status=%d\n", status)
+ status = MessageQ_E_FAIL;
+ goto exit;
}
if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
PRINTVERBOSE1("MessageQ_setup: no LAD response, status=%d\n", status)
- return(status);
+ status = MessageQ_E_FAIL;
+ goto exit;
}
status = rsp.setup.status;
- PRINTVERBOSE2(
- "MessageQ_setup: got LAD response for client %d, status=%d\n",
- handle, status)
+ PRINTVERBOSE2("MessageQ_setup: LAD response for client %d, status=%d\n",
+ handle, status)
- MessageQ_module->nameServer = rsp.setup.nameServerHandle;
MessageQ_module->seqNum = 0;
+ MessageQ_module->nameServer = rsp.setup.nameServerHandle;
+ MessageQ_module->numQueues = cfg->maxRuntimeEntries;
+ MessageQ_module->queues = calloc(cfg->maxRuntimeEntries,
+ sizeof(MessageQ_Handle));
+ MessageQ_module->numHeaps = cfg->numHeaps;
+ MessageQ_module->heaps = calloc(cfg->numHeaps, sizeof(Ptr));
- /* Create a default local gate. */
- pthread_mutex_init (&(MessageQ_module->gate), NULL);
-
- /* Clear sockets array. */
for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
- MessageQ_module->sock[i] = Transport_INVALIDSOCKET;
+ for (pri = 0; pri < 2; pri++) {
+ MessageQ_module->transports[i][pri] = NULL;
+ }
+ }
+
+ for (tid = 0; tid < MessageQ_MAXTRANSPORTS; tid++) {
+ MessageQ_module->transInst[tid] = NULL;
}
+exit:
+ /* if error, must decrement reference count */
+ if (status < 0) {
+ MessageQ_module->refCount--;
+ }
- return status;
+ pthread_mutex_unlock(&MessageQ_module->gate);
+
+ return (status);
}
/*
- * Function to destroy the MessageQ module.
- * Destroys socket/protocol maps; sockets themselves should have been
- * destroyed in MessageQ_delete() and MessageQ_detach() calls.
+ * MessageQ_destroy - destroy the MessageQ module.
*/
-Int MessageQ_destroy (void)
+Int MessageQ_destroy(void)
{
- Int status;
+ Int status = MessageQ_S_SUCCESS;
LAD_ClientHandle handle;
struct LAD_CommandObj cmd;
union LAD_ResponseObj rsp;
+ int i;
+
+ /* this entire function must be serialized */
+ pthread_mutex_lock(&MessageQ_module->gate);
+
+ /* ensure only last thread does the work */
+ if (--MessageQ_module->refCount > 0) {
+ goto exit;
+ }
+
+ /* ensure all registered heaps have been unregistered */
+ for (i = 0; i < MessageQ_module->numHeaps; i++) {
+ if (MessageQ_module->heaps[i] != NULL) {
+ PRINTVERBOSE1("MessageQ_destroy: Warning: found heapId=%d", i);
+ }
+ }
+ free(MessageQ_module->heaps);
+ MessageQ_module->heaps = NULL;
handle = LAD_findHandle();
if (handle == LAD_MAXNUMCLIENTS) {
- PRINTVERBOSE1(
- "MessageQ_destroy: can't find connection to daemon for pid %d\n",
- getpid())
-
- return MessageQ_E_RESOURCE;
+ PRINTVERBOSE1("MessageQ_destroy: can't find connection to daemon "
+ "for pid %d\n", getpid())
+ status = MessageQ_E_RESOURCE;
+ goto exit;
}
cmd.cmd = LAD_MESSAGEQ_DESTROY;
cmd.clientId = handle;
if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
- PRINTVERBOSE1(
- "MessageQ_destroy: sending LAD command failed, status=%d\n", status)
- return MessageQ_E_FAIL;
+ PRINTVERBOSE1("MessageQ_destroy: sending LAD command failed, "
+ "status=%d\n", status)
+ status = MessageQ_E_FAIL;
+ goto exit;
}
if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
PRINTVERBOSE1("MessageQ_destroy: no LAD response, status=%d\n", status)
- return(status);
+ status = MessageQ_E_FAIL;
+ goto exit;
}
status = rsp.status;
- PRINTVERBOSE2(
- "MessageQ_destroy: got LAD response for client %d, status=%d\n",
- handle, status)
+ PRINTVERBOSE2("MessageQ_destroy: got LAD response for client %d, "
+ "status=%d\n", handle, status)
- return status;
+exit:
+ pthread_mutex_unlock(&MessageQ_module->gate);
+
+ return (status);
}
-/* Function to initialize the parameters for the MessageQ instance. */
-Void MessageQ_Params_init (MessageQ_Params * params)
+/*
+ * ======== MessageQ_Params_init ========
+ * Legacy implementation.
+ */
+Void MessageQ_Params_init(MessageQ_Params *params)
{
- memcpy (params, &(MessageQ_module->defaultInstParams),
- sizeof (MessageQ_Params));
+ ((MessageQ_Params_Legacy *)params)->synchronizer = NULL;
+}
- return;
+/*
+ * ======== MessageQ_Params_init__S ========
+ * New implementation which is version aware.
+ */
+Void MessageQ_Params_init__S(MessageQ_Params *params, Int version)
+{
+ MessageQ_Params_Version2 *params2;
+
+ switch (version) {
+
+ case MessageQ_Params_VERSION_2:
+ params2 = (MessageQ_Params_Version2 *)params;
+ params2->__version = MessageQ_Params_VERSION_2;
+ params2->synchronizer = NULL;
+ params2->queueIndex = MessageQ_ANY;
+ break;
+
+ default:
+ assert(FALSE);
+ break;
+ }
}
/*
- * Function to create a MessageQ object for receiving.
- *
- * Create a socket and bind the source address (local ProcId/MessageQ ID) in
- * order to get messages dispatched to this messageQ.
+ * ======== MessageQ_create ========
*/
-MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
+MessageQ_Handle MessageQ_create(String name, const MessageQ_Params *pp)
{
- Int status = MessageQ_S_SUCCESS;
- MessageQ_Object * obj = NULL;
- UInt16 queueIndex = 0u;
- UInt16 procId;
- UInt16 rprocId;
+ Int status;
+ MessageQ_Object *obj = NULL;
+ IMessageQTransport_Handle transport;
+ INetworkTransport_Handle netTrans;
+ ITransport_Handle baseTrans;
+ UInt16 queueIndex;
+ UInt16 clusterId;
+ Int tid;
+ Int priority;
LAD_ClientHandle handle;
struct LAD_CommandObj cmd;
union LAD_ResponseObj rsp;
+ MessageQ_Params ps;
+
+ MessageQ_Params_init__S(&ps, MessageQ_Params_VERSION);
+
+ /* copy the given params into the current params structure */
+ if (pp != NULL) {
+
+ /* snoop the params pointer to see if it's a legacy structure */
+ if ((pp->__version == 0) || (pp->__version > 100)) {
+ ps.synchronizer = ((MessageQ_Params_Legacy *)pp)->synchronizer;
+ }
+
+ /* not legacy structure, use params version field */
+ else if (pp->__version == MessageQ_Params_VERSION_2) {
+ ps.__version = ((MessageQ_Params_Version2 *)pp)->__version;
+ ps.synchronizer = ((MessageQ_Params_Version2 *)pp)->synchronizer;
+ ps.queueIndex = ((MessageQ_Params_Version2 *)pp)->queueIndex;
+ }
+ else {
+ assert(FALSE);
+ }
+ }
handle = LAD_findHandle();
if (handle == LAD_MAXNUMCLIENTS) {
cmd.cmd = LAD_MESSAGEQ_CREATE;
cmd.clientId = handle;
- strncpy(cmd.args.messageQCreate.name, name,
- LAD_MESSAGEQCREATEMAXNAMELEN - 1);
- cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
- if (params) {
- memcpy(&cmd.args.messageQCreate.params, params, sizeof(*params));
+
+ if (name == NULL) {
+ cmd.args.messageQCreate.name[0] = '\0';
+ }
+ else {
+ strncpy(cmd.args.messageQCreate.name, name,
+ LAD_MESSAGEQCREATEMAXNAMELEN - 1);
+ cmd.args.messageQCreate.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
}
+ memcpy(&cmd.args.messageQCreate.params, &ps, sizeof(ps));
+
if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
PRINTVERBOSE1(
"MessageQ_create: sending LAD command failed, status=%d\n", status)
@@ -386,68 +584,129 @@ MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
/* Create the generic obj */
obj = (MessageQ_Object *)calloc(1, sizeof (MessageQ_Object));
- if (params != NULL) {
- /* Populate the params member */
- memcpy((Ptr) &obj->params, (Ptr)params, sizeof (MessageQ_Params));
- }
+ /* Populate the params member */
+ memcpy(&obj->params, &ps, sizeof(ps));
+
- procId = MultiProc_self();
- queueIndex = (MessageQ_QueueIndex)rsp.messageQCreate.queueId;
obj->queue = rsp.messageQCreate.queueId;
obj->serverHandle = rsp.messageQCreate.serverHandle;
+ pthread_mutex_init(&obj->msgListGate, NULL);
+ CIRCLEQ_INIT(&obj->msgList);
+ if (sem_init(&obj->synchronizer, 0, 0) < 0) {
+ PRINTVERBOSE1(
+ "MessageQ_create: failed to create synchronizer (errno %d)\n", errno)
- /*
- * Create a set of communication endpoints (one per each remote proc),
- * and return the socket as target for MessageQ_put() calls, and as
- * a file descriptor to close during MessageQ_delete().
- */
- for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
- obj->fd[rprocId] = Transport_INVALIDSOCKET;
- if (procId == rprocId) {
- /* Skip creating an endpoint for ourself. */
- continue;
- }
+ MessageQ_delete((MessageQ_Handle *)&obj);
+
+ return NULL;
+ }
- PRINTVERBOSE3("MessageQ_create: creating endpoint for: %s, rprocId: %d, queueIndex: %d\n", name, rprocId, queueIndex)
+ /* lad returns the queue port # (queueIndex + PORT_OFFSET) */
+ queueIndex = MessageQ_getQueueIndex(rsp.messageQCreate.queueId);
- status = transportCreateEndpoint(&obj->fd[rprocId], rprocId,
- queueIndex);
- if (status < 0) {
- goto cleanup;
+ PRINTVERBOSE2("MessageQ_create: creating endpoints for '%s' "
+ "queueIndex %d\n", (name == NULL) ? "NULL" : name , queueIndex)
+
+ pthread_mutex_lock(&MessageQ_module->gate);
+
+ for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
+ for (priority = 0; priority < 2; priority++) {
+ transport = MessageQ_module->transports[clusterId][priority];
+ if (transport) {
+ /* need to check return and do something if error */
+ IMessageQTransport_bind((Void *)transport, obj->queue);
+ }
}
}
- /*
- * Now, to support MessageQ_unblock() functionality, create an event object.
- * Writing to this event will unblock the select() call in MessageQ_get().
+ for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
+ baseTrans = MessageQ_module->transInst[tid];
+
+ if (baseTrans != NULL) {
+ switch (ITransport_itype(baseTrans)) {
+ case INetworkTransport_TypeId:
+ netTrans = INetworkTransport_downCast(baseTrans);
+ INetworkTransport_bind((void *)netTrans, obj->queue);
+ break;
+
+ default:
+ /* error */
+ fprintf(stderr,
+ "MessageQ_create: Error: transport id %d is an "
+ "unsupported transport type.\n", tid);
+ break;
+ }
+ }
+ }
+
+ /* LAD's MessageQ module can grow, we need to grow as well */
+ if (queueIndex >= MessageQ_module->numQueues) {
+ _MessageQ_grow(queueIndex);
+ }
+
+ /* No need to "allocate" slot since the queueIndex returned by
+ * LAD is guaranteed to be unique.
*/
- obj->unblockFd = eventfd(0, 0);
- if (obj->unblockFd == -1) {
- printf ("MessageQ_create: eventfd creation failed: %d, %s\n",
- errno, strerror(errno));
- status = MessageQ_E_FAIL;
+ MessageQ_module->queues[queueIndex] = (MessageQ_Handle)obj;
+
+ pthread_mutex_unlock(&MessageQ_module->gate);
+
+ /* send announce message to LAD, indicating we are ready to receive msgs */
+ cmd.cmd = LAD_MESSAGEQ_ANNOUNCE;
+ cmd.clientId = handle;
+
+ if (name == NULL) {
+ cmd.args.messageQAnnounce.name[0] = '\0';
+ }
+ else {
+ strncpy(cmd.args.messageQAnnounce.name, name,
+ LAD_MESSAGEQCREATEMAXNAMELEN - 1);
+ cmd.args.messageQAnnounce.name[LAD_MESSAGEQCREATEMAXNAMELEN - 1] = '\0';
}
-cleanup:
- /* Cleanup if fail: */
- if (status < 0) {
- MessageQ_delete((MessageQ_Handle *)&obj);
+ cmd.args.messageQAnnounce.serverHandle = obj->serverHandle;
+
+ if ((status = LAD_putCommand(&cmd)) != LAD_SUCCESS) {
+ PRINTVERBOSE1(
+ "MessageQ_create: sending LAD command failed, status=%d\n", status)
+ goto exit;
+ }
+
+ if ((status = LAD_getResponse(handle, &rsp)) != LAD_SUCCESS) {
+ PRINTVERBOSE1("MessageQ_create: no LAD response, status=%d\n", status)
+ goto exit;
+ }
+ status = rsp.messageQAnnounce.status;
+
+ PRINTVERBOSE2(
+ "MessageQ_create: got LAD response for client %d, status=%d\n",
+ handle, status)
+
+ if (status == -1) {
+ PRINTVERBOSE1(
+ "MessageQ_create: MessageQ server operation failed, status=%d\n",
+ status)
}
- return ((MessageQ_Handle) obj);
+exit:
+ return (MessageQ_Handle)obj;
}
/*
- * Function to delete a MessageQ object for a specific slave processor.
- *
- * Deletes the socket associated with this MessageQ object.
+ * ======== MessageQ_delete ========
*/
-Int MessageQ_delete (MessageQ_Handle * handlePtr)
+Int MessageQ_delete(MessageQ_Handle *handlePtr)
{
- Int status = MessageQ_S_SUCCESS;
- MessageQ_Object * obj = NULL;
- UInt16 rprocId;
- LAD_ClientHandle handle;
+ MessageQ_Object *obj;
+ IMessageQTransport_Handle transport;
+ INetworkTransport_Handle netTrans;
+ ITransport_Handle baseTrans;
+ Int status = MessageQ_S_SUCCESS;
+ UInt16 queueIndex;
+ UInt16 clusterId;
+ Int tid;
+ Int priority;
+ LAD_ClientHandle handle;
struct LAD_CommandObj cmd;
union LAD_ResponseObj rsp;
return MessageQ_E_FAIL;
}
- obj = (MessageQ_Object *) (*handlePtr);
+ obj = (MessageQ_Object *)(*handlePtr);
cmd.cmd = LAD_MESSAGEQ_DELETE;
cmd.clientId = handle;
"MessageQ_delete: got LAD response for client %d, status=%d\n",
handle, status)
+ pthread_mutex_lock(&MessageQ_module->gate);
- /* Close the event used for MessageQ_unblock(): */
- close(obj->unblockFd);
+ for (clusterId = 0; clusterId < MultiProc_MAXPROCESSORS; clusterId++) {
+ for (priority = 0; priority < 2; priority++) {
+ transport = MessageQ_module->transports[clusterId][priority];
+ if (transport) {
+ IMessageQTransport_unbind((Void *)transport, obj->queue);
+ }
+ }
+ }
- /* Close the communication endpoint: */
- for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
- if (obj->fd[rprocId] != Transport_INVALIDSOCKET) {
- status = transportCloseEndpoint(obj->fd[rprocId]);
+ for (tid = 1; tid < MessageQ_MAXTRANSPORTS; tid++) {
+ baseTrans = MessageQ_module->transInst[tid];
+
+ if (baseTrans != NULL) {
+ switch (ITransport_itype(baseTrans)) {
+ case INetworkTransport_TypeId:
+ netTrans = INetworkTransport_downCast(baseTrans);
+ INetworkTransport_unbind((void *)netTrans, obj->queue);
+ break;
+
+ default:
+ /* error */
+ fprintf(stderr,
+ "MessageQ_create: Error: transport id %d is an "
+ "unsupported transport type.\n", tid);
+ break;
+ }
}
}
- /* Now free the obj */
- free (obj);
+ /* extract the queue index from the queueId */
+ queueIndex = MessageQ_getQueueIndex(obj->queue);
+ MessageQ_module->queues[queueIndex] = NULL;
+
+ pthread_mutex_unlock(&MessageQ_module->gate);
+
+ free(obj);
*handlePtr = NULL;
- return (status);
+ return status;
}
/*
- * Opens an instance of MessageQ for sending.
- *
- * We need not create a socket here; the sockets for all remote processors
- * were created during MessageQ_attach(), and will be
- * retrieved during MessageQ_put().
+ * ======== MessageQ_open ========
+ * Acquire a queueId for use in sending messages to the queue
*/
-Int MessageQ_open (String name, MessageQ_QueueId * queueId)
+Int MessageQ_open(String name, MessageQ_QueueId *queueId)
{
Int status = MessageQ_S_SUCCESS;
- status = NameServer_getUInt32 (MessageQ_module->nameServer,
- name, queueId, NULL);
+ status = NameServer_getUInt32(MessageQ_module->nameServer,
+ name, queueId, NULL);
if (status == NameServer_E_NOTFOUND) {
- /* Set return queue ID to invalid. */
+ /* Set return queue ID to invalid */
*queueId = MessageQ_INVALIDMESSAGEQ;
status = MessageQ_E_NOTFOUND;
}
else if (status >= 0) {
- /* Override with a MessageQ status code. */
+ /* Override with a MessageQ status code */
status = MessageQ_S_SUCCESS;
}
else {
- /* Set return queue ID to invalid. */
+ /* Set return queue ID to invalid */
*queueId = MessageQ_INVALIDMESSAGEQ;
- /* Override with a MessageQ status code. */
+
+ /* Override with a MessageQ status code */
if (status == NameServer_E_TIMEOUT) {
status = MessageQ_E_TIMEOUT;
}
}
}
- return (status);
+ return status;
}
-/* Closes previously opened instance of MessageQ module. */
-Int MessageQ_close (MessageQ_QueueId * queueId)
+/*
+ * ======== MessageQ_openQueueId ========
+ */
+MessageQ_QueueId MessageQ_openQueueId(UInt16 queueIndex, UInt16 procId)
+{
+ MessageQ_QueueIndex queuePort;
+ MessageQ_QueueId queueId;
+
+ /* queue port is embedded in the queueId */
+ queuePort = queueIndex + MessageQ_PORTOFFSET;
+ queueId = ((MessageQ_QueueId)(procId) << 16) | queuePort;
+
+ return (queueId);
+}
+
+/*
+ * ======== MessageQ_close ========
+ * Closes previously opened instance of MessageQ
+ */
+Int MessageQ_close(MessageQ_QueueId *queueId)
{
Int32 status = MessageQ_S_SUCCESS;
/* Nothing more to be done for closing the MessageQ. */
*queueId = MessageQ_INVALIDMESSAGEQ;
- return (status);
+ return status;
}
/*
- * Place a message onto a message queue.
- *
- * Calls TransportShm_put(), which handles the sending of the message using the
- * appropriate kernel interface (socket, device ioctl) call for the remote
- * procId encoded in the queueId argument.
+ * ======== MessageQ_put ========
+ * Deliver the given message, either locally or to the transport
*
+ * If the destination is a local queue, deliver the message. Otherwise,
+ * pass the message to a transport for delivery. The transport handles
+ * the sending of the message using the appropriate interface (socket,
+ * device ioctl, etc.).
*/
-Int MessageQ_put (MessageQ_QueueId queueId, MessageQ_Msg msg)
+Int MessageQ_put(MessageQ_QueueId queueId, MessageQ_Msg msg)
{
- Int status;
- UInt16 dstProcId = (UInt16)(queueId >> 16);
- UInt16 queueIndex = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
+ Int status = MessageQ_S_SUCCESS;
+ MessageQ_Object *obj;
+ UInt16 dstProcId;
+ UInt16 queueIndex;
+ UInt16 queuePort;
+ ITransport_Handle baseTrans;
+ IMessageQTransport_Handle msgTrans;
+ INetworkTransport_Handle netTrans;
+ Int priority;
+ UInt tid;
+ UInt16 clusterId;
+ Bool delivered;
+
+ /* extract destination address from the given queueId */
+ dstProcId = (UInt16)(queueId >> 16);
+ queuePort = (MessageQ_QueueIndex)(queueId & 0x0000ffff);
+
+ /* write the destination address into the message header */
+ msg->dstId = queuePort;
+ msg->dstProc= dstProcId;
+
+ /* invoke the hook function after addressing the message */
+ if (MessageQ_module->putHookFxn != NULL) {
+ MessageQ_module->putHookFxn(queueId, msg);
+ }
- msg->dstId = queueIndex;
- msg->dstProc = dstProcId;
+ /* For an outbound message: If message destination is on this
+ * processor, then check if the destination queue is in this
+ * process (thread-to-thread messaging).
+ *
+ * For an inbound message: Check if destination queue is in this
+ * process (process-to-process messaging).
+ */
+ if (dstProcId == MultiProc_self()) {
+ queueIndex = queuePort - MessageQ_PORTOFFSET;
+
+ if (queueIndex < MessageQ_module->numQueues) {
+ obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
+
+ if (obj != NULL) {
+ /* deliver message to queue */
+ pthread_mutex_lock(&obj->msgListGate);
+ CIRCLEQ_INSERT_TAIL(&obj->msgList,
+ (MessageQ_CIRCLEQ_ENTRY *)msg, elem);
+ pthread_mutex_unlock(&obj->msgListGate);
+ sem_post(&obj->synchronizer);
+ goto done;
+ }
+ }
+ /* If we get here, then we have failed to deliver a local message. */
+ status = MessageQ_E_FAIL;
+ goto done;
+ }
+
+ /* Getting here implies the message is outbound. Must give it to
+ * either the primary or secondary transport for delivery. Start
+ * by extracting the transport ID from the message header.
+ */
+ tid = MessageQ_getTransportId(msg);
+
+ if (tid >= MessageQ_MAXTRANSPORTS) {
+ fprintf(stderr,
+ "MessageQ_put: Error: transport id %d too big, must be < %d\n",
+ tid, MessageQ_MAXTRANSPORTS);
+ status = MessageQ_E_FAIL;
+ goto done;
+ }
+
+ /* if transportId is set, use secondary transport for message delivery */
+ if (tid != 0) {
+ baseTrans = MessageQ_module->transInst[tid];
+
+ if (baseTrans == NULL) {
+ fprintf(stderr, "MessageQ_put: Error: transport is null\n");
+ status = MessageQ_E_FAIL;
+ goto done;
+ }
+
+ /* downcast instance pointer to transport interface */
+ switch (ITransport_itype(baseTrans)) {
+ case INetworkTransport_TypeId:
+ netTrans = INetworkTransport_downCast(baseTrans);
+ delivered = INetworkTransport_put(netTrans, (Ptr)msg);
+ status = (delivered ? MessageQ_S_SUCCESS :
+ (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN :
+ MessageQ_E_FAIL));
+ break;
+
+ default:
+ /* error */
+ fprintf(stderr, "MessageQ_put: Error: transport id %d is an "
+ "unsupported transport type\n", tid);
+ status = MessageQ_E_FAIL;
+ break;
+ }
+ }
+ else {
+ /* use primary transport for delivery */
+ priority = MessageQ_getMsgPri(msg);
+ clusterId = dstProcId - MultiProc_getBaseIdOfCluster();
+
+ /* primary transport can only be used for intra-cluster delivery */
+ if (clusterId > MultiProc_getNumProcsInCluster()) {
+ fprintf(stderr,
+ "MessageQ_put: Error: destination procId=%d is not "
+ "in cluster. Must specify a transportId.\n", dstProcId);
+ status = MessageQ_E_FAIL;
+ goto done;
+ }
- status = transportPut(msg, queueIndex, dstProcId);
+ msgTrans = MessageQ_module->transports[clusterId][priority];
+ if (msgTrans) {
+ delivered = IMessageQTransport_put(msgTrans, (Ptr)msg);
+ }
+ else {
+ delivered = MessageQ_E_FAIL;
+ }
+ status = (delivered ? MessageQ_S_SUCCESS :
+ (errno == ESHUTDOWN ? MessageQ_E_SHUTDOWN : MessageQ_E_FAIL));
+ }
+done:
return (status);
}
/*
- * Gets a message for a message queue and blocks if the queue is empty.
- * If a message is present, it returns it. Otherwise it blocks
- * waiting for a message to arrive.
- * When a message is returned, it is owned by the caller.
- *
- * We block using select() on the receiving socket's file descriptor, then
- * get the waiting message via the socket API recvfrom().
- * We use the socket stored in the messageQ object via a previous call to
- * MessageQ_create().
+ * MessageQ_get - gets a message for a message queue and blocks if
+ * the queue is empty.
*
+ * If a message is present, it returns it. Otherwise it blocks
+ * waiting for a message to arrive.
+ * When a message is returned, it is owned by the caller.
*/
-Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
+Int MessageQ_get(MessageQ_Handle handle, MessageQ_Msg *msg, UInt timeout)
{
+ MessageQ_Object * obj = (MessageQ_Object *)handle;
Int status = MessageQ_S_SUCCESS;
- Int tmpStatus;
- MessageQ_Object * obj = (MessageQ_Object *) handle;
- int retval;
- int nfds;
- fd_set rfds;
- struct timeval tv;
- void *timevalPtr;
- UInt16 rprocId;
- int maxfd = 0;
-
- /* Wait (with timeout) and retreive message from socket: */
- FD_ZERO(&rfds);
- for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
- if (rprocId == MultiProc_self()) {
- continue;
- }
- maxfd = MAX(maxfd, obj->fd[rprocId]);
- FD_SET(obj->fd[rprocId], &rfds);
- }
+ struct timespec ts;
+ struct timeval tv;
- /* Wait also on the event fd, which may be written by MessageQ_unblock(): */
- FD_SET(obj->unblockFd, &rfds);
+#if 0
+/*
+ * Optimization here to get a message without going in to the sem
+ * operation, but the sem count will not be maintained properly.
+ */
+ pthread_mutex_lock(&obj->msgListGate);
+
+ if (obj->msgList.cqh_first != &obj->msgList) {
+ *msg = (MessageQ_Msg)obj->msglist.cqh_first;
+ CIRCLEQ_REMOVE(&obj->msgList, *msg, reserved0);
+
+ pthread_mutex_unlock(&obj->msgListGate);
+ }
+ else {
+ pthread_mutex_unlock(&obj->msgListGate);
+ }
+#endif
if (timeout == MessageQ_FOREVER) {
- timevalPtr = NULL;
+ sem_wait(&obj->synchronizer);
}
else {
- /* Timeout given in msec: convert: */
- tv.tv_sec = timeout / 1000;
- tv.tv_usec = (timeout % 1000) * 1000;
- timevalPtr = &tv;
- }
- /* Add one to last fd created: */
- nfds = MAX(maxfd, obj->unblockFd) + 1;
-
- retval = select(nfds, &rfds, NULL, NULL, timevalPtr);
- if (retval) {
- if (FD_ISSET(obj->unblockFd, &rfds)) {
- /*
- * Our event was signalled by MessageQ_unblock().
- *
- * This is typically done during a shutdown sequence, where
- * the intention of the client would be to ignore (i.e. not fetch)
- * any pending messages in the transport's queue.
- * Thus, we shall not check for nor return any messages.
- */
- *msg = NULL;
- status = MessageQ_E_UNBLOCKED;
+ /* add timeout (microseconds) to current time of day */
+ gettimeofday(&tv, NULL);
+ tv.tv_sec += timeout / 1000000;
+ tv.tv_usec += timeout % 1000000;
+
+ if (tv.tv_usec >= 1000000) {
+ tv.tv_sec++;
+ tv.tv_usec -= 1000000;
}
- else {
- for (rprocId = 0; rprocId < MultiProc_getNumProcessors();
- rprocId++) {
- if (rprocId == MultiProc_self()) {
- continue;
- }
- if (FD_ISSET(obj->fd[rprocId], &rfds)) {
- /* Our transport's fd was signalled: Get the message: */
- tmpStatus = transportGet(obj->fd[rprocId], msg);
- if (tmpStatus < 0) {
- printf ("MessageQ_get: tranposrtshm_get failed.");
- status = MessageQ_E_FAIL;
- }
- }
+
+ /* set absolute timeout value */
+ ts.tv_sec = tv.tv_sec;
+ ts.tv_nsec = tv.tv_usec * 1000; /* convert to nanoseconds */
+
+ if (sem_timedwait(&obj->synchronizer, &ts) < 0) {
+ if (errno == ETIMEDOUT) {
+ PRINTVERBOSE0("MessageQ_get: operation timed out\n")
+ return (MessageQ_E_TIMEOUT);
+ }
+ else {
+ PRINTVERBOSE0("MessageQ_get: sem_timedwait error\n")
+ return (MessageQ_E_FAIL);
}
}
}
- else if (retval == 0) {
- *msg = NULL;
- status = MessageQ_E_TIMEOUT;
+
+ if (obj->unblocked) {
+ return obj->unblocked;
}
- return (status);
+ pthread_mutex_lock(&obj->msgListGate);
+
+ *msg = (MessageQ_Msg)obj->msgList.cqh_first;
+ CIRCLEQ_REMOVE(&obj->msgList, (MessageQ_CIRCLEQ_ENTRY *)*msg, elem);
+
+ pthread_mutex_unlock(&obj->msgListGate);
+
+ return status;
}
/*
*
* TBD: Implement as a socket ioctl, using getsockopt(). Return -1 for now.
*/
-Int MessageQ_count (MessageQ_Handle handle)
+Int MessageQ_count(MessageQ_Handle handle)
{
Int count = -1;
#if 0
&count, &optlen);
#endif
- return (count);
+ return count;
}
-/* Initializes a message not obtained from MessageQ_alloc. */
-Void MessageQ_staticMsgInit (MessageQ_Msg msg, UInt32 size)
+/*
+ * Initializes a message not obtained from MessageQ_alloc.
+ */
+Void MessageQ_staticMsgInit(MessageQ_Msg msg, UInt32 size)
{
/* Fill in the fields of the message */
- MessageQ_msgInit (msg);
- msg->heapId = MessageQ_STATICMSG;
+ MessageQ_msgInit(msg);
+ msg->heapId = MessageQ_STATICMSG;
msg->msgSize = size;
}
/*
- * Allocate a message and initialize the needed fields (note some
- * of the fields in the header are set via other APIs or in the
- * MessageQ_put function,
+ * Allocate a message and initialize the needed fields (note some
+ * of the fields in the header are set via other APIs or in the
+ * MessageQ_put function,
*/
-MessageQ_Msg MessageQ_alloc (UInt16 heapId, UInt32 size)
+MessageQ_Msg MessageQ_alloc(UInt16 heapId, UInt32 size)
{
- MessageQ_Msg msg = NULL;
+ IHeap_Handle heap;
+ MessageQ_Msg msg;
- /*
- * heapId not used for local alloc (as this is over a copy transport), but
- * we need to send to other side as heapId is used in BIOS transport:
- */
- msg = (MessageQ_Msg)calloc (1, size);
- MessageQ_msgInit (msg);
+ if (heapId > (MessageQ_module->numHeaps - 1)) {
+ PRINTVERBOSE1("MessageQ_alloc: Error: heapId (%d) too large", heapId);
+ return (NULL);
+ }
+ else if (MessageQ_module->heaps[heapId] == NULL) {
+ PRINTVERBOSE1("MessageQ_alloc: Error: heapId (%d) not registered",
+ heapId);
+ return (NULL);
+ }
+ else {
+ heap = (IHeap_Handle)MessageQ_module->heaps[heapId];
+ }
+
+ msg = IHeap_alloc(heap, size);
+
+ if (msg == NULL) {
+ return (NULL);
+ }
+
+ MessageQ_msgInit(msg);
msg->msgSize = size;
- msg->heapId = heapId;
+ msg->heapId = heapId;
- return msg;
+ return (msg);
}
-/* Frees the message back to the heap that was used to allocate it. */
-Int MessageQ_free (MessageQ_Msg msg)
+/*
+ * Frees the message back to the heap that was used to allocate it.
+ */
+Int MessageQ_free(MessageQ_Msg msg)
{
- UInt32 status = MessageQ_S_SUCCESS;
+ UInt32 status = MessageQ_S_SUCCESS;
+ IHeap_Handle heap;
- /* Check to ensure this was not allocated by user: */
- if (msg->heapId == MessageQ_STATICMSG) {
- status = MessageQ_E_CANNOTFREESTATICMSG;
+ /* ensure this was not allocated by user */
+ if (msg->heapId == MessageQ_STATICMSG) {
+ status = MessageQ_E_CANNOTFREESTATICMSG;
+ }
+ else if (msg->heapId > (MessageQ_module->numHeaps - 1)) {
+ status = MessageQ_E_INVALIDARG;
+ }
+ else if (MessageQ_module->heaps[msg->heapId] == NULL) {
+ status = MessageQ_E_NOTFOUND;
}
else {
- free (msg);
+ heap = (IHeap_Handle)MessageQ_module->heaps[msg->heapId];
+ IHeap_free(heap, (void *)msg);
}
- return status;
+ return (status);
}
-/* Register a heap with MessageQ. */
-Int MessageQ_registerHeap (Ptr heap, UInt16 heapId)
+/*
+ * ======== MessageQ_registerHeap ========
+ */
+Int MessageQ_registerHeap(Ptr heap, UInt16 heapId)
{
- Int status = MessageQ_S_SUCCESS;
+ Int status = MessageQ_S_SUCCESS;
- /* Do nothing, as this uses a copy transport: */
+ pthread_mutex_lock(&MessageQ_module->gate);
- return status;
+ if (heapId > (MessageQ_module->numHeaps - 1)) {
+ status = MessageQ_E_INVALIDARG;
+ }
+ else if (MessageQ_module->heaps[heapId] != NULL) {
+ status = MessageQ_E_ALREADYEXISTS;
+ }
+ else {
+ MessageQ_module->heaps[heapId] = heap;
+ }
+
+ pthread_mutex_unlock(&MessageQ_module->gate);
+
+ return (status);
}
-/* Unregister a heap with MessageQ. */
-Int MessageQ_unregisterHeap (UInt16 heapId)
+/*
+ * ======== MessageQ_unregisterHeap ========
+ */
+Int MessageQ_unregisterHeap(UInt16 heapId)
{
- Int status = MessageQ_S_SUCCESS;
+ Int status = MessageQ_S_SUCCESS;
- /* Do nothing, as this uses a copy transport: */
+ pthread_mutex_lock(&MessageQ_module->gate);
- return status;
+ if (heapId > (MessageQ_module->numHeaps - 1)) {
+ status = MessageQ_E_INVALIDARG;
+ }
+ else if (MessageQ_module->heaps[heapId] == NULL) {
+ status = MessageQ_E_NOTFOUND;
+ }
+ else {
+ MessageQ_module->heaps[heapId] = NULL;
+ }
+
+ pthread_mutex_unlock(&MessageQ_module->gate);
+
+ return (status);
}
/* Unblocks a MessageQ */
-Void MessageQ_unblock (MessageQ_Handle handle)
+Void MessageQ_unblock(MessageQ_Handle handle)
{
- MessageQ_Object * obj = (MessageQ_Object *) handle;
- uint64_t buf = 1;
- int numBytes;
+ MessageQ_Object *obj = (MessageQ_Object *)handle;
- /* Write 8 bytes to awaken any threads blocked on this messageQ: */
- numBytes = write(obj->unblockFd, &buf, sizeof(buf));
+ obj->unblocked = MessageQ_E_UNBLOCKED;
+ sem_post(&obj->synchronizer);
}
-/* Embeds a source message queue into a message. */
-Void MessageQ_setReplyQueue (MessageQ_Handle handle, MessageQ_Msg msg)
+/* Unblocks a MessageQ that's been shutdown due to transport failure */
+Void MessageQ_shutdown(MessageQ_Handle handle)
{
- MessageQ_Object * obj = (MessageQ_Object *) handle;
+ MessageQ_Object *obj = (MessageQ_Object *)handle;
+
+ obj->unblocked = MessageQ_E_SHUTDOWN;
+ sem_post(&obj->synchronizer);
+}
- msg->replyId = (UInt16)(obj->queue);
+/* Embeds a source message queue into a message */
+Void MessageQ_setReplyQueue(MessageQ_Handle handle, MessageQ_Msg msg)
+{
+ MessageQ_Object *obj = (MessageQ_Object *)handle;
+
+ msg->replyId = (UInt16)(obj->queue);
msg->replyProc = (UInt16)(obj->queue >> 16);
}
/* Returns the QueueId associated with the handle. */
-MessageQ_QueueId MessageQ_getQueueId (MessageQ_Handle handle)
+MessageQ_QueueId MessageQ_getQueueId(MessageQ_Handle handle)
{
- MessageQ_Object * obj = (MessageQ_Object *) handle;
- UInt32 queueId;
+ MessageQ_Object *obj = (MessageQ_Object *) handle;
+ UInt32 queueId;
queueId = (obj->queue);
return queueId;
}
+/* Returns the local handle associated with queueId. */
+MessageQ_Handle MessageQ_getLocalHandle(MessageQ_QueueId queueId)
+{
+ MessageQ_Object *obj;
+ MessageQ_QueueIndex queueIndex;
+ UInt16 procId;
+
+ procId = MessageQ_getProcId(queueId);
+ if (procId != MultiProc_self()) {
+ return NULL;
+ }
+
+ queueIndex = MessageQ_getQueueIndex(queueId);
+ obj = (MessageQ_Object *)MessageQ_module->queues[queueIndex];
+
+ return (MessageQ_Handle)obj;
+}
+
/* Sets the tracing of a message */
-Void MessageQ_setMsgTrace (MessageQ_Msg msg, Bool traceFlag)
+Void MessageQ_setMsgTrace(MessageQ_Msg msg, Bool traceFlag)
{
msg->flags = (msg->flags & ~TRACEMASK) | (traceFlag << TRACESHIFT);
}
* The MessageQ module itself does not use any shared memory but the
* underlying transport may use some shared memory.
*/
-SizeT MessageQ_sharedMemReq (Ptr sharedAddr)
+SizeT MessageQ_sharedMemReq(Ptr sharedAddr)
{
SizeT memReq = 0u;
/* Do nothing, as this is a copy transport. */
- return (memReq);
-}
-
-/*
- * Create a socket for this remote proc, and attempt to connect.
- *
- * Only creates a socket if one does not already exist for this procId.
- *
- * Note: remoteProcId may be MultiProc_Self() for loopback case.
- */
-Int MessageQ_attach (UInt16 remoteProcId, Ptr sharedAddr)
-{
- Int status = MessageQ_S_SUCCESS;
- int sock;
-
- PRINTVERBOSE1("MessageQ_attach: remoteProcId: %d\n", remoteProcId)
-
- if (remoteProcId >= MultiProc_MAXPROCESSORS) {
- status = MessageQ_E_INVALIDPROCID;
- goto exit;
- }
-
- pthread_mutex_lock (&(MessageQ_module->gate));
-
- /* Only create a socket if one doesn't exist: */
- if (MessageQ_module->sock[remoteProcId] == Transport_INVALIDSOCKET) {
- /* Create the socket for sending messages to the remote proc: */
- sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
- if (sock < 0) {
- status = MessageQ_E_FAIL;
- printf ("MessageQ_attach: socket failed: %d, %s\n",
- errno, strerror(errno));
- }
- else {
- PRINTVERBOSE1("MessageQ_attach: created send socket: %d\n", sock)
- MessageQ_module->sock[remoteProcId] = sock;
- /* Attempt to connect: */
- ConnectSocket(sock, remoteProcId, MESSAGEQ_RPMSG_PORT);
- }
- }
- else {
- status = MessageQ_E_ALREADYEXISTS;
- }
-
- pthread_mutex_unlock (&(MessageQ_module->gate));
-
-exit:
- return (status);
-}
-
-/*
- * Close the socket for this remote proc.
- *
- */
-Int MessageQ_detach (UInt16 remoteProcId)
-{
- Int status = MessageQ_S_SUCCESS;
- int sock;
-
- if (remoteProcId >= MultiProc_MAXPROCESSORS) {
- status = MessageQ_E_INVALIDPROCID;
- goto exit;
- }
-
- pthread_mutex_lock (&(MessageQ_module->gate));
-
- sock = MessageQ_module->sock[remoteProcId];
- if (close (sock)) {
- status = MessageQ_E_OSFAILURE;
- printf ("MessageQ_detach: close failed: %d, %s\n",
- errno, strerror(errno));
- }
- else {
- PRINTVERBOSE1("MessageQ_detach: closed socket: %d\n", sock)
- MessageQ_module->sock[remoteProcId] = Transport_INVALIDSOCKET;
- }
-
- pthread_mutex_unlock (&(MessageQ_module->gate));
-
-exit:
- return (status);
+ return memReq;
}
/*
* This is a helper function to initialize a message.
*/
-Void MessageQ_msgInit (MessageQ_Msg msg)
+Void MessageQ_msgInit(MessageQ_Msg msg)
{
#if 0
Int status = MessageQ_S_SUCCESS;
handle = LAD_findHandle();
if (handle == LAD_MAXNUMCLIENTS) {
PRINTVERBOSE1(
- "MessageQ_setup: can't find connection to daemon for pid %d\n",
+ "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
getpid())
return;
"MessageQ_msgInit: got LAD response for client %d, status=%d\n",
handle, status)
- memcpy(msg, &rsp.msgInit.msg, sizeof(*msg));
+ memcpy(msg, &rsp.msgInit.msg, sizeof (*msg));
#else
msg->reserved0 = 0; /* We set this to distinguish from NameServerMsg */
msg->replyId = (UInt16)MessageQ_INVALIDMESSAGEQ;
msg->flags = MessageQ_HEADERVERSION | MessageQ_NORMALPRI;
msg->srcProc = MultiProc_self();
- pthread_mutex_lock(&(MessageQ_module->gate));
+ pthread_mutex_lock(&MessageQ_module->seqNumGate);
msg->seqNum = MessageQ_module->seqNum++;
- pthread_mutex_unlock(&(MessageQ_module->gate));
+ pthread_mutex_unlock(&MessageQ_module->seqNumGate);
#endif
}
/*
- * =============================================================================
- * Transport: Fxns kept here until need for a transport layer is realized.
- * =============================================================================
- */
-/*
- * ======== transportCreateEndpoint ========
+ * ======== _MessageQ_grow ========
+ * Increase module's queues array to accommodate queueIndex from LAD
*
- * Create a communication endpoint to receive messages.
+ * Note: this function takes the queue index value (i.e. without the
+ * port offset).
*/
-static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex)
+Void _MessageQ_grow(UInt16 queueIndex)
{
- Int status = MessageQ_S_SUCCESS;
- int err;
-
- /* Create the socket to receive messages for this messageQ. */
- *fd = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
- if (*fd < 0) {
- status = MessageQ_E_FAIL;
- printf ("transportCreateEndpoint: socket call failed: %d, %s\n",
- errno, strerror(errno));
- goto exit;
- }
+ MessageQ_Handle *queues;
+ MessageQ_Handle *oldQueues;
+ UInt oldSize;
- PRINTVERBOSE1("transportCreateEndpoint: created socket: fd: %d\n", *fd)
+ pthread_mutex_lock(&MessageQ_module->gate);
- err = SocketBindAddr(*fd, rprocId, (UInt32)queueIndex);
- if (err < 0) {
- status = MessageQ_E_FAIL;
- printf ("transportCreateEndpoint: bind failed: %d, %s\n",
- errno, strerror(errno));
- }
+ oldSize = MessageQ_module->numQueues * sizeof (MessageQ_Handle);
-exit:
- return (status);
-}
+ queues = calloc(queueIndex + MessageQ_GROWSIZE, sizeof(MessageQ_Handle));
+ memcpy(queues, MessageQ_module->queues, oldSize);
-/*
- * ======== transportCloseEndpoint ========
- *
- * Close the communication endpoint.
- */
-static Int transportCloseEndpoint(int fd)
-{
- Int status = MessageQ_S_SUCCESS;
+ oldQueues = MessageQ_module->queues;
+ MessageQ_module->queues = queues;
+ MessageQ_module->numQueues = queueIndex + MessageQ_GROWSIZE;
- PRINTVERBOSE1("transportCloseEndpoint: closing socket: %d\n", fd)
+ pthread_mutex_unlock(&MessageQ_module->gate);
- /* Stop communication to this socket: */
- close(fd);
+ free(oldQueues);
- return (status);
+ return;
}
/*
- * ======== transportGet ========
- * Retrieve a message waiting in the socket's queue.
-*/
-static Int transportGet(int sock, MessageQ_Msg * retMsg)
+ * ======== MessageQ_bind ========
+ * Bind all existing message queues to the given processor
+ *
+ * Note: This function is a hack to work around the driver.
+ *
+ * The Linux rpmsgproto driver requires a socket for each
+ * message queue and remote processor tuple.
+ *
+ * socket --> (queue, processor)
+ *
+ * Therefore, each time a new remote processor is started, all
+ * existing message queues need to create a socket for the new
+ * processor.
+ *
+ * The driver should not have this requirement. One socket per
+ * message queue should be sufficient to uniquely identify the
+ * endpoint to the driver.
+ */
+Void MessageQ_bind(UInt16 procId)
{
- Int status = MessageQ_S_SUCCESS;
- MessageQ_Msg msg;
- struct sockaddr_rpmsg fromAddr; // [Socket address of sender]
- unsigned int len;
- int byteCount;
+ int q;
+ int clusterId;
+ int priority;
+ MessageQ_Handle handle;
+ MessageQ_QueueId queue;
+ IMessageQTransport_Handle transport;
- /*
- * We have no way of peeking to see what message size we'll get, so we
- * allocate a message of max size to receive contents from the rpmsg socket
- * (currently, a copy transport)
- */
- msg = MessageQ_alloc (0, MESSAGEQ_RPMSG_MAXSIZE);
- if (!msg) {
- status = MessageQ_E_MEMORY;
- goto exit;
- }
+ clusterId = procId - MultiProc_getBaseIdOfCluster();
+ pthread_mutex_lock(&MessageQ_module->gate);
- memset(&fromAddr, 0, sizeof(fromAddr));
- len = sizeof(fromAddr);
-
- byteCount = recvfrom(sock, msg, MESSAGEQ_RPMSG_MAXSIZE, 0,
- (struct sockaddr *)&fromAddr, &len);
- if (len != sizeof(fromAddr)) {
- printf("recvfrom: got bad addr len (%d)\n", len);
- status = MessageQ_E_FAIL;
- goto exit;
- }
- if (byteCount < 0) {
- printf("recvfrom failed: %s (%d)\n", strerror(errno), errno);
- status = MessageQ_E_FAIL;
- goto exit;
- }
- else {
- /* Update the allocated message size (even though this may waste space
- * when the actual message is smaller than the maximum rpmsg size,
- * the message will be freed soon anyway, and it avoids an extra copy).
- */
- msg->msgSize = byteCount;
+ for (q = 0; q < MessageQ_module->numQueues; q++) {
- /*
- * If the message received was statically allocated, reset the
- * heapId, so the app can free it.
- */
- if (msg->heapId == MessageQ_STATICMSG) {
- msg->heapId = 0; /* for a copy transport, heap id is 0. */
- }
- }
+ if ((handle = MessageQ_module->queues[q]) == NULL) {
+ continue;
+ }
- PRINTVERBOSE1("transportGet: recvfrom socket: fd: %d\n", sock)
- PRINTVERBOSE3("\tReceived a msg: byteCount: %d, rpmsg addr: %d, rpmsg proc: %d\n", byteCount, fromAddr.addr, fromAddr.vproc_id)
- PRINTVERBOSE2("\tMessage Id: %d, Message size: %d\n", msg->msgId, msg->msgSize)
+ queue = ((MessageQ_Object *)handle)->queue;
- *retMsg = msg;
+ for (priority = 0; priority < 2; priority++) {
+ transport = MessageQ_module->transports[clusterId][priority];
+ if (transport != NULL) {
+ IMessageQTransport_bind((Void *)transport, queue);
+ }
+ }
+ }
-exit:
- return (status);
+ pthread_mutex_unlock(&MessageQ_module->gate);
}
/*
- * ======== transportPut ========
+ * ======== MessageQ_unbind ========
+ * Unbind all existing message queues from the given processor
*
- * Calls the socket API sendto() on the socket associated with
- * with this destination procID.
- * Currently, both local and remote messages are sent via the Socket ABI, so
- * no local object lists are maintained here.
-*/
-static Int transportPut(MessageQ_Msg msg, UInt16 dstId, UInt16 dstProcId)
+ * Hack: see MessageQ_bind.
+ */
+Void MessageQ_unbind(UInt16 procId)
{
- Int status = MessageQ_S_SUCCESS;
- int sock;
- int err;
+ int q;
+ int clusterId;
+ int priority;
+ MessageQ_Handle handle;
+ MessageQ_QueueId queue;
+ IMessageQTransport_Handle transport;
- /*
- * Retrieve the socket for the AF_SYSLINK protocol associated with this
- * transport.
- */
- sock = MessageQ_module->sock[dstProcId];
+ pthread_mutex_lock(&MessageQ_module->gate);
- PRINTVERBOSE2("Sending msgId: %d via sock: %d\n", msg->msgId, sock)
+ for (q = 0; q < MessageQ_module->numQueues; q++) {
- err = send(sock, msg, msg->msgSize, 0);
- if (err < 0) {
- printf ("transportPut: send failed: %d, %s\n",
- errno, strerror(errno));
- status = MessageQ_E_FAIL;
- }
+ if ((handle = MessageQ_module->queues[q]) == NULL) {
+ continue;
+ }
- /*
- * Free the message, as this is a copy transport, we maintain MessageQ
- * semantics.
- */
- MessageQ_free (msg);
+ queue = ((MessageQ_Object *)handle)->queue;
+ clusterId = procId - MultiProc_getBaseIdOfCluster();
- return (status);
+ for (priority = 0; priority < 2; priority++) {
+ transport = MessageQ_module->transports[clusterId][priority];
+ if (transport != NULL) {
+ IMessageQTransport_unbind((Void *)transport, queue);
+ }
+ }
+ }
+
+ pthread_mutex_unlock(&MessageQ_module->gate);
}