index 74cba7199e6554725f0c095e525e4ed99b49a08e..8c30aa60ba0226682ddb7463a62e9ed5f0e95415 100644 (file)
/* Internal stuff: */
#include <_NameServer.h>
-#include <_NameServerRemoteRpmsg.h>
+#include <ti/ipc/namesrv/_NameServerRemoteRpmsg.h>
/* Socket utils: */
#include <SocketFxns.h>
#define INVALIDSOCKET (-1)
+#define NameServer_Event_ACK (1 << 0)
+#define NameServer_Event_REFRESH (1 << 1)
+#define NameServer_Event_SHUTDOWN (1 << 2)
+
#if defined (__cplusplus)
extern "C" {
#endif
/* structure for NameServer module state */
typedef struct NameServer_ModuleObject {
CIRCLEQ_HEAD(dummy1, NameServer_Object) objList;
- Int32 refCount;
- int sendSock[MultiProc_MAXPROCESSORS];
- /* Sockets for sending to remote proc nameserver ports: */
- int recvSock[MultiProc_MAXPROCESSORS];
- /* Sockets for recving from remote proc nameserver ports: */
- pthread_t listener;
+ Int32 refCount;
+ struct {
+ Int refCount; /* attached reference count */
+ int sendSock; /* socket for sending */
+ int recvSock; /* socket for receiving */
+ } comm[MultiProc_MAXPROCESSORS];
+ pthread_t listener;
/* Listener thread for NameServer replies and requests. */
- int unblockFd;
- /* Event to post to exit listener. */
- int waitFd;
+ int unblockFd;
+ /* Event to wake up listener thread. */
+ int waitFd;
/* Event to post to NameServer_get. */
- NameServerMsg nsMsg;
+ NameServerRemote_Msg nsMsg;
/* NameServer Message cache. */
- NameServer_Params defInstParams;
+ NameServer_Params defInstParams;
/* Default instance paramters */
- pthread_mutex_t modGate;
+ pthread_mutex_t modGate;
} NameServer_ModuleObject;
-#define CIRCLEQ_destruct(head) { \
- (head)->cqh_first = NULL; \
- (head)->cqh_last = NULL; \
-}
-
#define CIRCLEQ_elemClear(elem) { \
(elem)->cqe_next = (elem)->cqe_prev = (Void *)(elem); \
}
return (hash);
}
-static void NameServerRemote_processMessage(NameServerMsg * msg, UInt16 procId)
+static void NameServerRemote_processMessage(NameServerRemote_Msg *msg,
+ UInt16 procId)
{
NameServer_Handle handle;
Int status = NameServer_E_FAIL;
int err;
uint64_t buf = 1;
- int waitFd = NameServer_module->waitFd;
UInt16 clusterId;
if (msg->request == NAMESERVER_REQUEST) {
@@ -289,26 +289,27 @@ static void NameServerRemote_processMessage(NameServerMsg * msg, UInt16 procId)
/* send response message to remote processor */
clusterId = procId - MultiProc_getBaseIdOfCluster();
- err = send(NameServer_module->sendSock[clusterId], msg,
- sizeof(NameServerMsg), 0);
+ err = send(NameServer_module->comm[clusterId].sendSock, msg,
+ sizeof(NameServerRemote_Msg), 0);
if (err < 0) {
LOG2("NameServer: send failed: %d, %s\n", errno, strerror(errno))
}
}
else {
- LOG2("NameServer Reply: instanceName: %s, name: %s",
- (String)msg->instanceName, (String)msg->name)
- LOG1(", value: 0x%x\n", msg->value)
+ LOG3("NameServer Reply: instanceName: %s, name: %s, value: 0x%x\n",
+ (String)msg->instanceName, (String)msg->name, msg->value);
/* Save the response message. */
- memcpy(&NameServer_module->nsMsg, msg, sizeof(NameServerMsg));
+ memcpy(&NameServer_module->nsMsg, msg, sizeof(NameServerRemote_Msg));
/* Post the eventfd upon which NameServer_get() is waiting */
- write(waitFd, &buf, sizeof(uint64_t));
+ write(NameServer_module->waitFd, &buf, sizeof(uint64_t));
}
}
-
+/*
+ * ======== listener_cb ========
+ */
static void *listener_cb(void *arg)
{
fd_set rfds;
UInt16 procId;
struct sockaddr_rpmsg fromAddr;
unsigned int len;
- NameServerMsg msg;
- int byteCount;
- UInt16 numProcs = MultiProc_getNumProcsInCluster();
- UInt16 baseId = MultiProc_getBaseIdOfCluster();
- int sock;
+ NameServerRemote_Msg msg;
+ int nbytes;
+ UInt16 numProcs = MultiProc_getNumProcsInCluster();
+ UInt16 baseId = MultiProc_getBaseIdOfCluster();
+ int sock;
+ uint64_t event;
+ Bool run = TRUE;
LOG0("listener_cb: Entered Listener thread.\n")
for (i = 0, procId = baseId; i < numProcs; i++, procId++) {
if ((MultiProc_self() == procId)
- || (NameServer_module->recvSock[i] == INVALIDSOCKET)) {
+ || (NameServer_module->comm[i].recvSock == INVALIDSOCKET)) {
continue;
}
- sock = NameServer_module->recvSock[i];
+ sock = NameServer_module->comm[i].recvSock;
FD_SET(sock, &rfds);
- maxfd = MAX(sock, maxfd);
+ maxfd = sock > maxfd ? sock : maxfd;
}
- maxfd = maxfd + 1;
LOG2("NameServer: waiting for unblockFd: %d, and socks: maxfd: %d\n",
NameServer_module->unblockFd, maxfd)
- ret = select(maxfd, &rfds, NULL, NULL, NULL);
+
+ /* wait here until new data available */
+ ret = select(maxfd + 1, &rfds, NULL, NULL, NULL);
+
if (ret == -1) {
LOG0("listener_cb: select failed.")
break;
}
LOG0("NameServer: back from select()\n")
+ /* check all receive sockets for pending data */
for (i = 0, procId = baseId; i < numProcs; i++, procId++) {
if ((MultiProc_self() == procId)
- || (NameServer_module->recvSock[i] == INVALIDSOCKET)) {
+ || (NameServer_module->comm[i].recvSock == INVALIDSOCKET)) {
continue;
}
- sock = NameServer_module->recvSock[i];
+ sock = NameServer_module->comm[i].recvSock;
+
if (FD_ISSET(sock, &rfds)) {
LOG1("NameServer: Listener got NameServer message "
"from sock: %d!\n", sock);
memset(&fromAddr, 0, sizeof(fromAddr));
len = sizeof(fromAddr);
- byteCount = recvfrom(sock, &msg, sizeof(NameServerMsg), 0,
+ nbytes = recvfrom(sock, &msg, sizeof(NameServerRemote_Msg), 0,
(struct sockaddr *)&fromAddr, &len);
if (len != sizeof(fromAddr)) {
LOG1("recvfrom: got bad addr len (%d)\n", len)
break;
}
- if (byteCount < 0) {
+ if (nbytes < 0) {
LOG2("recvfrom failed: %s (%d)\n", strerror(errno), errno)
break;
}
else {
LOG1("listener_cb: recvfrom socket: fd: %d\n", sock)
- LOG2("\tReceived ns msg: byteCount: %d, from addr: %d, ",
- byteCount, fromAddr.addr)
+ LOG2("\tReceived ns msg: nbytes: %d, from addr: %d, ",
+ nbytes, fromAddr.addr)
LOG1("from vproc: %d\n", fromAddr.vproc_id)
NameServerRemote_processMessage(&msg, procId);
}
}
}
+
+ /* check for events */
if (FD_ISSET(NameServer_module->unblockFd, &rfds)) {
- /* We are told to unblock and exit: */
- LOG0("NameServer: Listener thread exiting\n")
- break;
+
+ read(NameServer_module->unblockFd, &event, sizeof(event));
+
+ if (event & NameServer_Event_SHUTDOWN) {
+ LOG0("NameServer: listener thread, event: SHUTDOWN\n")
+ event &= ~NameServer_Event_SHUTDOWN;
+ run = FALSE;
+ }
+ if (event & NameServer_Event_REFRESH) {
+ LOG0("NameServer: listener thread, event: REFRESH\n")
+ /* send ACK event */
+ event = NameServer_Event_ACK;
+ write(NameServer_module->waitFd, &event, sizeof(event));
+ }
}
- } while (1);
+
+ } while (run);
return ((void *)ret);
}
* =============================================================================
*/
-/* Function to setup the nameserver module. */
+/*
+ * ======== NameServer_setup ========
+ * Function to setup the name server module
+ */
Int NameServer_setup(Void)
{
Int status = NameServer_S_SUCCESS;
- int err;
- int sock;
int ret;
- int clId;
- UInt16 procId;
- UInt16 numProcs;
- UInt16 baseId;
+ Int i;
pthread_mutex_lock(&NameServer_module->modGate);
goto exit;
}
+ /* counter event object for passing commands to worker thread */
NameServer_module->unblockFd = eventfd(0, 0);
+
if (NameServer_module->unblockFd < 0) {
status = NameServer_E_FAIL;
LOG0("NameServer_setup: failed to create unblockFd.\n")
goto exit;
}
- NameServer_module->waitFd = eventfd(0, 0);
+ /* semaphore event object for acknowledging LAD command thread */
+ NameServer_module->waitFd = eventfd(0, EFD_SEMAPHORE);
+
if (NameServer_module->waitFd < 0) {
status = NameServer_E_FAIL;
LOG0("NameServer_setup: failed to create waitFd.\n")
goto exit;
}
- numProcs = MultiProc_getNumProcsInCluster();
- baseId = MultiProc_getBaseIdOfCluster();
-
- for (clId = 0, procId = baseId; clId < numProcs; clId++, procId++) {
- NameServer_module->sendSock[clId] = INVALIDSOCKET;
- NameServer_module->recvSock[clId] = INVALIDSOCKET;
-
- /* Only support NameServer to remote procs: */
- if (MultiProc_self() == procId) {
- continue;
- }
-
- /* Create the socket for sending messages to each remote proc: */
- sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
- if (sock < 0) {
- status = NameServer_E_FAIL;
- LOG2("NameServer_setup: socket failed: %d, %s\n",
- errno, strerror(errno))
- }
- else {
- LOG2("NameServer_setup: created send socket: %d, procId %d\n",
- sock, procId)
- err = ConnectSocket(sock, procId, MESSAGEQ_RPMSG_PORT);
- if (err < 0) {
- status = NameServer_E_FAIL;
- LOG3("NameServer_setup: connect failed: procId=%d, "
- "errno=%d (%s)\n", procId, errno, strerror(errno))
-
- LOG1(" closing send socket: %d\n", sock)
- close(sock);
- }
- else {
- NameServer_module->sendSock[clId] = sock;
- }
- }
-
- /* Create the socket for recving messages from each remote proc: */
- sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
- if (sock < 0) {
- status = NameServer_E_FAIL;
- LOG2("NameServer_setup: socket failed: %d, %s\n",
- errno, strerror(errno))
- }
- else {
- LOG2("NameServer_setup: created recv socket: %d, procId %d\n",
- sock, procId)
- err = SocketBindAddr(sock, procId, NAME_SERVER_RPMSG_ADDR);
- if (err < 0) {
- status = NameServer_E_FAIL;
- LOG2("NameServer_setup: bind failed: %d, %s\n",
- errno, strerror(errno))
-
- LOG1(" closing recv socket: %d\n", sock)
- close(sock);
- }
- else {
- NameServer_module->recvSock[clId] = sock;
- }
- }
+ for (i = 0; i < MultiProc_MAXPROCESSORS; i++) {
+ NameServer_module->comm[i].refCount = 0;
+ NameServer_module->comm[i].sendSock = INVALIDSOCKET;
+ NameServer_module->comm[i].recvSock = INVALIDSOCKET;
}
/* Construct the list object */
ret = pthread_create(&NameServer_module->listener, NULL, listener_cb, NULL);
if (ret) {
LOG1("NameServer_setup: can't spawn thread: %s\n", strerror(ret))
- LOG0("NameServer_setup: eventfd failed");
-
status = NameServer_E_FAIL;
}
- else {
- /* look for at least one good send/recv pair to indicate success */
- for (clId = 0; clId < numProcs; clId++) {
- if (NameServer_module->sendSock[clId] != INVALIDSOCKET &&
- NameServer_module->recvSock[clId] != INVALIDSOCKET) {
- status = NameServer_S_SUCCESS;
- break;
- }
- }
- }
exit:
LOG1("NameServer_setup: exiting, refCount=%d\n",
return (status);
}
-/*! Function to destroy the nameserver module. */
+/*
+ * ======== NameServer_destroy ========
+ * Function to destroy the name server module
+ */
Int NameServer_destroy(void)
{
- Int status = NameServer_S_SUCCESS;
- UInt16 numProcs;
- UInt16 baseId;
- UInt16 procId;
- int clId;
- int sock;
- uint64_t buf = 1;
+ Int status = NameServer_S_SUCCESS;
+ uint64_t event;
pthread_mutex_lock(&NameServer_module->modGate);
goto exit;
}
- numProcs = MultiProc_getNumProcsInCluster();
- baseId = MultiProc_getBaseIdOfCluster();
-
- LOG2("NameServer_destroy: numProcs=%d, baseId=%d\n", numProcs, baseId);
+ /* TODO: delete any remaining instances */
- for (clId = 0, procId = baseId; clId < numProcs; clId++, procId++) {
-
- /* Only support NameServer to remote procs: */
- if (MultiProc_self() == procId) {
- continue;
- }
-
- /* Close the socket: */
- sock = NameServer_module->sendSock[clId];
- if (sock != INVALIDSOCKET) {
- LOG1("NameServer_destroy: closing socket: %d\n", sock)
- close(sock);
- NameServer_module->sendSock[clId] = INVALIDSOCKET;
- }
- /* Close the socket: */
- sock = NameServer_module->recvSock[clId];
- if (sock != INVALIDSOCKET) {
- LOG1("NameServer_destroy: closing socket: %d\n", sock)
- close(sock);
- NameServer_module->recvSock[clId] = INVALIDSOCKET;
- }
- }
-
- CIRCLEQ_destruct(&NameServer_module->objList);
-
- /* Unblock the NameServer listener thread: */
- LOG0("NameServer_destroy: unblocking listener...\n")
- write(NameServer_module->unblockFd, &buf, sizeof(uint64_t));
+ /* shutdown the NameServer listener thread */
+ LOG0("NameServer_destroy: shutdown listener...\n")
+ event = NameServer_Event_SHUTDOWN;
+ write(NameServer_module->unblockFd, &event, sizeof(event));
/* Join: */
LOG0("NameServer_destroy: joining listener thread...\n")
return (handle);
}
-
-/* Function to delete a name server. */
-Int NameServer_delete(NameServer_Handle * handle)
+/*
+ * ======== NameServer_delete ========
+ * Delete a name server instance
+ */
+Int NameServer_delete(NameServer_Handle *handle)
{
Int status = NameServer_S_SUCCESS;
+ struct NameServer_Object *obj;
assert(handle != NULL);
assert(*handle != NULL);
- assert((*handle)->count == 0);
assert(NameServer_module->refCount != 0);
+ obj = *(struct NameServer_Object **)handle;
+
pthread_mutex_lock(&NameServer_module->modGate);
- (*handle)->refCount--;
- if ((*handle)->refCount != 0) {
+ obj->refCount--;
+ if (obj->refCount != 0) {
goto leave;
}
- if ((*handle)->count == 0) {
- CIRCLEQ_REMOVE(&NameServer_module->objList, *handle, elem);
+ /* delete each entry on the name list */
+ while (obj->nameList.cqh_first != (void *)&obj->nameList) {
+ NameServer_removeEntry(*handle, (Ptr)(obj->nameList.cqh_first));
+ }
- if ((*handle)->name != NULL) {
- free((*handle)->name);
- (*handle)->name = NULL;
- }
+ /* free the instance name */
+ if (obj->name != NULL) {
+ free(obj->name);
+ obj->name = NULL;
+ }
- CIRCLEQ_destruct(&(*handle)->nameList);
+ /* destroy the mutex */
+ pthread_mutex_destroy(&obj->gate);
- free((*handle));
- (*handle) = NULL;
- }
+ /* finally, free the instance object */
+ free(obj);
+
+ /* set the caller's handle to null */
+ (*handle) = NULL;
leave:
pthread_mutex_unlock(&NameServer_module->modGate);
/* Calculate the hash */
hash = stringHash(name);
+ pthread_mutex_lock(&handle->gate);
+
+ if (strlen(name) > handle->params.maxNameLen - 1) {
+ status = NameServer_E_INVALIDARG;
+ LOG0("NameServer_add: name length exceeded maximum!\n")
+ new_node = NULL;
+ goto exit;
+ }
+
if (len > handle->params.maxValueLen) {
status = NameServer_E_INVALIDARG;
LOG0("NameServer_add: value length exceeded maximum!\n")
goto exit;
}
- pthread_mutex_lock(&handle->gate);
-
/* Traverse the list to find duplicate check */
CIRCLEQ_traverse(node, &handle->nameList, NameServer_TableEntry_tag) {
/* Hash matches */
{
Int status = NameServer_S_SUCCESS;
struct NameServer_Object *obj = (struct NameServer_Object *)(handle);
- NameServerMsg nsMsg;
- NameServerMsg *replyMsg;
+ NameServerRemote_Msg nsMsg;
+ NameServerRemote_Msg *replyMsg;
fd_set rfds;
int ret = 0, sock, maxfd, waitFd;
struct timeval tv;
Bool done = FALSE;
UInt16 clusterId;
+ if (strlen(name) >= MAXNAMEINCHAR) {
+ LOG0("Name is too long in remote query\n");
+ return NameServer_E_NAMETOOLONG;
+ }
+
+ if (strlen(obj->name) >= MAXNAMEINCHAR) {
+ LOG0("Instance name is too long for remote query\n");
+ return NameServer_E_NAMETOOLONG;
+ }
+
/* Set Timeout to wait: */
tv.tv_sec = 0;
tv.tv_usec = NAMESERVER_GET_TIMEOUT;
/* Create request message and send to remote: */
clusterId = procId - MultiProc_getBaseIdOfCluster();
- sock = NameServer_module->sendSock[clusterId];
+ sock = NameServer_module->comm[clusterId].sendSock;
if (sock == INVALIDSOCKET) {
LOG1("NameServer_getRemote: no socket connection to processor %d\n",
procId);
procId, (String)nsMsg.instanceName)
LOG1("%s...\n", (String)nsMsg.name)
- err = send(sock, &nsMsg, sizeof(NameServerMsg), 0);
+ err = send(sock, &nsMsg, sizeof(NameServerRemote_Msg), 0);
if (err < 0) {
LOG2("NameServer_getRemote: send failed: %d, %s\n",
errno, strerror(errno))
FD_SET(waitFd, &rfds);
maxfd = waitFd + 1;
LOG1("NameServer_getRemote: pending on waitFd: %d\n", waitFd)
+
ret = select(maxfd, &rfds, NULL, NULL, &tv);
+
if (ret == -1) {
LOG0("NameServer_getRemote: select failed.")
status = NameServer_E_FAIL;
@@ -1304,6 +1260,142 @@ Int NameServer_getLocalUInt32(NameServer_Handle handle, String name, Ptr value)
return (status);
}
+/*
+ * ======== NameServer_attach ========
+ */
+Int NameServer_attach(UInt16 procId)
+{
+ Int status = NameServer_S_SUCCESS;
+ int sock;
+ int err;
+ UInt16 clId;
+ uint64_t event;
+
+ /* procId already validated in API layer */
+ clId = procId - MultiProc_getBaseIdOfCluster();
+
+ /* must reference count because we have multiple clients */
+ if (NameServer_module->comm[clId].refCount > 0) {
+ NameServer_module->comm[clId].refCount++;
+ goto done;
+ }
+
+ /* create socket for sending messages to remote processor */
+ sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
+ if (sock < 0) {
+ status = NameServer_E_FAIL;
+ LOG2("NameServer_attach: socket failed: %d, %s\n", errno,
+ strerror(errno));
+ goto done;
+ }
+ NameServer_module->comm[clId].sendSock = sock;
+ LOG2("NameServer_attach: created send socket: %d, procId %d\n", sock,
+ procId);
+
+ err = ConnectSocket(sock, procId, MESSAGEQ_RPMSG_PORT);
+ if (err < 0) {
+ status = NameServer_E_FAIL;
+ LOG3("NameServer_attach: connect failed: procId=%d, errno=%d (%s)\n",
+ procId, errno, strerror(errno));
+ goto done;
+ }
+
+ /* create socket for receiving messages from remote processor */
+ sock = socket(AF_RPMSG, SOCK_SEQPACKET, 0);
+ if (sock < 0) {
+ status = NameServer_E_FAIL;
+ LOG2("NameServer_attach: socket failed: %d, %s\n", errno,
+ strerror(errno));
+ goto done;
+ }
+ NameServer_module->comm[clId].recvSock = sock;
+ LOG2("NameServer_attach: created receive socket: %d, procId %d\n", sock,
+ procId);
+
+ err = SocketBindAddr(sock, procId, NAME_SERVER_RPMSG_ADDR);
+ if (err < 0) {
+ status = NameServer_E_FAIL;
+ LOG2("NameServer_attach: bind failed: %d, %s\n", errno,
+ strerror(errno));
+ goto done;
+ }
+
+ /* getting here means we have successfully attached */
+ NameServer_module->comm[clId].refCount++;
+
+ /* tell the listener thread to add new receive sockets */
+ event = NameServer_Event_REFRESH;
+ write(NameServer_module->unblockFd, &event, sizeof(event));
+
+ /* wait for ACK event */
+ read(NameServer_module->waitFd, &event, sizeof(event));
+
+done:
+ if (status < 0) {
+ sock = NameServer_module->comm[clId].recvSock;
+ if (sock != INVALIDSOCKET) {
+ LOG1(" closing receive socket: %d\n", sock)
+ close(sock);
+ NameServer_module->comm[clId].recvSock = INVALIDSOCKET;
+ }
+
+ sock = NameServer_module->comm[clId].sendSock;
+ if (sock != INVALIDSOCKET) {
+ LOG1(" closing send socket: %d\n", sock)
+ close(sock);
+ NameServer_module->comm[clId].sendSock = INVALIDSOCKET;
+ }
+ }
+
+ return (status);
+}
+
+/*
+ * ======== NameServer_detach ========
+ */
+Int NameServer_detach(UInt16 procId)
+{
+ Int status = NameServer_S_SUCCESS;
+ UInt16 clId;
+ int sendSock;
+ int recvSock;
+ uint64_t event;
+
+ /* procId already validated in API layer */
+ clId = procId - MultiProc_getBaseIdOfCluster();
+
+ if (--NameServer_module->comm[clId].refCount > 0) {
+ goto done;
+ }
+
+ /* remove sockets from active list */
+ sendSock = NameServer_module->comm[clId].sendSock;
+ NameServer_module->comm[clId].sendSock = INVALIDSOCKET;
+
+ recvSock = NameServer_module->comm[clId].recvSock;
+ NameServer_module->comm[clId].recvSock = INVALIDSOCKET;
+
+ /* tell the listener thread to remove old sockets */
+ event = NameServer_Event_REFRESH;
+ write(NameServer_module->unblockFd, &event, sizeof(event));
+
+ /* wait for ACK event */
+ read(NameServer_module->waitFd, &event, sizeof(event));
+
+ /* close the sending socket */
+ LOG1("NameServer_destroy: closing socket: %d\n", sendSock)
+ close(sendSock);
+
+ /* close the receiving socket */
+ LOG1("NameServer_destroy: closing socket: %d\n", recvSock)
+ close(recvSock);
+
+ /* decrement the reference count */
+ NameServer_module->comm[clId].refCount--;
+
+done:
+ return (status);
+}
#if defined (__cplusplus)
}