index 5921fe78325111b341ea28621cdc463a342bd9c5..c2930ad44c1b351fe4163ef298f93518142f524a 100644 (file)
--- a/linux/src/api/MessageQ.c
+++ b/linux/src/api/MessageQ.c
}
/* Function to setup the MessageQ module. */
-Int MessageQ_setup (const MessageQ_Config * cfg)
+Int MessageQ_setup(const MessageQ_Config * cfg)
{
Int status;
LAD_ClientHandle handle;
/* Clear sockets array. */
for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
- MessageQ_module->sock[i] = Transport_INVALIDSOCKET;
+ MessageQ_module->sock[i] = Transport_INVALIDSOCKET;
}
-
return status;
}
*/
MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params)
{
- Int status = MessageQ_S_SUCCESS;
+ Int status;
MessageQ_Object * obj = NULL;
UInt16 queueIndex = 0u;
UInt16 procId;
status = transportCreateEndpoint(&obj->fd[rprocId], rprocId,
queueIndex);
if (status < 0) {
- goto cleanup;
+ obj->fd[rprocId] = Transport_INVALIDSOCKET;
}
}
if (obj->unblockFd == -1) {
printf ("MessageQ_create: eventfd creation failed: %d, %s\n",
errno, strerror(errno));
- status = MessageQ_E_FAIL;
+ MessageQ_delete((MessageQ_Handle *)&obj);
}
+ else {
+ int endpointFound = 0;
-cleanup:
- /* Cleanup if fail: */
- if (status < 0) {
- MessageQ_delete((MessageQ_Handle *)&obj);
+ for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
+ if (obj->fd[rprocId] != Transport_INVALIDSOCKET) {
+ endpointFound = 1;
+ }
+ }
+ if (!endpointFound) {
+ printf("MessageQ_create: no transport endpoints found, deleting\n");
+ MessageQ_delete((MessageQ_Handle *)&obj);
+ }
}
return ((MessageQ_Handle) obj);
*/
Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
{
+ static int last = 0;
Int status = MessageQ_S_SUCCESS;
Int tmpStatus;
MessageQ_Object * obj = (MessageQ_Object *) handle;
void *timevalPtr;
UInt16 rprocId;
int maxfd = 0;
+ int selfId;
+ int nProcessors;
/* Wait (with timeout) and retreive message from socket: */
FD_ZERO(&rfds);
for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) {
- if (rprocId == MultiProc_self()) {
+ if (rprocId == MultiProc_self() ||
+ obj->fd[rprocId] == Transport_INVALIDSOCKET) {
continue;
}
maxfd = MAX(maxfd, obj->fd[rprocId]);
status = MessageQ_E_UNBLOCKED;
}
else {
- for (rprocId = 0; rprocId < MultiProc_getNumProcessors();
- rprocId++) {
- if (rprocId == MultiProc_self()) {
- continue;
- }
- if (FD_ISSET(obj->fd[rprocId], &rfds)) {
- /* Our transport's fd was signalled: Get the message: */
- tmpStatus = transportGet(obj->fd[rprocId], msg);
- if (tmpStatus < 0) {
- printf ("MessageQ_get: tranposrtshm_get failed.");
- status = MessageQ_E_FAIL;
- }
+ /* start where we last left off */
+ rprocId = last;
+
+ selfId = MultiProc_self();
+ nProcessors = MultiProc_getNumProcessors();
+
+ do {
+ if (rprocId != selfId &&
+ obj->fd[rprocId] != Transport_INVALIDSOCKET) {
+
+ if (FD_ISSET(obj->fd[rprocId], &rfds)) {
+ /* Our transport's fd was signalled: Get the message */
+ tmpStatus = transportGet(obj->fd[rprocId], msg);
+ if (tmpStatus < 0) {
+ printf ("MessageQ_get: tranposrtshm_get failed.");
+ status = MessageQ_E_FAIL;
+ }
+
+ last = (rprocId + 1) % nProcessors;
+ break;
+ }
}
- }
+ rprocId = (rprocId + 1) % nProcessors;
+ } while (rprocId != last);
}
}
else if (retval == 0) {
PRINTVERBOSE1("MessageQ_attach: created send socket: %d\n", sock)
MessageQ_module->sock[remoteProcId] = sock;
/* Attempt to connect: */
- ConnectSocket(sock, remoteProcId, MESSAGEQ_RPMSG_PORT);
+ status = ConnectSocket(sock, remoteProcId, MESSAGEQ_RPMSG_PORT);
+ if (status < 0) {
+ status = MessageQ_E_RESOURCE;
+ /* don't hard-printf since this is no longer fatal */
+ PRINTVERBOSE1("MessageQ_attach: ConnectSocket(remoteProcId:%d) failed\n",
+ remoteProcId);
+ }
}
}
else {
pthread_mutex_unlock (&(MessageQ_module->gate));
+ if (status == MessageQ_E_RESOURCE) {
+ MessageQ_detach(remoteProcId);
+ }
+
exit:
return (status);
}
pthread_mutex_lock (&(MessageQ_module->gate));
sock = MessageQ_module->sock[remoteProcId];
- if (close (sock)) {
- status = MessageQ_E_OSFAILURE;
- printf ("MessageQ_detach: close failed: %d, %s\n",
- errno, strerror(errno));
- }
- else {
- PRINTVERBOSE1("MessageQ_detach: closed socket: %d\n", sock)
- MessageQ_module->sock[remoteProcId] = Transport_INVALIDSOCKET;
+ if (sock != Transport_INVALIDSOCKET) {
+ if (close(sock)) {
+ status = MessageQ_E_OSFAILURE;
+ printf("MessageQ_detach: close failed: %d, %s\n",
+ errno, strerror(errno));
+ }
+ else {
+ PRINTVERBOSE1("MessageQ_detach: closed socket: %d\n", sock)
+ MessageQ_module->sock[remoteProcId] = Transport_INVALIDSOCKET;
+ }
}
pthread_mutex_unlock (&(MessageQ_module->gate));
handle = LAD_findHandle();
if (handle == LAD_MAXNUMCLIENTS) {
PRINTVERBOSE1(
- "MessageQ_setup: can't find connection to daemon for pid %d\n",
+ "MessageQ_msgInit: can't find connection to daemon for pid %d\n",
getpid())
return;
@@ -969,8 +1001,9 @@ static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex)
err = SocketBindAddr(*fd, rprocId, (UInt32)queueIndex);
if (err < 0) {
status = MessageQ_E_FAIL;
- printf ("transportCreateEndpoint: bind failed: %d, %s\n",
- errno, strerror(errno));
+ /* don't hard-printf since this is no longer fatal */
+ PRINTVERBOSE2("transportCreateEndpoint: bind failed: %d, %s\n",
+ errno, strerror(errno));
}
exit: