index 827af575bd152447f5627250bb0eab09a98167a9..c2930ad44c1b351fe4163ef298f93518142f524a 100644 (file)
--- a/linux/src/api/MessageQ.c
+++ b/linux/src/api/MessageQ.c
*/
Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
{
*/
Int MessageQ_get (MessageQ_Handle handle, MessageQ_Msg * msg ,UInt timeout)
{
+ static int last = 0;
Int status = MessageQ_S_SUCCESS;
Int tmpStatus;
MessageQ_Object * obj = (MessageQ_Object *) handle;
Int status = MessageQ_S_SUCCESS;
Int tmpStatus;
MessageQ_Object * obj = (MessageQ_Object *) handle;
void *timevalPtr;
UInt16 rprocId;
int maxfd = 0;
void *timevalPtr;
UInt16 rprocId;
int maxfd = 0;
+ int selfId;
+ int nProcessors;
/* Wait (with timeout) and retreive message from socket: */
FD_ZERO(&rfds);
/* Wait (with timeout) and retreive message from socket: */
FD_ZERO(&rfds);
status = MessageQ_E_UNBLOCKED;
}
else {
status = MessageQ_E_UNBLOCKED;
}
else {
- for (rprocId = 0; rprocId < MultiProc_getNumProcessors();
- rprocId++) {
- if (rprocId == MultiProc_self() ||
- obj->fd[rprocId] == Transport_INVALIDSOCKET) {
- continue;
+ /* start where we last left off */
+ rprocId = last;
+
+ selfId = MultiProc_self();
+ nProcessors = MultiProc_getNumProcessors();
+
+ do {
+ if (rprocId != selfId &&
+ obj->fd[rprocId] != Transport_INVALIDSOCKET) {
+
+ if (FD_ISSET(obj->fd[rprocId], &rfds)) {
+ /* Our transport's fd was signalled: Get the message */
+ tmpStatus = transportGet(obj->fd[rprocId], msg);
+ if (tmpStatus < 0) {
+ printf ("MessageQ_get: tranposrtshm_get failed.");
+ status = MessageQ_E_FAIL;
+ }
+
+ last = (rprocId + 1) % nProcessors;
+ break;
+ }
}
}
- if (FD_ISSET(obj->fd[rprocId], &rfds)) {
- /* Our transport's fd was signalled: Get the message: */
- tmpStatus = transportGet(obj->fd[rprocId], msg);
- if (tmpStatus < 0) {
- printf ("MessageQ_get: tranposrtshm_get failed.");
- status = MessageQ_E_FAIL;
- }
- }
- }
+ rprocId = (rprocId + 1) % nProcessors;
+ } while (rprocId != last);
}
}
else if (retval == 0) {
}
}
else if (retval == 0) {