summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to 'libappfuse')
-rw-r--r--libappfuse/FuseBridgeLoop.cc402
-rw-r--r--libappfuse/include/libappfuse/EpollController.h4
-rw-r--r--libappfuse/include/libappfuse/FuseBridgeLoop.h44
-rw-r--r--libappfuse/tests/FuseBridgeLoopTest.cc16
4 files changed, 389 insertions, 77 deletions
diff --git a/libappfuse/FuseBridgeLoop.cc b/libappfuse/FuseBridgeLoop.cc
index 2386bf84b..3f47066a8 100644
--- a/libappfuse/FuseBridgeLoop.cc
+++ b/libappfuse/FuseBridgeLoop.cc
@@ -16,85 +16,355 @@
16 16
17#include "libappfuse/FuseBridgeLoop.h" 17#include "libappfuse/FuseBridgeLoop.h"
18 18
19#include <sys/epoll.h>
20#include <sys/socket.h>
21
22#include <unordered_map>
23
19#include <android-base/logging.h> 24#include <android-base/logging.h>
20#include <android-base/unique_fd.h> 25#include <android-base/unique_fd.h>
21 26
27#include "libappfuse/EpollController.h"
28
22namespace android { 29namespace android {
23namespace fuse { 30namespace fuse {
31namespace {
32
33enum class FuseBridgeState { kWaitToReadEither, kWaitToReadProxy, kWaitToWriteProxy, kClosing };
34
35struct FuseBridgeEntryEvent {
36 FuseBridgeEntry* entry;
37 int events;
38};
39
40void GetObservedEvents(FuseBridgeState state, int* device_events, int* proxy_events) {
41 switch (state) {
42 case FuseBridgeState::kWaitToReadEither:
43 *device_events = EPOLLIN;
44 *proxy_events = EPOLLIN;
45 return;
46 case FuseBridgeState::kWaitToReadProxy:
47 *device_events = 0;
48 *proxy_events = EPOLLIN;
49 return;
50 case FuseBridgeState::kWaitToWriteProxy:
51 *device_events = 0;
52 *proxy_events = EPOLLOUT;
53 return;
54 case FuseBridgeState::kClosing:
55 *device_events = 0;
56 *proxy_events = 0;
57 return;
58 }
59}
60}
61
62class FuseBridgeEntry {
63 public:
64 FuseBridgeEntry(int mount_id, base::unique_fd&& dev_fd, base::unique_fd&& proxy_fd)
65 : mount_id_(mount_id),
66 device_fd_(std::move(dev_fd)),
67 proxy_fd_(std::move(proxy_fd)),
68 state_(FuseBridgeState::kWaitToReadEither),
69 last_state_(FuseBridgeState::kWaitToReadEither),
70 last_device_events_({this, 0}),
71 last_proxy_events_({this, 0}),
72 open_count_(0) {}
73
74 // Transfer bytes depends on availability of FDs and the internal |state_|.
75 void Transfer(FuseBridgeLoopCallback* callback) {
76 constexpr int kUnexpectedEventMask = ~(EPOLLIN | EPOLLOUT);
77 const bool unexpected_event = (last_device_events_.events & kUnexpectedEventMask) ||
78 (last_proxy_events_.events & kUnexpectedEventMask);
79 const bool device_read_ready = last_device_events_.events & EPOLLIN;
80 const bool proxy_read_ready = last_proxy_events_.events & EPOLLIN;
81 const bool proxy_write_ready = last_proxy_events_.events & EPOLLOUT;
82
83 last_device_events_.events = 0;
84 last_proxy_events_.events = 0;
85
86 LOG(VERBOSE) << "Transfer device_read_ready=" << device_read_ready
87 << " proxy_read_ready=" << proxy_read_ready
88 << " proxy_write_ready=" << proxy_write_ready;
89
90 if (unexpected_event) {
91 LOG(ERROR) << "Invalid epoll event is observed";
92 state_ = FuseBridgeState::kClosing;
93 return;
94 }
95
96 switch (state_) {
97 case FuseBridgeState::kWaitToReadEither:
98 if (proxy_read_ready) {
99 state_ = ReadFromProxy();
100 } else if (device_read_ready) {
101 state_ = ReadFromDevice(callback);
102 }
103 return;
104
105 case FuseBridgeState::kWaitToReadProxy:
106 CHECK(proxy_read_ready);
107 state_ = ReadFromProxy();
108 return;
109
110 case FuseBridgeState::kWaitToWriteProxy:
111 CHECK(proxy_write_ready);
112 state_ = WriteToProxy();
113 return;
114
115 case FuseBridgeState::kClosing:
116 return;
117 }
118 }
119
120 bool IsClosing() const { return state_ == FuseBridgeState::kClosing; }
121
122 int mount_id() const { return mount_id_; }
123
124 private:
125 friend class BridgeEpollController;
126
127 FuseBridgeState ReadFromProxy() {
128 switch (buffer_.response.ReadOrAgain(proxy_fd_)) {
129 case ResultOrAgain::kSuccess:
130 break;
131 case ResultOrAgain::kFailure:
132 return FuseBridgeState::kClosing;
133 case ResultOrAgain::kAgain:
134 return FuseBridgeState::kWaitToReadProxy;
135 }
136
137 if (!buffer_.response.Write(device_fd_)) {
138 return FuseBridgeState::kClosing;
139 }
140
141 auto it = opcode_map_.find(buffer_.response.header.unique);
142 if (it != opcode_map_.end()) {
143 switch (it->second) {
144 case FUSE_OPEN:
145 if (buffer_.response.header.error == fuse::kFuseSuccess) {
146 open_count_++;
147 }
148 break;
149
150 case FUSE_RELEASE:
151 if (open_count_ > 0) {
152 open_count_--;
153 } else {
154 LOG(WARNING) << "Unexpected FUSE_RELEASE before opening a file.";
155 break;
156 }
157 if (open_count_ == 0) {
158 return FuseBridgeState::kClosing;
159 }
160 break;
161 }
162 opcode_map_.erase(it);
163 }
164
165 return FuseBridgeState::kWaitToReadEither;
166 }
167
168 FuseBridgeState ReadFromDevice(FuseBridgeLoopCallback* callback) {
169 LOG(VERBOSE) << "ReadFromDevice";
170 if (!buffer_.request.Read(device_fd_)) {
171 return FuseBridgeState::kClosing;
172 }
173
174 const uint32_t opcode = buffer_.request.header.opcode;
175 LOG(VERBOSE) << "Read a fuse packet, opcode=" << opcode;
176 switch (opcode) {
177 case FUSE_FORGET:
178 // Do not reply to FUSE_FORGET.
179 return FuseBridgeState::kWaitToReadEither;
180
181 case FUSE_LOOKUP:
182 case FUSE_GETATTR:
183 case FUSE_OPEN:
184 case FUSE_READ:
185 case FUSE_WRITE:
186 case FUSE_RELEASE:
187 case FUSE_FSYNC:
188 if (opcode == FUSE_OPEN || opcode == FUSE_RELEASE) {
189 opcode_map_.emplace(buffer_.request.header.unique, opcode);
190 }
191 return WriteToProxy();
192
193 case FUSE_INIT:
194 buffer_.HandleInit();
195 break;
196
197 default:
198 buffer_.HandleNotImpl();
199 break;
200 }
201
202 if (!buffer_.response.Write(device_fd_)) {
203 return FuseBridgeState::kClosing;
204 }
205
206 if (opcode == FUSE_INIT) {
207 callback->OnMount(mount_id_);
208 }
209
210 return FuseBridgeState::kWaitToReadEither;
211 }
212
213 FuseBridgeState WriteToProxy() {
214 switch (buffer_.request.WriteOrAgain(proxy_fd_)) {
215 case ResultOrAgain::kSuccess:
216 return FuseBridgeState::kWaitToReadEither;
217 case ResultOrAgain::kFailure:
218 return FuseBridgeState::kClosing;
219 case ResultOrAgain::kAgain:
220 return FuseBridgeState::kWaitToWriteProxy;
221 }
222 }
24 223
25bool StartFuseBridgeLoop( 224 const int mount_id_;
26 int raw_dev_fd, int raw_proxy_fd, FuseBridgeLoopCallback* callback) { 225 base::unique_fd device_fd_;
27 base::unique_fd dev_fd(raw_dev_fd); 226 base::unique_fd proxy_fd_;
28 base::unique_fd proxy_fd(raw_proxy_fd); 227 FuseBuffer buffer_;
29 FuseBuffer buffer; 228 FuseBridgeState state_;
30 size_t open_count = 0; 229 FuseBridgeState last_state_;
31 230 FuseBridgeEntryEvent last_device_events_;
32 LOG(DEBUG) << "Start fuse loop."; 231 FuseBridgeEntryEvent last_proxy_events_;
33 while (true) { 232
34 if (!buffer.request.Read(dev_fd)) { 233 // Remember map between unique and opcode in fuse_in_header so that we can
35 return false; 234 // refer the opcode later.
235 std::unordered_map<uint64_t, uint32_t> opcode_map_;
236
237 int open_count_;
238
239 DISALLOW_COPY_AND_ASSIGN(FuseBridgeEntry);
240};
241
242class BridgeEpollController : private EpollController {
243 public:
244 BridgeEpollController(base::unique_fd&& poll_fd) : EpollController(std::move(poll_fd)) {}
245
246 bool AddBridgePoll(FuseBridgeEntry* bridge) const {
247 return InvokeControl(EPOLL_CTL_ADD, bridge);
36 } 248 }
37 249
38 const uint32_t opcode = buffer.request.header.opcode; 250 bool UpdateOrDeleteBridgePoll(FuseBridgeEntry* bridge) const {
39 LOG(VERBOSE) << "Read a fuse packet, opcode=" << opcode; 251 return InvokeControl(
40 switch (opcode) { 252 bridge->state_ != FuseBridgeState::kClosing ? EPOLL_CTL_MOD : EPOLL_CTL_DEL, bridge);
41 case FUSE_FORGET:
42 // Do not reply to FUSE_FORGET.
43 continue;
44
45 case FUSE_LOOKUP:
46 case FUSE_GETATTR:
47 case FUSE_OPEN:
48 case FUSE_READ:
49 case FUSE_WRITE:
50 case FUSE_RELEASE:
51 case FUSE_FSYNC:
52 if (!buffer.request.Write(proxy_fd)) {
53 LOG(ERROR) << "Failed to write a request to the proxy.";
54 return false;
55 }
56 if (!buffer.response.Read(proxy_fd)) {
57 LOG(ERROR) << "Failed to read a response from the proxy.";
58 return false;
59 }
60 break;
61
62 case FUSE_INIT:
63 buffer.HandleInit();
64 break;
65
66 default:
67 buffer.HandleNotImpl();
68 break;
69 } 253 }
70 254
71 if (!buffer.response.Write(dev_fd)) { 255 bool Wait(size_t bridge_count, std::unordered_set<FuseBridgeEntry*>* entries_out) {
72 LOG(ERROR) << "Failed to write a response to the device."; 256 CHECK(entries_out);
73 return false; 257 const size_t event_count = std::max<size_t>(bridge_count * 2, 1);
258 if (!EpollController::Wait(event_count)) {
259 return false;
260 }
261 entries_out->clear();
262 for (const auto& event : events()) {
263 FuseBridgeEntryEvent* const entry_event =
264 reinterpret_cast<FuseBridgeEntryEvent*>(event.data.ptr);
265 entry_event->events = event.events;
266 entries_out->insert(entry_event->entry);
267 }
268 return true;
74 } 269 }
75 270
76 switch (opcode) { 271 private:
77 case FUSE_INIT: 272 bool InvokeControl(int op, FuseBridgeEntry* bridge) const {
78 callback->OnMount(); 273 LOG(VERBOSE) << "InvokeControl op=" << op << " bridge=" << bridge->mount_id_
79 break; 274 << " state=" << static_cast<int>(bridge->state_)
80 case FUSE_OPEN: 275 << " last_state=" << static_cast<int>(bridge->last_state_);
81 if (buffer.response.header.error == fuse::kFuseSuccess) { 276
82 open_count++; 277 int last_device_events;
83 } 278 int last_proxy_events;
84 break; 279 int device_events;
85 case FUSE_RELEASE: 280 int proxy_events;
86 if (open_count != 0) { 281 GetObservedEvents(bridge->last_state_, &last_device_events, &last_proxy_events);
87 open_count--; 282 GetObservedEvents(bridge->state_, &device_events, &proxy_events);
88 } else { 283 bool result = true;
89 LOG(WARNING) << "Unexpected FUSE_RELEASE before opening a file."; 284 if (op != EPOLL_CTL_MOD || last_device_events != device_events) {
90 break; 285 result &= EpollController::InvokeControl(op, bridge->device_fd_, device_events,
91 } 286 &bridge->last_device_events_);
92 if (open_count == 0) { 287 }
93 return true; 288 if (op != EPOLL_CTL_MOD || last_proxy_events != proxy_events) {
94 } 289 result &= EpollController::InvokeControl(op, bridge->proxy_fd_, proxy_events,
95 break; 290 &bridge->last_proxy_events_);
291 }
292 return result;
293 }
294};
295
296FuseBridgeLoop::FuseBridgeLoop() : opened_(true) {
297 base::unique_fd epoll_fd(epoll_create1(/* no flag */ 0));
298 if (epoll_fd.get() == -1) {
299 PLOG(ERROR) << "Failed to open FD for epoll";
300 opened_ = false;
301 return;
302 }
303 epoll_controller_.reset(new BridgeEpollController(std::move(epoll_fd)));
304}
305
306FuseBridgeLoop::~FuseBridgeLoop() { CHECK(bridges_.empty()); }
307
308bool FuseBridgeLoop::AddBridge(int mount_id, base::unique_fd dev_fd, base::unique_fd proxy_fd) {
309 LOG(VERBOSE) << "Adding bridge " << mount_id;
310
311 std::unique_ptr<FuseBridgeEntry> bridge(
312 new FuseBridgeEntry(mount_id, std::move(dev_fd), std::move(proxy_fd)));
313 std::lock_guard<std::mutex> lock(mutex_);
314 if (!opened_) {
315 LOG(ERROR) << "Tried to add a mount to a closed bridge";
316 return false;
317 }
318 if (bridges_.count(mount_id)) {
319 LOG(ERROR) << "Tried to add a mount point that has already been added";
320 return false;
321 }
322 if (!epoll_controller_->AddBridgePoll(bridge.get())) {
323 return false;
324 }
325
326 bridges_.emplace(mount_id, std::move(bridge));
327 return true;
328}
329
330bool FuseBridgeLoop::ProcessEventLocked(const std::unordered_set<FuseBridgeEntry*>& entries,
331 FuseBridgeLoopCallback* callback) {
332 for (auto entry : entries) {
333 entry->Transfer(callback);
334 if (!epoll_controller_->UpdateOrDeleteBridgePoll(entry)) {
335 return false;
336 }
337 if (entry->IsClosing()) {
338 const int mount_id = entry->mount_id();
339 callback->OnClosed(mount_id);
340 bridges_.erase(mount_id);
341 if (bridges_.size() == 0) {
342 // All bridges are now closed.
343 return false;
344 }
345 }
346 }
347 return true;
348}
349
350void FuseBridgeLoop::Start(FuseBridgeLoopCallback* callback) {
351 LOG(DEBUG) << "Start fuse bridge loop";
352 std::unordered_set<FuseBridgeEntry*> entries;
353 while (true) {
354 const bool wait_result = epoll_controller_->Wait(bridges_.size(), &entries);
355 LOG(VERBOSE) << "Receive epoll events";
356 {
357 std::lock_guard<std::mutex> lock(mutex_);
358 if (!(wait_result && ProcessEventLocked(entries, callback))) {
359 for (auto it = bridges_.begin(); it != bridges_.end();) {
360 callback->OnClosed(it->second->mount_id());
361 it = bridges_.erase(it);
362 }
363 opened_ = false;
364 return;
365 }
366 }
96 } 367 }
97 }
98} 368}
99 369
100} // namespace fuse 370} // namespace fuse
diff --git a/libappfuse/include/libappfuse/EpollController.h b/libappfuse/include/libappfuse/EpollController.h
index 3863abae7..622bd2cc9 100644
--- a/libappfuse/include/libappfuse/EpollController.h
+++ b/libappfuse/include/libappfuse/EpollController.h
@@ -37,8 +37,10 @@ class EpollController {
37 37
38 const std::vector<epoll_event>& events() const; 38 const std::vector<epoll_event>& events() const;
39 39
40 private: 40 protected:
41 bool InvokeControl(int op, int fd, int events, void* data) const; 41 bool InvokeControl(int op, int fd, int events, void* data) const;
42
43 private:
42 base::unique_fd poll_fd_; 44 base::unique_fd poll_fd_;
43 std::vector<epoll_event> events_; 45 std::vector<epoll_event> events_;
44 46
diff --git a/libappfuse/include/libappfuse/FuseBridgeLoop.h b/libappfuse/include/libappfuse/FuseBridgeLoop.h
index 1f71cf272..6bfda9819 100644
--- a/libappfuse/include/libappfuse/FuseBridgeLoop.h
+++ b/libappfuse/include/libappfuse/FuseBridgeLoop.h
@@ -17,6 +17,13 @@
17#ifndef ANDROID_LIBAPPFUSE_FUSEBRIDGELOOP_H_ 17#ifndef ANDROID_LIBAPPFUSE_FUSEBRIDGELOOP_H_
18#define ANDROID_LIBAPPFUSE_FUSEBRIDGELOOP_H_ 18#define ANDROID_LIBAPPFUSE_FUSEBRIDGELOOP_H_
19 19
20#include <map>
21#include <mutex>
22#include <queue>
23#include <unordered_set>
24
25#include <android-base/macros.h>
26
20#include "libappfuse/FuseBuffer.h" 27#include "libappfuse/FuseBuffer.h"
21 28
22namespace android { 29namespace android {
@@ -24,12 +31,41 @@ namespace fuse {
24 31
25class FuseBridgeLoopCallback { 32class FuseBridgeLoopCallback {
26 public: 33 public:
27 virtual void OnMount() = 0; 34 virtual void OnMount(int mount_id) = 0;
28 virtual ~FuseBridgeLoopCallback() = default; 35 virtual void OnClosed(int mount_id) = 0;
36 virtual ~FuseBridgeLoopCallback() = default;
29}; 37};
30 38
31bool StartFuseBridgeLoop( 39class FuseBridgeEntry;
32 int dev_fd, int proxy_fd, FuseBridgeLoopCallback* callback); 40class BridgeEpollController;
41
42class FuseBridgeLoop final {
43 public:
44 FuseBridgeLoop();
45 ~FuseBridgeLoop();
46
47 void Start(FuseBridgeLoopCallback* callback);
48
49 // Add bridge to the loop. It's OK to invoke the method from a different
50 // thread from one which invokes |Start|.
51 bool AddBridge(int mount_id, base::unique_fd dev_fd, base::unique_fd proxy_fd);
52
53 private:
54 bool ProcessEventLocked(const std::unordered_set<FuseBridgeEntry*>& entries,
55 FuseBridgeLoopCallback* callback);
56
57 std::unique_ptr<BridgeEpollController> epoll_controller_;
58
59 // Map between |mount_id| and bridge entry.
60 std::map<int, std::unique_ptr<FuseBridgeEntry>> bridges_;
61
62 // Lock for multi-threading.
63 std::mutex mutex_;
64
65 bool opened_;
66
67 DISALLOW_COPY_AND_ASSIGN(FuseBridgeLoop);
68};
33 69
34} // namespace fuse 70} // namespace fuse
35} // namespace android 71} // namespace android
diff --git a/libappfuse/tests/FuseBridgeLoopTest.cc b/libappfuse/tests/FuseBridgeLoopTest.cc
index b4c1efb01..51d605136 100644
--- a/libappfuse/tests/FuseBridgeLoopTest.cc
+++ b/libappfuse/tests/FuseBridgeLoopTest.cc
@@ -32,10 +32,12 @@ namespace {
32class Callback : public FuseBridgeLoopCallback { 32class Callback : public FuseBridgeLoopCallback {
33 public: 33 public:
34 bool mounted; 34 bool mounted;
35 Callback() : mounted(false) {} 35 bool closed;
36 void OnMount() override { 36 Callback() : mounted(false), closed(false) {}
37 mounted = true; 37
38 } 38 void OnMount(int /*mount_id*/) override { mounted = true; }
39
40 void OnClosed(int /* mount_id */) override { closed = true; }
39}; 41};
40 42
41class FuseBridgeLoopTest : public ::testing::Test { 43class FuseBridgeLoopTest : public ::testing::Test {
@@ -53,8 +55,9 @@ class FuseBridgeLoopTest : public ::testing::Test {
53 ASSERT_TRUE(SetupMessageSockets(&dev_sockets_)); 55 ASSERT_TRUE(SetupMessageSockets(&dev_sockets_));
54 ASSERT_TRUE(SetupMessageSockets(&proxy_sockets_)); 56 ASSERT_TRUE(SetupMessageSockets(&proxy_sockets_));
55 thread_ = std::thread([this] { 57 thread_ = std::thread([this] {
56 StartFuseBridgeLoop( 58 FuseBridgeLoop loop;
57 dev_sockets_[1].release(), proxy_sockets_[0].release(), &callback_); 59 loop.AddBridge(1, std::move(dev_sockets_[1]), std::move(proxy_sockets_[0]));
60 loop.Start(&callback_);
58 }); 61 });
59 } 62 }
60 63
@@ -115,6 +118,7 @@ class FuseBridgeLoopTest : public ::testing::Test {
115 if (thread_.joinable()) { 118 if (thread_.joinable()) {
116 thread_.join(); 119 thread_.join();
117 } 120 }
121 ASSERT_TRUE(callback_.closed);
118 } 122 }
119 123
120 void TearDown() override { 124 void TearDown() override {