summary | shortlog | log | commit | commitdiff | tree
raw | patch | inline | side by side (from parent 1: 5790c8d)
raw | patch | inline | side by side (from parent 1: 5790c8d)
author | Yangqing Jia <jiayq84@gmail.com> | |
Fri, 4 Oct 2013 04:01:52 +0000 (21:01 -0700) | ||
committer | Yangqing Jia <jiayq84@gmail.com> | |
Fri, 4 Oct 2013 04:01:52 +0000 (21:01 -0700) |
diff --git a/src/Makefile b/src/Makefile
index 31d225e946c8e18572b4c9070b0f3d623a4bcfb3..1b04a459a88058d005b33927ba08bfb12ec5301d 100644 (file)
--- a/src/Makefile
+++ b/src/Makefile
INCLUDE_DIRS := . /usr/local/include $(CUDA_INCLUDE_DIR) $(MKL_INCLUDE_DIR)
LIBRARY_DIRS := . /usr/local/lib $(CUDA_LIB_DIR) $(MKL_LIB_DIR)
LIBRARIES := cuda cudart cublas protobuf glog mkl_rt mkl_intel_thread curand \
- leveldb snappy opencv_core opencv_highgui
+ leveldb snappy opencv_core opencv_highgui pthread
WARNINGS := -Wall
CXXFLAGS += -fPIC $(foreach includedir,$(INCLUDE_DIRS),-I$(includedir))
for testbin in $(TEST_BINS); do $$testbin; done
$(TEST_BINS): %.testbin : %.o
- $(CXX) $< $(OBJS) $(GTEST_OBJ) -o $@ $(LDFLAGS) $(WARNINGS)
+ $(CXX) -pthread $< $(OBJS) $(GTEST_OBJ) -o $@ $(LDFLAGS) $(WARNINGS)
$(PROGRAM_BINS): %.bin : %.o
- $(CXX) $< $(OBJS) -o $@ $(LDFLAGS) $(WARNINGS)
+ $(CXX) -pthread $< $(OBJS) -o $@ $(LDFLAGS) $(WARNINGS)
$(NAME): $(PROTO_GEN_CC) $(OBJS)
$(LINK) -shared $(OBJS) -o $(NAME)
index 40e91de84c0bc7abd4efdb01f14a72225fdef836..5b957701ce7d9904e1ec42df0d705a19d10498d2 100644 (file)
#include <stdint.h>
#include <leveldb/db.h>
+#include <pthread.h>
#include <string>
#include <vector>
namespace caffe {
+template <typename Dtype>
+void* DataLayerPrefetch(void* layer_pointer) {
+ DataLayer<Dtype>* layer = reinterpret_cast<DataLayer<Dtype>*>(layer_pointer);
+ Datum datum;
+ Dtype* top_data = layer->prefetch_data_->mutable_cpu_data();
+ Dtype* top_label = layer->prefetch_label_->mutable_cpu_data();
+ const Dtype scale = layer->layer_param_.scale();
+ const Dtype subtraction = layer->layer_param_.subtraction();
+ const int batchsize = layer->layer_param_.batchsize();
+ const int cropsize = layer->layer_param_.cropsize();
+ for (int itemid = 0; itemid < batchsize; ++itemid) {
+ // get a blob
+ datum.ParseFromString(layer->iter_->value().ToString());
+ const string& data = datum.data();
+ if (cropsize) {
+ CHECK(data.size()) << "Image cropping only support uint8 data";
+ int h_offset = rand() % (layer->datum_height_ - cropsize);
+ int w_offset = rand() % (layer->datum_width_ - cropsize);
+ for (int c = 0; c < layer->datum_channels_; ++c) {
+ for (int h = 0; h < cropsize; ++h) {
+ for (int w = 0; w < cropsize; ++w) {
+ top_data[((itemid * layer->datum_channels_ + c) * cropsize + h) * cropsize + w] =
+ static_cast<Dtype>((uint8_t)data[
+ (c * layer->datum_height_ + h + h_offset) * layer->datum_width_
+ + w + w_offset]
+ ) * scale - subtraction;
+ }
+ }
+ }
+ } else {
+ // we will prefer to use data() first, and then try float_data()
+ if (data.size()) {
+ for (int j = 0; j < layer->datum_size_; ++j) {
+ top_data[itemid * layer->datum_size_ + j] =
+ (static_cast<Dtype>((uint8_t)data[j]) * scale) - subtraction;
+ }
+ } else {
+ for (int j = 0; j < layer->datum_size_; ++j) {
+ top_data[itemid * layer->datum_size_ + j] =
+ (datum.float_data(j) * scale) - subtraction;
+ }
+ }
+ }
+ top_label[itemid] = datum.label();
+ // go to the next iter
+ layer->iter_->Next();
+ if (!layer->iter_->Valid()) {
+ // We have reached the end. Restart from the first.
+ LOG(INFO) << "Restarting data read from start.";
+ layer->iter_->SeekToFirst();
+ }
+ }
+}
+
+
template <typename Dtype>
void DataLayer<Dtype>::SetUp(const vector<Blob<Dtype>*>& bottom,
vector<Blob<Dtype>*>* top) {
if (cropsize > 0) {
(*top)[0]->Reshape(
this->layer_param_.batchsize(), datum.channels(), cropsize, cropsize);
+ prefetch_data_.reset(new Blob<Dtype>(
+ this->layer_param_.batchsize(), datum.channels(), cropsize, cropsize));
} else {
(*top)[0]->Reshape(
this->layer_param_.batchsize(), datum.channels(), datum.height(),
datum.width());
+ prefetch_data_.reset(new Blob<Dtype>(
+ this->layer_param_.batchsize(), datum.channels(), datum.height(),
+ datum.width()));
}
LOG(INFO) << "output data size: " << (*top)[0]->num() << ","
<< (*top)[0]->channels() << "," << (*top)[0]->height() << ","
<< (*top)[0]->width();
// label
(*top)[1]->Reshape(this->layer_param_.batchsize(), 1, 1, 1);
+ prefetch_label_.reset(
+ new Blob<Dtype>(this->layer_param_.batchsize(), 1, 1, 1));
// datum size
datum_channels_ = datum.channels();
datum_height_ = datum.height();
datum_size_ = datum.channels() * datum.height() * datum.width();
CHECK_GT(datum_height_, cropsize);
CHECK_GT(datum_width_, cropsize);
+ // Now, start the prefetch thread.
+ //LOG(INFO) << "Initializing prefetch";
+ CHECK(!pthread_create(&thread_, NULL, DataLayerPrefetch<Dtype>, (void*)this))
+ << "Pthread execution failed.";
+ //LOG(INFO) << "Prefetch initialized.";
}
template <typename Dtype>
void DataLayer<Dtype>::Forward_cpu(const vector<Blob<Dtype>*>& bottom,
vector<Blob<Dtype>*>* top) {
- Datum datum;
- Dtype* top_data = (*top)[0]->mutable_cpu_data();
- Dtype* top_label = (*top)[1]->mutable_cpu_data();
- const Dtype scale = this->layer_param_.scale();
- const Dtype subtraction = this->layer_param_.subtraction();
- int cropsize = this->layer_param_.cropsize();
- for (int itemid = 0; itemid < (*top)[0]->num(); ++itemid) {
- // get a blob
- datum.ParseFromString(iter_->value().ToString());
- const string& data = datum.data();
- if (cropsize) {
- CHECK(data.size()) << "Image cropping only support uint8 data";
- int h_offset = rand() % (datum_height_ - cropsize);
- int w_offset = rand() % (datum_width_ - cropsize);
- for (int c = 0; c < datum_channels_; ++c) {
- for (int h = 0; h < cropsize; ++h) {
- for (int w = 0; w < cropsize; ++w) {
- top_data[((itemid * datum_channels_ + c) * cropsize + h) * cropsize + w] =
- static_cast<Dtype>((uint8_t)data[
- (c * datum_height_ + h + h_offset) * datum_width_
- + w + w_offset]
- ) * scale - subtraction;
- }
- }
- }
- } else {
- // we will prefer to use data() first, and then try float_data()
- if (data.size()) {
- for (int j = 0; j < datum_size_; ++j) {
- top_data[itemid * datum_size_ + j] =
- (static_cast<Dtype>((uint8_t)data[j]) * scale) - subtraction;
- }
- } else {
- for (int j = 0; j < datum_size_; ++j) {
- top_data[itemid * datum_size_ + j] =
- (datum.float_data(j) * scale) - subtraction;
- }
- }
- }
- top_label[itemid] = datum.label();
- // go to the next iter
- iter_->Next();
- if (!iter_->Valid()) {
- // We have reached the end. Restart from the first.
- LOG(INFO) << "Restarting data read from start.";
- iter_->SeekToFirst();
- }
- }
+ // First, join the thread
+ CHECK(!pthread_join(thread_, NULL)) << "Pthread joining failed.";
+ // Copy the data
+ memcpy((*top)[0]->mutable_cpu_data(), prefetch_data_->cpu_data(),
+ sizeof(Dtype) * prefetch_data_->count());
+ memcpy((*top)[1]->mutable_cpu_data(), prefetch_label_->cpu_data(),
+ sizeof(Dtype) * prefetch_label_->count());
+ // Start a new prefetch thread
+ CHECK(!pthread_create(&thread_, NULL, DataLayerPrefetch<Dtype>, (void*)this))
+ << "Pthread execution failed.";
}
template <typename Dtype>
void DataLayer<Dtype>::Forward_gpu(const vector<Blob<Dtype>*>& bottom,
vector<Blob<Dtype>*>* top) {
- Forward_cpu(bottom, top);
- // explicitly copy data to gpu - this is achieved by simply calling gpu_data
- // functions.
- // TODO(Yangqing): maybe we don't need this since data synchronization is
- // simply done under the hood?
- (*top)[0]->gpu_data();
- (*top)[1]->gpu_data();
+ // First, join the thread
+ CHECK(!pthread_join(thread_, NULL)) << "Pthread joining failed.";
+ // Copy the data
+ CUDA_CHECK(cudaMemcpy((*top)[0]->mutable_gpu_data(), prefetch_data_->cpu_data(),
+ sizeof(Dtype) * prefetch_data_->count(), cudaMemcpyHostToDevice));
+ CUDA_CHECK(cudaMemcpy((*top)[1]->mutable_gpu_data(), prefetch_label_->cpu_data(),
+ sizeof(Dtype) * prefetch_label_->count(), cudaMemcpyHostToDevice));
+ // Start a new prefetch thread
+ CHECK(!pthread_create(&thread_, NULL, DataLayerPrefetch<Dtype>, (void*)this))
+ << "Pthread execution failed.";
}
// The backward operations are dummy - they do not carry any computation.
diff --git a/src/caffe/net.cpp b/src/caffe/net.cpp
index 2dd9b56422a9e8daadaa17a0507b1f3d74000778..c0ccbb1a92bf642ef0774144490b267d8e568d4f 100644 (file)
--- a/src/caffe/net.cpp
+++ b/src/caffe/net.cpp
blobs_[net_input_blob_indices_[i]]->CopyFrom(*bottom[i]);
}
for (int i = 0; i < layers_.size(); ++i) {
+ //LOG(ERROR) << "Forwarding " << layer_names_[i];
layers_[i]->Forward(bottom_vecs_[i], &top_vecs_[i]);
}
return net_output_blobs_;
diff --git a/src/caffe/net.hpp b/src/caffe/net.hpp
index 3ac5fb81382832cb39ad24e00f112bfc1b97cccd..4b24c23a80ee4de82d271fce2c275a7ecd233e52 100644 (file)
--- a/src/caffe/net.hpp
+++ b/src/caffe/net.hpp
// been provided during the forward pass.
Dtype Backward();
- Dtype ForwardBackWard(const vector<Blob<Dtype>* > & bottom) {
+ Dtype ForwardBackward(const vector<Blob<Dtype>* > & bottom) {
Forward(bottom);
return Backward();
}
index 2df872ed841d48d9310820cdce458dd0c5b5ee19..a48408c6b7bf16955ca9d8ab1d3b8f4c5827c4db 100644 (file)
// should be given, and we will just provide dummy vecs.
vector<Blob<Dtype>*> bottom_vec;
while (iter_++ < param_.max_iter()) {
- Dtype loss = net_->ForwardBackWard(bottom_vec);
+ Dtype loss = net_->ForwardBackward(bottom_vec);
ComputeUpdateValue();
net_->Update();
index 23678bf47e434ecd929147c2c0c7b801cd6836c0..6f943a8e92caf15775e0848fc0a2e0ed8a6feed9 100644 (file)
#define CAFFE_VISION_LAYERS_HPP_
#include <leveldb/db.h>
+#include <pthread.h>
#include <vector>
int N_;
};
+template <typename Dtype>
+void* DataLayerPrefetch(void* layer_pointer);
+
template <typename Dtype>
class DataLayer : public Layer<Dtype> {
+ // The function used to perform prefetching.
+ friend void* DataLayerPrefetch<Dtype>(void*);
+
public:
explicit DataLayer(const LayerParameter& param)
: Layer<Dtype>(param) {}
int datum_height_;
int datum_width_;
int datum_size_;
+ pthread_t thread_;
+ shared_ptr<Blob<Dtype> > prefetch_data_;
+ shared_ptr<Blob<Dtype> > prefetch_label_;
};
index cb48cb344b30153900bba614c1f1cf09271f16b2..53a1e29aa1a4e944aa5d9675797586fb88842ddc 100644 (file)
#include <glog/logging.h>
#include <leveldb/db.h>
+#include <leveldb/write_batch.h>
#include <string>
#include <iostream>
leveldb::Options options;
options.error_if_exists = true;
options.create_if_missing = true;
+ options.create_if_missing = true;
+ options.write_buffer_size = 268435456;
LOG(INFO) << "Opening leveldb " << argv[3];
leveldb::Status status = leveldb::DB::Open(
options, argv[3], &db);
Datum datum;
int count = 0;
char key_cstr[100];
+ leveldb::WriteBatch* batch = new leveldb::WriteBatch();
while (infile >> filename >> label) {
ReadImageToDatum(root_folder + filename, label, &datum);
sprintf(key_cstr, "%08d_%s", count, filename.c_str());
string value;
// get the value
datum.SerializeToString(&value);
- db->Put(leveldb::WriteOptions(), key, value);
+ batch->Put(key, value);
if (++count % 1000 == 0) {
+ db->Write(leveldb::WriteOptions(), batch);
LOG(ERROR) << "Processed " << count << " files.";
+ delete batch;
+ batch = new leveldb::WriteBatch();
}
}