X-Git-Url: https://git.ti.com/gitweb?p=ipc%2Fipcdev.git;a=blobdiff_plain;f=linux%2Fsrc%2Fapi%2FMessageQ.c;h=c2930ad44c1b351fe4163ef298f93518142f524a;hp=5921fe78325111b341ea28621cdc463a342bd9c5;hb=7610394a7901b753ea4f2ba938ce96a42964420b;hpb=29fe97980aeba0e5914edbd126f47b569a20691a diff --git a/linux/src/api/MessageQ.c b/linux/src/api/MessageQ.c index 5921fe7..c2930ad 100644 --- a/linux/src/api/MessageQ.c +++ b/linux/src/api/MessageQ.c @@ -221,7 +221,7 @@ Void MessageQ_getConfig (MessageQ_Config * cfg) } /* Function to setup the MessageQ module. */ -Int MessageQ_setup (const MessageQ_Config * cfg) +Int MessageQ_setup(const MessageQ_Config * cfg) { Int status; LAD_ClientHandle handle; @@ -266,10 +266,9 @@ Int MessageQ_setup (const MessageQ_Config * cfg) /* Clear sockets array. */ for (i = 0; i < MultiProc_MAXPROCESSORS; i++) { - MessageQ_module->sock[i] = Transport_INVALIDSOCKET; + MessageQ_module->sock[i] = Transport_INVALIDSOCKET; } - return status; } @@ -333,7 +332,7 @@ Void MessageQ_Params_init (MessageQ_Params * params) */ MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params) { - Int status = MessageQ_S_SUCCESS; + Int status; MessageQ_Object * obj = NULL; UInt16 queueIndex = 0u; UInt16 procId; @@ -413,7 +412,7 @@ MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params) status = transportCreateEndpoint(&obj->fd[rprocId], rprocId, queueIndex); if (status < 0) { - goto cleanup; + obj->fd[rprocId] = Transport_INVALIDSOCKET; } } @@ -425,13 +424,20 @@ MessageQ_Handle MessageQ_create (String name, const MessageQ_Params * params) if (obj->unblockFd == -1) { printf ("MessageQ_create: eventfd creation failed: %d, %s\n", errno, strerror(errno)); - status = MessageQ_E_FAIL; + MessageQ_delete((MessageQ_Handle *)&obj); } + else { + int endpointFound = 0; -cleanup: - /* Cleanup if fail: */ - if (status < 0) { - MessageQ_delete((MessageQ_Handle *)&obj); + for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) { + if (obj->fd[rprocId] != Transport_INVALIDSOCKET) { + endpointFound = 1; + } + } + if (!endpointFound) { + printf("MessageQ_create: no transport endpoints found, deleting\n"); + MessageQ_delete((MessageQ_Handle *)&obj); + } } return ((MessageQ_Handle) obj); @@ -585,6 +591,7 @@ Int MessageQ_put (MessageQ_QueueId queueId, MessageQ_Msg msg) */ Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout) { + static int last = 0; Int status = MessageQ_S_SUCCESS; Int tmpStatus; MessageQ_Object * obj = (MessageQ_Object *) handle; @@ -595,11 +602,14 @@ Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout) void *timevalPtr; UInt16 rprocId; int maxfd = 0; + int selfId; + int nProcessors; /* Wait (with timeout) and retreive message from socket: */ FD_ZERO(&rfds); for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); rprocId++) { - if (rprocId == MultiProc_self()) { + if (rprocId == MultiProc_self() || + obj->fd[rprocId] == Transport_INVALIDSOCKET) { continue; } maxfd = MAX(maxfd, obj->fd[rprocId]); @@ -636,20 +646,30 @@ Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout) status = MessageQ_E_UNBLOCKED; } else { - for (rprocId = 0; rprocId < MultiProc_getNumProcessors(); - rprocId++) { - if (rprocId == MultiProc_self()) { - continue; - } - if (FD_ISSET(obj->fd[rprocId], &rfds)) { - /* Our transport's fd was signalled: Get the message: */ - tmpStatus = transportGet(obj->fd[rprocId], msg); - if (tmpStatus < 0) { - printf ("MessageQ_get: tranposrtshm_get failed."); - status = MessageQ_E_FAIL; - } + /* start where we last left off */ + rprocId = last; + + selfId = MultiProc_self(); + nProcessors = MultiProc_getNumProcessors(); + + do { + if (rprocId != selfId && + obj->fd[rprocId] != Transport_INVALIDSOCKET) { + + if (FD_ISSET(obj->fd[rprocId], &rfds)) { + /* Our transport's fd was signalled: Get the message */ + tmpStatus = transportGet(obj->fd[rprocId], msg); + if (tmpStatus < 0) { + printf ("MessageQ_get: tranposrtshm_get failed."); + status = MessageQ_E_FAIL; + } + + last = (rprocId + 1) % nProcessors; + break; + } } - } + rprocId = (rprocId + 1) % nProcessors; + } while (rprocId != last); } } else if (retval == 0) { @@ -840,7 +860,13 @@ Int MessageQ_attach (UInt16 remoteProcId, Ptr sharedAddr) PRINTVERBOSE1("MessageQ_attach: created send socket: %d\n", sock) MessageQ_module->sock[remoteProcId] = sock; /* Attempt to connect: */ - ConnectSocket(sock, remoteProcId, MESSAGEQ_RPMSG_PORT); + status = ConnectSocket(sock, remoteProcId, MESSAGEQ_RPMSG_PORT); + if (status < 0) { + status = MessageQ_E_RESOURCE; + /* don't hard-printf since this is no longer fatal */ + PRINTVERBOSE1("MessageQ_attach: ConnectSocket(remoteProcId:%d) failed\n", + remoteProcId); + } } } else { @@ -849,6 +875,10 @@ Int MessageQ_attach (UInt16 remoteProcId, Ptr sharedAddr) pthread_mutex_unlock (&(MessageQ_module->gate)); + if (status == MessageQ_E_RESOURCE) { + MessageQ_detach(remoteProcId); + } + exit: return (status); } @@ -870,14 +900,16 @@ Int MessageQ_detach (UInt16 remoteProcId) pthread_mutex_lock (&(MessageQ_module->gate)); sock = MessageQ_module->sock[remoteProcId]; - if (close (sock)) { - status = MessageQ_E_OSFAILURE; - printf ("MessageQ_detach: close failed: %d, %s\n", - errno, strerror(errno)); - } - else { - PRINTVERBOSE1("MessageQ_detach: closed socket: %d\n", sock) - MessageQ_module->sock[remoteProcId] = Transport_INVALIDSOCKET; + if (sock != Transport_INVALIDSOCKET) { + if (close(sock)) { + status = MessageQ_E_OSFAILURE; + printf("MessageQ_detach: close failed: %d, %s\n", + errno, strerror(errno)); + } + else { + PRINTVERBOSE1("MessageQ_detach: closed socket: %d\n", sock) + MessageQ_module->sock[remoteProcId] = Transport_INVALIDSOCKET; + } } pthread_mutex_unlock (&(MessageQ_module->gate)); @@ -900,7 +932,7 @@ Void MessageQ_msgInit (MessageQ_Msg msg) handle = LAD_findHandle(); if (handle == LAD_MAXNUMCLIENTS) { PRINTVERBOSE1( - "MessageQ_setup: can't find connection to daemon for pid %d\n", + "MessageQ_msgInit: can't find connection to daemon for pid %d\n", getpid()) return; @@ -969,8 +1001,9 @@ static Int transportCreateEndpoint(int * fd, UInt16 rprocId, UInt16 queueIndex) err = SocketBindAddr(*fd, rprocId, (UInt32)queueIndex); if (err < 0) { status = MessageQ_E_FAIL; - printf ("transportCreateEndpoint: bind failed: %d, %s\n", - errno, strerror(errno)); + /* don't hard-printf since this is no longer fatal */ + PRINTVERBOSE2("transportCreateEndpoint: bind failed: %d, %s\n", + errno, strerror(errno)); } exit: