From febae548d16d90e43519c13999689e8e94c98ecf Mon Sep 17 00:00:00 2001 From: Yangqing Jia Date: Thu, 3 Oct 2013 21:01:52 -0700 Subject: [PATCH] data layer: using pthread --- src/Makefile | 6 +- src/caffe/layers/data_layer.cpp | 143 ++++++++++++++++++------------ src/caffe/net.cpp | 1 + src/caffe/net.hpp | 2 +- src/caffe/optimization/solver.cpp | 2 +- src/caffe/vision_layers.hpp | 10 +++ src/programs/convert_dataset.cpp | 9 +- 7 files changed, 112 insertions(+), 61 deletions(-) diff --git a/src/Makefile b/src/Makefile index 31d225e9..1b04a459 100644 --- a/src/Makefile +++ b/src/Makefile @@ -39,7 +39,7 @@ MKL_LIB_DIR := $(MKL_DIR)/lib $(MKL_DIR)/lib/intel64 INCLUDE_DIRS := . /usr/local/include $(CUDA_INCLUDE_DIR) $(MKL_INCLUDE_DIR) LIBRARY_DIRS := . /usr/local/lib $(CUDA_LIB_DIR) $(MKL_LIB_DIR) LIBRARIES := cuda cudart cublas protobuf glog mkl_rt mkl_intel_thread curand \ - leveldb snappy opencv_core opencv_highgui + leveldb snappy opencv_core opencv_highgui pthread WARNINGS := -Wall CXXFLAGS += -fPIC $(foreach includedir,$(INCLUDE_DIRS),-I$(includedir)) @@ -64,10 +64,10 @@ runtest: test for testbin in $(TEST_BINS); do $$testbin; done $(TEST_BINS): %.testbin : %.o - $(CXX) $< $(OBJS) $(GTEST_OBJ) -o $@ $(LDFLAGS) $(WARNINGS) + $(CXX) -pthread $< $(OBJS) $(GTEST_OBJ) -o $@ $(LDFLAGS) $(WARNINGS) $(PROGRAM_BINS): %.bin : %.o - $(CXX) $< $(OBJS) -o $@ $(LDFLAGS) $(WARNINGS) + $(CXX) -pthread $< $(OBJS) -o $@ $(LDFLAGS) $(WARNINGS) $(NAME): $(PROTO_GEN_CC) $(OBJS) $(LINK) -shared $(OBJS) -o $(NAME) diff --git a/src/caffe/layers/data_layer.cpp b/src/caffe/layers/data_layer.cpp index 40e91de8..5b957701 100644 --- a/src/caffe/layers/data_layer.cpp +++ b/src/caffe/layers/data_layer.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -13,6 +14,61 @@ using std::string; namespace caffe { +template +void* DataLayerPrefetch(void* layer_pointer) { + DataLayer* layer = reinterpret_cast*>(layer_pointer); + Datum datum; + Dtype* top_data = layer->prefetch_data_->mutable_cpu_data(); + Dtype* top_label = layer->prefetch_label_->mutable_cpu_data(); + const Dtype scale = layer->layer_param_.scale(); + const Dtype subtraction = layer->layer_param_.subtraction(); + const int batchsize = layer->layer_param_.batchsize(); + const int cropsize = layer->layer_param_.cropsize(); + for (int itemid = 0; itemid < batchsize; ++itemid) { + // get a blob + datum.ParseFromString(layer->iter_->value().ToString()); + const string& data = datum.data(); + if (cropsize) { + CHECK(data.size()) << "Image cropping only support uint8 data"; + int h_offset = rand() % (layer->datum_height_ - cropsize); + int w_offset = rand() % (layer->datum_width_ - cropsize); + for (int c = 0; c < layer->datum_channels_; ++c) { + for (int h = 0; h < cropsize; ++h) { + for (int w = 0; w < cropsize; ++w) { + top_data[((itemid * layer->datum_channels_ + c) * cropsize + h) * cropsize + w] = + static_cast((uint8_t)data[ + (c * layer->datum_height_ + h + h_offset) * layer->datum_width_ + + w + w_offset] + ) * scale - subtraction; + } + } + } + } else { + // we will prefer to use data() first, and then try float_data() + if (data.size()) { + for (int j = 0; j < layer->datum_size_; ++j) { + top_data[itemid * layer->datum_size_ + j] = + (static_cast((uint8_t)data[j]) * scale) - subtraction; + } + } else { + for (int j = 0; j < layer->datum_size_; ++j) { + top_data[itemid * layer->datum_size_ + j] = + (datum.float_data(j) * scale) - subtraction; + } + } + } + top_label[itemid] = datum.label(); + // go to the next iter + layer->iter_->Next(); + if (!layer->iter_->Valid()) { + // We have reached the end. Restart from the first. + LOG(INFO) << "Restarting data read from start."; + layer->iter_->SeekToFirst(); + } + } +} + + template void DataLayer::SetUp(const vector*>& bottom, vector*>* top) { @@ -38,16 +94,23 @@ void DataLayer::SetUp(const vector*>& bottom, if (cropsize > 0) { (*top)[0]->Reshape( this->layer_param_.batchsize(), datum.channels(), cropsize, cropsize); + prefetch_data_.reset(new Blob( + this->layer_param_.batchsize(), datum.channels(), cropsize, cropsize)); } else { (*top)[0]->Reshape( this->layer_param_.batchsize(), datum.channels(), datum.height(), datum.width()); + prefetch_data_.reset(new Blob( + this->layer_param_.batchsize(), datum.channels(), datum.height(), + datum.width())); } LOG(INFO) << "output data size: " << (*top)[0]->num() << "," << (*top)[0]->channels() << "," << (*top)[0]->height() << "," << (*top)[0]->width(); // label (*top)[1]->Reshape(this->layer_param_.batchsize(), 1, 1, 1); + prefetch_label_.reset( + new Blob(this->layer_param_.batchsize(), 1, 1, 1)); // datum size datum_channels_ = datum.channels(); datum_height_ = datum.height(); @@ -55,71 +118,41 @@ void DataLayer::SetUp(const vector*>& bottom, datum_size_ = datum.channels() * datum.height() * datum.width(); CHECK_GT(datum_height_, cropsize); CHECK_GT(datum_width_, cropsize); + // Now, start the prefetch thread. + //LOG(INFO) << "Initializing prefetch"; + CHECK(!pthread_create(&thread_, NULL, DataLayerPrefetch, (void*)this)) + << "Pthread execution failed."; + //LOG(INFO) << "Prefetch initialized."; } template void DataLayer::Forward_cpu(const vector*>& bottom, vector*>* top) { - Datum datum; - Dtype* top_data = (*top)[0]->mutable_cpu_data(); - Dtype* top_label = (*top)[1]->mutable_cpu_data(); - const Dtype scale = this->layer_param_.scale(); - const Dtype subtraction = this->layer_param_.subtraction(); - int cropsize = this->layer_param_.cropsize(); - for (int itemid = 0; itemid < (*top)[0]->num(); ++itemid) { - // get a blob - datum.ParseFromString(iter_->value().ToString()); - const string& data = datum.data(); - if (cropsize) { - CHECK(data.size()) << "Image cropping only support uint8 data"; - int h_offset = rand() % (datum_height_ - cropsize); - int w_offset = rand() % (datum_width_ - cropsize); - for (int c = 0; c < datum_channels_; ++c) { - for (int h = 0; h < cropsize; ++h) { - for (int w = 0; w < cropsize; ++w) { - top_data[((itemid * datum_channels_ + c) * cropsize + h) * cropsize + w] = - static_cast((uint8_t)data[ - (c * datum_height_ + h + h_offset) * datum_width_ - + w + w_offset] - ) * scale - subtraction; - } - } - } - } else { - // we will prefer to use data() first, and then try float_data() - if (data.size()) { - for (int j = 0; j < datum_size_; ++j) { - top_data[itemid * datum_size_ + j] = - (static_cast((uint8_t)data[j]) * scale) - subtraction; - } - } else { - for (int j = 0; j < datum_size_; ++j) { - top_data[itemid * datum_size_ + j] = - (datum.float_data(j) * scale) - subtraction; - } - } - } - top_label[itemid] = datum.label(); - // go to the next iter - iter_->Next(); - if (!iter_->Valid()) { - // We have reached the end. Restart from the first. - LOG(INFO) << "Restarting data read from start."; - iter_->SeekToFirst(); - } - } + // First, join the thread + CHECK(!pthread_join(thread_, NULL)) << "Pthread joining failed."; + // Copy the data + memcpy((*top)[0]->mutable_cpu_data(), prefetch_data_->cpu_data(), + sizeof(Dtype) * prefetch_data_->count()); + memcpy((*top)[1]->mutable_cpu_data(), prefetch_label_->cpu_data(), + sizeof(Dtype) * prefetch_label_->count()); + // Start a new prefetch thread + CHECK(!pthread_create(&thread_, NULL, DataLayerPrefetch, (void*)this)) + << "Pthread execution failed."; } template void DataLayer::Forward_gpu(const vector*>& bottom, vector*>* top) { - Forward_cpu(bottom, top); - // explicitly copy data to gpu - this is achieved by simply calling gpu_data - // functions. - // TODO(Yangqing): maybe we don't need this since data synchronization is - // simply done under the hood? - (*top)[0]->gpu_data(); - (*top)[1]->gpu_data(); + // First, join the thread + CHECK(!pthread_join(thread_, NULL)) << "Pthread joining failed."; + // Copy the data + CUDA_CHECK(cudaMemcpy((*top)[0]->mutable_gpu_data(), prefetch_data_->cpu_data(), + sizeof(Dtype) * prefetch_data_->count(), cudaMemcpyHostToDevice)); + CUDA_CHECK(cudaMemcpy((*top)[1]->mutable_gpu_data(), prefetch_label_->cpu_data(), + sizeof(Dtype) * prefetch_label_->count(), cudaMemcpyHostToDevice)); + // Start a new prefetch thread + CHECK(!pthread_create(&thread_, NULL, DataLayerPrefetch, (void*)this)) + << "Pthread execution failed."; } // The backward operations are dummy - they do not carry any computation. diff --git a/src/caffe/net.cpp b/src/caffe/net.cpp index 2dd9b564..c0ccbb1a 100644 --- a/src/caffe/net.cpp +++ b/src/caffe/net.cpp @@ -106,6 +106,7 @@ const vector*>& Net::Forward( blobs_[net_input_blob_indices_[i]]->CopyFrom(*bottom[i]); } for (int i = 0; i < layers_.size(); ++i) { + //LOG(ERROR) << "Forwarding " << layer_names_[i]; layers_[i]->Forward(bottom_vecs_[i], &top_vecs_[i]); } return net_output_blobs_; diff --git a/src/caffe/net.hpp b/src/caffe/net.hpp index 3ac5fb81..4b24c23a 100644 --- a/src/caffe/net.hpp +++ b/src/caffe/net.hpp @@ -30,7 +30,7 @@ class Net { // been provided during the forward pass. Dtype Backward(); - Dtype ForwardBackWard(const vector* > & bottom) { + Dtype ForwardBackward(const vector* > & bottom) { Forward(bottom); return Backward(); } diff --git a/src/caffe/optimization/solver.cpp b/src/caffe/optimization/solver.cpp index 2df872ed..a48408c6 100644 --- a/src/caffe/optimization/solver.cpp +++ b/src/caffe/optimization/solver.cpp @@ -25,7 +25,7 @@ void Solver::Solve(Net* net) { // should be given, and we will just provide dummy vecs. vector*> bottom_vec; while (iter_++ < param_.max_iter()) { - Dtype loss = net_->ForwardBackWard(bottom_vec); + Dtype loss = net_->ForwardBackward(bottom_vec); ComputeUpdateValue(); net_->Update(); diff --git a/src/caffe/vision_layers.hpp b/src/caffe/vision_layers.hpp index 23678bf4..6f943a8e 100644 --- a/src/caffe/vision_layers.hpp +++ b/src/caffe/vision_layers.hpp @@ -4,6 +4,7 @@ #define CAFFE_VISION_LAYERS_HPP_ #include +#include #include @@ -232,8 +233,14 @@ class ConvolutionLayer : public Layer { int N_; }; +template +void* DataLayerPrefetch(void* layer_pointer); + template class DataLayer : public Layer { + // The function used to perform prefetching. + friend void* DataLayerPrefetch(void*); + public: explicit DataLayer(const LayerParameter& param) : Layer(param) {} @@ -256,6 +263,9 @@ class DataLayer : public Layer { int datum_height_; int datum_width_; int datum_size_; + pthread_t thread_; + shared_ptr > prefetch_data_; + shared_ptr > prefetch_label_; }; diff --git a/src/programs/convert_dataset.cpp b/src/programs/convert_dataset.cpp index cb48cb34..53a1e29a 100644 --- a/src/programs/convert_dataset.cpp +++ b/src/programs/convert_dataset.cpp @@ -11,6 +11,7 @@ #include #include +#include #include #include @@ -40,6 +41,8 @@ int main(int argc, char** argv) { leveldb::Options options; options.error_if_exists = true; options.create_if_missing = true; + options.create_if_missing = true; + options.write_buffer_size = 268435456; LOG(INFO) << "Opening leveldb " << argv[3]; leveldb::Status status = leveldb::DB::Open( options, argv[3], &db); @@ -51,6 +54,7 @@ int main(int argc, char** argv) { Datum datum; int count = 0; char key_cstr[100]; + leveldb::WriteBatch* batch = new leveldb::WriteBatch(); while (infile >> filename >> label) { ReadImageToDatum(root_folder + filename, label, &datum); sprintf(key_cstr, "%08d_%s", count, filename.c_str()); @@ -58,9 +62,12 @@ int main(int argc, char** argv) { string value; // get the value datum.SerializeToString(&value); - db->Put(leveldb::WriteOptions(), key, value); + batch->Put(key, value); if (++count % 1000 == 0) { + db->Write(leveldb::WriteOptions(), batch); LOG(ERROR) << "Processed " << count << " files."; + delete batch; + batch = new leveldb::WriteBatch(); } } -- 2.39.2