1 /*
2 * Copyright (c) 2012-2015 Texas Instruments Incorporated - http://www.ti.com
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * * Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 *
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * * Neither the name of Texas Instruments Incorporated nor the names of
17 * its contributors may be used to endorse or promote products derived
18 * from this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 */
32 /* =============================================================================
33 * @file MessageQMultiMulti.c
34 *
35 * @brief Sample application for MessageQ module between MPU and Remote Proc
36 *
37 * ============================================================================
38 */
40 /* Standard headers */
41 #include <pthread.h>
42 #include <stdio.h>
43 #include <errno.h>
44 #include <string.h>
45 #include <stdlib.h>
47 /* IPC Headers */
48 #include <ti/ipc/Std.h>
49 #include <ti/ipc/Ipc.h>
50 #include <ti/ipc/MessageQ.h>
51 #include <ti/ipc/transports/TransportRpmsg.h>
53 /* App defines: Must match on remote proc side: */
54 #define MSGSIZE 64u
55 #define HEAPID 0u
56 #define SLAVE_MESSAGEQNAME "SLAVE"
57 #define HOST_MESSAGEQNAME "HOST"
59 /** ============================================================================
60 * Macros and types
61 * ============================================================================
62 */
64 #define NUM_LOOPS_DFLT 1000
65 #define NUM_THREADS_DFLT 10
66 #define MAX_NUM_THREADS 25
67 #define ONE_PROCESS_ONLY (-1)
69 /** ============================================================================
70 * Globals
71 * ============================================================================
72 */
73 static Int numLoops, numThreads, procNum;
75 struct thread_info { /* Used as argument to thread_start() */
76 pthread_t thread_id; /* ID returned by pthread_create() */
77 int thread_num; /* Application-defined thread # */
78 };
80 static void * pingThreadFxn(void *arg);
82 /** ============================================================================
83 * Functions
84 * ============================================================================
85 */
87 static Void * pingThreadFxn(void *arg)
88 {
89 Int threadNum = *(int *)arg;
90 Int32 status = 0;
91 MessageQ_Msg msg = NULL;
92 MessageQ_Params msgParams;
93 UInt16 i;
94 MessageQ_Handle handle;
95 MessageQ_QueueId queueId = MessageQ_INVALIDMESSAGEQ;
97 char remoteQueueName[64];
98 char hostQueueName[64];
100 printf ("Entered pingThreadFxn: %d\n", threadNum);
102 sprintf(remoteQueueName, "%s_%d%d", SLAVE_MESSAGEQNAME, threadNum, (threadNum % (MultiProc_getNumProcessors() - 1)) + 1);
103 sprintf(hostQueueName, "%s_%d", HOST_MESSAGEQNAME, threadNum );
105 /* Create the local Message Queue for receiving. */
106 MessageQ_Params_init (&msgParams);
107 handle = MessageQ_create (hostQueueName, &msgParams);
108 if (handle == NULL) {
109 printf ("Error in MessageQ_create\n");
110 goto exit;
111 }
112 else {
113 printf ("thread: %d, Local Message: %s, QId: 0x%x\n",
114 threadNum, hostQueueName, MessageQ_getQueueId(handle));
115 }
117 /* Poll until remote side has it's messageQ created before we send: */
118 do {
119 status = MessageQ_open (remoteQueueName, &queueId);
120 sleep (1);
121 } while (status == MessageQ_E_NOTFOUND);
122 if (status < 0) {
123 printf ("Error in MessageQ_open [0x%x]\n", status);
124 goto cleanup;
125 }
126 else {
127 printf ("thread: %d, Remote queue: %s, QId: 0x%x\n",
128 threadNum, remoteQueueName, queueId);
129 }
131 printf ("\nthread: %d: Exchanging messages with remote processor...\n",
132 threadNum);
133 for (i = 0 ; i < numLoops ; i++) {
134 /* Allocate message. */
135 msg = MessageQ_alloc (HEAPID, MSGSIZE);
136 if (msg == NULL) {
137 printf ("Error in MessageQ_alloc\n");
138 break;
139 }
141 MessageQ_setMsgId (msg, i);
143 /* Have the remote proc reply to this message queue */
144 MessageQ_setReplyQueue (handle, msg);
146 status = MessageQ_put (queueId, msg);
147 if (status < 0) {
148 printf ("Error in MessageQ_put [0x%x]\n", status);
149 break;
150 }
152 status = MessageQ_get(handle, &msg, MessageQ_FOREVER);
153 if (status < 0) {
154 printf ("Error in MessageQ_get [0x%x]\n", status);
155 break;
156 }
157 else {
158 /* Validate the returned message. */
159 if ((msg != NULL) && (MessageQ_getMsgId (msg) != i)) {
160 printf ("Data integrity failure!\n"
161 " Expected %d\n"
162 " Received %d\n",
163 i, MessageQ_getMsgId (msg));
164 break;
165 }
167 status = MessageQ_free (msg);
168 }
170 printf ("thread: %d: Exchanged %d msgs\n", threadNum, (i+1));
171 }
173 printf ("thread: %d: pingThreadFxn successfully completed!\n", threadNum);
175 MessageQ_close (&queueId);
177 cleanup:
178 /* Clean-up */
179 status = MessageQ_delete (&handle);
180 if (status < 0) {
181 printf ("Error in MessageQ_delete [0x%x]\n", status);
182 }
184 exit:
186 return ((void *)status);
187 }
189 int main (int argc, char ** argv)
190 {
191 struct thread_info threads[MAX_NUM_THREADS];
192 int ret,i;
193 Int32 status = 0;
195 /* Parse Args: */
196 numLoops = NUM_LOOPS_DFLT;
197 numThreads = NUM_THREADS_DFLT;
198 procNum = ONE_PROCESS_ONLY;
199 switch (argc) {
200 case 1:
201 /* use defaults */
202 break;
203 case 2:
204 numThreads = atoi(argv[1]);
205 break;
206 case 3:
207 numThreads = atoi(argv[1]);
208 numLoops = atoi(argv[2]);
209 break;
210 case 4:
211 /* We force numThreads = 1 if doing a multiProcess test: */
212 numThreads = 1;
213 numLoops = atoi(argv[2]);
214 procNum = atoi(argv[3]);
215 break;
216 default:
217 printf("Usage: %s [<numThreads>] [<numLoops>] [<Process #]>\n",
218 argv[0]);
219 printf("\tDefaults: numThreads: %d, numLoops: %d\n",
220 NUM_THREADS_DFLT, NUM_LOOPS_DFLT);
221 printf("\tMax Threads: %d\n", MAX_NUM_THREADS);
222 exit(0);
223 }
225 if (numThreads > MAX_NUM_THREADS) {
226 printf("Error: Maximum number of threads supported is %d\n",
227 MAX_NUM_THREADS);
228 exit(EXIT_FAILURE);
229 }
231 printf("Using numThreads: %d, numLoops: %d\n", numThreads, numLoops);
232 if (procNum != ONE_PROCESS_ONLY) {
233 printf("ProcNum: %d\n", procNum);
234 }
236 /* configure the transport factory */
237 Ipc_transportConfig(&TransportRpmsg_Factory);
239 /* IPC initialization */
240 status = Ipc_start();
242 if (status < 0) {
243 printf ("Ipc_start failed: status = 0x%x\n", status);
244 goto exit;
245 }
247 /* Launch multiple threads: */
248 for (i = 0; i < numThreads; i++) {
249 /* Create the test thread: */
250 printf ("creating pingThreadFxn: %d\n", i);
251 threads[i].thread_num = (procNum == ONE_PROCESS_ONLY)? i: procNum;
252 ret = pthread_create(&threads[i].thread_id, NULL, &pingThreadFxn,
253 &(threads[i].thread_num));
254 if (ret) {
255 printf("MessageQMulti: can't spawn thread: %d, %s\n",
256 i, strerror(ret));
257 }
258 }
260 /* Join all threads: */
261 for (i = 0; i < numThreads; i++) {
262 ret = pthread_join(threads[i].thread_id, NULL);
263 if (ret != 0) {
264 printf("MessageQMulti: failed to join thread: %d, %s\n",
265 threads[i].thread_num, strerror(ret));
266 }
267 printf("MessageQMulti: Joined with thread %d\n",threads[i].thread_num);
268 }
270 Ipc_stop();
272 exit:
273 return (status);
274 }