]> Gitweb @ Texas Instruments - Open Source Git Repositories - git.TI.com/gitweb - processor-sdk/performance-audio-sr.git/blob - procsdk_audio_x_xx_xx_xx/tools/pyalpha/pyalpha/serial/urlhandler/protocol_spy.py
34790100d03eca01aa1f4e39be723e217c240d2b
[processor-sdk/performance-audio-sr.git] / procsdk_audio_x_xx_xx_xx / tools / pyalpha / pyalpha / serial / urlhandler / protocol_spy.py
1 #! python
2 #
3 # This module implements a special URL handler that wraps an other port,
4 # print the traffic for debugging purposes. With this, it is possible
5 # to debug the serial port traffic on every application that uses
6 # serial_for_url.
7 #
8 # This file is part of pySerial. https://github.com/pyserial/pyserial
9 # (C) 2015 Chris Liechti <cliechti@gmx.net>
10 #
11 # SPDX-License-Identifier:    BSD-3-Clause
12 #
13 # URL format:    spy://port[?option[=value][&option[=value]]]
14 # options:
15 # - dev=X   a file or device to write to
16 # - color   use escape code to colorize output
17 # - raw     forward raw bytes instead of hexdump
18 #
19 # example:
20 #   redirect output to an other terminal window on Posix (Linux):
21 #   python -m serial.tools.miniterm spy:///dev/ttyUSB0?dev=/dev/pts/14\&color
23 import sys
24 import time
26 import serial
28 try:
29     import urlparse
30 except ImportError:
31     import urllib.parse as urlparse
34 def sixteen(data):
35     """\
36     yield tuples of hex and ASCII display in multiples of 16. Includes a
37     space after 8 bytes and (None, None) after 16 bytes and at the end.
38     """
39     n = 0
40     for b in serial.iterbytes(data):
41         yield ('{:02X} '.format(ord(b)), b.decode('ascii') if b' ' <= b < b'\x7f' else '.')
42         n += 1
43         if n == 8:
44             yield (' ', '')
45         elif n >= 16:
46             yield (None, None)
47             n = 0
48     if n > 0:
49         while n < 16:
50             n += 1
51             if n == 8:
52                 yield (' ', '')
53             yield ('   ', ' ')
54         yield (None, None)
57 def hexdump(data):
58     """yield lines with hexdump of data"""
59     values = []
60     ascii = []
61     offset = 0
62     for h, a in sixteen(data):
63         if h is None:
64             yield (offset, ' '.join([''.join(values), ''.join(ascii)]))
65             del values[:]
66             del ascii[:]
67             offset += 0x10
68         else:
69             values.append(h)
70             ascii.append(a)
73 class FormatRaw(object):
74     """Forward only RX and TX data to output."""
76     def __init__(self, output, color):
77         self.output = output
78         self.color = color
79         self.rx_color = '\x1b[32m'
80         self.tx_color = '\x1b[31m'
82     def rx(self, data):
83         """show received data"""
84         if self.color:
85             self.output.write(self.rx_color)
86         self.output.write(data)
87         self.output.flush()
89     def tx(self, data):
90         """show transmitted data"""
91         if self.color:
92             self.output.write(self.tx_color)
93         self.output.write(data)
94         self.output.flush()
96     def control(self, name, value):
97         """(do not) show control calls"""
98         pass
101 class FormatHexdump(object):
102     """\
103     Create a hex dump of RX ad TX data, show when control lines are read or
104     written.
106     output example::
108         000000.000 Q-RX flushInput
109         000002.469 RTS  inactive
110         000002.773 RTS  active
111         000003.001 TX   48 45 4C 4C 4F                                    HELLO
112         000003.102 RX   48 45 4C 4C 4F                                    HELLO
114     """
116     def __init__(self, output, color):
117         self.start_time = time.time()
118         self.output = output
119         self.color = color
120         self.rx_color = '\x1b[32m'
121         self.tx_color = '\x1b[31m'
122         self.control_color = '\x1b[37m'
124     def write_line(self, timestamp, label, value, value2=''):
125         self.output.write('{:010.3f} {:4} {}{}\n'.format(timestamp, label, value, value2))
126         self.output.flush()
128     def rx(self, data):
129         """show received data as hex dump"""
130         if self.color:
131             self.output.write(self.rx_color)
132         if data:
133             for offset, row in hexdump(data):
134                 self.write_line(time.time() - self.start_time, 'RX', '{:04X}  '.format(offset), row)
135         else:
136             self.write_line(time.time() - self.start_time, 'RX', '<empty>')
138     def tx(self, data):
139         """show transmitted data as hex dump"""
140         if self.color:
141             self.output.write(self.tx_color)
142         for offset, row in hexdump(data):
143             self.write_line(time.time() - self.start_time, 'TX', '{:04X}  '.format(offset), row)
145     def control(self, name, value):
146         """show control calls"""
147         if self.color:
148             self.output.write(self.control_color)
149         self.write_line(time.time() - self.start_time, name, value)
152 class Serial(serial.Serial):
153     """\
154     Inherit the native Serial port implementation and wrap all the methods and
155     attributes.
156     """
157     # pylint: disable=no-member
159     def __init__(self, *args, **kwargs):
160         super(Serial, self).__init__(*args, **kwargs)
161         self.formatter = None
162         self.show_all = False
164     @serial.Serial.port.setter
165     def port(self, value):
166         if value is not None:
167             serial.Serial.port.__set__(self, self.from_url(value))
169     def from_url(self, url):
170         """extract host and port from an URL string"""
171         parts = urlparse.urlsplit(url)
172         if parts.scheme != 'spy':
173             raise serial.SerialException(
174                 'expected a string in the form '
175                 '"spy://port[?option[=value][&option[=value]]]": '
176                 'not starting with spy:// ({!r})'.format(parts.scheme))
177         # process options now, directly altering self
178         formatter = FormatHexdump
179         color = False
180         output = sys.stderr
181         try:
182             for option, values in urlparse.parse_qs(parts.query, True).items():
183                 if option == 'file':
184                     output = open(values[0], 'w')
185                 elif option == 'color':
186                     color = True
187                 elif option == 'raw':
188                     formatter = FormatRaw
189                 elif option == 'all':
190                     self.show_all = True
191                 else:
192                     raise ValueError('unknown option: {!r}'.format(option))
193         except ValueError as e:
194             raise serial.SerialException(
195                 'expected a string in the form '
196                 '"spy://port[?option[=value][&option[=value]]]": {}'.format(e))
197         self.formatter = formatter(output, color)
198         return ''.join([parts.netloc, parts.path])
200     def write(self, tx):
201         self.formatter.tx(tx)
202         return super(Serial, self).write(tx)
204     def read(self, size=1):
205         rx = super(Serial, self).read(size)
206         if rx or self.show_all:
207             self.formatter.rx(rx)
208         return rx
210     if hasattr(serial.Serial, 'cancel_read'):
211         def cancel_read(self):
212             self.formatter.control('Q-RX', 'cancel_read')
213             super(Serial, self).cancel_read()
215     if hasattr(serial.Serial, 'cancel_write'):
216         def cancel_write(self):
217             self.formatter.control('Q-TX', 'cancel_write')
218             super(Serial, self).cancel_write()
220     @property
221     def in_waiting(self):
222         n = super(Serial, self).in_waiting
223         if self.show_all:
224             self.formatter.control('Q-RX', 'in_waiting -> {}'.format(n))
225         return n
227     def flush(self):
228         self.formatter.control('Q-TX', 'flush')
229         super(Serial, self).flush()
231     def reset_input_buffer(self):
232         self.formatter.control('Q-RX', 'reset_input_buffer')
233         super(Serial, self).reset_input_buffer()
235     def reset_output_buffer(self):
236         self.formatter.control('Q-TX', 'reset_output_buffer')
237         super(Serial, self).reset_output_buffer()
239     def send_break(self, duration=0.25):
240         self.formatter.control('BRK', 'send_break {}s'.format(duration))
241         super(Serial, self).send_break(duration)
243     @serial.Serial.break_condition.setter
244     def break_condition(self, level):
245         self.formatter.control('BRK', 'active' if level else 'inactive')
246         serial.Serial.break_condition.__set__(self, level)
248     @serial.Serial.rts.setter
249     def rts(self, level):
250         self.formatter.control('RTS', 'active' if level else 'inactive')
251         serial.Serial.rts.__set__(self, level)
253     @serial.Serial.dtr.setter
254     def dtr(self, level):
255         self.formatter.control('DTR', 'active' if level else 'inactive')
256         serial.Serial.dtr.__set__(self, level)
258     @serial.Serial.cts.getter
259     def cts(self):
260         level = super(Serial, self).cts
261         self.formatter.control('CTS', 'active' if level else 'inactive')
262         return level
264     @serial.Serial.dsr.getter
265     def dsr(self):
266         level = super(Serial, self).dsr
267         self.formatter.control('DSR', 'active' if level else 'inactive')
268         return level
270     @serial.Serial.ri.getter
271     def ri(self):
272         level = super(Serial, self).ri
273         self.formatter.control('RI', 'active' if level else 'inactive')
274         return level
276     @serial.Serial.cd.getter
277     def cd(self):
278         level = super(Serial, self).cd
279         self.formatter.control('CD', 'active' if level else 'inactive')
280         return level
282 # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
283 if __name__ == '__main__':
284     ser = Serial(None)
285     ser.port = 'spy:///dev/ttyS0'
286     print(ser)