1 /******************************************************************************
2 * Copyright (c) 2018 Texas Instruments Incorporated - http://www.ti.com/
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 * * Redistributions of source code must retain the above copyright
8 * notice, this list of conditions and the following disclaimer.
9 * * Redistributions in binary form must reproduce the above copyright
10 * notice, this list of conditions and the following disclaimer in the
11 * documentation and/or other materials provided with the distribution.
12 * * Neither the name of Texas Instruments Incorporated nor the
13 * names of its contributors may be used to endorse or promote products
14 * derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
26 * THE POSSIBILITY OF SUCH DAMAGE.
27 *****************************************************************************/
29 #include <assert.h>
30 #include <mutex>
31 #include <condition_variable>
32 #include <chrono>
33 #include "device_arginfo.h"
34 #include "execution_object_pipeline.h"
36 using namespace tidl;
38 class ExecutionObjectPipeline::Impl
39 {
40 public:
41 Impl(std::vector<ExecutionObject*> &eos);
42 ~Impl();
44 void SetInputOutputBuffer(const ArgInfo &in, const ArgInfo &out);
45 bool RunAsyncStart();
46 bool RunAsyncNext();
47 bool Wait();
49 // Trace related
50 void WriteLayerOutputsToFile(const std::string& filename_prefix) const;
51 const LayerOutput* GetOutputFromLayer(uint32_t layer_index,
52 uint32_t output_index) const;
53 const LayerOutputs* GetOutputsFromAllLayers() const;
55 //! for pipelined execution
56 std::vector<ExecutionObject*> eos_m;
57 std::vector<IODeviceArgInfo*> iobufs_m;
58 std::vector<float> eo_device_time_m;
59 std::vector<float> eo_host_time_m;
61 std::string device_name_m;
63 //! current frame index
64 int frame_idx_m;
66 //! current execution object index
67 uint32_t curr_eo_idx_m;
69 // device and host time tracking: pipeline start to finish
70 float device_time_m;
71 float host_time_m;
73 private:
74 //! @brief Initialize ExecutionObjectPipeline with given
75 //! ExecutionObjects: check consecutive layersGroup, allocate memory
76 void Initialize();
78 // flag, mutex and cond var for signaling completion and waiting
79 bool has_work_m, is_processed_m;
80 std::mutex mutex_m;
81 std::condition_variable cv_m;
83 // host time tracking: pipeline start to finish
84 std::chrono::time_point<std::chrono::steady_clock> start_m;
85 };
87 ExecutionObjectPipeline::ExecutionObjectPipeline(
88 std::vector<ExecutionObject*> eos)
89 {
90 pimpl_m = std::unique_ptr<Impl> { new Impl(eos) };
91 }
93 ExecutionObjectPipeline::Impl::Impl(std::vector<ExecutionObject *> &eos) :
94 eos_m(eos), has_work_m(false), is_processed_m(false)
95 {
96 Initialize();
97 }
99 // Pointer to implementation idiom: https://herbsutter.com/gotw/_100/:
100 // Both unique_ptr and shared_ptr can be instantiated with an incomplete type
101 // unique_ptr's destructor requires a complete type in order to invoke delete
102 ExecutionObjectPipeline::~ExecutionObjectPipeline() = default;
104 char* ExecutionObjectPipeline::GetInputBufferPtr() const
105 {
106 return static_cast<char *>(pimpl_m->iobufs_m.front()->GetArg().ptr());
107 }
109 uint32_t ExecutionObjectPipeline::GetNumExecutionObjects() const
110 {
111 return pimpl_m->eos_m.size();
112 }
114 size_t ExecutionObjectPipeline::GetInputBufferSizeInBytes() const
115 {
116 return pimpl_m->eos_m.front()->GetInputBufferSizeInBytes();
117 }
119 char* ExecutionObjectPipeline::GetOutputBufferPtr() const
120 {
121 return static_cast<char *>(pimpl_m->iobufs_m.back()->GetArg().ptr());
122 }
124 size_t ExecutionObjectPipeline::GetOutputBufferSizeInBytes() const
125 {
126 return pimpl_m->eos_m.back()->GetOutputBufferSizeInBytes();
127 }
129 void ExecutionObjectPipeline::SetInputOutputBuffer(const ArgInfo& in,
130 const ArgInfo& out)
131 {
132 assert(in.ptr() != nullptr && in.size() >= GetInputBufferSizeInBytes());
133 assert(out.ptr() != nullptr && out.size() >= GetOutputBufferSizeInBytes());
134 pimpl_m->SetInputOutputBuffer(in, out);
135 }
137 void ExecutionObjectPipeline::SetFrameIndex(int idx)
138 {
139 pimpl_m->frame_idx_m = idx;
140 }
142 int ExecutionObjectPipeline::GetFrameIndex() const
143 {
144 return pimpl_m->frame_idx_m;
145 }
147 bool ExecutionObjectPipeline::ProcessFrameStartAsync()
148 {
149 assert(GetInputBufferPtr() != nullptr && GetOutputBufferPtr() != nullptr);
150 bool st = pimpl_m->RunAsyncStart();
151 if (st)
152 st = pimpl_m->eos_m[0]->AddCallback(ExecutionObject::CallType::PROCESS,
153 this);
154 return st;
155 }
157 bool ExecutionObjectPipeline::ProcessFrameWait()
158 {
159 return pimpl_m->Wait();
160 }
162 void CallbackWrapper(void *user_data)
163 {
164 ((ExecutionObjectPipeline *) user_data)->RunAsyncNext();
165 }
167 void ExecutionObjectPipeline::RunAsyncNext()
168 {
169 bool has_next = pimpl_m->RunAsyncNext();
170 if (has_next)
171 pimpl_m->eos_m[pimpl_m->curr_eo_idx_m]->AddCallback(
172 ExecutionObject::CallType::PROCESS, this);
173 }
175 float ExecutionObjectPipeline::GetProcessTimeInMilliSeconds() const
176 {
177 return pimpl_m->device_time_m;
178 }
180 float ExecutionObjectPipeline::GetHostProcessTimeInMilliSeconds() const
181 {
182 return pimpl_m->host_time_m;
183 }
185 float ExecutionObjectPipeline::GetProcessTimeInMilliSeconds(
186 uint32_t eo_index) const
187 {
188 assert(eo_index < pimpl_m->eos_m.size());
189 return pimpl_m->eo_device_time_m[eo_index];
190 }
192 float ExecutionObjectPipeline::GetHostProcessTimeInMilliSeconds(
193 uint32_t eo_index) const
194 {
195 assert(eo_index < pimpl_m->eos_m.size());
196 return pimpl_m->eo_host_time_m[eo_index];
197 }
199 const std::string& ExecutionObjectPipeline::GetDeviceName() const
200 {
201 return pimpl_m->device_name_m;
202 }
204 void
205 ExecutionObjectPipeline::WriteLayerOutputsToFile(
206 const std::string& filename_prefix) const
207 {
208 pimpl_m->WriteLayerOutputsToFile(filename_prefix);
209 }
211 const LayerOutput*
212 ExecutionObjectPipeline::GetOutputFromLayer(uint32_t layer_index,
213 uint32_t output_index) const
214 {
215 return pimpl_m->GetOutputFromLayer(layer_index, output_index);
216 }
218 const LayerOutputs*
219 ExecutionObjectPipeline::GetOutputsFromAllLayers() const
220 {
221 return pimpl_m->GetOutputsFromAllLayers();
222 }
225 /// Impl methods start here
228 static
229 void* AllocateMem(size_t size)
230 {
231 if (size == 0) return nullptr;
232 void *ptr = malloc(size);
233 if (ptr == nullptr)
234 throw Exception("Out of memory, ExecutionObjectPipeline malloc failed",
235 __FILE__, __FUNCTION__, __LINE__);
236 return ptr;
237 }
239 void ExecutionObjectPipeline::Impl::Initialize()
240 {
241 // Check consecutive layersGroups to form a pipeline
242 int prev_group = 0;
243 for (auto eo : eos_m)
244 {
245 int group = eo->GetLayersGroupId();
246 if (prev_group != 0 && group != prev_group + 1)
247 throw Exception(
248 "Non-consecutive layersGroupIds in ExecutionObjectPipeline",
249 __FILE__, __FUNCTION__, __LINE__);
250 prev_group = group;
251 }
253 for (auto eo : eos_m)
254 device_name_m += eo->GetDeviceName() + "+";
255 device_name_m.resize(device_name_m.size() - 1);
257 // Allocate input and output memory for EOs/layersGroups
258 // Note that i-th EO's output buffer is the same as (i+1)-th EO's input
259 // So, if n EOs, then (n+1) buffers: b EO b EO b EO b ... EO b
260 // User must set the first input buffer and the last output buffer
261 size_t size;
262 ArgInfo in(nullptr, 0);
263 iobufs_m.push_back(new IODeviceArgInfo(in));
264 for (auto eo : eos_m)
265 {
266 if (eo != eos_m.back())
267 size = eo->GetOutputBufferSizeInBytes();
268 else
269 size = 0;
271 void *ptr = AllocateMem(size);
272 ArgInfo out(ptr, size);
273 iobufs_m.push_back(new IODeviceArgInfo(out));
274 }
276 // Record keeping for each EO's device time and host time
277 // because EO could be shared by another EOP
278 eo_device_time_m.resize(eos_m.size());
279 eo_host_time_m.resize(eos_m.size());
280 }
282 ExecutionObjectPipeline::Impl::~Impl()
283 {
284 int num_iobufs = iobufs_m.size();
285 for (int i = 0; i < num_iobufs; i++)
286 {
287 if (! (i == 0 || i == num_iobufs-1))
288 free(iobufs_m[i]->GetArg().ptr());
289 delete iobufs_m[i];
290 }
291 }
293 void ExecutionObjectPipeline::Impl::SetInputOutputBuffer(const ArgInfo &in,
294 const ArgInfo &out)
295 {
296 delete iobufs_m.front();
297 delete iobufs_m.back();
298 iobufs_m.front() = new IODeviceArgInfo(in);
299 iobufs_m.back() = new IODeviceArgInfo(out);
300 }
302 bool ExecutionObjectPipeline::Impl::RunAsyncStart()
303 {
304 has_work_m = true;
305 is_processed_m = false;
306 device_time_m = 0.0f;
307 host_time_m = 0.0f;
308 curr_eo_idx_m = 0;
309 eos_m[0]->AcquireLock();
310 start_m = std::chrono::steady_clock::now();
311 eos_m[0]->SetInputOutputBuffer(iobufs_m[0], iobufs_m[1]);
312 return eos_m[0]->ProcessFrameStartAsync();
313 }
315 // returns true if we have more EOs to execute
316 bool ExecutionObjectPipeline::Impl::RunAsyncNext()
317 {
318 eos_m[curr_eo_idx_m]->ProcessFrameWait();
319 // need to capture EO's device/host time before we release its lock
320 eo_device_time_m[curr_eo_idx_m] = eos_m[curr_eo_idx_m]->
321 GetProcessTimeInMilliSeconds();
322 eo_host_time_m[curr_eo_idx_m] = eos_m[curr_eo_idx_m]->
323 GetHostProcessTimeInMilliSeconds();
324 device_time_m += eo_device_time_m[curr_eo_idx_m];
325 eos_m[curr_eo_idx_m]->ReleaseLock();
326 curr_eo_idx_m += 1;
327 if (curr_eo_idx_m < eos_m.size())
328 {
329 eos_m[curr_eo_idx_m]->AcquireLock();
330 eos_m[curr_eo_idx_m]->SetInputOutputBuffer(iobufs_m[curr_eo_idx_m],
331 iobufs_m[curr_eo_idx_m+1]);
332 eos_m[curr_eo_idx_m]->ProcessFrameStartAsync();
333 return true;
334 }
335 else
336 {
337 std::chrono::duration<float> elapsed = std::chrono::steady_clock::now()
338 - start_m;
339 host_time_m = elapsed.count() * 1000; // seconds to milliseconds
340 is_processed_m = true;
341 cv_m.notify_all();
342 return false;
343 }
344 }
346 bool ExecutionObjectPipeline::Impl::Wait()
347 {
348 if (! has_work_m) return false;
350 std::unique_lock<std::mutex> lock(mutex_m);
351 cv_m.wait(lock, [this]{ return this->is_processed_m; });
352 has_work_m = false;
353 return true;
354 }
356 void
357 ExecutionObjectPipeline::Impl::WriteLayerOutputsToFile(
358 const std::string& filename_prefix) const
359 {
360 for (auto eo : eos_m)
361 eo->WriteLayerOutputsToFile(filename_prefix);
362 }
364 const LayerOutput*
365 ExecutionObjectPipeline::Impl::GetOutputFromLayer(uint32_t layer_index,
366 uint32_t output_index) const
367 {
368 const LayerOutput* lo = nullptr;
369 for (auto eo : eos_m)
370 {
371 lo = eo->GetOutputFromLayer(layer_index, output_index);
372 if (lo != nullptr) break;
373 }
374 return lo;
375 }
377 const LayerOutputs*
378 ExecutionObjectPipeline::Impl::GetOutputsFromAllLayers() const
379 {
380 LayerOutputs *all = new LayerOutputs;
381 for (auto eo : eos_m)
382 {
383 LayerOutputs *los = const_cast<LayerOutputs *>(
384 eo->GetOutputsFromAllLayers());
385 for (auto& lo : *los)
386 all->push_back(std::unique_ptr<const LayerOutput>{ lo.release() });
387 delete los;
388 }
389 return all;
390 }