]> Gitweb @ Texas Instruments - Open Source Git Repositories - git.TI.com/gitweb - tidl/tidl-api.git/blobdiff - examples/mobilenet_subgraph/main.cpp
Subgraph example: multi-threaded batch processing
[tidl/tidl-api.git] / examples / mobilenet_subgraph / main.cpp
index e4e499af67eb7a377cae2278b93674231e5fe654..8a77f6576eed068ecfc3b20d506c476b26ac3aa7 100644 (file)
@@ -1,5 +1,5 @@
 /******************************************************************************
- * Copyright (c) 2018, Texas Instruments Incorporated - http://www.ti.com/
+ * Copyright (c) 2019, Texas Instruments Incorporated - http://www.ti.com/
  *   All rights reserved.
  *
  *   Redistribution and use in source and binary forms, with or without
@@ -25,6 +25,7 @@
  *   ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
  *   THE POSSIBILITY OF SUCH DAMAGE.
  *****************************************************************************/
+
 #include <signal.h>
 #include <iostream>
 #include <iomanip>
@@ -50,6 +51,7 @@
 #include "../common/object_classes.h"
 #include "imgutil.h"
 #include "../common/video_utils.h"
+#include "thread_pool.h"
 
 #include "opencv2/core.hpp"
 #include "opencv2/imgproc.hpp"
@@ -70,13 +72,32 @@ const char *default_inputs[NUM_DEFAULT_INPUTS] =
     "../test/testvecs/input/objects/cat-pet-animal-domestic-104827.jpeg"
 };
 std::unique_ptr<ObjectClasses> object_classes;
+typedef struct {
+  float **inputs;
+  float **outputs;
+} UserData;
 
 bool RunConfiguration(cmdline_opts_t& opts);
 bool ReadFrame(const cmdline_opts_t& opts, VideoCapture &cap, float** inputs,
                int batch_size);
 bool WriteFrameOutput(float *out, const cmdline_opts_t& opts);
 void DisplayHelp();
+void SubgraphUserFunc(void *user_data);
 
+const int num_printed_outputs = 4;
+bool SkipOutputs(int i, int offset, bool &skip_outputs)
+{
+    if (skip_outputs)  return true;
+    if (i >= num_printed_outputs + offset)
+    {
+        if (! skip_outputs)
+        {
+            cout << "   ... skippping outputs ..." << endl;
+            skip_outputs = true;
+        }
+    }
+    return skip_outputs;
+}
 
 int main(int argc, char *argv[])
 {
@@ -180,37 +201,123 @@ bool RunConfiguration(cmdline_opts_t& opts)
         status = false;
     }
 
-    int batch_size = 8;
-    cout << "\n##### Batch size " << batch_size << " testing ######\n" << endl;
+    // If not doing multi-threaded processing, multiply by 2 or more
+    //     for a larger batch to amortize batch initilization/tear down cost
+    int preferred_batch_size = TidlGetPreferredBatchSize(1);
+    for (int multiple = 1; multiple <= 16; multiple *= 2)
+    {
+        int batch_size = preferred_batch_size * multiple;
+        cout << "\n##### Batch size " << batch_size << " testing ######\n"
+             << endl;
+        bool skip_outputs = false;
+        try
+        {
+            float **inputs  = new float *[batch_size];
+            float **outputs = new float *[batch_size];
+            for (int i = 0; i < batch_size; i++)
+            {
+                inputs[i]  = new float[1*3*224*224];
+                outputs[i] = new float[1001];
+            }
+
+            chrono::time_point<chrono::steady_clock> tloop0, tloop1;
+            tloop0 = chrono::steady_clock::now();
+
+            ReadFrame(opts, cap, inputs, batch_size);
+            TidlRunSubgraph(1, 0, batch_size, 1, 1, inputs, outputs);
+            for (int i = 0; i < batch_size; i++)
+            {
+                if (! SkipOutputs(i, 0, skip_outputs))
+                {
+                    cout << "Frame " << i << " of " << batch_size
+                         << " output:" << endl;
+                    WriteFrameOutput(outputs[i], opts);
+                }
+            }
+
+            tloop1 = chrono::steady_clock::now();
+            chrono::duration<float> elapsed = tloop1 - tloop0;
+            cout << "Batch size " << batch_size
+                 << " time: "
+                 << setw(6) << setprecision(4)
+                 << (elapsed.count() * 1000) << "ms, fps = "
+                 << setw(6) << setprecision(4)
+                 << (batch_size / elapsed.count())
+                 << endl;
+
+            for (int i = 0; i < batch_size; i++)
+            {
+                delete [] inputs[i];
+                delete [] outputs[i];
+            }
+            delete [] inputs;
+            delete [] outputs;
+        }
+        catch (tidl::Exception &e)
+        {
+            cerr << e.what() << endl;
+            status = false;
+        }
+    }
+
+    // This is to test the multithreaded inference with async/future
+    // async/future has slightly worse threading performance than
+    //     thread pool, however, it is much easier to program
+    cout << "\n##### Multithreaded inference testing (async/future) #####\n"
+         << endl;
+    int num_threads = TidlGetPreferredBatchSize(1) * 2;
+    int num_iters = 100;
     try
     {
-        float **inputs  = new float *[batch_size];
-        float **outputs = new float *[batch_size];
-        for (int i = 0; i < batch_size; i++)
+        float **inputs  = new float *[num_threads];
+        float **outputs = new float *[num_threads];
+        for (int i = 0; i < num_threads; i++)
         {
             inputs[i]  = new float[1*3*224*224];
             outputs[i] = new float[1001];
         }
+        vector<future<bool>> futures(num_threads);
+        bool skip_outputs = false;
 
         chrono::time_point<chrono::steady_clock> tloop0, tloop1;
         tloop0 = chrono::steady_clock::now();
 
-        ReadFrame(opts, cap, inputs, batch_size);
-        TidlRunSubgraph(1, 0, batch_size, 1, 1, inputs, outputs);
-        for (int i = 0; i < batch_size; i++)
+        for (int i = 0; i < num_iters + num_threads; i++)
         {
-            cout << "Frame " << i << " of " << batch_size << " output:" << endl;
-            WriteFrameOutput(outputs[i], opts);
+            int index = i % num_threads;
+            if (i >= num_threads)
+            {
+                if (futures[index].get())
+                {
+                    if (! SkipOutputs(i, num_threads, skip_outputs))
+                        WriteFrameOutput(outputs[index], opts);
+                }
+            }
+
+            if (i < num_iters)
+            {
+                ReadFrame(opts, cap, &inputs[index], 1);
+                futures[index] = std::async(std::launch::async,
+                              [inputs, outputs](int index) {
+                                  TidlRunSubgraph(1, 0, 1, 1, 1,
+                                              &inputs[index], &outputs[index]);
+                                   return true;
+                              },
+                                            index);
+            }
         }
 
         tloop1 = chrono::steady_clock::now();
         chrono::duration<float> elapsed = tloop1 - tloop0;
-        cout << "Batch size " << batch_size
-             << " time (including read/write/opencv/print/etc): "
+        cout << "Multithreaded (num_threads=" << num_threads
+             << ", batch_size=1) loop time (" << num_iters << " frames): "
              << setw(6) << setprecision(4)
-             << (elapsed.count() * 1000) << "ms" << endl;
+             << (elapsed.count() * 1000) << "ms, fps = "
+             << setw(6) << setprecision(4)
+             << (num_iters / elapsed.count())
+             << endl;
 
-        for (int i = 0; i < batch_size; i++)
+        for (int i = 0; i < num_threads; i++)
         {
             delete [] inputs[i];
             delete [] outputs[i];
@@ -224,53 +331,62 @@ bool RunConfiguration(cmdline_opts_t& opts)
         status = false;
     }
 
-    // This is only to test the multithreaded inference
-    // async/future may not be the most efficient multithreading method
-    // threading pool might have better performance
-    cout << "\n##### Multithreaded inference testing #####\n" << endl;
-    int num_threads = 8;
-    int num_iters = 8;
+    // This is to test the multithreaded inference with a thread pool
+    cout << "\n##### Multithreaded inference testing (thread pool) #####\n"
+         << endl;
     try
     {
         float **inputs  = new float *[num_threads];
         float **outputs = new float *[num_threads];
+        vector<UserData> v_data(num_threads);
         for (int i = 0; i < num_threads; i++)
         {
             inputs[i]  = new float[1*3*224*224];
             outputs[i] = new float[1001];
+            v_data[i].inputs  = &inputs[i];
+            v_data[i].outputs = &outputs[i];
         }
-        vector<future<bool>> futures(num_threads);
+        ThPool pool(num_threads, SubgraphUserFunc);
+        vector<int> th_ids(num_threads);
+        bool skip_outputs = false;
 
         chrono::time_point<chrono::steady_clock> tloop0, tloop1;
         tloop0 = chrono::steady_clock::now();
 
         for (int i = 0; i < num_iters + num_threads; i++)
         {
-          int index = i % num_threads;
-          if (i >= num_threads)
-          {
-            if (futures[index].get())
-              WriteFrameOutput(outputs[index], opts);
-          }
-
-          if (i < num_iters)
-          {
-            ReadFrame(opts, cap, &inputs[index], 1);
-            futures[index] = std::async(std::launch::async,
-                                        [inputs, outputs](int index) {
-               TidlRunSubgraph(1, 0, 1, 1, 1, &inputs[index], &outputs[index]);
-               return true;
-               },
-                                        index);
-          }
+            int index = i % num_threads;
+            if (i >= num_threads)
+            {
+                UserData *data = (UserData *) pool.Wait(th_ids[index]);
+                if (! SkipOutputs(i, num_threads, skip_outputs))
+                    WriteFrameOutput(data->outputs[0], opts);
+            }
+
+            if (i < num_iters)
+            {
+                ReadFrame(opts, cap, &inputs[index], 1);
+                th_ids[index] = pool.RunAsync(&v_data[index]);
+            }
         }
 
         tloop1 = chrono::steady_clock::now();
         chrono::duration<float> elapsed = tloop1 - tloop0;
         cout << "Multithreaded (num_threads=" << num_threads
-             << ") loop time (including read/write/opencv/print/etc): "
+             << ", batch_size=1) loop time (" << num_iters << " frames): "
+             << setw(6) << setprecision(4)
+             << (elapsed.count() * 1000) << "ms, fps = "
              << setw(6) << setprecision(4)
-             << (elapsed.count() * 1000) << "ms" << endl;
+             << (num_iters / elapsed.count())
+             << endl;
+
+        for (int i = 0; i < num_threads; i++)
+        {
+            delete [] inputs[i];
+            delete [] outputs[i];
+        }
+        delete [] inputs;
+        delete [] outputs;
     }
     catch (tidl::Exception &e)
     {
@@ -278,9 +394,89 @@ bool RunConfiguration(cmdline_opts_t& opts)
         status = false;
     }
 
+    num_threads = 2;
+    int batch_size  = preferred_batch_size;
+    // This is to test the multithreaded batch inference with async/future
+    // Ideally, batch_size * num_threads <= number of threads
+    cout << "\n##### Multithreaded batch inference testing (async/future)"
+         << " #####\n" << endl;
+    try
+    {
+        float **inputs  = new float *[num_threads * batch_size];
+        float **outputs = new float *[num_threads * batch_size];
+        for (int i = 0; i < num_threads * batch_size; i++)
+        {
+            inputs[i]  = new float[1*3*224*224];
+            outputs[i] = new float[1001];
+        }
+        vector<future<bool>> futures(num_threads);
+        bool skip_outputs = false;
+
+        chrono::time_point<chrono::steady_clock> tloop0, tloop1;
+        tloop0 = chrono::steady_clock::now();
+
+        for (int i = 0; i < num_iters/batch_size + num_threads; i++)
+        {
+            int index = i % num_threads;
+            if (i >= num_threads)
+            {
+                if (futures[index].get())
+                    if (! SkipOutputs(i*batch_size, num_threads*batch_size,
+                                      skip_outputs))
+                        for (int b = 0; b < batch_size; b++)
+                            WriteFrameOutput(outputs[index*batch_size+b], opts);
+            }
+
+            if (i < num_iters/batch_size)
+            {
+                ReadFrame(opts, cap, &inputs[index*batch_size], batch_size);
+                futures[index] = std::async(std::launch::async,
+                      [inputs, outputs, batch_size](int index) {
+                          TidlRunSubgraph(1, 0, batch_size, 1, 1,
+                                          &inputs[index*batch_size],
+                                          &outputs[index*batch_size]);
+                          return true;
+                      },
+                                            index);
+            }
+        }
+
+        tloop1 = chrono::steady_clock::now();
+        chrono::duration<float> elapsed = tloop1 - tloop0;
+        cout << "Multithreaded batch (num_threads=" << num_threads
+             << ", batch_size=" << batch_size
+             << ") loop time (" << num_iters << " frames): "
+             << setw(6) << setprecision(4)
+             << (elapsed.count() * 1000) << "ms, fps = "
+             << setw(6) << setprecision(4)
+             << (num_iters / elapsed.count())
+             << endl;
+
+        for (int i = 0; i < num_threads * batch_size; i++)
+        {
+            delete [] inputs[i];
+            delete [] outputs[i];
+        }
+        delete [] inputs;
+        delete [] outputs;
+    }
+    catch (tidl::Exception &e)
+    {
+        cerr << e.what() << endl;
+        status = false;
+    }
+
+
     return status;
 }
 
+void SubgraphUserFunc(void *user_data)
+{
+  UserData *data = (UserData *) user_data;
+  //printf("data inputs = %p, outputs = %p\n", data->inputs, data->outputs);
+  TidlRunSubgraph(1, 0, 1, 1, 1, data->inputs, data->outputs);
+  //printf("TidlRunSubgraph finished\n");
+}
 
 bool ReadFrame(const cmdline_opts_t& opts, VideoCapture &cap, float** inputs,
                int batch_size)