summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTreehugger Robot2018-05-21 17:37:21 -0500
committerGerrit Code Review2018-05-21 17:37:21 -0500
commit2fa9770c3ad8041953cbc07a5892e87ef506a309 (patch)
treeeb5a8223edf1d4374416e5a6605cd0e291987bb6
parente0a52774b4236f8dab6e1f0f246c6021ec0df698 (diff)
parent32559741699d46820ff9409445a21fdce012e03a (diff)
downloadplatform-system-core-2fa9770c3ad8041953cbc07a5892e87ef506a309.tar.gz
platform-system-core-2fa9770c3ad8041953cbc07a5892e87ef506a309.tar.xz
platform-system-core-2fa9770c3ad8041953cbc07a5892e87ef506a309.zip
Merge changes I4d6da40d,I91c7ced5,I7b9f6d18
* changes: adb: Add a test for emulator connection adb: Improve test_adb a bit more adb: Add a way to reconnect TCP transports
-rw-r--r--adb/client/main.cpp1
-rw-r--r--adb/test_adb.py281
-rw-r--r--adb/transport.cpp174
-rw-r--r--adb/transport.h24
-rw-r--r--adb/transport_local.cpp58
5 files changed, 430 insertions, 108 deletions
diff --git a/adb/client/main.cpp b/adb/client/main.cpp
index 31cb8536a..44ed3a253 100644
--- a/adb/client/main.cpp
+++ b/adb/client/main.cpp
@@ -117,6 +117,7 @@ int adb_server_main(int is_daemon, const std::string& socket_spec, int ack_reply
117 atexit(adb_server_cleanup); 117 atexit(adb_server_cleanup);
118 118
119 init_transport_registration(); 119 init_transport_registration();
120 init_reconnect_handler();
120 init_mdns_transport_discovery(); 121 init_mdns_transport_discovery();
121 122
122 usb_init(); 123 usb_init();
diff --git a/adb/test_adb.py b/adb/test_adb.py
index 32bf0297c..ddd3ff041 100644
--- a/adb/test_adb.py
+++ b/adb/test_adb.py
@@ -36,10 +36,11 @@ import adb
36 36
37 37
38@contextlib.contextmanager 38@contextlib.contextmanager
39def fake_adb_server(protocol=socket.AF_INET, port=0): 39def fake_adbd(protocol=socket.AF_INET, port=0):
40 """Creates a fake ADB server that just replies with a CNXN packet.""" 40 """Creates a fake ADB daemon that just replies with a CNXN packet."""
41 41
42 serversock = socket.socket(protocol, socket.SOCK_STREAM) 42 serversock = socket.socket(protocol, socket.SOCK_STREAM)
43 serversock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
43 if protocol == socket.AF_INET: 44 if protocol == socket.AF_INET:
44 serversock.bind(('127.0.0.1', port)) 45 serversock.bind(('127.0.0.1', port))
45 else: 46 else:
@@ -60,31 +61,33 @@ def fake_adb_server(protocol=socket.AF_INET, port=0):
60 rlist = [readpipe, serversock] 61 rlist = [readpipe, serversock]
61 cnxn_sent = {} 62 cnxn_sent = {}
62 while True: 63 while True:
63 ready, _, _ = select.select(rlist, [], []) 64 read_ready, _, _ = select.select(rlist, [], [])
64 for r in ready: 65 for ready in read_ready:
65 if r == readpipe: 66 if ready == readpipe:
66 # Closure pipe 67 # Closure pipe
67 os.close(r) 68 os.close(ready)
68 serversock.shutdown(socket.SHUT_RDWR) 69 serversock.shutdown(socket.SHUT_RDWR)
69 serversock.close() 70 serversock.close()
70 return 71 return
71 elif r == serversock: 72 elif ready == serversock:
72 # Server socket 73 # Server socket
73 conn, _ = r.accept() 74 conn, _ = ready.accept()
74 rlist.append(conn) 75 rlist.append(conn)
75 else: 76 else:
76 # Client socket 77 # Client socket
77 data = r.recv(1024) 78 data = ready.recv(1024)
78 if not data: 79 if not data or data.startswith('OPEN'):
79 if r in cnxn_sent: 80 if ready in cnxn_sent:
80 del cnxn_sent[r] 81 del cnxn_sent[ready]
81 rlist.remove(r) 82 ready.shutdown(socket.SHUT_RDWR)
83 ready.close()
84 rlist.remove(ready)
82 continue 85 continue
83 if r in cnxn_sent: 86 if ready in cnxn_sent:
84 continue 87 continue
85 cnxn_sent[r] = True 88 cnxn_sent[ready] = True
86 r.sendall(_adb_packet('CNXN', 0x01000001, 1024 * 1024, 89 ready.sendall(_adb_packet('CNXN', 0x01000001, 1024 * 1024,
87 'device::ro.product.name=fakeadb')) 90 'device::ro.product.name=fakeadb'))
88 91
89 port = serversock.getsockname()[1] 92 port = serversock.getsockname()[1]
90 server_thread = threading.Thread(target=_handle) 93 server_thread = threading.Thread(target=_handle)
@@ -97,8 +100,52 @@ def fake_adb_server(protocol=socket.AF_INET, port=0):
97 server_thread.join() 100 server_thread.join()
98 101
99 102
100class NonApiTest(unittest.TestCase): 103@contextlib.contextmanager
101 """Tests for ADB that aren't a part of the AndroidDevice API.""" 104def adb_connect(unittest, serial):
105 """Context manager for an ADB connection.
106
107 This automatically disconnects when done with the connection.
108 """
109
110 output = subprocess.check_output(['adb', 'connect', serial])
111 unittest.assertEqual(output.strip(), 'connected to {}'.format(serial))
112
113 try:
114 yield
115 finally:
116 # Perform best-effort disconnection. Discard the output.
117 subprocess.Popen(['adb', 'disconnect', serial],
118 stdout=subprocess.PIPE,
119 stderr=subprocess.PIPE).communicate()
120
121
122@contextlib.contextmanager
123def adb_server():
124 """Context manager for an ADB server.
125
126 This creates an ADB server and returns the port it's listening on.
127 """
128
129 port = 5038
130 # Kill any existing server on this non-default port.
131 subprocess.check_output(['adb', '-P', str(port), 'kill-server'],
132 stderr=subprocess.STDOUT)
133 read_pipe, write_pipe = os.pipe()
134 proc = subprocess.Popen(['adb', '-L', 'tcp:localhost:{}'.format(port),
135 'fork-server', 'server',
136 '--reply-fd', str(write_pipe)])
137 try:
138 os.close(write_pipe)
139 greeting = os.read(read_pipe, 1024)
140 assert greeting == 'OK\n', repr(greeting)
141 yield port
142 finally:
143 proc.terminate()
144 proc.wait()
145
146
147class CommandlineTest(unittest.TestCase):
148 """Tests for the ADB commandline."""
102 149
103 def test_help(self): 150 def test_help(self):
104 """Make sure we get _something_ out of help.""" 151 """Make sure we get _something_ out of help."""
@@ -120,28 +167,37 @@ class NonApiTest(unittest.TestCase):
120 revision_line, r'^Revision [0-9a-f]{12}-android$') 167 revision_line, r'^Revision [0-9a-f]{12}-android$')
121 168
122 def test_tcpip_error_messages(self): 169 def test_tcpip_error_messages(self):
123 p = subprocess.Popen(['adb', 'tcpip'], stdout=subprocess.PIPE, 170 """Make sure 'adb tcpip' parsing is sane."""
124 stderr=subprocess.STDOUT) 171 proc = subprocess.Popen(['adb', 'tcpip'], stdout=subprocess.PIPE,
125 out, _ = p.communicate() 172 stderr=subprocess.STDOUT)
126 self.assertEqual(1, p.returncode) 173 out, _ = proc.communicate()
174 self.assertEqual(1, proc.returncode)
127 self.assertIn('requires an argument', out) 175 self.assertIn('requires an argument', out)
128 176
129 p = subprocess.Popen(['adb', 'tcpip', 'foo'], stdout=subprocess.PIPE, 177 proc = subprocess.Popen(['adb', 'tcpip', 'foo'], stdout=subprocess.PIPE,
130 stderr=subprocess.STDOUT) 178 stderr=subprocess.STDOUT)
131 out, _ = p.communicate() 179 out, _ = proc.communicate()
132 self.assertEqual(1, p.returncode) 180 self.assertEqual(1, proc.returncode)
133 self.assertIn('invalid port', out) 181 self.assertIn('invalid port', out)
134 182
135 # Helper method that reads a pipe until it is closed, then sets the event. 183
136 def _read_pipe_and_set_event(self, pipe, event): 184class ServerTest(unittest.TestCase):
137 x = pipe.read() 185 """Tests for the ADB server."""
186
187 @staticmethod
188 def _read_pipe_and_set_event(pipe, event):
189 """Reads a pipe until it is closed, then sets the event."""
190 pipe.read()
138 event.set() 191 event.set()
139 192
140 # Test that launch_server() does not let the adb server inherit
141 # stdin/stdout/stderr handles which can cause callers of adb.exe to hang.
142 # This test also runs fine on unix even though the impetus is an issue
143 # unique to Windows.
144 def test_handle_inheritance(self): 193 def test_handle_inheritance(self):
194 """Test that launch_server() does not inherit handles.
195
196 launch_server() should not let the adb server inherit
197 stdin/stdout/stderr handles, which can cause callers of adb.exe to hang.
198 This test also runs fine on unix even though the impetus is an issue
199 unique to Windows.
200 """
145 # This test takes 5 seconds to run on Windows: if there is no adb server 201 # This test takes 5 seconds to run on Windows: if there is no adb server
146 # running on the the port used below, adb kill-server tries to make a 202 # running on the the port used below, adb kill-server tries to make a
147 # TCP connection to a closed port and that takes 1 second on Windows; 203 # TCP connection to a closed port and that takes 1 second on Windows;
@@ -163,29 +219,30 @@ class NonApiTest(unittest.TestCase):
163 219
164 try: 220 try:
165 # Run the adb client and have it start the adb server. 221 # Run the adb client and have it start the adb server.
166 p = subprocess.Popen(['adb', '-P', str(port), 'start-server'], 222 proc = subprocess.Popen(['adb', '-P', str(port), 'start-server'],
167 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 223 stdin=subprocess.PIPE,
168 stderr=subprocess.PIPE) 224 stdout=subprocess.PIPE,
225 stderr=subprocess.PIPE)
169 226
170 # Start threads that set events when stdout/stderr are closed. 227 # Start threads that set events when stdout/stderr are closed.
171 stdout_event = threading.Event() 228 stdout_event = threading.Event()
172 stdout_thread = threading.Thread( 229 stdout_thread = threading.Thread(
173 target=self._read_pipe_and_set_event, 230 target=ServerTest._read_pipe_and_set_event,
174 args=(p.stdout, stdout_event)) 231 args=(proc.stdout, stdout_event))
175 stdout_thread.daemon = True 232 stdout_thread.daemon = True
176 stdout_thread.start() 233 stdout_thread.start()
177 234
178 stderr_event = threading.Event() 235 stderr_event = threading.Event()
179 stderr_thread = threading.Thread( 236 stderr_thread = threading.Thread(
180 target=self._read_pipe_and_set_event, 237 target=ServerTest._read_pipe_and_set_event,
181 args=(p.stderr, stderr_event)) 238 args=(proc.stderr, stderr_event))
182 stderr_thread.daemon = True 239 stderr_thread.daemon = True
183 stderr_thread.start() 240 stderr_thread.start()
184 241
185 # Wait for the adb client to finish. Once that has occurred, if 242 # Wait for the adb client to finish. Once that has occurred, if
186 # stdin/stderr/stdout are still open, it must be open in the adb 243 # stdin/stderr/stdout are still open, it must be open in the adb
187 # server. 244 # server.
188 p.wait() 245 proc.wait()
189 246
190 # Try to write to stdin which we expect is closed. If it isn't 247 # Try to write to stdin which we expect is closed. If it isn't
191 # closed, we should get an IOError. If we don't get an IOError, 248 # closed, we should get an IOError. If we don't get an IOError,
@@ -193,7 +250,7 @@ class NonApiTest(unittest.TestCase):
193 # probably letting the adb server inherit stdin which would be 250 # probably letting the adb server inherit stdin which would be
194 # wrong. 251 # wrong.
195 with self.assertRaises(IOError): 252 with self.assertRaises(IOError):
196 p.stdin.write('x') 253 proc.stdin.write('x')
197 254
198 # Wait a few seconds for stdout/stderr to be closed (in the success 255 # Wait a few seconds for stdout/stderr to be closed (in the success
199 # case, this won't wait at all). If there is a timeout, that means 256 # case, this won't wait at all). If there is a timeout, that means
@@ -207,8 +264,12 @@ class NonApiTest(unittest.TestCase):
207 subprocess.check_output(['adb', '-P', str(port), 'kill-server'], 264 subprocess.check_output(['adb', '-P', str(port), 'kill-server'],
208 stderr=subprocess.STDOUT) 265 stderr=subprocess.STDOUT)
209 266
210 # Use SO_LINGER to cause TCP RST segment to be sent on socket close. 267
268class EmulatorTest(unittest.TestCase):
269 """Tests for the emulator connection."""
270
211 def _reset_socket_on_close(self, sock): 271 def _reset_socket_on_close(self, sock):
272 """Use SO_LINGER to cause TCP RST segment to be sent on socket close."""
212 # The linger structure is two shorts on Windows, but two ints on Unix. 273 # The linger structure is two shorts on Windows, but two ints on Unix.
213 linger_format = 'hh' if os.name == 'nt' else 'ii' 274 linger_format = 'hh' if os.name == 'nt' else 'ii'
214 l_onoff = 1 275 l_onoff = 1
@@ -227,7 +288,7 @@ class NonApiTest(unittest.TestCase):
227 Bug: https://code.google.com/p/android/issues/detail?id=21021 288 Bug: https://code.google.com/p/android/issues/detail?id=21021
228 """ 289 """
229 with contextlib.closing( 290 with contextlib.closing(
230 socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as listener: 291 socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as listener:
231 # Use SO_REUSEADDR so subsequent runs of the test can grab the port 292 # Use SO_REUSEADDR so subsequent runs of the test can grab the port
232 # even if it is in TIME_WAIT. 293 # even if it is in TIME_WAIT.
233 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 294 listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
@@ -237,7 +298,7 @@ class NonApiTest(unittest.TestCase):
237 298
238 # Now that listening has started, start adb emu kill, telling it to 299 # Now that listening has started, start adb emu kill, telling it to
239 # connect to our mock emulator. 300 # connect to our mock emulator.
240 p = subprocess.Popen( 301 proc = subprocess.Popen(
241 ['adb', '-s', 'emulator-' + str(port), 'emu', 'kill'], 302 ['adb', '-s', 'emulator-' + str(port), 'emu', 'kill'],
242 stderr=subprocess.STDOUT) 303 stderr=subprocess.STDOUT)
243 304
@@ -246,12 +307,16 @@ class NonApiTest(unittest.TestCase):
246 # If WSAECONNABORTED (10053) is raised by any socket calls, 307 # If WSAECONNABORTED (10053) is raised by any socket calls,
247 # then adb probably isn't reading the data that we sent it. 308 # then adb probably isn't reading the data that we sent it.
248 conn.sendall('Android Console: type \'help\' for a list ' + 309 conn.sendall('Android Console: type \'help\' for a list ' +
249 'of commands\r\n') 310 'of commands\r\n')
250 conn.sendall('OK\r\n') 311 conn.sendall('OK\r\n')
251 312
252 with contextlib.closing(conn.makefile()) as f: 313 with contextlib.closing(conn.makefile()) as connf:
253 self.assertEqual('kill\n', f.readline()) 314 line = connf.readline()
254 self.assertEqual('quit\n', f.readline()) 315 if line.startswith('auth'):
316 # Ignore the first auth line.
317 line = connf.readline()
318 self.assertEqual('kill\n', line)
319 self.assertEqual('quit\n', connf.readline())
255 320
256 conn.sendall('OK: killing emulator, bye bye\r\n') 321 conn.sendall('OK: killing emulator, bye bye\r\n')
257 322
@@ -264,11 +329,48 @@ class NonApiTest(unittest.TestCase):
264 self._reset_socket_on_close(conn) 329 self._reset_socket_on_close(conn)
265 330
266 # Wait for adb to finish, so we can check return code. 331 # Wait for adb to finish, so we can check return code.
267 p.communicate() 332 proc.communicate()
268 333
269 # If this fails, adb probably isn't ignoring WSAECONNRESET when 334 # If this fails, adb probably isn't ignoring WSAECONNRESET when
270 # reading the response from the adb emu kill command (on Windows). 335 # reading the response from the adb emu kill command (on Windows).
271 self.assertEqual(0, p.returncode) 336 self.assertEqual(0, proc.returncode)
337
338 def test_emulator_connect(self):
339 """Ensure that the emulator can connect.
340
341 Bug: http://b/78991667
342 """
343 with adb_server() as server_port:
344 with fake_adbd() as port:
345 serial = 'emulator-{}'.format(port - 1)
346 # Ensure that the emulator is not there.
347 try:
348 subprocess.check_output(['adb', '-P', str(server_port),
349 '-s', serial, 'get-state'],
350 stderr=subprocess.STDOUT)
351 self.fail('Device should not be available')
352 except subprocess.CalledProcessError as err:
353 self.assertEqual(
354 err.output.strip(),
355 'error: device \'{}\' not found'.format(serial))
356
357 # Let the ADB server know that the emulator has started.
358 with contextlib.closing(
359 socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
360 sock.connect(('localhost', server_port))
361 command = 'host:emulator:{}'.format(port)
362 sock.sendall('%04x%s' % (len(command), command))
363
364 # Ensure the emulator is there.
365 subprocess.check_call(['adb', '-P', str(server_port),
366 '-s', serial, 'wait-for-device'])
367 output = subprocess.check_output(['adb', '-P', str(server_port),
368 '-s', serial, 'get-state'])
369 self.assertEqual(output.strip(), 'device')
370
371
372class ConnectionTest(unittest.TestCase):
373 """Tests for adb connect."""
272 374
273 def test_connect_ipv4_ipv6(self): 375 def test_connect_ipv4_ipv6(self):
274 """Ensure that `adb connect localhost:1234` will try both IPv4 and IPv6. 376 """Ensure that `adb connect localhost:1234` will try both IPv4 and IPv6.
@@ -277,38 +379,67 @@ class NonApiTest(unittest.TestCase):
277 """ 379 """
278 for protocol in (socket.AF_INET, socket.AF_INET6): 380 for protocol in (socket.AF_INET, socket.AF_INET6):
279 try: 381 try:
280 with fake_adb_server(protocol=protocol) as port: 382 with fake_adbd(protocol=protocol) as port:
281 output = subprocess.check_output( 383 serial = 'localhost:{}'.format(port)
282 ['adb', 'connect', 'localhost:{}'.format(port)]) 384 with adb_connect(self, serial):
283 385 pass
284 self.assertEqual(
285 output.strip(), 'connected to localhost:{}'.format(port))
286 except socket.error: 386 except socket.error:
287 print("IPv6 not available, skipping") 387 print("IPv6 not available, skipping")
288 continue 388 continue
289 389
290 def test_already_connected(self): 390 def test_already_connected(self):
291 with fake_adb_server() as port: 391 """Ensure that an already-connected device stays connected."""
292 output = subprocess.check_output( 392
293 ['adb', 'connect', 'localhost:{}'.format(port)]) 393 with fake_adbd() as port:
294 394 serial = 'localhost:{}'.format(port)
295 self.assertEqual( 395 with adb_connect(self, serial):
296 output.strip(), 'connected to localhost:{}'.format(port)) 396 # b/31250450: this always returns 0 but probably shouldn't.
297 397 output = subprocess.check_output(['adb', 'connect', serial])
298 # b/31250450: this always returns 0 but probably shouldn't. 398 self.assertEqual(
299 output = subprocess.check_output( 399 output.strip(), 'already connected to {}'.format(serial))
300 ['adb', 'connect', 'localhost:{}'.format(port)]) 400
401 def test_reconnect(self):
402 """Ensure that a disconnected device reconnects."""
403
404 with fake_adbd() as port:
405 serial = 'localhost:{}'.format(port)
406 with adb_connect(self, serial):
407 output = subprocess.check_output(['adb', '-s', serial,
408 'get-state'])
409 self.assertEqual(output.strip(), 'device')
410
411 # This will fail.
412 proc = subprocess.Popen(['adb', '-s', serial, 'shell', 'true'],
413 stdout=subprocess.PIPE,
414 stderr=subprocess.STDOUT)
415 output, _ = proc.communicate()
416 self.assertEqual(output.strip(), 'error: closed')
417
418 subprocess.check_call(['adb', '-s', serial, 'wait-for-device'])
419
420 output = subprocess.check_output(['adb', '-s', serial,
421 'get-state'])
422 self.assertEqual(output.strip(), 'device')
423
424 # Once we explicitly kick a device, it won't attempt to
425 # reconnect.
426 output = subprocess.check_output(['adb', 'disconnect', serial])
427 self.assertEqual(
428 output.strip(), 'disconnected {}'.format(serial))
429 try:
430 subprocess.check_output(['adb', '-s', serial, 'get-state'],
431 stderr=subprocess.STDOUT)
432 self.fail('Device should not be available')
433 except subprocess.CalledProcessError as err:
434 self.assertEqual(
435 err.output.strip(),
436 'error: device \'{}\' not found'.format(serial))
301 437
302 self.assertEqual(
303 output.strip(), 'already connected to localhost:{}'.format(port))
304 438
305def main(): 439def main():
440 """Main entrypoint."""
306 random.seed(0) 441 random.seed(0)
307 if len(adb.get_devices()) > 0: 442 unittest.main(verbosity=3)
308 suite = unittest.TestLoader().loadTestsFromName(__name__)
309 unittest.TextTestRunner(verbosity=3).run(suite)
310 else:
311 print('Test suite must be run with attached devices')
312 443
313 444
314if __name__ == '__main__': 445if __name__ == '__main__':
diff --git a/adb/transport.cpp b/adb/transport.cpp
index be7f8fe7f..beec13a69 100644
--- a/adb/transport.cpp
+++ b/adb/transport.cpp
@@ -33,6 +33,7 @@
33#include <deque> 33#include <deque>
34#include <list> 34#include <list>
35#include <mutex> 35#include <mutex>
36#include <queue>
36#include <thread> 37#include <thread>
37 38
38#include <android-base/logging.h> 39#include <android-base/logging.h>
@@ -50,7 +51,9 @@
50#include "adb_utils.h" 51#include "adb_utils.h"
51#include "fdevent.h" 52#include "fdevent.h"
52 53
53static void transport_unref(atransport *t); 54static void register_transport(atransport* transport);
55static void remove_transport(atransport* transport);
56static void transport_unref(atransport* transport);
54 57
55// TODO: unordered_map<TransportId, atransport*> 58// TODO: unordered_map<TransportId, atransport*>
56static auto& transport_list = *new std::list<atransport*>(); 59static auto& transport_list = *new std::list<atransport*>();
@@ -77,6 +80,130 @@ class SCOPED_CAPABILITY ScopedAssumeLocked {
77 ~ScopedAssumeLocked() RELEASE() {} 80 ~ScopedAssumeLocked() RELEASE() {}
78}; 81};
79 82
83// Tracks and handles atransport*s that are attempting reconnection.
84class ReconnectHandler {
85 public:
86 ReconnectHandler() = default;
87 ~ReconnectHandler() = default;
88
89 // Starts the ReconnectHandler thread.
90 void Start();
91
92 // Requests the ReconnectHandler thread to stop.
93 void Stop();
94
95 // Adds the atransport* to the queue of reconnect attempts.
96 void TrackTransport(atransport* transport);
97
98 private:
99 // The main thread loop.
100 void Run();
101
102 // Tracks a reconnection attempt.
103 struct ReconnectAttempt {
104 atransport* transport;
105 std::chrono::system_clock::time_point deadline;
106 size_t attempts_left;
107 };
108
109 // Only retry for up to one minute.
110 static constexpr const std::chrono::seconds kDefaultTimeout = std::chrono::seconds(10);
111 static constexpr const size_t kMaxAttempts = 6;
112
113 // Protects all members.
114 std::mutex reconnect_mutex_;
115 bool running_ GUARDED_BY(reconnect_mutex_) = true;
116 std::thread handler_thread_;
117 std::condition_variable reconnect_cv_;
118 std::queue<ReconnectAttempt> reconnect_queue_ GUARDED_BY(reconnect_mutex_);
119
120 DISALLOW_COPY_AND_ASSIGN(ReconnectHandler);
121};
122
123void ReconnectHandler::Start() {
124 check_main_thread();
125 handler_thread_ = std::thread(&ReconnectHandler::Run, this);
126}
127
128void ReconnectHandler::Stop() {
129 check_main_thread();
130 {
131 std::lock_guard<std::mutex> lock(reconnect_mutex_);
132 running_ = false;
133 }
134 reconnect_cv_.notify_one();
135 handler_thread_.join();
136
137 // Drain the queue to free all resources.
138 std::lock_guard<std::mutex> lock(reconnect_mutex_);
139 while (!reconnect_queue_.empty()) {
140 ReconnectAttempt attempt = reconnect_queue_.front();
141 reconnect_queue_.pop();
142 remove_transport(attempt.transport);
143 }
144}
145
146void ReconnectHandler::TrackTransport(atransport* transport) {
147 check_main_thread();
148 {
149 std::lock_guard<std::mutex> lock(reconnect_mutex_);
150 if (!running_) return;
151 reconnect_queue_.emplace(ReconnectAttempt{
152 transport, std::chrono::system_clock::now() + ReconnectHandler::kDefaultTimeout,
153 ReconnectHandler::kMaxAttempts});
154 }
155 reconnect_cv_.notify_one();
156}
157
158void ReconnectHandler::Run() {
159 while (true) {
160 ReconnectAttempt attempt;
161 {
162 std::unique_lock<std::mutex> lock(reconnect_mutex_);
163 ScopedAssumeLocked assume_lock(reconnect_mutex_);
164
165 auto deadline = std::chrono::time_point<std::chrono::system_clock>::max();
166 if (!reconnect_queue_.empty()) deadline = reconnect_queue_.front().deadline;
167 reconnect_cv_.wait_until(lock, deadline, [&]() REQUIRES(reconnect_mutex_) {
168 return !running_ ||
169 (!reconnect_queue_.empty() && reconnect_queue_.front().deadline < deadline);
170 });
171
172 if (!running_) return;
173 attempt = reconnect_queue_.front();
174 reconnect_queue_.pop();
175 if (attempt.transport->kicked()) {
176 D("transport %s was kicked. giving up on it.", attempt.transport->serial);
177 remove_transport(attempt.transport);
178 continue;
179 }
180 }
181 D("attempting to reconnect %s", attempt.transport->serial);
182
183 if (!attempt.transport->Reconnect()) {
184 D("attempting to reconnect %s failed.", attempt.transport->serial);
185 if (attempt.attempts_left == 0) {
186 D("transport %s exceeded the number of retry attempts. giving up on it.",
187 attempt.transport->serial);
188 remove_transport(attempt.transport);
189 continue;
190 }
191
192 std::lock_guard<std::mutex> lock(reconnect_mutex_);
193 reconnect_queue_.emplace(ReconnectAttempt{
194 attempt.transport,
195 std::chrono::system_clock::now() + ReconnectHandler::kDefaultTimeout,
196 attempt.attempts_left - 1});
197 continue;
198 }
199
200 D("reconnection to %s succeeded.", attempt.transport->serial);
201 register_transport(attempt.transport);
202 }
203}
204
205static auto& reconnect_handler = *new ReconnectHandler();
206
80} // namespace 207} // namespace
81 208
82TransportId NextTransportId() { 209TransportId NextTransportId() {
@@ -477,8 +604,6 @@ static int transport_write_action(int fd, struct tmsg* m) {
477 return 0; 604 return 0;
478} 605}
479 606
480static void remove_transport(atransport*);
481
482static void transport_registration_func(int _fd, unsigned ev, void*) { 607static void transport_registration_func(int _fd, unsigned ev, void*) {
483 tmsg m; 608 tmsg m;
484 atransport* t; 609 atransport* t;
@@ -515,8 +640,9 @@ static void transport_registration_func(int _fd, unsigned ev, void*) {
515 640
516 /* don't create transport threads for inaccessible devices */ 641 /* don't create transport threads for inaccessible devices */
517 if (t->GetConnectionState() != kCsNoPerm) { 642 if (t->GetConnectionState() != kCsNoPerm) {
518 /* initial references are the two threads */ 643 // The connection gets a reference to the atransport. It will release it
519 t->ref_count = 1; 644 // upon a read/write error.
645 t->ref_count++;
520 t->connection()->SetTransportName(t->serial_name()); 646 t->connection()->SetTransportName(t->serial_name());
521 t->connection()->SetReadCallback([t](Connection*, std::unique_ptr<apacket> p) { 647 t->connection()->SetReadCallback([t](Connection*, std::unique_ptr<apacket> p) {
522 if (!check_header(p.get(), t)) { 648 if (!check_header(p.get(), t)) {
@@ -547,13 +673,20 @@ static void transport_registration_func(int _fd, unsigned ev, void*) {
547 673
548 { 674 {
549 std::lock_guard<std::recursive_mutex> lock(transport_lock); 675 std::lock_guard<std::recursive_mutex> lock(transport_lock);
550 pending_list.remove(t); 676 auto it = std::find(pending_list.begin(), pending_list.end(), t);
551 transport_list.push_front(t); 677 if (it != pending_list.end()) {
678 pending_list.remove(t);
679 transport_list.push_front(t);
680 }
552 } 681 }
553 682
554 update_transports(); 683 update_transports();
555} 684}
556 685
686void init_reconnect_handler(void) {
687 reconnect_handler.Start();
688}
689
557void init_transport_registration(void) { 690void init_transport_registration(void) {
558 int s[2]; 691 int s[2];
559 692
@@ -572,6 +705,7 @@ void init_transport_registration(void) {
572} 705}
573 706
574void kick_all_transports() { 707void kick_all_transports() {
708 reconnect_handler.Stop();
575 // To avoid only writing part of a packet to a transport after exit, kick all transports. 709 // To avoid only writing part of a packet to a transport after exit, kick all transports.
576 std::lock_guard<std::recursive_mutex> lock(transport_lock); 710 std::lock_guard<std::recursive_mutex> lock(transport_lock);
577 for (auto t : transport_list) { 711 for (auto t : transport_list) {
@@ -601,15 +735,21 @@ static void remove_transport(atransport* transport) {
601} 735}
602 736
603static void transport_unref(atransport* t) { 737static void transport_unref(atransport* t) {
738 check_main_thread();
604 CHECK(t != nullptr); 739 CHECK(t != nullptr);
605 740
606 std::lock_guard<std::recursive_mutex> lock(transport_lock); 741 std::lock_guard<std::recursive_mutex> lock(transport_lock);
607 CHECK_GT(t->ref_count, 0u); 742 CHECK_GT(t->ref_count, 0u);
608 t->ref_count--; 743 t->ref_count--;
609 if (t->ref_count == 0) { 744 if (t->ref_count == 0) {
610 D("transport: %s unref (kicking and closing)", t->serial);
611 t->connection()->Stop(); 745 t->connection()->Stop();
612 remove_transport(t); 746 if (t->IsTcpDevice() && !t->kicked()) {
747 D("transport: %s unref (attempting reconnection) %d", t->serial, t->kicked());
748 reconnect_handler.TrackTransport(t);
749 } else {
750 D("transport: %s unref (kicking and closing)", t->serial);
751 remove_transport(t);
752 }
613 } else { 753 } else {
614 D("transport: %s unref (count=%zu)", t->serial, t->ref_count); 754 D("transport: %s unref (count=%zu)", t->serial, t->ref_count);
615 } 755 }
@@ -781,9 +921,8 @@ int atransport::Write(apacket* p) {
781} 921}
782 922
783void atransport::Kick() { 923void atransport::Kick() {
784 if (!kicked_) { 924 if (!kicked_.exchange(true)) {
785 D("kicking transport %s", this->serial); 925 D("kicking transport %p %s", this, this->serial);
786 kicked_ = true;
787 this->connection()->Stop(); 926 this->connection()->Stop();
788 } 927 }
789} 928}
@@ -941,6 +1080,10 @@ void atransport::SetConnectionEstablished(bool success) {
941 connection_waitable_->SetConnectionEstablished(success); 1080 connection_waitable_->SetConnectionEstablished(success);
942} 1081}
943 1082
1083bool atransport::Reconnect() {
1084 return reconnect_(this);
1085}
1086
944#if ADB_HOST 1087#if ADB_HOST
945 1088
946// We use newline as our delimiter, make sure to never output it. 1089// We use newline as our delimiter, make sure to never output it.
@@ -1021,8 +1164,9 @@ void close_usb_devices() {
1021} 1164}
1022#endif // ADB_HOST 1165#endif // ADB_HOST
1023 1166
1024int register_socket_transport(int s, const char* serial, int port, int local) { 1167int register_socket_transport(int s, const char* serial, int port, int local,
1025 atransport* t = new atransport(); 1168 atransport::ReconnectCallback reconnect) {
1169 atransport* t = new atransport(std::move(reconnect), kCsOffline);
1026 1170
1027 if (!serial) { 1171 if (!serial) {
1028 char buf[32]; 1172 char buf[32];
@@ -1103,7 +1247,7 @@ void kick_all_tcp_devices() {
1103 1247
1104void register_usb_transport(usb_handle* usb, const char* serial, const char* devpath, 1248void register_usb_transport(usb_handle* usb, const char* serial, const char* devpath,
1105 unsigned writeable) { 1249 unsigned writeable) {
1106 atransport* t = new atransport((writeable ? kCsConnecting : kCsNoPerm)); 1250 atransport* t = new atransport(writeable ? kCsOffline : kCsNoPerm);
1107 1251
1108 D("transport: %p init'ing for usb_handle %p (sn='%s')", t, usb, serial ? serial : ""); 1252 D("transport: %p init'ing for usb_handle %p (sn='%s')", t, usb, serial ? serial : "");
1109 init_usb_transport(t, usb); 1253 init_usb_transport(t, usb);
diff --git a/adb/transport.h b/adb/transport.h
index e1cbc092d..ae9cc023c 100644
--- a/adb/transport.h
+++ b/adb/transport.h
@@ -198,20 +198,27 @@ class atransport {
198 // class in one go is a very large change. Given how bad our testing is, 198 // class in one go is a very large change. Given how bad our testing is,
199 // it's better to do this piece by piece. 199 // it's better to do this piece by piece.
200 200
201 atransport(ConnectionState state = kCsConnecting) 201 using ReconnectCallback = std::function<bool(atransport*)>;
202
203 atransport(ReconnectCallback reconnect, ConnectionState state)
202 : id(NextTransportId()), 204 : id(NextTransportId()),
205 kicked_(false),
203 connection_state_(state), 206 connection_state_(state),
204 connection_waitable_(std::make_shared<ConnectionWaitable>()), 207 connection_waitable_(std::make_shared<ConnectionWaitable>()),
205 connection_(nullptr) { 208 connection_(nullptr),
209 reconnect_(std::move(reconnect)) {
206 // Initialize protocol to min version for compatibility with older versions. 210 // Initialize protocol to min version for compatibility with older versions.
207 // Version will be updated post-connect. 211 // Version will be updated post-connect.
208 protocol_version = A_VERSION_MIN; 212 protocol_version = A_VERSION_MIN;
209 max_payload = MAX_PAYLOAD; 213 max_payload = MAX_PAYLOAD;
210 } 214 }
215 atransport(ConnectionState state = kCsOffline)
216 : atransport([](atransport*) { return false; }, state) {}
211 virtual ~atransport(); 217 virtual ~atransport();
212 218
213 int Write(apacket* p); 219 int Write(apacket* p);
214 void Kick(); 220 void Kick();
221 bool kicked() const { return kicked_; }
215 222
216 // ConnectionState can be read by all threads, but can only be written in the main thread. 223 // ConnectionState can be read by all threads, but can only be written in the main thread.
217 ConnectionState GetConnectionState() const; 224 ConnectionState GetConnectionState() const;
@@ -286,8 +293,12 @@ class atransport {
286 // Gets a shared reference to the ConnectionWaitable. 293 // Gets a shared reference to the ConnectionWaitable.
287 std::shared_ptr<ConnectionWaitable> connection_waitable() { return connection_waitable_; } 294 std::shared_ptr<ConnectionWaitable> connection_waitable() { return connection_waitable_; }
288 295
296 // Attempts to reconnect with the underlying Connection. Returns true if the
297 // reconnection attempt succeeded.
298 bool Reconnect();
299
289 private: 300 private:
290 bool kicked_ = false; 301 std::atomic<bool> kicked_;
291 302
292 // A set of features transmitted in the banner with the initial connection. 303 // A set of features transmitted in the banner with the initial connection.
293 // This is stored in the banner as 'features=feature0,feature1,etc'. 304 // This is stored in the banner as 'features=feature0,feature1,etc'.
@@ -310,6 +321,9 @@ class atransport {
310 // The underlying connection object. 321 // The underlying connection object.
311 std::shared_ptr<Connection> connection_ GUARDED_BY(mutex_); 322 std::shared_ptr<Connection> connection_ GUARDED_BY(mutex_);
312 323
324 // A callback that will be invoked when the atransport needs to reconnect.
325 ReconnectCallback reconnect_;
326
313 std::mutex mutex_; 327 std::mutex mutex_;
314 328
315 DISALLOW_COPY_AND_ASSIGN(atransport); 329 DISALLOW_COPY_AND_ASSIGN(atransport);
@@ -333,6 +347,7 @@ void update_transports(void);
333// Stops iteration and returns false if fn returns false, otherwise returns true. 347// Stops iteration and returns false if fn returns false, otherwise returns true.
334bool iterate_transports(std::function<bool(const atransport*)> fn); 348bool iterate_transports(std::function<bool(const atransport*)> fn);
335 349
350void init_reconnect_handler(void);
336void init_transport_registration(void); 351void init_transport_registration(void);
337void init_mdns_transport_discovery(void); 352void init_mdns_transport_discovery(void);
338std::string list_transports(bool long_listing); 353std::string list_transports(bool long_listing);
@@ -347,7 +362,8 @@ void register_usb_transport(usb_handle* h, const char* serial,
347void connect_device(const std::string& address, std::string* response); 362void connect_device(const std::string& address, std::string* response);
348 363
349/* cause new transports to be init'd and added to the list */ 364/* cause new transports to be init'd and added to the list */
350int register_socket_transport(int s, const char* serial, int port, int local); 365int register_socket_transport(int s, const char* serial, int port, int local,
366 atransport::ReconnectCallback reconnect);
351 367
352// This should only be used for transports with connection_state == kCsNoPerm. 368// This should only be used for transports with connection_state == kCsNoPerm.
353void unregister_usb_transport(usb_handle* usb); 369void unregister_usb_transport(usb_handle* usb);
diff --git a/adb/transport_local.cpp b/adb/transport_local.cpp
index e81f27c95..181d6665d 100644
--- a/adb/transport_local.cpp
+++ b/adb/transport_local.cpp
@@ -68,28 +68,24 @@ bool local_connect(int port) {
68 return local_connect_arbitrary_ports(port - 1, port, &dummy) == 0; 68 return local_connect_arbitrary_ports(port - 1, port, &dummy) == 0;
69} 69}
70 70
71void connect_device(const std::string& address, std::string* response) { 71std::tuple<unique_fd, int, std::string> tcp_connect(const std::string& address,
72 if (address.empty()) { 72 std::string* response) {
73 *response = "empty address";
74 return;
75 }
76
77 std::string serial; 73 std::string serial;
78 std::string host; 74 std::string host;
79 int port = DEFAULT_ADB_LOCAL_TRANSPORT_PORT; 75 int port = DEFAULT_ADB_LOCAL_TRANSPORT_PORT;
80 if (!android::base::ParseNetAddress(address, &host, &port, &serial, response)) { 76 if (!android::base::ParseNetAddress(address, &host, &port, &serial, response)) {
81 return; 77 return std::make_tuple(unique_fd(), port, serial);
82 } 78 }
83 79
84 std::string error; 80 std::string error;
85 int fd = network_connect(host.c_str(), port, SOCK_STREAM, 10, &error); 81 unique_fd fd(network_connect(host.c_str(), port, SOCK_STREAM, 10, &error));
86 if (fd == -1) { 82 if (fd == -1) {
87 *response = android::base::StringPrintf("unable to connect to %s: %s", 83 *response = android::base::StringPrintf("unable to connect to %s: %s",
88 serial.c_str(), error.c_str()); 84 serial.c_str(), error.c_str());
89 return; 85 return std::make_tuple(std::move(fd), port, serial);
90 } 86 }
91 87
92 D("client: connected %s remote on fd %d", serial.c_str(), fd); 88 D("client: connected %s remote on fd %d", serial.c_str(), fd.get());
93 close_on_exec(fd); 89 close_on_exec(fd);
94 disable_tcp_nagle(fd); 90 disable_tcp_nagle(fd);
95 91
@@ -98,7 +94,38 @@ void connect_device(const std::string& address, std::string* response) {
98 D("warning: failed to configure TCP keepalives (%s)", strerror(errno)); 94 D("warning: failed to configure TCP keepalives (%s)", strerror(errno));
99 } 95 }
100 96
101 int ret = register_socket_transport(fd, serial.c_str(), port, 0); 97 return std::make_tuple(std::move(fd), port, serial);
98}
99
100void connect_device(const std::string& address, std::string* response) {
101 if (address.empty()) {
102 *response = "empty address";
103 return;
104 }
105
106 unique_fd fd;
107 int port;
108 std::string serial;
109 std::tie(fd, port, serial) = tcp_connect(address, response);
110 auto reconnect = [address](atransport* t) {
111 std::string response;
112 unique_fd fd;
113 int port;
114 std::string serial;
115 std::tie(fd, port, serial) = tcp_connect(address, &response);
116 if (fd == -1) {
117 D("reconnect failed: %s", response.c_str());
118 return false;
119 }
120
121 // This invokes the part of register_socket_transport() that needs to be
122 // invoked if the atransport* has already been setup. This eventually
123 // calls atransport->SetConnection() with a newly created Connection*
124 // that will in turn send the CNXN packet.
125 return init_socket_transport(t, fd.release(), port, 0) >= 0;
126 };
127
128 int ret = register_socket_transport(fd.release(), serial.c_str(), port, 0, std::move(reconnect));
102 if (ret < 0) { 129 if (ret < 0) {
103 adb_close(fd); 130 adb_close(fd);
104 if (ret == -EALREADY) { 131 if (ret == -EALREADY) {
@@ -135,7 +162,8 @@ int local_connect_arbitrary_ports(int console_port, int adb_port, std::string* e
135 close_on_exec(fd); 162 close_on_exec(fd);
136 disable_tcp_nagle(fd); 163 disable_tcp_nagle(fd);
137 std::string serial = getEmulatorSerialString(console_port); 164 std::string serial = getEmulatorSerialString(console_port);
138 if (register_socket_transport(fd, serial.c_str(), adb_port, 1) == 0) { 165 if (register_socket_transport(fd, serial.c_str(), adb_port, 1,
166 [](atransport*) { return false; }) == 0) {
139 return 0; 167 return 0;
140 } 168 }
141 adb_close(fd); 169 adb_close(fd);
@@ -239,7 +267,8 @@ static void server_socket_thread(int port) {
239 close_on_exec(fd); 267 close_on_exec(fd);
240 disable_tcp_nagle(fd); 268 disable_tcp_nagle(fd);
241 std::string serial = android::base::StringPrintf("host-%d", fd); 269 std::string serial = android::base::StringPrintf("host-%d", fd);
242 if (register_socket_transport(fd, serial.c_str(), port, 1) != 0) { 270 if (register_socket_transport(fd, serial.c_str(), port, 1,
271 [](atransport*) { return false; }) != 0) {
243 adb_close(fd); 272 adb_close(fd);
244 } 273 }
245 } 274 }
@@ -338,7 +367,8 @@ static void qemu_socket_thread(int port) {
338 /* Host is connected. Register the transport, and start the 367 /* Host is connected. Register the transport, and start the
339 * exchange. */ 368 * exchange. */
340 std::string serial = android::base::StringPrintf("host-%d", fd); 369 std::string serial = android::base::StringPrintf("host-%d", fd);
341 if (register_socket_transport(fd, serial.c_str(), port, 1) != 0 || 370 if (register_socket_transport(fd, serial.c_str(), port, 1,
371 [](atransport*) { return false; }) != 0 ||
342 !WriteFdExactly(fd, _start_req, strlen(_start_req))) { 372 !WriteFdExactly(fd, _start_req, strlen(_start_req))) {
343 adb_close(fd); 373 adb_close(fd);
344 } 374 }