]> Gitweb @ Texas Instruments - Open Source Git Repositories - git.TI.com/gitweb - jacinto-ai/caffe-jacinto.git/commitdiff
data layer: using pthread
authorYangqing Jia <jiayq84@gmail.com>
Fri, 4 Oct 2013 04:01:52 +0000 (21:01 -0700)
committerYangqing Jia <jiayq84@gmail.com>
Fri, 4 Oct 2013 04:01:52 +0000 (21:01 -0700)
src/Makefile
src/caffe/layers/data_layer.cpp
src/caffe/net.cpp
src/caffe/net.hpp
src/caffe/optimization/solver.cpp
src/caffe/vision_layers.hpp
src/programs/convert_dataset.cpp

index 31d225e946c8e18572b4c9070b0f3d623a4bcfb3..1b04a459a88058d005b33927ba08bfb12ec5301d 100644 (file)
@@ -39,7 +39,7 @@ MKL_LIB_DIR := $(MKL_DIR)/lib $(MKL_DIR)/lib/intel64
 INCLUDE_DIRS := . /usr/local/include $(CUDA_INCLUDE_DIR) $(MKL_INCLUDE_DIR)
 LIBRARY_DIRS := . /usr/local/lib $(CUDA_LIB_DIR) $(MKL_LIB_DIR)
 LIBRARIES := cuda cudart cublas protobuf glog mkl_rt mkl_intel_thread curand \
-               leveldb snappy opencv_core opencv_highgui
+               leveldb snappy opencv_core opencv_highgui pthread
 WARNINGS := -Wall
 
 CXXFLAGS += -fPIC $(foreach includedir,$(INCLUDE_DIRS),-I$(includedir))
@@ -64,10 +64,10 @@ runtest: test
        for testbin in $(TEST_BINS); do $$testbin; done
 
 $(TEST_BINS): %.testbin : %.o
-       $(CXX) $< $(OBJS) $(GTEST_OBJ) -o $@ $(LDFLAGS) $(WARNINGS)
+       $(CXX) -pthread $< $(OBJS) $(GTEST_OBJ) -o $@ $(LDFLAGS) $(WARNINGS)
 
 $(PROGRAM_BINS): %.bin : %.o
-       $(CXX) $< $(OBJS) -o $@ $(LDFLAGS) $(WARNINGS)
+       $(CXX) -pthread $< $(OBJS) -o $@ $(LDFLAGS) $(WARNINGS)
 
 $(NAME): $(PROTO_GEN_CC) $(OBJS)
        $(LINK) -shared $(OBJS) -o $(NAME)
index 40e91de84c0bc7abd4efdb01f14a72225fdef836..5b957701ce7d9904e1ec42df0d705a19d10498d2 100644 (file)
@@ -2,6 +2,7 @@
 
 #include <stdint.h>
 #include <leveldb/db.h>
+#include <pthread.h>
 
 #include <string>
 #include <vector>
@@ -13,6 +14,61 @@ using std::string;
 
 namespace caffe {
 
+template <typename Dtype>
+void* DataLayerPrefetch(void* layer_pointer) {
+  DataLayer<Dtype>* layer = reinterpret_cast<DataLayer<Dtype>*>(layer_pointer);
+  Datum datum;
+  Dtype* top_data = layer->prefetch_data_->mutable_cpu_data();
+  Dtype* top_label = layer->prefetch_label_->mutable_cpu_data();
+  const Dtype scale = layer->layer_param_.scale();
+  const Dtype subtraction = layer->layer_param_.subtraction();
+  const int batchsize = layer->layer_param_.batchsize();
+  const int cropsize = layer->layer_param_.cropsize();
+  for (int itemid = 0; itemid < batchsize; ++itemid) {
+    // get a blob
+    datum.ParseFromString(layer->iter_->value().ToString());
+    const string& data = datum.data();
+    if (cropsize) {
+      CHECK(data.size()) << "Image cropping only support uint8 data";
+      int h_offset = rand() % (layer->datum_height_ - cropsize);
+      int w_offset = rand() % (layer->datum_width_ - cropsize);
+      for (int c = 0; c < layer->datum_channels_; ++c) {
+        for (int h = 0; h < cropsize; ++h) {
+          for (int w = 0; w < cropsize; ++w) {
+            top_data[((itemid * layer->datum_channels_ + c) * cropsize + h) * cropsize + w] =
+                static_cast<Dtype>((uint8_t)data[
+                    (c * layer->datum_height_ + h + h_offset) * layer->datum_width_
+                    + w + w_offset]
+                ) * scale - subtraction;
+          }
+        }
+      }
+    } else {
+      // we will prefer to use data() first, and then try float_data()
+      if (data.size()) {
+        for (int j = 0; j < layer->datum_size_; ++j) {
+          top_data[itemid * layer->datum_size_ + j] =
+              (static_cast<Dtype>((uint8_t)data[j]) * scale) - subtraction;
+        }
+      } else {
+        for (int j = 0; j < layer->datum_size_; ++j) {
+          top_data[itemid * layer->datum_size_ + j] =
+              (datum.float_data(j) * scale) - subtraction;
+        }
+      }
+    }
+    top_label[itemid] = datum.label();
+    // go to the next iter
+    layer->iter_->Next();
+    if (!layer->iter_->Valid()) {
+      // We have reached the end. Restart from the first.
+      LOG(INFO) << "Restarting data read from start.";
+      layer->iter_->SeekToFirst();
+    }
+  }
+}
+
+
 template <typename Dtype>
 void DataLayer<Dtype>::SetUp(const vector<Blob<Dtype>*>& bottom,
       vector<Blob<Dtype>*>* top) {
@@ -38,16 +94,23 @@ void DataLayer<Dtype>::SetUp(const vector<Blob<Dtype>*>& bottom,
   if (cropsize > 0) {
     (*top)[0]->Reshape(
         this->layer_param_.batchsize(), datum.channels(), cropsize, cropsize);
+    prefetch_data_.reset(new Blob<Dtype>(
+        this->layer_param_.batchsize(), datum.channels(), cropsize, cropsize));
   } else {
     (*top)[0]->Reshape(
         this->layer_param_.batchsize(), datum.channels(), datum.height(),
         datum.width());
+    prefetch_data_.reset(new Blob<Dtype>(
+        this->layer_param_.batchsize(), datum.channels(), datum.height(),
+        datum.width()));
   }
   LOG(INFO) << "output data size: " << (*top)[0]->num() << ","
       << (*top)[0]->channels() << "," << (*top)[0]->height() << ","
       << (*top)[0]->width();
   // label
   (*top)[1]->Reshape(this->layer_param_.batchsize(), 1, 1, 1);
+  prefetch_label_.reset(
+      new Blob<Dtype>(this->layer_param_.batchsize(), 1, 1, 1));
   // datum size
   datum_channels_ = datum.channels();
   datum_height_ = datum.height();
@@ -55,71 +118,41 @@ void DataLayer<Dtype>::SetUp(const vector<Blob<Dtype>*>& bottom,
   datum_size_ = datum.channels() * datum.height() * datum.width();
   CHECK_GT(datum_height_, cropsize);
   CHECK_GT(datum_width_, cropsize);
+  // Now, start the prefetch thread.
+  //LOG(INFO) << "Initializing prefetch";
+  CHECK(!pthread_create(&thread_, NULL, DataLayerPrefetch<Dtype>, (void*)this))
+      << "Pthread execution failed.";
+  //LOG(INFO) << "Prefetch initialized.";
 }
 
 template <typename Dtype>
 void DataLayer<Dtype>::Forward_cpu(const vector<Blob<Dtype>*>& bottom,
       vector<Blob<Dtype>*>* top) {
-  Datum datum;
-  Dtype* top_data = (*top)[0]->mutable_cpu_data();
-  Dtype* top_label = (*top)[1]->mutable_cpu_data();
-  const Dtype scale = this->layer_param_.scale();
-  const Dtype subtraction = this->layer_param_.subtraction();
-  int cropsize = this->layer_param_.cropsize();
-  for (int itemid = 0; itemid < (*top)[0]->num(); ++itemid) {
-    // get a blob
-    datum.ParseFromString(iter_->value().ToString());
-    const string& data = datum.data();
-    if (cropsize) {
-      CHECK(data.size()) << "Image cropping only support uint8 data";
-      int h_offset = rand() % (datum_height_ - cropsize);
-      int w_offset = rand() % (datum_width_ - cropsize);
-      for (int c = 0; c < datum_channels_; ++c) {
-        for (int h = 0; h < cropsize; ++h) {
-          for (int w = 0; w < cropsize; ++w) {
-            top_data[((itemid * datum_channels_ + c) * cropsize + h) * cropsize + w] =
-                static_cast<Dtype>((uint8_t)data[
-                    (c * datum_height_ + h + h_offset) * datum_width_
-                    + w + w_offset]
-                ) * scale - subtraction;
-          }
-        }
-      }
-    } else {
-      // we will prefer to use data() first, and then try float_data()
-      if (data.size()) {
-        for (int j = 0; j < datum_size_; ++j) {
-          top_data[itemid * datum_size_ + j] =
-              (static_cast<Dtype>((uint8_t)data[j]) * scale) - subtraction;
-        }
-      } else {
-        for (int j = 0; j < datum_size_; ++j) {
-          top_data[itemid * datum_size_ + j] =
-              (datum.float_data(j) * scale) - subtraction;
-        }
-      }
-    }
-    top_label[itemid] = datum.label();
-    // go to the next iter
-    iter_->Next();
-    if (!iter_->Valid()) {
-      // We have reached the end. Restart from the first.
-      LOG(INFO) << "Restarting data read from start.";
-      iter_->SeekToFirst();
-    }
-  }
+  // First, join the thread
+  CHECK(!pthread_join(thread_, NULL)) << "Pthread joining failed.";
+  // Copy the data
+  memcpy((*top)[0]->mutable_cpu_data(), prefetch_data_->cpu_data(),
+      sizeof(Dtype) * prefetch_data_->count());
+  memcpy((*top)[1]->mutable_cpu_data(), prefetch_label_->cpu_data(),
+      sizeof(Dtype) * prefetch_label_->count());
+  // Start a new prefetch thread
+  CHECK(!pthread_create(&thread_, NULL, DataLayerPrefetch<Dtype>, (void*)this))
+      << "Pthread execution failed.";
 }
 
 template <typename Dtype>
 void DataLayer<Dtype>::Forward_gpu(const vector<Blob<Dtype>*>& bottom,
       vector<Blob<Dtype>*>* top) {
-  Forward_cpu(bottom, top);
-  // explicitly copy data to gpu - this is achieved by simply calling gpu_data
-  // functions.
-  // TODO(Yangqing): maybe we don't need this since data synchronization is
-  // simply done under the hood?
-  (*top)[0]->gpu_data();
-  (*top)[1]->gpu_data();
+  // First, join the thread
+  CHECK(!pthread_join(thread_, NULL)) << "Pthread joining failed.";
+  // Copy the data
+  CUDA_CHECK(cudaMemcpy((*top)[0]->mutable_gpu_data(), prefetch_data_->cpu_data(),
+      sizeof(Dtype) * prefetch_data_->count(), cudaMemcpyHostToDevice));
+  CUDA_CHECK(cudaMemcpy((*top)[1]->mutable_gpu_data(), prefetch_label_->cpu_data(),
+      sizeof(Dtype) * prefetch_label_->count(), cudaMemcpyHostToDevice));
+  // Start a new prefetch thread
+  CHECK(!pthread_create(&thread_, NULL, DataLayerPrefetch<Dtype>, (void*)this))
+      << "Pthread execution failed.";
 }
 
 // The backward operations are dummy - they do not carry any computation.
index 2dd9b56422a9e8daadaa17a0507b1f3d74000778..c0ccbb1a92bf642ef0774144490b267d8e568d4f 100644 (file)
@@ -106,6 +106,7 @@ const vector<Blob<Dtype>*>& Net<Dtype>::Forward(
     blobs_[net_input_blob_indices_[i]]->CopyFrom(*bottom[i]);
   }
   for (int i = 0; i < layers_.size(); ++i) {
+    //LOG(ERROR) << "Forwarding " << layer_names_[i];
     layers_[i]->Forward(bottom_vecs_[i], &top_vecs_[i]);
   }
   return net_output_blobs_;
index 3ac5fb81382832cb39ad24e00f112bfc1b97cccd..4b24c23a80ee4de82d271fce2c275a7ecd233e52 100644 (file)
@@ -30,7 +30,7 @@ class Net {
   // been provided during the forward pass.
   Dtype Backward();
 
-  Dtype ForwardBackWard(const vector<Blob<Dtype>* > & bottom) {
+  Dtype ForwardBackward(const vector<Blob<Dtype>* > & bottom) {
     Forward(bottom);
     return Backward();
   }
index 2df872ed841d48d9310820cdce458dd0c5b5ee19..a48408c6b7bf16955ca9d8ab1d3b8f4c5827c4db 100644 (file)
@@ -25,7 +25,7 @@ void Solver<Dtype>::Solve(Net<Dtype>* net) {
   // should be given, and we will just provide dummy vecs.
   vector<Blob<Dtype>*> bottom_vec;
   while (iter_++ < param_.max_iter()) {
-    Dtype loss = net_->ForwardBackWard(bottom_vec);
+    Dtype loss = net_->ForwardBackward(bottom_vec);
     ComputeUpdateValue();
     net_->Update();
 
index 23678bf47e434ecd929147c2c0c7b801cd6836c0..6f943a8e92caf15775e0848fc0a2e0ed8a6feed9 100644 (file)
@@ -4,6 +4,7 @@
 #define CAFFE_VISION_LAYERS_HPP_
 
 #include <leveldb/db.h>
+#include <pthread.h>
 
 #include <vector>
 
@@ -232,8 +233,14 @@ class ConvolutionLayer : public Layer<Dtype> {
   int N_;
 };
 
+template <typename Dtype>
+void* DataLayerPrefetch(void* layer_pointer);
+
 template <typename Dtype>
 class DataLayer : public Layer<Dtype> {
+  // The function used to perform prefetching.
+  friend void* DataLayerPrefetch<Dtype>(void*);
+
  public:
   explicit DataLayer(const LayerParameter& param)
       : Layer<Dtype>(param) {}
@@ -256,6 +263,9 @@ class DataLayer : public Layer<Dtype> {
   int datum_height_;
   int datum_width_;
   int datum_size_;
+  pthread_t thread_;
+  shared_ptr<Blob<Dtype> > prefetch_data_;
+  shared_ptr<Blob<Dtype> > prefetch_label_;
 };
 
 
index cb48cb344b30153900bba614c1f1cf09271f16b2..53a1e29aa1a4e944aa5d9675797586fb88842ddc 100644 (file)
@@ -11,6 +11,7 @@
 
 #include <glog/logging.h>
 #include <leveldb/db.h>
+#include <leveldb/write_batch.h>
 
 #include <string>
 #include <iostream>
@@ -40,6 +41,8 @@ int main(int argc, char** argv) {
   leveldb::Options options;
   options.error_if_exists = true;
   options.create_if_missing = true;
+  options.create_if_missing = true;
+  options.write_buffer_size = 268435456;
   LOG(INFO) << "Opening leveldb " << argv[3];
   leveldb::Status status = leveldb::DB::Open(
       options, argv[3], &db);
@@ -51,6 +54,7 @@ int main(int argc, char** argv) {
   Datum datum;
   int count = 0;
   char key_cstr[100];
+  leveldb::WriteBatch* batch = new leveldb::WriteBatch();
   while (infile >> filename >> label) {
     ReadImageToDatum(root_folder + filename, label, &datum);
     sprintf(key_cstr, "%08d_%s", count, filename.c_str());
@@ -58,9 +62,12 @@ int main(int argc, char** argv) {
     string value;
     // get the value
     datum.SerializeToString(&value);
-    db->Put(leveldb::WriteOptions(), key, value);
+    batch->Put(key, value);
     if (++count % 1000 == 0) {
+      db->Write(leveldb::WriteOptions(), batch);
       LOG(ERROR) << "Processed " << count << " files.";
+      delete batch;
+      batch = new leveldb::WriteBatch();
     }
   }