1 /*
2 * Copyright (c) 2011-2015 Texas Instruments Incorporated - http://www.ti.com
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 /** ============================================================================
33 * @file RPMessage.c
34 *
35 * @brief A simple copy-based MessageQ, to work with Linux virtio_rp_msg.
36 *
37 * Notes:
38 * - The logic in the functions for sending (_put()) and receiving _swiFxn()
39 * depend on the role (host or slave) the processor is playing in the
40 * asymmetric virtio I/O.
41 * - The host always adds *available* buffers to send/receive, while the slave
42 * always adds *used* buffers to send/receive.
43 * - The logic is summarized below:
44 *
45 * Host:
46 * - Prime vq_host with avail bufs, and kick vq_host so slave can send.
47 * - To send a buffer to the slave processor:
48 * allocate a tx buffer, or get_used_buf(vq_slave);
49 * >> copy data into buf <<
50 * add_avail_buf(vq_slave);
51 * kick(vq_slave);
52 * - To receive buffer from slave processor:
53 * get_used_buf(vq_host);
54 * >> empty data from buf <<
55 * add_avail_buf(vq_host);
56 * kick(vq_host);
57 *
58 * Slave:
59 * - To receive buffer from the host:
60 * get_avail_buf(vq_slave);
61 * >> empty data from buf <<
62 * add_used_buf(vq_slave);
63 * kick(vq_slave);
64 * - To send buffer to the host:
65 * get_avail_buf(vq_host);
66 * >> copy data into buf <<
67 * add_used_buf(vq_host);
68 * kick(vq_host);
69 *
70 * ============================================================================
71 */
73 /* this define must precede inclusion of any xdc header file */
74 #define Registry_CURDESC ti_ipc_rpmsg_RPMessage__Desc
75 #define MODULE_NAME "ti.ipc.rpmsg.RPMessage"
77 #include <xdc/std.h>
78 #include <string.h>
80 #include <xdc/runtime/System.h>
81 #include <xdc/runtime/Assert.h>
82 #include <xdc/runtime/Memory.h>
83 #include <xdc/runtime/Registry.h>
84 #include <xdc/runtime/Log.h>
85 #include <xdc/runtime/Diags.h>
87 #include <ti/sysbios/BIOS.h>
88 #include <ti/sysbios/knl/Swi.h>
89 #include <ti/sysbios/knl/Semaphore.h>
90 #include <ti/sysbios/heaps/HeapBuf.h>
91 #include <ti/sysbios/gates/GateHwi.h>
93 #include <ti/sdo/utils/List.h>
94 #include <ti/ipc/MultiProc.h>
96 #include <ti/ipc/rpmsg/RPMessage.h>
98 #include "_VirtQueue.h"
100 /* TBD: VirtQueue.h needs to somehow get factored out of family directory .*/
101 #if defined(OMAPL138)
102 #include <ti/ipc/family/omapl138/VirtQueue.h>
103 #elif defined(TCI6614)
104 #include <ti/ipc/family/tci6614/VirtQueue.h>
105 #elif defined(TCI6638)
106 #include <ti/ipc/family/tci6638/VirtQueue.h>
107 #elif defined(OMAP5)
108 #include <ti/ipc/family/omap54xx/VirtQueue.h>
109 #elif defined(VAYU)
110 #include <ti/ipc/family/vayu/VirtQueue.h>
111 #else
112 #error unknown processor!
113 #endif
115 /* =============================================================================
116 * Structures & Enums
117 * =============================================================================
118 */
120 /* Various arbitrary limits: */
121 #define MAXMESSAGEQOBJECTS 256
123 /* The RPMessage Object */
124 typedef struct RPMessage_Object {
125 UInt32 queueId; /* Unique id (procId | queueIndex) */
126 Semaphore_Handle semHandle; /* I/O Completion */
127 RPMessage_callback cb; /* RPMessage Callback */
128 UArg arg; /* Callback argument */
129 List_Handle queue; /* Queue of pending messages */
130 Bool unblocked; /* Use with signal to unblock _receive() */
131 } RPMessage_Object;
133 /* Module_State */
134 typedef struct RPMessage_Module {
135 /* Instance gate: */
136 GateHwi_Handle gateH;
137 /* Array of messageQObjects in the system: */
138 struct RPMessage_Object *msgqObjects[MAXMESSAGEQOBJECTS];
139 /* Heap from which to allocate free messages for copying: */
140 HeapBuf_Handle heap;
141 } RPMessage_Module;
143 /* Message Header: Must match mp_msg_hdr in virtio_rp_msg.h on Linux side. */
144 typedef struct RPMessage_MsgHeader {
145 Bits32 srcAddr; /* source endpoint addr */
146 Bits32 dstAddr; /* destination endpoint addr */
147 Bits32 reserved; /* reserved */
148 Bits16 dataLen; /* data length */
149 Bits16 flags; /* bitmask of different flags */
150 UInt8 payload[]; /* Data payload */
151 } RPMessage_MsgHeader;
153 typedef RPMessage_MsgHeader *RPMessage_Msg;
155 /* Element to hold payload copied onto receiver's queue. */
156 typedef struct Queue_elem {
157 List_Elem elem; /* Allow list linking. */
158 UInt len; /* Length of data */
159 UInt32 src; /* Src address/endpt of the msg */
160 Char data[]; /* payload begins here */
161 } Queue_elem;
163 /* Combine transport related objects into a struct for future migration: */
164 typedef struct RPMessage_Transport {
165 Swi_Handle swiHandle;
166 VirtQueue_Handle virtQueue_toHost;
167 VirtQueue_Handle virtQueue_fromHost;
168 Semaphore_Handle semHandle_toHost;
169 } RPMessage_Transport;
171 /* generated in RPMessage.xs: module$use() */
172 extern const HeapBuf_Handle ti_ipc_rpmsg_RPMessage_heap;
174 /* generated in template file RPMessage.xdt */
175 extern const UInt ti_ipc_rpmsg_RPMessage_messageBufferSize;
177 /* module diags mask */
178 Registry_Desc Registry_CURDESC;
180 static RPMessage_Module module;
181 static RPMessage_Transport transport;
183 /* Module ref count: */
184 static Int curInit = 0;
186 /*
187 * ======== RPMessage_swiFxn ========
188 */
189 #define FXNN "RPMessage_swiFxn"
190 static Void RPMessage_swiFxn(UArg arg0, UArg arg1)
191 {
192 Int16 token;
193 RPMessage_Msg msg;
194 UInt16 dstProc = MultiProc_self();
195 Bool usedBufAdded = FALSE;
196 int len;
198 Log_print0(Diags_ENTRY, "--> "FXNN);
200 /* Process all available buffers: */
201 while ((token = VirtQueue_getAvailBuf(transport.virtQueue_fromHost,
202 (Void **)&msg, &len)) >= 0) {
204 Log_print3(Diags_INFO, FXNN": Received msg from: 0x%x, "
205 "to: 0x%x, dataLen: %d",
206 (IArg)msg->srcAddr, (IArg)msg->dstAddr, (IArg)msg->dataLen);
208 /* Pass to destination queue (on this proc), or callback: */
209 RPMessage_send(dstProc, msg->dstAddr, msg->srcAddr,
210 (Ptr)msg->payload, msg->dataLen);
212 VirtQueue_addUsedBuf(transport.virtQueue_fromHost, token,
213 RPMSG_BUF_SIZE);
214 usedBufAdded = TRUE;
215 }
217 if (usedBufAdded) {
218 /* Tell host we've processed the buffers: */
219 VirtQueue_kick(transport.virtQueue_fromHost);
220 }
221 Log_print0(Diags_EXIT, "<-- "FXNN);
222 }
223 #undef FXNN
226 #define FXNN "callback_availBufReady"
227 static Void callback_availBufReady(VirtQueue_Handle vq)
228 {
230 if (vq == transport.virtQueue_fromHost) {
231 /* Post a SWI to process all incoming messages */
232 Log_print0(Diags_INFO, FXNN": virtQueue_fromHost kicked");
233 Swi_post(transport.swiHandle);
234 }
235 else if (vq == transport.virtQueue_toHost) {
236 /* Note: We normally post nothing for transport.virtQueue_toHost,
237 * unless we were starved for buffers, and we turned on notifications.
238 */
239 Semaphore_post(transport.semHandle_toHost);
240 Log_print0(Diags_INFO, FXNN": virtQueue_toHost kicked");
241 }
242 }
243 #undef FXNN
245 /* =============================================================================
246 * RPMessage Functions:
247 * =============================================================================
248 */
250 /*
251 * ======== MessasgeQCopy_init ========
252 *
253 *
254 */
255 #define FXNN "RPMessage_init"
256 Void RPMessage_init(UInt16 remoteProcId)
257 {
258 GateHwi_Params gatePrms;
259 Semaphore_Params semParams;
260 int i;
261 Registry_Result result;
262 Bool isHost;
263 VirtQueue_Params vqParams;
265 if (curInit++) {
266 Log_print1(Diags_ENTRY, "--> "FXNN": (remoteProcId=%d)",
267 (IArg)remoteProcId);
268 goto exit; /* module already initialized */
269 }
271 /* register with xdc.runtime to get a diags mask */
272 result = Registry_addModule(&Registry_CURDESC, MODULE_NAME);
273 Assert_isTrue(result == Registry_SUCCESS, (Assert_Id)NULL);
275 /* Log should be after the Registry_CURDESC is initialized */
276 Log_print1(Diags_ENTRY, "--> "FXNN": (remoteProcId=%d)",
277 (IArg)remoteProcId);
279 /* Gate to protect module object and lists: */
280 GateHwi_Params_init(&gatePrms);
281 module.gateH = GateHwi_create(&gatePrms, NULL);
283 /* Initialize Module State: */
284 for (i = 0; i < MAXMESSAGEQOBJECTS; i++) {
285 module.msgqObjects[i] = NULL;
286 }
288 /* store handle to heap created at config time */
289 module.heap = ti_ipc_rpmsg_RPMessage_heap;
291 Semaphore_Params_init(&semParams);
292 semParams.mode = Semaphore_Mode_BINARY;
293 transport.semHandle_toHost = Semaphore_create(0, &semParams, NULL);
295 isHost = (MultiProc_self() == MultiProc_getId("HOST"));
297 /* Initialize Transport related objects: */
299 VirtQueue_Params_init(&vqParams);
300 if (isHost) {
301 /* We don't handle this case currently! Host would need to prime vq. */
302 Assert_isTrue(FALSE, NULL);
303 }
304 else {
305 #if defined(OMAP5) || defined(VAYU)
306 vqParams.callback = callback_availBufReady;
307 #else
308 vqParams.callback = (xdc_Fxn)callback_availBufReady;
309 #endif
310 }
312 /*
313 * Create a pair VirtQueues (one for sending, one for receiving).
314 * Note: First one gets an even, second gets odd vq ID.
315 */
316 vqParams.vqId = ID_SELF_TO_HOST;
317 transport.virtQueue_toHost = (Ptr)VirtQueue_create(remoteProcId,
318 &vqParams, NULL);
319 vqParams.vqId = ID_HOST_TO_SELF;
320 transport.virtQueue_fromHost = (Ptr)VirtQueue_create(remoteProcId,
321 &vqParams, NULL);
323 /* Plug Vring Interrupts, and wait for host ready to recv kick: */
324 VirtQueue_startup(remoteProcId, isHost);
326 /* construct the Swi to process incoming messages: */
327 transport.swiHandle = Swi_create(RPMessage_swiFxn, NULL, NULL);
329 exit:
330 Log_print0(Diags_EXIT, "<-- "FXNN);
331 }
332 #undef FXNN
334 /*
335 * ======== MessasgeQCopy_finalize ========
336 */
337 #define FXNN "RPMessage_finalize"
338 Void RPMessage_finalize()
339 {
340 Log_print0(Diags_ENTRY, "--> "FXNN);
342 if (!curInit || --curInit) {
343 goto exit; /* module still in use, or uninitialized */
344 }
346 /* Tear down Module */
347 Swi_delete(&(transport.swiHandle));
349 GateHwi_delete(&module.gateH);
351 exit:
352 Log_print0(Diags_EXIT, "<-- "FXNN);
353 }
354 #undef FXNN
356 /*
357 * ======== RPMessage_create ========
358 */
359 #define FXNN "RPMessage_create"
360 RPMessage_Handle RPMessage_create(UInt32 reserved,
361 RPMessage_callback cb,
362 UArg arg,
363 UInt32 * endpoint)
364 {
365 RPMessage_Object *obj = NULL;
366 Bool found = FALSE;
367 Int i;
368 UInt16 queueIndex = 0;
369 IArg key;
371 Log_print4(Diags_ENTRY, "--> "FXNN": "
372 "(reserved=%d, cb=0x%x, arg=0x%x, endpoint=0x%x)",
373 (IArg)reserved, (IArg)cb, (IArg)arg, (IArg)endpoint);
375 Assert_isTrue((curInit > 0) , NULL);
377 key = GateHwi_enter(module.gateH);
379 if (reserved == RPMessage_ASSIGN_ANY) {
380 /* Search the array for a free slot above reserved: */
381 for (i = RPMessage_MAX_RESERVED_ENDPOINT + 1;
382 (i < MAXMESSAGEQOBJECTS) && (found == FALSE) ; i++) {
383 if (module.msgqObjects[i] == NULL) {
384 queueIndex = i;
385 found = TRUE;
386 break;
387 }
388 }
389 }
390 else if ((queueIndex = reserved) <= RPMessage_MAX_RESERVED_ENDPOINT) {
391 if (module.msgqObjects[queueIndex] == NULL) {
392 found = TRUE;
393 }
394 }
396 if (found) {
397 obj = Memory_alloc(NULL, sizeof(RPMessage_Object), 0, NULL);
398 if (obj != NULL) {
399 if (cb) {
400 /* Store callback and it's arg instead of semaphore: */
401 obj->cb = cb;
402 obj->arg= arg;
403 }
404 else {
405 obj->cb = NULL;
407 /* Allocate a semaphore to signal when messages received: */
408 obj->semHandle = Semaphore_create(0, NULL, NULL);
410 /* Create our queue of to be received messages: */
411 obj->queue = List_create(NULL, NULL);
412 }
414 /* Store our endpoint, and object: */
415 obj->queueId = queueIndex;
416 module.msgqObjects[queueIndex] = obj;
418 /* See RPMessage_unblock() */
419 obj->unblocked = FALSE;
421 *endpoint = queueIndex;
422 Log_print1(Diags_LIFECYCLE, FXNN": endPt created: %d",
423 (IArg)queueIndex);
424 }
425 }
427 GateHwi_leave(module.gateH, key);
429 Log_print1(Diags_EXIT, "<-- "FXNN": 0x%x", (IArg)obj);
430 return (obj);
431 }
432 #undef FXNN
434 /*
435 * ======== RPMessage_delete ========
436 */
437 #define FXNN "RPMessage_delete"
438 Int RPMessage_delete(RPMessage_Handle *handlePtr)
439 {
440 Int status = RPMessage_S_SUCCESS;
441 RPMessage_Object *obj;
442 Queue_elem *payload;
443 IArg key;
445 Log_print1(Diags_ENTRY, "--> "FXNN": (handlePtr=0x%x)", (IArg)handlePtr);
447 Assert_isTrue((curInit > 0) , NULL);
449 if (handlePtr && (obj = (RPMessage_Object *)(*handlePtr))) {
451 if (obj->cb) {
452 obj->cb = NULL;
453 obj->arg= NULL;
454 }
455 else {
456 Semaphore_delete(&(obj->semHandle));
458 /* Free/discard all queued message buffers: */
459 while ((payload = (Queue_elem *)List_get(obj->queue)) != NULL) {
460 HeapBuf_free(module.heap, (Ptr)payload,
461 ti_ipc_rpmsg_RPMessage_messageBufferSize);
462 }
464 List_delete(&(obj->queue));
465 }
467 /* Null out our slot: */
468 key = GateHwi_enter(module.gateH);
469 module.msgqObjects[obj->queueId] = NULL;
470 GateHwi_leave(module.gateH, key);
472 Log_print1(Diags_LIFECYCLE, FXNN": endPt deleted: %d",
473 (IArg)obj->queueId);
475 /* Now free the obj */
476 Memory_free(NULL, obj, sizeof(RPMessage_Object));
478 *handlePtr = NULL;
479 }
481 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
482 return(status);
483 }
484 #undef FXNN
486 /*
487 * ======== RPMessage_recv ========
488 */
489 #define FXNN "RPMessage_recv"
490 Int RPMessage_recv(RPMessage_Handle handle, Ptr data, UInt16 *len,
491 UInt32 *rplyEndpt, UInt timeout)
492 {
493 Int status = RPMessage_S_SUCCESS;
494 RPMessage_Object *obj = (RPMessage_Object *)handle;
495 Bool semStatus;
496 Queue_elem *payload;
498 Log_print5(Diags_ENTRY, "--> "FXNN": (handle=0x%x, data=0x%x, len=0x%x,"
499 "rplyEndpt=0x%x, timeout=%d)", (IArg)handle, (IArg)data,
500 (IArg)len, (IArg)rplyEndpt, (IArg)timeout);
502 Assert_isTrue((curInit > 0) , NULL);
503 /* A callback was set: client should not be calling this fxn! */
504 Assert_isTrue((!obj->cb), NULL);
506 /* Check vring for pending messages before we block: */
507 Swi_post(transport.swiHandle);
509 /* Block until notified. */
510 semStatus = Semaphore_pend(obj->semHandle, timeout);
512 if (semStatus == FALSE) {
513 status = RPMessage_E_TIMEOUT;
514 Log_print0(Diags_STATUS, FXNN": Sem pend timeout!");
515 }
516 else if (obj->unblocked) {
517 status = RPMessage_E_UNBLOCKED;
518 }
519 else {
520 payload = (Queue_elem *)List_get(obj->queue);
521 Assert_isTrue((payload), NULL);
522 }
524 if (status == RPMessage_S_SUCCESS) {
525 /* Now, copy payload to client and free our internal msg */
526 memcpy(data, payload->data, payload->len);
527 *len = payload->len;
528 *rplyEndpt = payload->src;
530 HeapBuf_free(module.heap, (Ptr)payload,
531 (payload->len + sizeof(Queue_elem)));
532 }
534 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
535 return (status);
536 }
537 #undef FXNN
539 /*
540 * ======== RPMessage_send ========
541 */
542 #define FXNN "RPMessage_send"
543 Int RPMessage_send(UInt16 dstProc,
544 UInt32 dstEndpt,
545 UInt32 srcEndpt,
546 Ptr data,
547 UInt16 len)
548 {
549 Int status = RPMessage_S_SUCCESS;
550 RPMessage_Object *obj;
551 Int16 token = 0;
552 RPMessage_Msg msg;
553 Queue_elem *payload;
554 UInt size;
555 IArg key;
556 int length;
558 Log_print5(Diags_ENTRY, "--> "FXNN": (dstProc=%d, dstEndpt=%d, "
559 "srcEndpt=%d, data=0x%x, len=%d", (IArg)dstProc, (IArg)dstEndpt,
560 (IArg)srcEndpt, (IArg)data, (IArg)len);
562 Assert_isTrue((curInit > 0) , NULL);
564 if (dstProc != MultiProc_self()) {
565 /* Send to remote processor: */
566 do {
567 token = VirtQueue_getAvailBuf(transport.virtQueue_toHost,
568 (Void **)&msg, &length);
569 } while (token < 0 && Semaphore_pend(transport.semHandle_toHost,
570 BIOS_WAIT_FOREVER));
571 if (token >= 0) {
572 /* Copy the payload and set message header: */
573 memcpy(msg->payload, data, len);
574 msg->dataLen = len;
575 msg->dstAddr = dstEndpt;
576 msg->srcAddr = srcEndpt;
577 msg->flags = 0;
578 msg->reserved = 0;
580 VirtQueue_addUsedBuf(transport.virtQueue_toHost, token,
581 RPMSG_BUF_SIZE);
582 VirtQueue_kick(transport.virtQueue_toHost);
583 }
584 else {
585 status = RPMessage_E_FAIL;
586 Log_print0(Diags_STATUS, FXNN": getAvailBuf failed!");
587 }
588 }
589 else {
590 /* Put on a Message queue on this processor: */
592 /* Protect from RPMessage_delete */
593 key = GateHwi_enter(module.gateH);
594 obj = module.msgqObjects[dstEndpt];
595 GateHwi_leave(module.gateH, key);
597 if (obj == NULL) {
598 Log_print1(Diags_STATUS, FXNN": no object for endpoint: %d",
599 (IArg)dstEndpt);
600 status = RPMessage_E_NOENDPT;
601 return status;
602 }
604 /* If callback registered, call it: */
605 if (obj->cb) {
606 Log_print2(Diags_INFO, FXNN": calling callback with data len: "
607 "%d, from: %d", len, srcEndpt);
608 obj->cb(obj, obj->arg, data, len, srcEndpt);
609 }
610 else {
611 /* else, put on a Message queue on this processor: */
612 /* Allocate a buffer to copy the payload: */
613 size = len + sizeof(Queue_elem);
615 /* HeapBuf_alloc() is non-blocking, so needs protection: */
616 key = GateHwi_enter(module.gateH);
617 payload = (Queue_elem *)HeapBuf_alloc(module.heap, size, 0, NULL);
618 GateHwi_leave(module.gateH, key);
620 if (payload != NULL) {
621 memcpy(payload->data, data, len);
622 payload->len = len;
623 payload->src = srcEndpt;
625 /* Put on the endpoint's queue and signal: */
626 List_put(obj->queue, (List_Elem *)payload);
627 Semaphore_post(obj->semHandle);
628 }
629 else {
630 status = RPMessage_E_MEMORY;
631 Log_print0(Diags_STATUS, FXNN": HeapBuf_alloc failed!");
632 }
633 }
634 }
636 Log_print1(Diags_EXIT, "<-- "FXNN": %d", (IArg)status);
637 return (status);
638 }
639 #undef FXNN
641 /*
642 * ======== RPMessage_unblock ========
643 */
644 #define FXNN "RPMessage_unblock"
645 Void RPMessage_unblock(RPMessage_Handle handle)
646 {
647 RPMessage_Object *obj = (RPMessage_Object *)handle;
649 Log_print1(Diags_ENTRY, "--> "FXNN": (handle=0x%x)", (IArg)handle);
651 Assert_isTrue((!obj->cb), NULL);
652 /* Set instance to 'unblocked' state, and post */
653 obj->unblocked = TRUE;
654 Semaphore_post(obj->semHandle);
655 Log_print0(Diags_EXIT, "<-- "FXNN);
656 }
657 #undef FXNN