]> Gitweb @ Texas Instruments - Open Source Git Repositories - git.TI.com/gitweb - processor-sdk/performance-audio-sr.git/blobdiff - processor_audio_sdk_1_00_00_00/tools/pyalpha/pyalpha/serial/threaded/__init__.py
Remove processor_audio_sdk_1_00_00_00 subfolder
[processor-sdk/performance-audio-sr.git] / processor_audio_sdk_1_00_00_00 / tools / pyalpha / pyalpha / serial / threaded / __init__.py
diff --git a/processor_audio_sdk_1_00_00_00/tools/pyalpha/pyalpha/serial/threaded/__init__.py b/processor_audio_sdk_1_00_00_00/tools/pyalpha/pyalpha/serial/threaded/__init__.py
deleted file mode 100644 (file)
index 325d4a3..0000000
+++ /dev/null
@@ -1,295 +0,0 @@
-#!/usr/bin/env python3
-#
-# Working with threading and pySerial
-#
-# This file is part of pySerial. https://github.com/pyserial/pyserial
-# (C) 2015 Chris Liechti <cliechti@gmx.net>
-#
-# SPDX-License-Identifier:    BSD-3-Clause
-"""\
-Support threading with serial ports.
-"""
-import serial
-import threading
-
-
-class Protocol(object):
-    """\
-    Protocol as used by the ReaderThread. This base class provides empty
-    implementations of all methods.
-    """
-
-    def connection_made(self, transport):
-        """Called when reader thread is started"""
-
-    def data_received(self, data):
-        """Called with snippets received from the serial port"""
-
-    def connection_lost(self, exc):
-        """\
-        Called when the serial port is closed or the reader loop terminated
-        otherwise.
-        """
-        if isinstance(exc, Exception):
-            raise exc
-
-
-class Packetizer(Protocol):
-    """
-    Read binary packets from serial port. Packets are expected to be terminated
-    with a TERMINATOR byte (null byte by default).
-
-    The class also keeps track of the transport.
-    """
-
-    TERMINATOR = b'\0'
-
-    def __init__(self):
-        self.buffer = bytearray()
-        self.transport = None
-
-    def connection_made(self, transport):
-        """Store transport"""
-        self.transport = transport
-
-    def connection_lost(self, exc):
-        """Forget transport"""
-        self.transport = None
-        super(Packetizer, self).connection_lost(exc)
-
-    def data_received(self, data):
-        """Buffer received data, find TERMINATOR, call handle_packet"""
-        self.buffer.extend(data)
-        while self.TERMINATOR in self.buffer:
-            packet, self.buffer = self.buffer.split(self.TERMINATOR, 1)
-            self.handle_packet(packet)
-
-    def handle_packet(self, packet):
-        """Process packets - to be overridden by subclassing"""
-        raise NotImplementedError('please implement functionality in handle_packet')
-
-
-class FramedPacket(Protocol):
-    """
-    Read binary packets. Packets are expected to have a start and stop marker.
-
-    The class also keeps track of the transport.
-    """
-
-    START = b'('
-    STOP = b')'
-
-    def __init__(self):
-        self.packet = bytearray()
-        self.in_packet = False
-        self.transport = None
-
-    def connection_made(self, transport):
-        """Store transport"""
-        self.transport = transport
-
-    def connection_lost(self, exc):
-        """Forget transport"""
-        self.transport = None
-        self.in_packet = False
-        del self.packet[:]
-        super(FramedPacket, self).connection_lost(exc)
-
-    def data_received(self, data):
-        """Find data enclosed in START/STOP, call handle_packet"""
-        for byte in serial.iterbytes(data):
-            if byte == self.START:
-                self.in_packet = True
-            elif byte == self.STOP:
-                self.in_packet = False
-                self.handle_packet(self.packet)
-                del self.packet[:]
-            elif self.in_packet:
-                self.packet.append(byte)
-            else:
-                self.handle_out_of_packet_data(byte)
-
-    def handle_packet(self, packet):
-        """Process packets - to be overridden by subclassing"""
-        raise NotImplementedError('please implement functionality in handle_packet')
-
-    def handle_out_of_packet_data(self, data):
-        """Process data that is received outside of packets"""
-        pass
-
-
-class LineReader(Packetizer):
-    """
-    Read and write (Unicode) lines from/to serial port.
-    The encoding is applied.
-    """
-
-    TERMINATOR = b'\r\n'
-    ENCODING = 'utf-8'
-    UNICODE_HANDLING = 'replace'
-
-    def handle_packet(self, packet):
-        self.handle_line(packet.decode(self.ENCODING, self.UNICODE_HANDLING))
-
-    def handle_line(self, line):
-        """Process one line - to be overridden by subclassing"""
-        raise NotImplementedError('please implement functionality in handle_line')
-
-    def write_line(self, text):
-        """
-        Write text to the transport. ``text`` is a Unicode string and the encoding
-        is applied before sending ans also the newline is append.
-        """
-        # + is not the best choice but bytes does not support % or .format in py3 and we want a single write call
-        self.transport.write(text.encode(self.ENCODING, self.UNICODE_HANDLING) + self.TERMINATOR)
-
-
-class ReaderThread(threading.Thread):
-    """\
-    Implement a serial port read loop and dispatch to a Protocol instance (like
-    the asyncio.Protocol) but do it with threads.
-
-    Calls to close() will close the serial port but it is also possible to just
-    stop() this thread and continue the serial port instance otherwise.
-    """
-
-    def __init__(self, serial_instance, protocol_factory):
-        """\
-        Initialize thread.
-
-        Note that the serial_instance' timeout is set to one second!
-        Other settings are not changed.
-        """
-        super(ReaderThread, self).__init__()
-        self.daemon = True
-        self.serial = serial_instance
-        self.protocol_factory = protocol_factory
-        self.alive = True
-        self._lock = threading.Lock()
-        self._connection_made = threading.Event()
-        self.protocol = None
-
-    def stop(self):
-        """Stop the reader thread"""
-        self.alive = False
-        if hasattr(self.serial, 'cancel_read'):
-            self.serial.cancel_read()
-        self.join(2)
-
-    def run(self):
-        """Reader loop"""
-        if not hasattr(self.serial, 'cancel_read'):
-            self.serial.timeout = 1
-        self.protocol = self.protocol_factory()
-        try:
-            self.protocol.connection_made(self)
-        except Exception as e:
-            self.alive = False
-            self.protocol.connection_lost(e)
-            self._connection_made.set()
-            return
-        error = None
-        self._connection_made.set()
-        while self.alive and self.serial.is_open:
-            try:
-                # read all that is there or wait for one byte (blocking)
-                data = self.serial.read(self.serial.in_waiting or 1)
-            except serial.SerialException as e:
-                # probably some I/O problem such as disconnected USB serial
-                # adapters -> exit
-                error = e
-                break
-            else:
-                if data:
-                    # make a separated try-except for called used code
-                    try:
-                        self.protocol.data_received(data)
-                    except Exception as e:
-                        error = e
-                        break
-        self.alive = False
-        self.protocol.connection_lost(error)
-        self.protocol = None
-
-    def write(self, data):
-        """Thread safe writing (uses lock)"""
-        with self._lock:
-            self.serial.write(data)
-
-    def close(self):
-        """Close the serial port and exit reader thread (uses lock)"""
-        # use the lock to let other threads finish writing
-        with self._lock:
-            # first stop reading, so that closing can be done on idle port
-            self.stop()
-            self.serial.close()
-
-    def connect(self):
-        """
-        Wait until connection is set up and return the transport and protocol
-        instances.
-        """
-        if self.alive:
-            self._connection_made.wait()
-            if not self.alive:
-                raise RuntimeError('connection_lost already called')
-            return (self, self.protocol)
-        else:
-            raise RuntimeError('already stopped')
-
-    # - -  context manager, returns protocol
-
-    def __enter__(self):
-        """\
-        Enter context handler. May raise RuntimeError in case the connection
-        could not be created.
-        """
-        self.start()
-        self._connection_made.wait()
-        if not self.alive:
-            raise RuntimeError('connection_lost already called')
-        return self.protocol
-
-    def __exit__(self, exc_type, exc_val, exc_tb):
-        """Leave context: close port"""
-        self.close()
-
-
-# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-# test
-if __name__ == '__main__':
-    # pylint: disable=wrong-import-position
-    import sys
-    import time
-    import traceback
-
-    #~ PORT = 'spy:///dev/ttyUSB0'
-    PORT = 'loop://'
-
-    class PrintLines(LineReader):
-        def connection_made(self, transport):
-            super(PrintLines, self).connection_made(transport)
-            sys.stdout.write('port opened\n')
-            self.write_line('hello world')
-
-        def handle_line(self, data):
-            sys.stdout.write('line received: {}\n'.format(repr(data)))
-
-        def connection_lost(self, exc):
-            if exc:
-                traceback.print_exc(exc)
-            sys.stdout.write('port closed\n')
-
-    ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1)
-    with ReaderThread(ser, PrintLines) as protocol:
-        protocol.write_line('hello')
-        time.sleep(2)
-
-    # alternative usage
-    ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1)
-    t = ReaderThread(ser, PrintLines)
-    t.start()
-    transport, protocol = t.connect()
-    protocol.write_line('hello')
-    time.sleep(2)
-    t.close()