]> Gitweb @ Texas Instruments - Open Source Git Repositories - git.TI.com/gitweb - processor-sdk/performance-audio-sr.git/blob - tools/pyalpha/serial/threaded/__init__.py
PASDK-53: Hard-coding 192 KHz as the IB SampleRate, to aid with SIDK progress.
[processor-sdk/performance-audio-sr.git] / tools / pyalpha / serial / threaded / __init__.py
1 #!/usr/bin/env python3
2 #
3 # Working with threading and pySerial
4 #
5 # This file is part of pySerial. https://github.com/pyserial/pyserial
6 # (C) 2015 Chris Liechti <cliechti@gmx.net>
7 #
8 # SPDX-License-Identifier:    BSD-3-Clause
9 """\
10 Support threading with serial ports.
11 """
12 import serial
13 import threading
16 class Protocol(object):
17     """\
18     Protocol as used by the ReaderThread. This base class provides empty
19     implementations of all methods.
20     """
22     def connection_made(self, transport):
23         """Called when reader thread is started"""
25     def data_received(self, data):
26         """Called with snippets received from the serial port"""
28     def connection_lost(self, exc):
29         """\
30         Called when the serial port is closed or the reader loop terminated
31         otherwise.
32         """
33         if isinstance(exc, Exception):
34             raise exc
37 class Packetizer(Protocol):
38     """
39     Read binary packets from serial port. Packets are expected to be terminated
40     with a TERMINATOR byte (null byte by default).
42     The class also keeps track of the transport.
43     """
45     TERMINATOR = b'\0'
47     def __init__(self):
48         self.buffer = bytearray()
49         self.transport = None
51     def connection_made(self, transport):
52         """Store transport"""
53         self.transport = transport
55     def connection_lost(self, exc):
56         """Forget transport"""
57         self.transport = None
58         super(Packetizer, self).connection_lost(exc)
60     def data_received(self, data):
61         """Buffer received data, find TERMINATOR, call handle_packet"""
62         self.buffer.extend(data)
63         while self.TERMINATOR in self.buffer:
64             packet, self.buffer = self.buffer.split(self.TERMINATOR, 1)
65             self.handle_packet(packet)
67     def handle_packet(self, packet):
68         """Process packets - to be overridden by subclassing"""
69         raise NotImplementedError('please implement functionality in handle_packet')
72 class FramedPacket(Protocol):
73     """
74     Read binary packets. Packets are expected to have a start and stop marker.
76     The class also keeps track of the transport.
77     """
79     START = b'('
80     STOP = b')'
82     def __init__(self):
83         self.packet = bytearray()
84         self.in_packet = False
85         self.transport = None
87     def connection_made(self, transport):
88         """Store transport"""
89         self.transport = transport
91     def connection_lost(self, exc):
92         """Forget transport"""
93         self.transport = None
94         self.in_packet = False
95         del self.packet[:]
96         super(FramedPacket, self).connection_lost(exc)
98     def data_received(self, data):
99         """Find data enclosed in START/STOP, call handle_packet"""
100         for byte in serial.iterbytes(data):
101             if byte == self.START:
102                 self.in_packet = True
103             elif byte == self.STOP:
104                 self.in_packet = False
105                 self.handle_packet(self.packet)
106                 del self.packet[:]
107             elif self.in_packet:
108                 self.packet.append(byte)
109             else:
110                 self.handle_out_of_packet_data(byte)
112     def handle_packet(self, packet):
113         """Process packets - to be overridden by subclassing"""
114         raise NotImplementedError('please implement functionality in handle_packet')
116     def handle_out_of_packet_data(self, data):
117         """Process data that is received outside of packets"""
118         pass
121 class LineReader(Packetizer):
122     """
123     Read and write (Unicode) lines from/to serial port.
124     The encoding is applied.
125     """
127     TERMINATOR = b'\r\n'
128     ENCODING = 'utf-8'
129     UNICODE_HANDLING = 'replace'
131     def handle_packet(self, packet):
132         self.handle_line(packet.decode(self.ENCODING, self.UNICODE_HANDLING))
134     def handle_line(self, line):
135         """Process one line - to be overridden by subclassing"""
136         raise NotImplementedError('please implement functionality in handle_line')
138     def write_line(self, text):
139         """
140         Write text to the transport. ``text`` is a Unicode string and the encoding
141         is applied before sending ans also the newline is append.
142         """
143         # + is not the best choice but bytes does not support % or .format in py3 and we want a single write call
144         self.transport.write(text.encode(self.ENCODING, self.UNICODE_HANDLING) + self.TERMINATOR)
147 class ReaderThread(threading.Thread):
148     """\
149     Implement a serial port read loop and dispatch to a Protocol instance (like
150     the asyncio.Protocol) but do it with threads.
152     Calls to close() will close the serial port but it is also possible to just
153     stop() this thread and continue the serial port instance otherwise.
154     """
156     def __init__(self, serial_instance, protocol_factory):
157         """\
158         Initialize thread.
160         Note that the serial_instance' timeout is set to one second!
161         Other settings are not changed.
162         """
163         super(ReaderThread, self).__init__()
164         self.daemon = True
165         self.serial = serial_instance
166         self.protocol_factory = protocol_factory
167         self.alive = True
168         self._lock = threading.Lock()
169         self._connection_made = threading.Event()
170         self.protocol = None
172     def stop(self):
173         """Stop the reader thread"""
174         self.alive = False
175         if hasattr(self.serial, 'cancel_read'):
176             self.serial.cancel_read()
177         self.join(2)
179     def run(self):
180         """Reader loop"""
181         if not hasattr(self.serial, 'cancel_read'):
182             self.serial.timeout = 1
183         self.protocol = self.protocol_factory()
184         try:
185             self.protocol.connection_made(self)
186         except Exception as e:
187             self.alive = False
188             self.protocol.connection_lost(e)
189             self._connection_made.set()
190             return
191         error = None
192         self._connection_made.set()
193         while self.alive and self.serial.is_open:
194             try:
195                 # read all that is there or wait for one byte (blocking)
196                 data = self.serial.read(self.serial.in_waiting or 1)
197             except serial.SerialException as e:
198                 # probably some I/O problem such as disconnected USB serial
199                 # adapters -> exit
200                 error = e
201                 break
202             else:
203                 if data:
204                     # make a separated try-except for called used code
205                     try:
206                         self.protocol.data_received(data)
207                     except Exception as e:
208                         error = e
209                         break
210         self.alive = False
211         self.protocol.connection_lost(error)
212         self.protocol = None
214     def write(self, data):
215         """Thread safe writing (uses lock)"""
216         with self._lock:
217             self.serial.write(data)
219     def close(self):
220         """Close the serial port and exit reader thread (uses lock)"""
221         # use the lock to let other threads finish writing
222         with self._lock:
223             # first stop reading, so that closing can be done on idle port
224             self.stop()
225             self.serial.close()
227     def connect(self):
228         """
229         Wait until connection is set up and return the transport and protocol
230         instances.
231         """
232         if self.alive:
233             self._connection_made.wait()
234             if not self.alive:
235                 raise RuntimeError('connection_lost already called')
236             return (self, self.protocol)
237         else:
238             raise RuntimeError('already stopped')
240     # - -  context manager, returns protocol
242     def __enter__(self):
243         """\
244         Enter context handler. May raise RuntimeError in case the connection
245         could not be created.
246         """
247         self.start()
248         self._connection_made.wait()
249         if not self.alive:
250             raise RuntimeError('connection_lost already called')
251         return self.protocol
253     def __exit__(self, exc_type, exc_val, exc_tb):
254         """Leave context: close port"""
255         self.close()
258 # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
259 # test
260 if __name__ == '__main__':
261     # pylint: disable=wrong-import-position
262     import sys
263     import time
264     import traceback
266     #~ PORT = 'spy:///dev/ttyUSB0'
267     PORT = 'loop://'
269     class PrintLines(LineReader):
270         def connection_made(self, transport):
271             super(PrintLines, self).connection_made(transport)
272             sys.stdout.write('port opened\n')
273             self.write_line('hello world')
275         def handle_line(self, data):
276             sys.stdout.write('line received: {}\n'.format(repr(data)))
278         def connection_lost(self, exc):
279             if exc:
280                 traceback.print_exc(exc)
281             sys.stdout.write('port closed\n')
283     ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1)
284     with ReaderThread(ser, PrintLines) as protocol:
285         protocol.write_line('hello')
286         time.sleep(2)
288     # alternative usage
289     ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1)
290     t = ReaderThread(ser, PrintLines)
291     t.start()
292     transport, protocol = t.connect()
293     protocol.write_line('hello')
294     time.sleep(2)
295     t.close()