]> Gitweb @ Texas Instruments - Open Source Git Repositories - git.TI.com/gitweb - processor-sdk/performance-audio-sr.git/blob - tools/pyalpha/pyalpha/serial/serialwin32.py
Add Debug, Release, and SDF build profiles. This allows normal build all and SDF...
[processor-sdk/performance-audio-sr.git] / tools / pyalpha / pyalpha / serial / serialwin32.py
1 #! python
2 #
3 # backend for Windows ("win32" incl. 32/64 bit support)
4 #
5 # (C) 2001-2015 Chris Liechti <cliechti@gmx.net>
6 #
7 # This file is part of pySerial. https://github.com/pyserial/pyserial
8 # SPDX-License-Identifier:    BSD-3-Clause
9 #
10 # Initial patch to use ctypes by Giovanni Bajo <rasky@develer.com>
12 # pylint: disable=invalid-name,too-few-public-methods
13 import ctypes
14 import time
15 from serial import win32
17 import serial
18 from serial.serialutil import SerialBase, SerialException, to_bytes, portNotOpenError, writeTimeoutError
21 class Serial(SerialBase):
22     """Serial port implementation for Win32 based on ctypes."""
24     BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
25                  9600, 19200, 38400, 57600, 115200)
27     def __init__(self, *args, **kwargs):
28         self._port_handle = None
29         self._overlapped_read = None
30         self._overlapped_write = None
31         super(Serial, self).__init__(*args, **kwargs)
33     def open(self):
34         """\
35         Open port with current settings. This may throw a SerialException
36         if the port cannot be opened.
37         """
38         if self._port is None:
39             raise SerialException("Port must be configured before it can be used.")
40         if self.is_open:
41             raise SerialException("Port is already open.")
42         # the "\\.\COMx" format is required for devices other than COM1-COM8
43         # not all versions of windows seem to support this properly
44         # so that the first few ports are used with the DOS device name
45         port = self.name
46         try:
47             if port.upper().startswith('COM') and int(port[3:]) > 8:
48                 port = '\\\\.\\' + port
49         except ValueError:
50             # for like COMnotanumber
51             pass
52         self._port_handle = win32.CreateFile(
53             port,
54             win32.GENERIC_READ | win32.GENERIC_WRITE,
55             0,  # exclusive access
56             None,  # no security
57             win32.OPEN_EXISTING,
58             win32.FILE_ATTRIBUTE_NORMAL | win32.FILE_FLAG_OVERLAPPED,
59             0)
60         if self._port_handle == win32.INVALID_HANDLE_VALUE:
61             self._port_handle = None    # 'cause __del__ is called anyway
62             raise SerialException("could not open port {!r}: {!r}".format(self.portstr, ctypes.WinError()))
64         try:
65             self._overlapped_read = win32.OVERLAPPED()
66             self._overlapped_read.hEvent = win32.CreateEvent(None, 1, 0, None)
67             self._overlapped_write = win32.OVERLAPPED()
68             #~ self._overlapped_write.hEvent = win32.CreateEvent(None, 1, 0, None)
69             self._overlapped_write.hEvent = win32.CreateEvent(None, 0, 0, None)
71             # Setup a 4k buffer
72             win32.SetupComm(self._port_handle, 4096, 4096)
74             # Save original timeout values:
75             self._orgTimeouts = win32.COMMTIMEOUTS()
76             win32.GetCommTimeouts(self._port_handle, ctypes.byref(self._orgTimeouts))
78             self._reconfigure_port()
80             # Clear buffers:
81             # Remove anything that was there
82             win32.PurgeComm(
83                 self._port_handle,
84                 win32.PURGE_TXCLEAR | win32.PURGE_TXABORT |
85                 win32.PURGE_RXCLEAR | win32.PURGE_RXABORT)
86         except:
87             try:
88                 self._close()
89             except:
90                 # ignore any exception when closing the port
91                 # also to keep original exception that happened when setting up
92                 pass
93             self._port_handle = None
94             raise
95         else:
96             self.is_open = True
98     def _reconfigure_port(self):
99         """Set communication parameters on opened port."""
100         if not self._port_handle:
101             raise SerialException("Can only operate on a valid port handle")
103         # Set Windows timeout values
104         # timeouts is a tuple with the following items:
105         # (ReadIntervalTimeout,ReadTotalTimeoutMultiplier,
106         #  ReadTotalTimeoutConstant,WriteTotalTimeoutMultiplier,
107         #  WriteTotalTimeoutConstant)
108         timeouts = win32.COMMTIMEOUTS()
109         if self._timeout is None:
110             pass  # default of all zeros is OK
111         elif self._timeout == 0:
112             timeouts.ReadIntervalTimeout = win32.MAXDWORD
113         else:
114             timeouts.ReadTotalTimeoutConstant = max(int(self._timeout * 1000), 1)
115         if self._timeout != 0 and self._inter_byte_timeout is not None:
116             timeouts.ReadIntervalTimeout = max(int(self._inter_byte_timeout * 1000), 1)
118         if self._write_timeout is None:
119             pass
120         elif self._write_timeout == 0:
121             timeouts.WriteTotalTimeoutConstant = win32.MAXDWORD
122         else:
123             timeouts.WriteTotalTimeoutConstant = max(int(self._write_timeout * 1000), 1)
124         win32.SetCommTimeouts(self._port_handle, ctypes.byref(timeouts))
126         win32.SetCommMask(self._port_handle, win32.EV_ERR)
128         # Setup the connection info.
129         # Get state and modify it:
130         comDCB = win32.DCB()
131         win32.GetCommState(self._port_handle, ctypes.byref(comDCB))
132         comDCB.BaudRate = self._baudrate
134         if self._bytesize == serial.FIVEBITS:
135             comDCB.ByteSize = 5
136         elif self._bytesize == serial.SIXBITS:
137             comDCB.ByteSize = 6
138         elif self._bytesize == serial.SEVENBITS:
139             comDCB.ByteSize = 7
140         elif self._bytesize == serial.EIGHTBITS:
141             comDCB.ByteSize = 8
142         else:
143             raise ValueError("Unsupported number of data bits: {!r}".format(self._bytesize))
145         if self._parity == serial.PARITY_NONE:
146             comDCB.Parity = win32.NOPARITY
147             comDCB.fParity = 0  # Disable Parity Check
148         elif self._parity == serial.PARITY_EVEN:
149             comDCB.Parity = win32.EVENPARITY
150             comDCB.fParity = 1  # Enable Parity Check
151         elif self._parity == serial.PARITY_ODD:
152             comDCB.Parity = win32.ODDPARITY
153             comDCB.fParity = 1  # Enable Parity Check
154         elif self._parity == serial.PARITY_MARK:
155             comDCB.Parity = win32.MARKPARITY
156             comDCB.fParity = 1  # Enable Parity Check
157         elif self._parity == serial.PARITY_SPACE:
158             comDCB.Parity = win32.SPACEPARITY
159             comDCB.fParity = 1  # Enable Parity Check
160         else:
161             raise ValueError("Unsupported parity mode: {!r}".format(self._parity))
163         if self._stopbits == serial.STOPBITS_ONE:
164             comDCB.StopBits = win32.ONESTOPBIT
165         elif self._stopbits == serial.STOPBITS_ONE_POINT_FIVE:
166             comDCB.StopBits = win32.ONE5STOPBITS
167         elif self._stopbits == serial.STOPBITS_TWO:
168             comDCB.StopBits = win32.TWOSTOPBITS
169         else:
170             raise ValueError("Unsupported number of stop bits: {!r}".format(self._stopbits))
172         comDCB.fBinary = 1  # Enable Binary Transmission
173         # Char. w/ Parity-Err are replaced with 0xff (if fErrorChar is set to TRUE)
174         if self._rs485_mode is None:
175             if self._rtscts:
176                 comDCB.fRtsControl = win32.RTS_CONTROL_HANDSHAKE
177             else:
178                 comDCB.fRtsControl = win32.RTS_CONTROL_ENABLE if self._rts_state else win32.RTS_CONTROL_DISABLE
179             comDCB.fOutxCtsFlow = self._rtscts
180         else:
181             # checks for unsupported settings
182             # XXX verify if platform really does not have a setting for those
183             if not self._rs485_mode.rts_level_for_tx:
184                 raise ValueError(
185                     'Unsupported value for RS485Settings.rts_level_for_tx: {!r}'.format(
186                         self._rs485_mode.rts_level_for_tx,))
187             if self._rs485_mode.rts_level_for_rx:
188                 raise ValueError(
189                     'Unsupported value for RS485Settings.rts_level_for_rx: {!r}'.format(
190                         self._rs485_mode.rts_level_for_rx,))
191             if self._rs485_mode.delay_before_tx is not None:
192                 raise ValueError(
193                     'Unsupported value for RS485Settings.delay_before_tx: {!r}'.format(
194                         self._rs485_mode.delay_before_tx,))
195             if self._rs485_mode.delay_before_rx is not None:
196                 raise ValueError(
197                     'Unsupported value for RS485Settings.delay_before_rx: {!r}'.format(
198                         self._rs485_mode.delay_before_rx,))
199             if self._rs485_mode.loopback:
200                 raise ValueError(
201                     'Unsupported value for RS485Settings.loopback: {!r}'.format(
202                         self._rs485_mode.loopback,))
203             comDCB.fRtsControl = win32.RTS_CONTROL_TOGGLE
204             comDCB.fOutxCtsFlow = 0
206         if self._dsrdtr:
207             comDCB.fDtrControl = win32.DTR_CONTROL_HANDSHAKE
208         else:
209             comDCB.fDtrControl = win32.DTR_CONTROL_ENABLE if self._dtr_state else win32.DTR_CONTROL_DISABLE
210         comDCB.fOutxDsrFlow = self._dsrdtr
211         comDCB.fOutX = self._xonxoff
212         comDCB.fInX = self._xonxoff
213         comDCB.fNull = 0
214         comDCB.fErrorChar = 0
215         comDCB.fAbortOnError = 0
216         comDCB.XonChar = serial.XON
217         comDCB.XoffChar = serial.XOFF
219         if not win32.SetCommState(self._port_handle, ctypes.byref(comDCB)):
220             raise SerialException(
221                 'Cannot configure port, something went wrong. '
222                 'Original message: {!r}'.format(ctypes.WinError()))
224     #~ def __del__(self):
225         #~ self.close()
227     def _close(self):
228         """internal close port helper"""
229         if self._port_handle is not None:
230             # Restore original timeout values:
231             win32.SetCommTimeouts(self._port_handle, self._orgTimeouts)
232             if self._overlapped_read is not None:
233                 self.cancel_read()
234                 win32.CloseHandle(self._overlapped_read.hEvent)
235                 self._overlapped_read = None
236             if self._overlapped_write is not None:
237                 self.cancel_write()
238                 win32.CloseHandle(self._overlapped_write.hEvent)
239                 self._overlapped_write = None
240             win32.CloseHandle(self._port_handle)
241             self._port_handle = None
243     def close(self):
244         """Close port"""
245         if self.is_open:
246             self._close()
247             self.is_open = False
249     #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
251     @property
252     def in_waiting(self):
253         """Return the number of bytes currently in the input buffer."""
254         flags = win32.DWORD()
255         comstat = win32.COMSTAT()
256         if not win32.ClearCommError(self._port_handle, ctypes.byref(flags), ctypes.byref(comstat)):
257             raise SerialException('call to ClearCommError failed')
258         return comstat.cbInQue
260     def read(self, size=1):
261         """\
262         Read size bytes from the serial port. If a timeout is set it may
263         return less characters as requested. With no timeout it will block
264         until the requested number of bytes is read.
265         """
266         if not self.is_open:
267             raise portNotOpenError
268         if size > 0:
269             win32.ResetEvent(self._overlapped_read.hEvent)
270             flags = win32.DWORD()
271             comstat = win32.COMSTAT()
272             if not win32.ClearCommError(self._port_handle, ctypes.byref(flags), ctypes.byref(comstat)):
273                 raise SerialException("ClearCommError failed ({!r})".format(ctypes.WinError()))
274             n = min(comstat.cbInQue, size) if self.timeout == 0 else size
275             if n > 0:
276                 buf = ctypes.create_string_buffer(n)
277                 rc = win32.DWORD()
278                 read_ok = win32.ReadFile(
279                     self._port_handle,
280                     buf,
281                     n,
282                     ctypes.byref(rc),
283                     ctypes.byref(self._overlapped_read))
284                 if not read_ok and win32.GetLastError() not in (win32.ERROR_SUCCESS, win32.ERROR_IO_PENDING):
285                     raise SerialException("ReadFile failed ({!r})".format(ctypes.WinError()))
286                 result_ok = win32.GetOverlappedResult(
287                     self._port_handle,
288                     ctypes.byref(self._overlapped_read),
289                     ctypes.byref(rc),
290                     True)
291                 if not result_ok:
292                     if win32.GetLastError() != win32.ERROR_OPERATION_ABORTED:
293                         raise SerialException("GetOverlappedResult failed ({!r})".format(ctypes.WinError()))
294                 read = buf.raw[:rc.value]
295             else:
296                 read = bytes()
297         else:
298             read = bytes()
299         return bytes(read)
301     def write(self, data):
302         """Output the given byte string over the serial port."""
303         if not self.is_open:
304             raise portNotOpenError
305         #~ if not isinstance(data, (bytes, bytearray)):
306             #~ raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data)))
307         # convert data (needed in case of memoryview instance: Py 3.1 io lib), ctypes doesn't like memoryview
308         data = to_bytes(data)
309         if data:
310             #~ win32event.ResetEvent(self._overlapped_write.hEvent)
311             n = win32.DWORD()
312             err = win32.WriteFile(self._port_handle, data, len(data), ctypes.byref(n), self._overlapped_write)
313             if not err and win32.GetLastError() != win32.ERROR_IO_PENDING:
314                 raise SerialException("WriteFile failed ({!r})".format(ctypes.WinError()))
315             if self._write_timeout != 0:  # if blocking (None) or w/ write timeout (>0)
316                 # Wait for the write to complete.
317                 #~ win32.WaitForSingleObject(self._overlapped_write.hEvent, win32.INFINITE)
318                 err = win32.GetOverlappedResult(self._port_handle, self._overlapped_write, ctypes.byref(n), True)
319                 if win32.GetLastError() == win32.ERROR_OPERATION_ABORTED:
320                     return n.value  # canceled IO is no error
321                 if n.value != len(data):
322                     raise writeTimeoutError
323             return n.value
324         else:
325             return 0
327     def flush(self):
328         """\
329         Flush of file like objects. In this case, wait until all data
330         is written.
331         """
332         while self.out_waiting:
333             time.sleep(0.05)
334         # XXX could also use WaitCommEvent with mask EV_TXEMPTY, but it would
335         # require overlapped IO and its also only possible to set a single mask
336         # on the port---
338     def reset_input_buffer(self):
339         """Clear input buffer, discarding all that is in the buffer."""
340         if not self.is_open:
341             raise portNotOpenError
342         win32.PurgeComm(self._port_handle, win32.PURGE_RXCLEAR | win32.PURGE_RXABORT)
344     def reset_output_buffer(self):
345         """\
346         Clear output buffer, aborting the current output and discarding all
347         that is in the buffer.
348         """
349         if not self.is_open:
350             raise portNotOpenError
351         win32.PurgeComm(self._port_handle, win32.PURGE_TXCLEAR | win32.PURGE_TXABORT)
353     def _update_break_state(self):
354         """Set break: Controls TXD. When active, to transmitting is possible."""
355         if not self.is_open:
356             raise portNotOpenError
357         if self._break_state:
358             win32.SetCommBreak(self._port_handle)
359         else:
360             win32.ClearCommBreak(self._port_handle)
362     def _update_rts_state(self):
363         """Set terminal status line: Request To Send"""
364         if self._rts_state:
365             win32.EscapeCommFunction(self._port_handle, win32.SETRTS)
366         else:
367             win32.EscapeCommFunction(self._port_handle, win32.CLRRTS)
369     def _update_dtr_state(self):
370         """Set terminal status line: Data Terminal Ready"""
371         if self._dtr_state:
372             win32.EscapeCommFunction(self._port_handle, win32.SETDTR)
373         else:
374             win32.EscapeCommFunction(self._port_handle, win32.CLRDTR)
376     def _GetCommModemStatus(self):
377         if not self.is_open:
378             raise portNotOpenError
379         stat = win32.DWORD()
380         win32.GetCommModemStatus(self._port_handle, ctypes.byref(stat))
381         return stat.value
383     @property
384     def cts(self):
385         """Read terminal status line: Clear To Send"""
386         return win32.MS_CTS_ON & self._GetCommModemStatus() != 0
388     @property
389     def dsr(self):
390         """Read terminal status line: Data Set Ready"""
391         return win32.MS_DSR_ON & self._GetCommModemStatus() != 0
393     @property
394     def ri(self):
395         """Read terminal status line: Ring Indicator"""
396         return win32.MS_RING_ON & self._GetCommModemStatus() != 0
398     @property
399     def cd(self):
400         """Read terminal status line: Carrier Detect"""
401         return win32.MS_RLSD_ON & self._GetCommModemStatus() != 0
403     # - - platform specific - - - -
405     def set_buffer_size(self, rx_size=4096, tx_size=None):
406         """\
407         Recommend a buffer size to the driver (device driver can ignore this
408         value). Must be called before the port is opended.
409         """
410         if tx_size is None:
411             tx_size = rx_size
412         win32.SetupComm(self._port_handle, rx_size, tx_size)
414     def set_output_flow_control(self, enable=True):
415         """\
416         Manually control flow - when software flow control is enabled.
417         This will do the same as if XON (true) or XOFF (false) are received
418         from the other device and control the transmission accordingly.
419         WARNING: this function is not portable to different platforms!
420         """
421         if not self.is_open:
422             raise portNotOpenError
423         if enable:
424             win32.EscapeCommFunction(self._port_handle, win32.SETXON)
425         else:
426             win32.EscapeCommFunction(self._port_handle, win32.SETXOFF)
428     @property
429     def out_waiting(self):
430         """Return how many bytes the in the outgoing buffer"""
431         flags = win32.DWORD()
432         comstat = win32.COMSTAT()
433         if not win32.ClearCommError(self._port_handle, ctypes.byref(flags), ctypes.byref(comstat)):
434             raise SerialException('call to ClearCommError failed')
435         return comstat.cbOutQue
437     def _cancel_overlapped_io(self, overlapped):
438         """Cancel a blocking read operation, may be called from other thread"""
439         # check if read operation is pending
440         rc = win32.DWORD()
441         err = win32.GetOverlappedResult(
442             self._port_handle,
443             ctypes.byref(overlapped),
444             ctypes.byref(rc),
445             False)
446         if not err and win32.GetLastError() in (win32.ERROR_IO_PENDING, win32.ERROR_IO_INCOMPLETE):
447             # cancel, ignoring any errors (e.g. it may just have finished on its own)
448             win32.CancelIoEx(self._port_handle, overlapped)
450     def cancel_read(self):
451         """Cancel a blocking read operation, may be called from other thread"""
452         self._cancel_overlapped_io(self._overlapped_read)
454     def cancel_write(self):
455         """Cancel a blocking write operation, may be called from other thread"""
456         self._cancel_overlapped_io(self._overlapped_write)