[processor-sdk/performance-audio-sr.git] / tools / pyalpha / pyalpha / serial / threaded / __init__.py
1 #!/usr/bin/env python3
2 #
3 # Working with threading and pySerial
4 #
5 # This file is part of pySerial. https://github.com/pyserial/pyserial
6 # (C) 2015 Chris Liechti <cliechti@gmx.net>
7 #
8 # SPDX-License-Identifier: BSD-3-Clause
9 """\
10 Support threading with serial ports.
11 """
12 import serial
13 import threading
16 class Protocol(object):
17 """\
18 Protocol as used by the ReaderThread. This base class provides empty
19 implementations of all methods.
20 """
22 def connection_made(self, transport):
23 """Called when reader thread is started"""
25 def data_received(self, data):
26 """Called with snippets received from the serial port"""
28 def connection_lost(self, exc):
29 """\
30 Called when the serial port is closed or the reader loop terminated
31 otherwise.
32 """
33 if isinstance(exc, Exception):
34 raise exc
37 class Packetizer(Protocol):
38 """
39 Read binary packets from serial port. Packets are expected to be terminated
40 with a TERMINATOR byte (null byte by default).
42 The class also keeps track of the transport.
43 """
45 TERMINATOR = b'\0'
47 def __init__(self):
48 self.buffer = bytearray()
49 self.transport = None
51 def connection_made(self, transport):
52 """Store transport"""
53 self.transport = transport
55 def connection_lost(self, exc):
56 """Forget transport"""
57 self.transport = None
58 super(Packetizer, self).connection_lost(exc)
60 def data_received(self, data):
61 """Buffer received data, find TERMINATOR, call handle_packet"""
62 self.buffer.extend(data)
63 while self.TERMINATOR in self.buffer:
64 packet, self.buffer = self.buffer.split(self.TERMINATOR, 1)
65 self.handle_packet(packet)
67 def handle_packet(self, packet):
68 """Process packets - to be overridden by subclassing"""
69 raise NotImplementedError('please implement functionality in handle_packet')
72 class FramedPacket(Protocol):
73 """
74 Read binary packets. Packets are expected to have a start and stop marker.
76 The class also keeps track of the transport.
77 """
79 START = b'('
80 STOP = b')'
82 def __init__(self):
83 self.packet = bytearray()
84 self.in_packet = False
85 self.transport = None
87 def connection_made(self, transport):
88 """Store transport"""
89 self.transport = transport
91 def connection_lost(self, exc):
92 """Forget transport"""
93 self.transport = None
94 self.in_packet = False
95 del self.packet[:]
96 super(FramedPacket, self).connection_lost(exc)
98 def data_received(self, data):
99 """Find data enclosed in START/STOP, call handle_packet"""
100 for byte in serial.iterbytes(data):
101 if byte == self.START:
102 self.in_packet = True
103 elif byte == self.STOP:
104 self.in_packet = False
105 self.handle_packet(self.packet)
106 del self.packet[:]
107 elif self.in_packet:
108 self.packet.append(byte)
109 else:
110 self.handle_out_of_packet_data(byte)
112 def handle_packet(self, packet):
113 """Process packets - to be overridden by subclassing"""
114 raise NotImplementedError('please implement functionality in handle_packet')
116 def handle_out_of_packet_data(self, data):
117 """Process data that is received outside of packets"""
118 pass
121 class LineReader(Packetizer):
122 """
123 Read and write (Unicode) lines from/to serial port.
124 The encoding is applied.
125 """
127 TERMINATOR = b'\r\n'
128 ENCODING = 'utf-8'
129 UNICODE_HANDLING = 'replace'
131 def handle_packet(self, packet):
132 self.handle_line(packet.decode(self.ENCODING, self.UNICODE_HANDLING))
134 def handle_line(self, line):
135 """Process one line - to be overridden by subclassing"""
136 raise NotImplementedError('please implement functionality in handle_line')
138 def write_line(self, text):
139 """
140 Write text to the transport. ``text`` is a Unicode string and the encoding
141 is applied before sending ans also the newline is append.
142 """
143 # + is not the best choice but bytes does not support % or .format in py3 and we want a single write call
144 self.transport.write(text.encode(self.ENCODING, self.UNICODE_HANDLING) + self.TERMINATOR)
147 class ReaderThread(threading.Thread):
148 """\
149 Implement a serial port read loop and dispatch to a Protocol instance (like
150 the asyncio.Protocol) but do it with threads.
152 Calls to close() will close the serial port but it is also possible to just
153 stop() this thread and continue the serial port instance otherwise.
154 """
156 def __init__(self, serial_instance, protocol_factory):
157 """\
158 Initialize thread.
160 Note that the serial_instance' timeout is set to one second!
161 Other settings are not changed.
162 """
163 super(ReaderThread, self).__init__()
164 self.daemon = True
165 self.serial = serial_instance
166 self.protocol_factory = protocol_factory
167 self.alive = True
168 self._lock = threading.Lock()
169 self._connection_made = threading.Event()
170 self.protocol = None
172 def stop(self):
173 """Stop the reader thread"""
174 self.alive = False
175 if hasattr(self.serial, 'cancel_read'):
176 self.serial.cancel_read()
177 self.join(2)
179 def run(self):
180 """Reader loop"""
181 if not hasattr(self.serial, 'cancel_read'):
182 self.serial.timeout = 1
183 self.protocol = self.protocol_factory()
184 try:
185 self.protocol.connection_made(self)
186 except Exception as e:
187 self.alive = False
188 self.protocol.connection_lost(e)
189 self._connection_made.set()
190 return
191 error = None
192 self._connection_made.set()
193 while self.alive and self.serial.is_open:
194 try:
195 # read all that is there or wait for one byte (blocking)
196 data = self.serial.read(self.serial.in_waiting or 1)
197 except serial.SerialException as e:
198 # probably some I/O problem such as disconnected USB serial
199 # adapters -> exit
200 error = e
201 break
202 else:
203 if data:
204 # make a separated try-except for called used code
205 try:
206 self.protocol.data_received(data)
207 except Exception as e:
208 error = e
209 break
210 self.alive = False
211 self.protocol.connection_lost(error)
212 self.protocol = None
214 def write(self, data):
215 """Thread safe writing (uses lock)"""
216 with self._lock:
217 self.serial.write(data)
219 def close(self):
220 """Close the serial port and exit reader thread (uses lock)"""
221 # use the lock to let other threads finish writing
222 with self._lock:
223 # first stop reading, so that closing can be done on idle port
224 self.stop()
225 self.serial.close()
227 def connect(self):
228 """
229 Wait until connection is set up and return the transport and protocol
230 instances.
231 """
232 if self.alive:
233 self._connection_made.wait()
234 if not self.alive:
235 raise RuntimeError('connection_lost already called')
236 return (self, self.protocol)
237 else:
238 raise RuntimeError('already stopped')
240 # - - context manager, returns protocol
242 def __enter__(self):
243 """\
244 Enter context handler. May raise RuntimeError in case the connection
245 could not be created.
246 """
247 self.start()
248 self._connection_made.wait()
249 if not self.alive:
250 raise RuntimeError('connection_lost already called')
251 return self.protocol
253 def __exit__(self, exc_type, exc_val, exc_tb):
254 """Leave context: close port"""
255 self.close()
258 # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
259 # test
260 if __name__ == '__main__':
261 # pylint: disable=wrong-import-position
262 import sys
263 import time
264 import traceback
266 #~ PORT = 'spy:///dev/ttyUSB0'
267 PORT = 'loop://'
269 class PrintLines(LineReader):
270 def connection_made(self, transport):
271 super(PrintLines, self).connection_made(transport)
272 sys.stdout.write('port opened\n')
273 self.write_line('hello world')
275 def handle_line(self, data):
276 sys.stdout.write('line received: {}\n'.format(repr(data)))
278 def connection_lost(self, exc):
279 if exc:
280 traceback.print_exc(exc)
281 sys.stdout.write('port closed\n')
283 ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1)
284 with ReaderThread(ser, PrintLines) as protocol:
285 protocol.write_line('hello')
286 time.sleep(2)
288 # alternative usage
289 ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1)
290 t = ReaderThread(ser, PrintLines)
291 t.start()
292 transport, protocol = t.connect()
293 protocol.write_line('hello')
294 time.sleep(2)
295 t.close()