1 #include "caffe/data_transformer.hpp"
2 #include "caffe/layer.hpp"
3 #include "caffe/layers/data_layer.hpp"
4 #include "caffe/parallel.hpp"
6 namespace caffe {
8 template<typename Ftype, typename Btype>
9 DataLayer<Ftype, Btype>::DataLayer(const LayerParameter& param, size_t solver_rank)
10 : BasePrefetchingDataLayer<Ftype, Btype>(param, solver_rank),
11 cache_(param.data_param().cache()),
12 shuffle_(param.data_param().shuffle()) {
13 sample_only_.store(this->auto_mode_);
14 init_offsets();
15 datum_encoded_ = false;
16 }
18 template<typename Ftype, typename Btype>
19 void
20 DataLayer<Ftype, Btype>::init_offsets() {
21 CHECK_EQ(this->transf_num_, this->threads_num());
22 CHECK_LE(parser_offsets_.size(), this->transf_num_);
23 CHECK_LE(queue_ids_.size(), this->transf_num_);
24 parser_offsets_.resize(this->transf_num_);
25 random_vectors_.resize(this->transf_num_);
26 queue_ids_.resize(this->transf_num_);
27 for (size_t i = 0; i < this->transf_num_; ++i) {
28 parser_offsets_[i] = 0;
29 queue_ids_[i] = i * this->parsers_num_;
30 if (!random_vectors_[i]) {
31 random_vectors_[i] = make_shared<TBlob<unsigned int>>();
32 }
33 }
34 }
36 template<typename Ftype, typename Btype>
37 DataLayer<Ftype, Btype>::~DataLayer() {
38 this->StopInternalThread();
39 }
41 template<typename Ftype, typename Btype>
42 void
43 DataLayer<Ftype, Btype>::InitializePrefetch() {
44 if (layer_inititialized_flag_.is_set()) {
45 return;
46 }
47 const bool auto_mode = this->auto_mode_;
48 if (auto_mode) {
49 // Here we try to optimize memory split between prefetching and convolution.
50 // All data and parameter blobs are allocated at this moment.
51 // Now let's find out what's left...
52 Net* pnet = this->parent_net();
53 const size_t batch_bytes = pnet->prefetch_bytes<Ftype, Btype>();
54 const size_t gpu_bytes = Caffe::min_avail_device_memory();
55 const size_t batches_fit = gpu_bytes / batch_bytes;
56 size_t parsers_num = this->parsers_num_;
57 size_t transf_num = this->threads_num();
58 if (this->is_gpu_transform()) {
59 // in this mode memory demand is O(n) high
60 size_t max_parsers_num = 2;
61 const size_t max_transf_num = 4;
62 float ratio = datum_encoded_ ? 3.F : 4.F;
63 const float fit = std::min(float(max_parsers_num * max_transf_num),
64 std::floor(batches_fit / ratio) - 1.F);
65 parsers_num = std::min(max_parsers_num, std::max(1UL,
66 static_cast<size_t>(std::sqrt(fit))));
67 if (cache_ && parsers_num > 1UL) {
68 LOG(INFO) << this->print_current_device() << " Reduced parser threads count from "
69 << parsers_num << " to 1 because cache is used";
70 parsers_num = 1UL;
71 }
72 transf_num = std::min(max_transf_num, std::max(transf_num,
73 static_cast<size_t>(std::lround(fit / parsers_num))));
74 if (parsers_num > 1 && transf_num == max_transf_num - 1) {
75 parsers_num = 1;
76 transf_num = max_transf_num;
77 }
78 if (parsers_num == 2 && transf_num == 2) {
79 parsers_num = 1;
80 transf_num = max_transf_num;
81 }
82 } else {
83 // in this mode memory demand is O(1)
84 if (batches_fit > 0) {
85 parsers_num = cache_ ? 1 : 3;
86 transf_num = 4;
87 }
88 }
90 this->RestartAllThreads(transf_num, true, false, Caffe::next_seed());
91 this->transf_num_ = this->threads_num();
92 this->parsers_num_ = parsers_num;
93 this->queues_num_ = this->transf_num_ * this->parsers_num_;
94 this->batch_transformer_->ResizeQueues(this->queues_num_);
95 BasePrefetchingDataLayer<Ftype, Btype>::InitializePrefetch();
96 if (this->parsers_num_ > 1) {
97 parser_offsets_[0]++; // 0th already processed
98 }
99 this->auto_mode_ = false;
100 layer_inititialized_flag_.set();
101 this->go(); // kick off new threads if any
102 }
104 CHECK_EQ(this->threads_num(), this->transf_num_);
105 LOG(INFO) << this->print_current_device() << " Parser threads: "
106 << this->parsers_num_ << (auto_mode ? " (auto)" : "");
107 LOG(INFO) << this->print_current_device() << " Transformer threads: "
108 << this->transf_num_ << (auto_mode ? " (auto)" : "");
109 layer_inititialized_flag_.set();
110 }
112 template<typename Ftype, typename Btype>
113 size_t DataLayer<Ftype, Btype>::queue_id(size_t thread_id) const {
114 const size_t qid = queue_ids_[thread_id] + parser_offsets_[thread_id];
115 parser_offsets_[thread_id]++;
116 if (parser_offsets_[thread_id] >= this->parsers_num_) {
117 parser_offsets_[thread_id] = 0UL;
118 queue_ids_[thread_id] += this->parsers_num_ * this->threads_num();
119 }
120 return qid % this->queues_num_;
121 };
123 template<typename Ftype, typename Btype>
124 void
125 DataLayer<Ftype, Btype>::DataLayerSetUp(const vector<Blob*>& bottom, const vector<Blob*>& top) {
126 const LayerParameter& param = this->layer_param();
127 const int batch_size = param.data_param().batch_size();
128 const bool cache = cache_ && this->phase_ == TRAIN;
129 const bool shuffle = cache && shuffle_ && this->phase_ == TRAIN;
131 if (this->auto_mode_) {
132 if (!sample_reader_) {
133 sample_reader_ = std::make_shared<DataReader<Datum>>(param, Caffe::solver_count(),
134 this->rank_,
135 this->parsers_num_,
136 this->threads_num(),
137 batch_size,
138 true,
139 false,
140 cache,
141 shuffle,
142 false);
143 } else if (!reader_) {
144 reader_ = std::make_shared<DataReader<Datum>>(param,
145 Caffe::solver_count(),
146 this->rank_,
147 this->parsers_num_,
148 this->threads_num(),
149 batch_size,
150 false,
151 true,
152 cache,
153 shuffle,
154 this->phase_ == TRAIN);
155 }
156 } else if (!reader_) {
157 reader_ = std::make_shared<DataReader<Datum>>(param,
158 Caffe::solver_count(),
159 this->rank_,
160 this->parsers_num_,
161 this->threads_num(),
162 batch_size,
163 false,
164 false,
165 cache,
166 shuffle,
167 this->phase_ == TRAIN);
168 start_reading();
169 }
170 // Read a data point, and use it to initialize the top blob.
171 shared_ptr<Datum> sample_datum = sample_only_ ? sample_reader_->sample() : reader_->sample();
172 datum_encoded_ = sample_datum->encoded();
173 this->ResizeQueues();
174 init_offsets();
176 // Reshape top[0] and prefetch_data according to the batch_size.
177 // Note: all these reshapings here in load_batch are needed only in case of
178 // different datum shapes coming from database.
179 Packing packing = NHWC; // OpenCV
180 vector<int> top_shape = this->bdt(0)->Transform(sample_datum.get(), nullptr, 0, packing);
181 top_shape[0] = batch_size;
182 top[0]->Reshape(top_shape);
184 if (this->is_gpu_transform()) {
185 CHECK(Caffe::mode() == Caffe::GPU);
186 LOG(INFO) << this->print_current_device() << " Transform on GPU enabled";
187 tmp_gpu_buffer_.resize(this->threads_num());
188 for (int i = 0; i < this->tmp_gpu_buffer_.size(); ++i) {
189 this->tmp_gpu_buffer_[i] = make_shared<GPUMemory::Workspace>();
190 }
191 }
192 // label
193 vector<int> label_shape(1, batch_size);
194 if (this->output_labels_) {
195 vector<int> label_shape(1, batch_size);
196 top[1]->Reshape(label_shape);
197 }
198 this->batch_transformer_->reshape(top_shape, label_shape, this->is_gpu_transform());
199 LOG(INFO) << this->print_current_device() << " Output data size: "
200 << top[0]->num() << ", "
201 << top[0]->channels() << ", "
202 << top[0]->height() << ", "
203 << top[0]->width();
204 }
206 template<typename Ftype, typename Btype>
207 void DataLayer<Ftype, Btype>::load_batch(Batch* batch, int thread_id, size_t queue_id) {
208 const bool sample_only = sample_only_.load();
209 // Reshape according to the first datum of each batch
210 // on single input batches allows for inputs of varying dimension.
211 const int batch_size = this->layer_param_.data_param().batch_size();
213 const size_t qid = sample_only ? 0UL : queue_id;
214 DataReader<Datum>* reader = sample_only ? sample_reader_.get() : reader_.get();
215 shared_ptr<Datum> init_datum = reader->full_peek(qid);
216 CHECK(init_datum);
217 const bool use_gpu_transform = this->is_gpu_transform();
218 Packing packing = NHWC; // OpenCV
219 // Use data_transformer to infer the expected blob shape from datum.
220 vector<int> top_shape =
221 this->bdt(thread_id)->Transform(init_datum.get(), nullptr, 0, packing);
222 // Reshape batch according to the batch_size.
223 top_shape[0] = batch_size;
224 if (top_shape != batch->data_->shape()) {
225 batch->data_->Reshape(top_shape);
226 }
227 int init_datum_height = init_datum->height();
228 int init_datum_width = init_datum->width();
229 const int color_mode = this->transform_param_.force_color() ?
230 1 : (this->transform_param_.force_gray() ? -1 : 0);
231 size_t datum_sizeof_element = 0UL;
232 int datum_len = top_shape[1] * top_shape[2] * top_shape[3];
233 size_t datum_size = 0UL;
234 const char *src_ptr = nullptr;
235 vector<char> src_buf;
236 cv::Mat img;
237 if (use_gpu_transform) {
238 if (init_datum->encoded()) {
239 DecodeDatumToCVMat(*init_datum, color_mode, img, false, false);
240 datum_len = img.channels() * img.rows * img.cols;
241 datum_sizeof_element = sizeof(char);
242 init_datum_height = img.rows;
243 init_datum_width = img.cols;
244 } else {
245 datum_len = init_datum->channels() * init_datum->height() * init_datum->width();
246 CHECK_GT(datum_len, 0);
247 const string &datum_data = init_datum->data();
248 if (datum_data.empty()) {
249 CHECK_LE(sizeof(float), sizeof(Ftype));
250 datum_sizeof_element = sizeof(float);
251 } else {
252 CHECK_LE(sizeof(uint8_t), sizeof(Ftype));
253 CHECK_EQ(datum_len, datum_data.size());
254 datum_sizeof_element = sizeof(uint8_t);
255 }
256 }
258 vector<int> random_vec_shape(1, batch_size * 3);
259 random_vectors_[thread_id]->Reshape(random_vec_shape);
260 datum_size = datum_len * datum_sizeof_element;
261 src_buf.resize(datum_size);
262 }
263 if (this->output_labels_) {
264 batch->label_->Reshape(vector<int>(1, batch_size));
265 }
266 Ftype* top_label = this->output_labels_ ?
267 batch->label_->template mutable_cpu_data_c<Ftype>(false) : nullptr;
269 void* dst_gptr = nullptr;
270 Btype* dst_cptr = nullptr;
271 if (use_gpu_transform) {
272 size_t buffer_size = top_shape[0] * top_shape[1] * init_datum_height * init_datum_width;
273 tmp_gpu_buffer_[thread_id]->safe_reserve(buffer_size);
274 dst_gptr = tmp_gpu_buffer_[thread_id]->data();
275 } else {
276 dst_cptr = batch->data_->template mutable_cpu_data_c<Btype>(false);
277 }
279 size_t current_batch_id = 0UL;
280 const size_t buf_len = batch->data_->offset(1);
281 for (size_t entry = 0; entry < batch_size; ++entry) {
282 shared_ptr<Datum> datum = reader->full_pop(qid, "Waiting for datum");
283 size_t item_id = datum->record_id() % batch_size;
284 if (item_id == 0UL) {
285 current_batch_id = datum->record_id() / batch_size;
286 }
287 // Copy label.
288 if (top_label != nullptr) {
289 top_label[item_id] = datum->label();
290 }
292 if (use_gpu_transform) {
293 cudaStream_t stream = Caffe::thread_stream(Caffe::GPU_TRANSF_GROUP);
294 if (datum->encoded()) {
295 DecodeDatumToSignedBuf(*datum, color_mode, src_buf.data(), datum_size, false);
296 } else {
297 CHECK_EQ(datum_len, datum->channels() * datum->height() * datum->width())
298 << "Datum size can't vary in the same batch";
299 src_ptr = datum->data().size() > 0 ?
300 &datum->data().front() :
301 reinterpret_cast<const char*>(&datum->float_data().Get(0));
302 // NOLINT_NEXT_LINE(caffe/alt_fn)
303 std::memcpy(src_buf.data(), src_ptr, datum_size);
304 }
305 CUDA_CHECK(cudaMemcpyAsync(static_cast<char*>(dst_gptr) + item_id * datum_size,
306 src_buf.data(), datum_size, cudaMemcpyHostToDevice, stream));
307 CUDA_CHECK(cudaStreamSynchronize(stream));
308 this->bdt(thread_id)->Fill3Randoms(&random_vectors_[thread_id]->
309 mutable_cpu_data()[item_id * 3]);
310 } else {
311 // Get data offset for this datum to hand off to transform thread
312 const size_t offset = batch->data_->offset(item_id);
313 CHECK_EQ(0, offset % buf_len);
314 #if defined(USE_CUDNN)
315 vector<int> shape = this->bdt(thread_id)->Transform(datum.get(), dst_cptr + offset,
316 buf_len, packing, false);
317 #else
318 vector<Btype> tmp(top_shape[1] * top_shape[2] * top_shape[3]);
319 CHECK_EQ(buf_len, tmp.size());
320 vector<int> shape = this->bdt(thread_id)->Transform(datum.get(), tmp.data(), buf_len,
321 packing, false);
322 if (packing == NHWC) {
323 hwc2chw(top_shape[1], top_shape[3], top_shape[2], tmp.data(), dst_cptr + offset);
324 packing = NCHW;
325 } else {
326 // NOLINT_NEXT_LINE(caffe/alt_fn)
327 memcpy(dst_cptr + offset, tmp.data(), buf_len * sizeof(Btype));
328 }
329 #endif
330 CHECK_EQ(top_shape[1], shape[1]) << "Number of channels can't vary in the same batch";
331 CHECK_EQ(top_shape[2], shape[2]) << "Image height can't vary in the same batch";
332 CHECK_EQ(top_shape[3], shape[3]) << "Image width can't vary in the same batch";
333 }
334 reader->free_push(qid, datum);
335 }
337 if (use_gpu_transform) {
338 this->fdt(thread_id)->TransformGPU(top_shape[0], top_shape[1],
339 init_datum_height, // non-crop
340 init_datum_width, // non-crop
341 datum_sizeof_element,
342 dst_gptr,
343 batch->data_->template mutable_gpu_data_c<Ftype>(false),
344 random_vectors_[thread_id]->gpu_data(), true);
345 packing = NCHW;
346 }
348 batch->set_data_packing(packing);
349 batch->set_id(current_batch_id);
350 sample_only_.store(false);
351 }
353 INSTANTIATE_CLASS_FB(DataLayer);
354 REGISTER_LAYER_CLASS_R(Data);
356 } // namespace caffe