index 9d8b0005dc5a9a38ccd45eefbab8323747fac0c2..c1a58f08e31184fb634ea3868bfc7a92e945d564 100644 (file)
/*
- * Copyright (c) 2012-2015 Texas Instruments Incorporated - http://www.ti.com
+ * Copyright (c) 2012-2016 Texas Instruments Incorporated - http://www.ti.com
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
/* Internal stuff: */
#include <_NameServer.h>
-#include <_NameServerRemoteRpmsg.h>
+#include <ti/ipc/namesrv/_NameServerRemoteRpmsg.h>
/* Socket utils: */
#include <SocketFxns.h>
#include <_lad.h>
+#if !defined(EFD_SEMAPHORE)
+# define EFD_SEMAPHORE (1 << 0)
+#endif
+
#define MESSAGEQ_RPMSG_PORT 61
#define NAME_SERVER_RPMSG_ADDR 0
#define INVALIDSOCKET (-1)
+#define NameServer_Event_ACK (1 << 0)
+#define NameServer_Event_REFRESH (1 << 1)
+#define NameServer_Event_SHUTDOWN (1 << 2)
+
#if defined (__cplusplus)
extern "C" {
#endif
/* structure for NameServer module state */
typedef struct NameServer_ModuleObject {
CIRCLEQ_HEAD(dummy1, NameServer_Object) objList;
- Int32 refCount;
- int sendSock[MultiProc_MAXPROCESSORS];
- /* Sockets for sending to remote proc nameserver ports: */
- int recvSock[MultiProc_MAXPROCESSORS];
- /* Sockets for recving from remote proc nameserver ports: */
- pthread_t listener;
+ Int32 refCount;
+ struct {
+ Int refCount; /* attached reference count */
+ int sendSock; /* socket for sending */
+ int recvSock; /* socket for receiving */
+ } comm[MultiProc_MAXPROCESSORS];
+ pthread_t listener;
/* Listener thread for NameServer replies and requests. */
- int unblockFd;
- /* Event to post to exit listener. */
- int waitFd;
+ int unblockFd;
+ /* Event to wake up listener thread. */
+ int waitFd;
/* Event to post to NameServer_get. */
- NameServerMsg nsMsg;
+ NameServerRemote_Msg nsMsg;
/* NameServer Message cache. */
- NameServer_Params defInstParams;
+ NameServer_Params defInstParams;
/* Default instance paramters */
- pthread_mutex_t modGate;
+ pthread_mutex_t modGate;
+ pthread_mutex_t attachGate;
} NameServer_ModuleObject;
-#define CIRCLEQ_destruct(head) { \
- (head)->cqh_first = NULL; \
- (head)->cqh_last = NULL; \
-}
-
#define CIRCLEQ_elemClear(elem) { \
(elem)->cqe_next = (elem)->cqe_prev = (Void *)(elem); \
}
.defInstParams.checkExisting = TRUE,
.defInstParams.maxValueLen = 0u,
.defInstParams.maxNameLen = 16u,
-#if defined(IPC_BUILDOS_ANDROID)
+#if defined(IPC_BUILDOS_ANDROID) && (PLATFORM_SDK_VERSION < 23)
.modGate = PTHREAD_RECURSIVE_MUTEX_INITIALIZER,
#else
// only _NP (non-portable) type available in CG tools which we're using
.modGate = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP,
#endif
+ .attachGate = PTHREAD_MUTEX_INITIALIZER,
.refCount = 0
};
return (hash);
}
-static void NameServerRemote_processMessage(NameServerMsg * msg, UInt16 procId)
+static Int NameServer_reattach(UInt16 procId)
+{
+ Int status = NameServer_S_SUCCESS;
+ UInt16 clId;
+ int sendSock = INVALIDSOCKET;
+ int recvSock = INVALIDSOCKET;
+ int err;
+
+ /* procId already validated in API layer */
+ clId = procId - MultiProc_getBaseIdOfCluster();
+
+ if (NameServer_module->comm[clId].refCount == 0) {
+ goto done;
+ }
+
+ LOG2("NameServer_reattach: --> procId=%d, refCount=%d\n",
+ procId, NameServer_module->comm[clId].refCount)
+
+ /* first create new sockets */
+ sendSock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
+ if (sendSock < 0) {
+ status = NameServer_E_FAIL;
+ LOG2("NameServer_reattach: socket failed: %d, %s\n", errno,
+ strerror(errno));
+ goto done;
+ }
+ LOG2("NameServer_reattach: created send socket: %d, procId %d\n", sendSock,
+ procId);
+
+ err = ConnectSocket(sendSock, procId, MESSAGEQ_RPMSG_PORT);
+ if (err < 0) {
+ status = NameServer_E_FAIL;
+ LOG3("NameServer_reattach: connect failed: procId=%d, errno=%d (%s)\n",
+ procId, errno, strerror(errno));
+ goto done;
+ }
+
+ /* create socket for receiving messages from remote processor */
+ recvSock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
+ if (recvSock < 0) {
+ status = NameServer_E_FAIL;
+ LOG2("NameServer_reattach: socket failed: %d, %s\n", errno,
+ strerror(errno));
+ goto done;
+ }
+
+ LOG2("NameServer_attach: created receive socket: %d, procId %d\n", recvSock,
+ procId);
+
+ err = SocketBindAddr(recvSock, procId, NAME_SERVER_RPMSG_ADDR);
+ if (err < 0) {
+ status = NameServer_E_FAIL;
+ LOG2("NameServer_attach: bind failed: %d, %s\n", errno,
+ strerror(errno));
+ goto done;
+ }
+
+ /* then close old sockets */
+ /* close the sending socket */
+ LOG1("NameServer_reattach: closing socket: %d\n",
+ NameServer_module->comm[clId].sendSock)
+ close(NameServer_module->comm[clId].sendSock);
+
+ /* close the receiving socket */
+ LOG1("NameServer_reattach: closing socket: %d\n",
+ NameServer_module->comm[clId].recvSock)
+ close(NameServer_module->comm[clId].recvSock);
+
+ /* assign new sockets */
+ NameServer_module->comm[clId].sendSock = sendSock;
+ NameServer_module->comm[clId].recvSock = recvSock;
+
+done:
+ if (status < 0) {
+ if (recvSock >= 0) {
+ LOG1(" closing receive socket: %d\n", recvSock)
+ close(recvSock);
+ }
+
+ if (sendSock >= 0) {
+ LOG1(" closing send socket: %d\n", sendSock)
+ close(sendSock);
+ }
+ }
+
+ LOG2("NameServer_reattach: <-- refCount=%d, status=%d\n",
+ NameServer_module->comm[clId].refCount, status)
+
+ return (status);
+}
+
+static void NameServerRemote_processMessage(NameServerRemote_Msg *msg,
+ UInt16 procId)
{
NameServer_Handle handle;
Int status = NameServer_E_FAIL;
int err;
uint64_t buf = 1;
- int waitFd = NameServer_module->waitFd;
UInt16 clusterId;
if (msg->request == NAMESERVER_REQUEST) {
@@ -271,16 +368,17 @@ static void NameServerRemote_processMessage(NameServerMsg * msg, UInt16 procId)
}
}
- LOG2("NameServer Response: instanceName: %s, name: %s,",
- (String)msg->instanceName, (String)msg->name)
/* set the request status */
if (status < 0) {
- LOG1(" Value not found, status: %d\n", status)
+ LOG3("NameServer Response: instance: %s, name: %s, value not "
+ "found, status: %d\n", (String)msg->instanceName,
+ (String)msg->name, status);
msg->requestStatus = 0;
}
else {
+ LOG3("NameServer Response: instance: %s, name: %s, value: 0x%x\n",
+ (String)msg->instanceName, (String)msg->name, msg->value);
msg->requestStatus = 1;
- LOG1(" Value: 0x%x\n", msg->value)
}
/* specify message as a response */
@@ -289,26 +387,27 @@ static void NameServerRemote_processMessage(NameServerMsg * msg, UInt16 procId)
/* send response message to remote processor */
clusterId = procId - MultiProc_getBaseIdOfCluster();
- err = send(NameServer_module->sendSock[clusterId], msg,
- sizeof(NameServerMsg), 0);
+ err = send(NameServer_module->comm[clusterId].sendSock, msg,
+ sizeof(NameServerRemote_Msg), 0);
if (err < 0) {
LOG2("NameServer: send failed: %d, %s\n", errno, strerror(errno))
}
}
else {
- LOG2("NameServer Reply: instanceName: %s, name: %s",
- (String)msg->instanceName, (String)msg->name)
- LOG1(", value: 0x%x\n", msg->value)
+ LOG3("NameServer Reply: instanceName: %s, name: %s, value: 0x%x\n",
+ (String)msg->instanceName, (String)msg->name, msg->value);
/* Save the response message. */
- memcpy(&NameServer_module->nsMsg, msg, sizeof(NameServerMsg));
+ memcpy(&NameServer_module->nsMsg, msg, sizeof(NameServerRemote_Msg));
/* Post the eventfd upon which NameServer_get() is waiting */
- write(waitFd, &buf, sizeof(uint64_t));
+ write(NameServer_module->waitFd, &buf, sizeof(uint64_t));
}
}
-
+/*
+ * ======== listener_cb ========
+ */
static void *listener_cb(void *arg)
{
fd_set rfds;
UInt16 procId;
struct sockaddr_rpmsg fromAddr;
unsigned int len;
- NameServerMsg msg;
- int byteCount;
- UInt16 numProcs = MultiProc_getNumProcsInCluster();
- UInt16 baseId = MultiProc_getBaseIdOfCluster();
- int sock;
+ NameServerRemote_Msg msg;
+ int nbytes;
+ UInt16 numProcs = MultiProc_getNumProcsInCluster();
+ UInt16 baseId = MultiProc_getBaseIdOfCluster();
+ int sock;
+ uint64_t event;
+ Bool run = TRUE;
+ Bool reconnect = FALSE;
LOG0("listener_cb: Entered Listener thread.\n")
for (i = 0, procId = baseId; i < numProcs; i++, procId++) {
if ((MultiProc_self() == procId)
- || (NameServer_module->recvSock[i] == INVALIDSOCKET)) {
+ || (NameServer_module->comm[i].recvSock == INVALIDSOCKET)) {
continue;
}
- sock = NameServer_module->recvSock[i];
+ sock = NameServer_module->comm[i].recvSock;
FD_SET(sock, &rfds);
- maxfd = MAX(sock, maxfd);
+ maxfd = sock > maxfd ? sock : maxfd;
}
- maxfd = maxfd + 1;
LOG2("NameServer: waiting for unblockFd: %d, and socks: maxfd: %d\n",
NameServer_module->unblockFd, maxfd)
- ret = select(maxfd, &rfds, NULL, NULL, NULL);
+
+ /* wait here until new data available */
+ ret = select(maxfd + 1, &rfds, NULL, NULL, NULL);
+
if (ret == -1) {
LOG0("listener_cb: select failed.")
break;
}
LOG0("NameServer: back from select()\n")
+ /* check all receive sockets for pending data */
for (i = 0, procId = baseId; i < numProcs; i++, procId++) {
if ((MultiProc_self() == procId)
- || (NameServer_module->recvSock[i] == INVALIDSOCKET)) {
+ || (NameServer_module->comm[i].recvSock == INVALIDSOCKET)) {
continue;
}
- sock = NameServer_module->recvSock[i];
+ sock = NameServer_module->comm[i].recvSock;
+
if (FD_ISSET(sock, &rfds)) {
LOG1("NameServer: Listener got NameServer message "
"from sock: %d!\n", sock);
memset(&fromAddr, 0, sizeof(fromAddr));
len = sizeof(fromAddr);
- byteCount = recvfrom(sock, &msg, sizeof(NameServerMsg), 0,
+ nbytes = recvfrom(sock, &msg, sizeof(NameServerRemote_Msg), 0,
(struct sockaddr *)&fromAddr, &len);
if (len != sizeof(fromAddr)) {
LOG1("recvfrom: got bad addr len (%d)\n", len)
break;
}
- if (byteCount < 0) {
+ if (nbytes < 0) {
LOG2("recvfrom failed: %s (%d)\n", strerror(errno), errno)
+ if (errno == ENOLINK) {
+ LOG0("Socket is no longer valid, MUST re-attach!\n");
+ reconnect = TRUE;
+ }
break;
}
else {
LOG1("listener_cb: recvfrom socket: fd: %d\n", sock)
- LOG2("\tReceived ns msg: byteCount: %d, from addr: %d, ",
- byteCount, fromAddr.addr)
- LOG1("from vproc: %d\n", fromAddr.vproc_id)
+ LOG3("\tReceived ns msg: nbytes: %d, from addr: %d, "
+ "from vproc: %d\n", nbytes, fromAddr.addr,
+ fromAddr.vproc_id);
NameServerRemote_processMessage(&msg, procId);
}
}
}
+
+ /* check for events */
if (FD_ISSET(NameServer_module->unblockFd, &rfds)) {
- /* We are told to unblock and exit: */
- LOG0("NameServer: Listener thread exiting\n")
- break;
+
+ read(NameServer_module->unblockFd, &event, sizeof(event));
+
+ if (event & NameServer_Event_SHUTDOWN) {
+ LOG0("NameServer: listener thread, event: SHUTDOWN\n")
+ event &= ~NameServer_Event_SHUTDOWN;
+ run = FALSE;
+ }
+ if (event & NameServer_Event_REFRESH) {
+ LOG0("NameServer: listener thread, event: REFRESH\n")
+ /* send ACK event */
+ event = NameServer_Event_ACK;
+ write(NameServer_module->waitFd, &event, sizeof(event));
+ }
+ }
+
+ if (reconnect) {
+ reconnect = FALSE;
+ /* grab lock to prevent users from attach/deattach while recovering */
+ pthread_mutex_lock(&NameServer_module->attachGate);
+ NameServer_reattach(procId);
+ pthread_mutex_unlock(&NameServer_module->attachGate);
}
- } while (1);
+
+ } while (run);
return ((void *)ret);
}
* =============================================================================
*/
-/* Function to setup the nameserver module. */
+/*
+ * ======== NameServer_setup ========
+ * Function to setup the name server module
+ */
Int NameServer_setup(Void)
{
Int status = NameServer_S_SUCCESS;
- int err;
- int sock;
int ret;
- int clId;
- UInt16 procId;
- UInt16 numProcs;
- UInt16 baseId;
+ Int i;
pthread_mutex_lock(&NameServer_module->modGate);
goto exit;
}
+ /* counter event object for passing commands to worker thread */
NameServer_module->unblockFd = eventfd(0, 0);
+
if (NameServer_module->unblockFd < 0) {
status = NameServer_E_FAIL;
LOG0("NameServer_setup: failed to create unblockFd.\n")
goto exit;
}
- NameServer_module->waitFd = eventfd(0, 0);
+ /* semaphore event object for acknowledging LAD command thread */
+ NameServer_module->waitFd = eventfd(0, EFD_SEMAPHORE);
+
if (NameServer_module->waitFd < 0) {
status = NameServer_E_FAIL;
LOG0("NameServer_setup: failed to create waitFd.\n")
goto exit;
}
- numProcs = MultiProc_getNumProcsInCluster();
- baseId = MultiProc_getBaseIdOfCluster();
-
- for (clId = 0, procId = baseId; clId < numProcs; clId++, procId++) {
- NameServer_module->sendSock[clId] = INVALIDSOCKET;
- NameServer_module->recvSock[clId] = INVALIDSOCKET;
-
- /* Only support NameServer to remote procs: */
- if (MultiProc_self() == procId) {
- continue;
- }
-
- /* Create the socket for sending messages to each remote proc: */
- sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
- if (sock < 0) {
- status = NameServer_E_FAIL;
- LOG2("NameServer_setup: socket failed: %d, %s\n",
- errno, strerror(errno))
- }
- else {
- LOG1("NameServer_setup: created send socket: %d\n", sock)
- err = ConnectSocket(sock, procId, MESSAGEQ_RPMSG_PORT);
- if (err < 0) {
- status = NameServer_E_FAIL;
- LOG3("NameServer_setup: connect failed: procId=%d, "
- "errno=%d (%s)\n", procId, errno, strerror(errno))
-
- LOG1(" closing send socket: %d\n", sock)
- close(sock);
- }
- else {
- NameServer_module->sendSock[clId] = sock;
- }
- }
-
- /* Create the socket for recving messages from each remote proc: */
- sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
- if (sock < 0) {
- status = NameServer_E_FAIL;
- LOG2("NameServer_setup: socket failed: %d, %s\n",
- errno, strerror(errno))
- }
- else {
- LOG1("NameServer_setup: created recv socket: %d\n", sock)
-
- err = SocketBindAddr(sock, procId, NAME_SERVER_RPMSG_ADDR);
- if (err < 0) {
- status = NameServer_E_FAIL;
- LOG2("NameServer_setup: bind failed: %d, %s\n",
- errno, strerror(errno))
-
- LOG1(" closing recv socket: %d\n", sock)
- close(sock);
- }
- else {
- NameServer_module->recvSock[clId] = sock;
- }
- }
+ for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
+ NameServer_module->comm[i].refCount = 0;
+ NameServer_module->comm[i].sendSock = INVALIDSOCKET;
+ NameServer_module->comm[i].recvSock = INVALIDSOCKET;
}
/* Construct the list object */
ret = pthread_create(&NameServer_module->listener, NULL, listener_cb, NULL);
if (ret) {
LOG1("NameServer_setup: can't spawn thread: %s\n", strerror(ret))
- LOG0("NameServer_setup: eventfd failed");
-
status = NameServer_E_FAIL;
}
- else {
- /* look for at least one good send/recv pair to indicate success */
- for (clId = 0; clId < numProcs; clId++) {
- if (NameServer_module->sendSock[clId] != INVALIDSOCKET &&
- NameServer_module->recvSock[clId] != INVALIDSOCKET) {
- status = NameServer_S_SUCCESS;
- break;
- }
- }
- }
exit:
LOG1("NameServer_setup: exiting, refCount=%d\n",
return (status);
}
-/*! Function to destroy the nameserver module. */
+/*
+ * ======== NameServer_destroy ========
+ * Function to destroy the name server module
+ */
Int NameServer_destroy(void)
{
- Int status = NameServer_S_SUCCESS;
- UInt16 numProcs;
- UInt16 baseId;
- UInt16 procId;
- int clId;
- int sock;
- uint64_t buf = 1;
+ Int status = NameServer_S_SUCCESS;
+ uint64_t event;
pthread_mutex_lock(&NameServer_module->modGate);
goto exit;
}
- numProcs = MultiProc_getNumProcsInCluster();
- baseId = MultiProc_getBaseIdOfCluster();
-
- LOG2("NameServer_destroy: numProcs=%d, baseId=%d\n", numProcs, baseId);
-
- for (clId = 0, procId = baseId; clId < numProcs; clId++, procId++) {
-
- /* Only support NameServer to remote procs: */
- if (MultiProc_self() == procId) {
- continue;
- }
-
- /* Close the socket: */
- sock = NameServer_module->sendSock[clId];
- if (sock != INVALIDSOCKET) {
- LOG1("NameServer_destroy: closing socket: %d\n", sock)
- close(sock);
- NameServer_module->sendSock[clId] = INVALIDSOCKET;
- }
- /* Close the socket: */
- sock = NameServer_module->recvSock[clId];
- if (sock != INVALIDSOCKET) {
- LOG1("NameServer_destroy: closing socket: %d\n", sock)
- close(sock);
- NameServer_module->recvSock[clId] = INVALIDSOCKET;
- }
- }
-
- CIRCLEQ_destruct(&NameServer_module->objList);
+ /* TODO: delete any remaining instances */
- /* Unblock the NameServer listener thread: */
- LOG0("NameServer_destroy: unblocking listener...\n")
- write(NameServer_module->unblockFd, &buf, sizeof(uint64_t));
+ /* shutdown the NameServer listener thread */
+ LOG0("NameServer_destroy: shutdown listener...\n")
+ event = NameServer_Event_SHUTDOWN;
+ write(NameServer_module->unblockFd, &event, sizeof(event));
/* Join: */
LOG0("NameServer_destroy: joining listener thread...\n")
return (handle);
}
-
-/* Function to delete a name server. */
-Int NameServer_delete(NameServer_Handle * handle)
+/*
+ * ======== NameServer_delete ========
+ * Delete a name server instance
+ */
+Int NameServer_delete(NameServer_Handle *handle)
{
Int status = NameServer_S_SUCCESS;
+ struct NameServer_Object *obj;
assert(handle != NULL);
assert(*handle != NULL);
- assert((*handle)->count == 0);
assert(NameServer_module->refCount != 0);
+ obj = *(struct NameServer_Object **)handle;
+
pthread_mutex_lock(&NameServer_module->modGate);
- (*handle)->refCount--;
- if ((*handle)->refCount != 0) {
+ obj->refCount--;
+ if (obj->refCount != 0) {
goto leave;
}
- if ((*handle)->count == 0) {
- CIRCLEQ_REMOVE(&NameServer_module->objList, *handle, elem);
+ /* delete each entry on the name list */
+ while (obj->nameList.cqh_first != (void *)&obj->nameList) {
+ NameServer_removeEntry(*handle, (Ptr)(obj->nameList.cqh_first));
+ }
- if ((*handle)->name != NULL) {
- free((*handle)->name);
- (*handle)->name = NULL;
- }
+ /* free the instance name */
+ if (obj->name != NULL) {
+ free(obj->name);
+ obj->name = NULL;
+ }
- CIRCLEQ_destruct(&(*handle)->nameList);
+ /* destroy the mutex */
+ pthread_mutex_destroy(&obj->gate);
- free((*handle));
- (*handle) = NULL;
- }
+ /* remove from objList */
+ CIRCLEQ_REMOVE(&NameServer_module->objList, obj, elem);
+
+ /* finally, free the instance object */
+ free(obj);
+
+ /* set the caller's handle to null */
+ (*handle) = NULL;
leave:
pthread_mutex_unlock(&NameServer_module->modGate);
/* Calculate the hash */
hash = stringHash(name);
+ pthread_mutex_lock(&handle->gate);
+
+ if (strlen(name) > handle->params.maxNameLen - 1) {
+ status = NameServer_E_INVALIDARG;
+ LOG0("NameServer_add: name length exceeded maximum!\n")
+ new_node = NULL;
+ goto exit;
+ }
+
if (len > handle->params.maxValueLen) {
status = NameServer_E_INVALIDARG;
LOG0("NameServer_add: value length exceeded maximum!\n")
goto exit;
}
- pthread_mutex_lock(&handle->gate);
-
/* Traverse the list to find duplicate check */
CIRCLEQ_traverse(node, &handle->nameList, NameServer_TableEntry_tag) {
/* Hash matches */
{
Int status = NameServer_S_SUCCESS;
struct NameServer_Object *obj = (struct NameServer_Object *)(handle);
- NameServerMsg nsMsg;
- NameServerMsg *replyMsg;
+ NameServerRemote_Msg nsMsg;
+ NameServerRemote_Msg *replyMsg;
fd_set rfds;
int ret = 0, sock, maxfd, waitFd;
struct timeval tv;
Bool done = FALSE;
UInt16 clusterId;
+ if (strlen(name) >= MAXNAMEINCHAR) {
+ LOG0("Name is too long in remote query\n");
+ return NameServer_E_NAMETOOLONG;
+ }
+
+ if (strlen(obj->name) >= MAXNAMEINCHAR) {
+ LOG0("Instance name is too long for remote query\n");
+ return NameServer_E_NAMETOOLONG;
+ }
+
/* Set Timeout to wait: */
tv.tv_sec = 0;
tv.tv_usec = NAMESERVER_GET_TIMEOUT;
/* Create request message and send to remote: */
clusterId = procId - MultiProc_getBaseIdOfCluster();
- sock = NameServer_module->sendSock[clusterId];
+ sock = NameServer_module->comm[clusterId].sendSock;
if (sock == INVALIDSOCKET) {
LOG1("NameServer_getRemote: no socket connection to processor %d\n",
procId);
strncpy((char *)nsMsg.instanceName, obj->name, strlen(obj->name) + 1);
strncpy((char *)nsMsg.name, name, strlen(name) + 1);
- LOG2("NameServer_getRemote: Requesting from procId %d, %s:",
- procId, (String)nsMsg.instanceName)
- LOG1("%s...\n", (String)nsMsg.name)
+ LOG3("NameServer_getRemote: requesting from procId %d, %s: %s\n",
+ procId, (String)nsMsg.instanceName, (String)nsMsg.name);
- err = send(sock, &nsMsg, sizeof(NameServerMsg), 0);
+ err = send(sock, &nsMsg, sizeof(NameServerRemote_Msg), 0);
if (err < 0) {
LOG2("NameServer_getRemote: send failed: %d, %s\n",
errno, strerror(errno))
FD_SET(waitFd, &rfds);
maxfd = waitFd + 1;
LOG1("NameServer_getRemote: pending on waitFd: %d\n", waitFd)
+
ret = select(maxfd, &rfds, NULL, NULL, &tv);
+
if (ret == -1) {
LOG0("NameServer_getRemote: select failed.")
status = NameServer_E_FAIL;
/* set the contents of value */
if (*len <= sizeof (Bits32)) {
*(UInt32 *)value = (UInt32)replyMsg->value;
- LOG2("NameServer_getRemote: Reply from: %d, %s:",
- procId, (String)replyMsg->instanceName)
- LOG2("%s, value: 0x%x...\n",
- (String)replyMsg->name, *(UInt32 *)value)
+ LOG4("NameServer_getRemote: Reply from: %d, %s: "
+ "name: %s, value: 0x%x\n", procId,
+ (String)replyMsg->instanceName,
+ (String)replyMsg->name, *(UInt32 *)value);
}
else {
memcpy(value, replyMsg->valueBuf, *len);
status = NameServer_getRemote(handle, name, value, len,
baseId + clusterId);
- if ((status >= 0) ||
- ((status < 0) && (status != NameServer_E_NOTFOUND) &&
- (status != NameServer_E_RESOURCE))) {
+ if (status >= 0) {
break;
}
}
procId[i]);
}
- if ((status >= 0) ||
- ((status < 0) && (status != NameServer_E_NOTFOUND) &&
- (status != NameServer_E_RESOURCE))) {
+ if (status >= 0) {
break;
}
@@ -1303,6 +1369,164 @@ Int NameServer_getLocalUInt32(NameServer_Handle handle, String name, Ptr value)
return (status);
}
+/*
+ * ======== NameServer_attach ========
+ */
+Int NameServer_attach(UInt16 procId)
+{
+ Int status = NameServer_S_SUCCESS;
+ int sock;
+ int err;
+ UInt16 clId;
+ uint64_t event;
+
+ /* procId already validated in API layer */
+ clId = procId - MultiProc_getBaseIdOfCluster();
+
+ pthread_mutex_lock(&NameServer_module->attachGate);
+
+ LOG2("NameServer_attach: --> procId=%d, refCount=%d\n",
+ procId, NameServer_module->comm[clId].refCount)
+
+ /* must reference count because we have multiple clients */
+ if (NameServer_module->comm[clId].refCount > 0) {
+ NameServer_module->comm[clId].refCount++;
+ goto done;
+ }
+
+ /* create socket for sending messages to remote processor */
+ sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
+ if (sock < 0) {
+ status = NameServer_E_FAIL;
+ LOG2("NameServer_attach: socket failed: %d, %s\n", errno,
+ strerror(errno));
+ goto done;
+ }
+ NameServer_module->comm[clId].sendSock = sock;
+ LOG2("NameServer_attach: created send socket: %d, procId %d\n", sock,
+ procId);
+
+ err = ConnectSocket(sock, procId, MESSAGEQ_RPMSG_PORT);
+ if (err < 0) {
+ status = NameServer_E_FAIL;
+ LOG3("NameServer_attach: connect failed: procId=%d, errno=%d (%s)\n",
+ procId, errno, strerror(errno));
+ goto done;
+ }
+
+ /* create socket for receiving messages from remote processor */
+ sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
+ if (sock < 0) {
+ status = NameServer_E_FAIL;
+ LOG2("NameServer_attach: socket failed: %d, %s\n", errno,
+ strerror(errno));
+ goto done;
+ }
+ NameServer_module->comm[clId].recvSock = sock;
+ LOG2("NameServer_attach: created receive socket: %d, procId %d\n", sock,
+ procId);
+
+ err = SocketBindAddr(sock, procId, NAME_SERVER_RPMSG_ADDR);
+ if (err < 0) {
+ status = NameServer_E_FAIL;
+ LOG2("NameServer_attach: bind failed: %d, %s\n", errno,
+ strerror(errno));
+ goto done;
+ }
+
+ /* getting here means we have successfully attached */
+ NameServer_module->comm[clId].refCount++;
+
+ pthread_mutex_unlock(&NameServer_module->attachGate);
+
+ /* tell the listener thread to add new receive sockets */
+ event = NameServer_Event_REFRESH;
+ write(NameServer_module->unblockFd, &event, sizeof(event));
+
+ /* wait for ACK event */
+ read(NameServer_module->waitFd, &event, sizeof(event));
+
+ pthread_mutex_lock(&NameServer_module->attachGate);
+
+done:
+ if (status < 0) {
+ sock = NameServer_module->comm[clId].recvSock;
+ if (sock != INVALIDSOCKET) {
+ LOG1(" closing receive socket: %d\n", sock)
+ close(sock);
+ NameServer_module->comm[clId].recvSock = INVALIDSOCKET;
+ }
+
+ sock = NameServer_module->comm[clId].sendSock;
+ if (sock != INVALIDSOCKET) {
+ LOG1(" closing send socket: %d\n", sock)
+ close(sock);
+ NameServer_module->comm[clId].sendSock = INVALIDSOCKET;
+ }
+ }
+
+ LOG2("NameServer_attach: <-- refCount=%d, status=%d\n",
+ NameServer_module->comm[clId].refCount, status)
+
+ pthread_mutex_unlock(&NameServer_module->attachGate);
+
+ return (status);
+}
+
+/*
+ * ======== NameServer_detach ========
+ */
+Int NameServer_detach(UInt16 procId)
+{
+ Int status = NameServer_S_SUCCESS;
+ UInt16 clId;
+ int sendSock;
+ int recvSock;
+ uint64_t event;
+
+ /* procId already validated in API layer */
+ clId = procId - MultiProc_getBaseIdOfCluster();
+
+ pthread_mutex_lock(&NameServer_module->attachGate);
+
+ LOG2("NameServer_detach: --> procId=%d, refCount=%d\n",
+ procId, NameServer_module->comm[clId].refCount)
+
+ /* decrement reference count regardless of outcome below */
+ if (--NameServer_module->comm[clId].refCount > 0) {
+ pthread_mutex_unlock(&NameServer_module->attachGate);
+ goto done;
+ }
+
+ /* remove sockets from active list */
+ sendSock = NameServer_module->comm[clId].sendSock;
+ NameServer_module->comm[clId].sendSock = INVALIDSOCKET;
+
+ recvSock = NameServer_module->comm[clId].recvSock;
+ NameServer_module->comm[clId].recvSock = INVALIDSOCKET;
+
+ pthread_mutex_unlock(&NameServer_module->attachGate);
+
+ /* tell the listener thread to remove old sockets */
+ event = NameServer_Event_REFRESH;
+ write(NameServer_module->unblockFd, &event, sizeof(event));
+
+ /* wait for ACK event */
+ read(NameServer_module->waitFd, &event, sizeof(event));
+
+ /* close the sending socket */
+ LOG1("NameServer_detach: closing socket: %d\n", sendSock)
+ close(sendSock);
+
+ /* close the receiving socket */
+ LOG1("NameServer_detach: closing socket: %d\n", recvSock)
+ close(recvSock);
+
+done:
+ LOG2("NameServer_detach: <-- refCount=%d, status=%d\n",
+ NameServer_module->comm[clId].refCount, status)
+ return (status);
+}
#if defined (__cplusplus)
}