diff options
Diffstat (limited to 'Adafruit_Python_GPIO/Adafruit_GPIO')
-rw-r--r-- | Adafruit_Python_GPIO/Adafruit_GPIO/FT232H.py | 816 | ||||
-rw-r--r-- | Adafruit_Python_GPIO/Adafruit_GPIO/GPIO.py | 426 | ||||
-rw-r--r-- | Adafruit_Python_GPIO/Adafruit_GPIO/I2C.py | 202 | ||||
-rw-r--r-- | Adafruit_Python_GPIO/Adafruit_GPIO/MCP230xx.py | 165 | ||||
-rw-r--r-- | Adafruit_Python_GPIO/Adafruit_GPIO/PCA95xx.py | 121 | ||||
-rw-r--r-- | Adafruit_Python_GPIO/Adafruit_GPIO/PCF8574.py | 94 | ||||
-rw-r--r-- | Adafruit_Python_GPIO/Adafruit_GPIO/PWM.py | 128 | ||||
-rw-r--r-- | Adafruit_Python_GPIO/Adafruit_GPIO/Platform.py | 110 | ||||
-rw-r--r-- | Adafruit_Python_GPIO/Adafruit_GPIO/SPI.py | 328 | ||||
-rw-r--r-- | Adafruit_Python_GPIO/Adafruit_GPIO/__init__.py | 3 |
10 files changed, 2393 insertions, 0 deletions
diff --git a/Adafruit_Python_GPIO/Adafruit_GPIO/FT232H.py b/Adafruit_Python_GPIO/Adafruit_GPIO/FT232H.py new file mode 100644 index 0000000..1874272 --- /dev/null +++ b/Adafruit_Python_GPIO/Adafruit_GPIO/FT232H.py @@ -0,0 +1,816 @@ +# Copyright (c) 2014 Adafruit Industries +# Author: Tony DiCola +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import atexit +import logging +import math +import os +import subprocess +import sys +import time + +import ftdi1 as ftdi + +import Adafruit_GPIO.GPIO as GPIO + + +logger = logging.getLogger(__name__) + +FT232H_VID = 0x0403 # Default FTDI FT232H vendor ID +FT232H_PID = 0x6014 # Default FTDI FT232H product ID + +MSBFIRST = 0 +LSBFIRST = 1 + +_REPEAT_DELAY = 4 + + +def _check_running_as_root(): + # NOTE: Checking for root with user ID 0 isn't very portable, perhaps + # there's a better alternative? + if os.geteuid() != 0: + raise RuntimeError('Expected to be run by root user! Try running with sudo.') + +def disable_FTDI_driver(): + """Disable the FTDI drivers for the current platform. This is necessary + because they will conflict with libftdi and accessing the FT232H. Note you + can enable the FTDI drivers again by calling enable_FTDI_driver. + """ + logger.debug('Disabling FTDI driver.') + if sys.platform == 'darwin': + logger.debug('Detected Mac OSX') + # Mac OS commands to disable FTDI driver. + _check_running_as_root() + subprocess.call('kextunload -b com.apple.driver.AppleUSBFTDI', shell=True) + subprocess.call('kextunload /System/Library/Extensions/FTDIUSBSerialDriver.kext', shell=True) + elif sys.platform.startswith('linux'): + logger.debug('Detected Linux') + # Linux commands to disable FTDI driver. + _check_running_as_root() + subprocess.call('modprobe -r -q ftdi_sio', shell=True) + subprocess.call('modprobe -r -q usbserial', shell=True) + # Note there is no need to disable FTDI drivers on Windows! + +def enable_FTDI_driver(): + """Re-enable the FTDI drivers for the current platform.""" + logger.debug('Enabling FTDI driver.') + if sys.platform == 'darwin': + logger.debug('Detected Mac OSX') + # Mac OS commands to enable FTDI driver. + _check_running_as_root() + subprocess.check_call('kextload -b com.apple.driver.AppleUSBFTDI', shell=True) + subprocess.check_call('kextload /System/Library/Extensions/FTDIUSBSerialDriver.kext', shell=True) + elif sys.platform.startswith('linux'): + logger.debug('Detected Linux') + # Linux commands to enable FTDI driver. + _check_running_as_root() + subprocess.check_call('modprobe -q ftdi_sio', shell=True) + subprocess.check_call('modprobe -q usbserial', shell=True) + +def use_FT232H(): + """Disable any built in FTDI drivers which will conflict and cause problems + with libftdi (which is used to communicate with the FT232H). Will register + an exit function so the drivers are re-enabled on program exit. + """ + disable_FTDI_driver() + atexit.register(enable_FTDI_driver) + +def enumerate_device_serials(vid=FT232H_VID, pid=FT232H_PID): + """Return a list of all FT232H device serial numbers connected to the + machine. You can use these serial numbers to open a specific FT232H device + by passing it to the FT232H initializer's serial parameter. + """ + try: + # Create a libftdi context. + ctx = None + ctx = ftdi.new() + # Enumerate FTDI devices. + device_list = None + count, device_list = ftdi.usb_find_all(ctx, vid, pid) + if count < 0: + raise RuntimeError('ftdi_usb_find_all returned error {0}: {1}'.format(count, ftdi.get_error_string(self._ctx))) + # Walk through list of devices and assemble list of serial numbers. + devices = [] + while device_list is not None: + # Get USB device strings and add serial to list of devices. + ret, manufacturer, description, serial = ftdi.usb_get_strings(ctx, device_list.dev, 256, 256, 256) + if serial is not None: + devices.append(serial) + device_list = device_list.next + return devices + finally: + # Make sure to clean up list and context when done. + if device_list is not None: + ftdi.list_free(device_list) + if ctx is not None: + ftdi.free(ctx) + + +class FT232H(GPIO.BaseGPIO): + # Make GPIO constants that match main GPIO class for compatibility. + HIGH = GPIO.HIGH + LOW = GPIO.LOW + IN = GPIO.IN + OUT = GPIO.OUT + + def __init__(self, vid=FT232H_VID, pid=FT232H_PID, serial=None): + """Create a FT232H object. Will search for the first available FT232H + device with the specified USB vendor ID and product ID (defaults to + FT232H default VID & PID). Can also specify an optional serial number + string to open an explicit FT232H device given its serial number. See + the FT232H.enumerate_device_serials() function to see how to list all + connected device serial numbers. + """ + # Initialize FTDI device connection. + self._ctx = ftdi.new() + if self._ctx == 0: + raise RuntimeError('ftdi_new failed! Is libftdi1 installed?') + # Register handler to close and cleanup FTDI context on program exit. + atexit.register(self.close) + if serial is None: + # Open USB connection for specified VID and PID if no serial is specified. + self._check(ftdi.usb_open, vid, pid) + else: + # Open USB connection for VID, PID, serial. + self._check(ftdi.usb_open_string, 's:{0}:{1}:{2}'.format(vid, pid, serial)) + # Reset device. + self._check(ftdi.usb_reset) + # Disable flow control. Commented out because it is unclear if this is necessary. + #self._check(ftdi.setflowctrl, ftdi.SIO_DISABLE_FLOW_CTRL) + # Change read & write buffers to maximum size, 65535 bytes. + self._check(ftdi.read_data_set_chunksize, 65535) + self._check(ftdi.write_data_set_chunksize, 65535) + # Clear pending read data & write buffers. + self._check(ftdi.usb_purge_buffers) + # Enable MPSSE and syncronize communication with device. + self._mpsse_enable() + self._mpsse_sync() + # Initialize all GPIO as inputs. + self._write('\x80\x00\x00\x82\x00\x00') + self._direction = 0x0000 + self._level = 0x0000 + + def close(self): + """Close the FTDI device. Will be automatically called when the program ends.""" + if self._ctx is not None: + ftdi.free(self._ctx) + self._ctx = None + + def _write(self, string): + """Helper function to call write_data on the provided FTDI device and + verify it succeeds. + """ + # Get modem status. Useful to enable for debugging. + #ret, status = ftdi.poll_modem_status(self._ctx) + #if ret == 0: + # logger.debug('Modem status {0:02X}'.format(status)) + #else: + # logger.debug('Modem status error {0}'.format(ret)) + length = len(string) + ret = ftdi.write_data(self._ctx, string, length) + # Log the string that was written in a python hex string format using a very + # ugly one-liner list comprehension for brevity. + #logger.debug('Wrote {0}'.format(''.join(['\\x{0:02X}'.format(ord(x)) for x in string]))) + if ret < 0: + raise RuntimeError('ftdi_write_data failed with error {0}: {1}'.format(ret, ftdi.get_error_string(self._ctx))) + if ret != length: + raise RuntimeError('ftdi_write_data expected to write {0} bytes but actually wrote {1}!'.format(length, ret)) + + def _check(self, command, *args): + """Helper function to call the provided command on the FTDI device and + verify the response matches the expected value. + """ + ret = command(self._ctx, *args) + logger.debug('Called ftdi_{0} and got response {1}.'.format(command.__name__, ret)) + if ret != 0: + raise RuntimeError('ftdi_{0} failed with error {1}: {2}'.format(command.__name__, ret, ftdi.get_error_string(self._ctx))) + + def _poll_read(self, expected, timeout_s=5.0): + """Helper function to continuously poll reads on the FTDI device until an + expected number of bytes are returned. Will throw a timeout error if no + data is received within the specified number of timeout seconds. Returns + the read data as a string if successful, otherwise raises an execption. + """ + start = time.time() + # Start with an empty response buffer. + response = bytearray(expected) + index = 0 + # Loop calling read until the response buffer is full or a timeout occurs. + while time.time() - start <= timeout_s: + ret, data = ftdi.read_data(self._ctx, expected - index) + # Fail if there was an error reading data. + if ret < 0: + raise RuntimeError('ftdi_read_data failed with error code {0}.'.format(ret)) + # Add returned data to the buffer. + response[index:index+ret] = data[:ret] + index += ret + # Buffer is full, return the result data. + if index >= expected: + return str(response) + time.sleep(0.01) + raise RuntimeError('Timeout while polling ftdi_read_data for {0} bytes!'.format(expected)) + + def _mpsse_enable(self): + """Enable MPSSE mode on the FTDI device.""" + # Reset MPSSE by sending mask = 0 and mode = 0 + self._check(ftdi.set_bitmode, 0, 0) + # Enable MPSSE by sending mask = 0 and mode = 2 + self._check(ftdi.set_bitmode, 0, 2) + + def _mpsse_sync(self, max_retries=10): + """Synchronize buffers with MPSSE by sending bad opcode and reading expected + error response. Should be called once after enabling MPSSE.""" + # Send a bad/unknown command (0xAB), then read buffer until bad command + # response is found. + self._write('\xAB') + # Keep reading until bad command response (0xFA 0xAB) is returned. + # Fail if too many read attempts are made to prevent sticking in a loop. + tries = 0 + sync = False + while not sync: + data = self._poll_read(2) + if data == '\xFA\xAB': + sync = True + tries += 1 + if tries >= max_retries: + raise RuntimeError('Could not synchronize with FT232H!') + + def mpsse_set_clock(self, clock_hz, adaptive=False, three_phase=False): + """Set the clock speed of the MPSSE engine. Can be any value from 450hz + to 30mhz and will pick that speed or the closest speed below it. + """ + # Disable clock divisor by 5 to enable faster speeds on FT232H. + self._write('\x8A') + # Turn on/off adaptive clocking. + if adaptive: + self._write('\x96') + else: + self._write('\x97') + # Turn on/off three phase clock (needed for I2C). + # Also adjust the frequency for three-phase clocking as specified in section 2.2.4 + # of this document: + # http://www.ftdichip.com/Support/Documents/AppNotes/AN_255_USB%20to%20I2C%20Example%20using%20the%20FT232H%20and%20FT201X%20devices.pdf + if three_phase: + self._write('\x8C') + else: + self._write('\x8D') + # Compute divisor for requested clock. + # Use equation from section 3.8.1 of: + # http://www.ftdichip.com/Support/Documents/AppNotes/AN_108_Command_Processor_for_MPSSE_and_MCU_Host_Bus_Emulation_Modes.pdf + # Note equation is using 60mhz master clock instead of 12mhz. + divisor = int(math.ceil((30000000.0-float(clock_hz))/float(clock_hz))) & 0xFFFF + if three_phase: + divisor = int(divisor*(2.0/3.0)) + logger.debug('Setting clockspeed with divisor value {0}'.format(divisor)) + # Send command to set divisor from low and high byte values. + self._write(str(bytearray((0x86, divisor & 0xFF, (divisor >> 8) & 0xFF)))) + + def mpsse_read_gpio(self): + """Read both GPIO bus states and return a 16 bit value with their state. + D0-D7 are the lower 8 bits and C0-C7 are the upper 8 bits. + """ + # Send command to read low byte and high byte. + self._write('\x81\x83') + # Wait for 2 byte response. + data = self._poll_read(2) + # Assemble response into 16 bit value. + low_byte = ord(data[0]) + high_byte = ord(data[1]) + logger.debug('Read MPSSE GPIO low byte = {0:02X} and high byte = {1:02X}'.format(low_byte, high_byte)) + return (high_byte << 8) | low_byte + + def mpsse_gpio(self): + """Return command to update the MPSSE GPIO state to the current direction + and level. + """ + level_low = chr(self._level & 0xFF) + level_high = chr((self._level >> 8) & 0xFF) + dir_low = chr(self._direction & 0xFF) + dir_high = chr((self._direction >> 8) & 0xFF) + return str(bytearray((0x80, level_low, dir_low, 0x82, level_high, dir_high))) + + def mpsse_write_gpio(self): + """Write the current MPSSE GPIO state to the FT232H chip.""" + self._write(self.mpsse_gpio()) + + def get_i2c_device(self, address, **kwargs): + """Return an I2CDevice instance using this FT232H object and the provided + I2C address. Meant to be passed as the i2c_provider parameter to objects + which use the Adafruit_Python_GPIO library for I2C. + """ + return I2CDevice(self, address, **kwargs) + + # GPIO functions below: + + def _setup_pin(self, pin, mode): + if pin < 0 or pin > 15: + raise ValueError('Pin must be between 0 and 15 (inclusive).') + if mode not in (GPIO.IN, GPIO.OUT): + raise ValueError('Mode must be GPIO.IN or GPIO.OUT.') + if mode == GPIO.IN: + # Set the direction and level of the pin to 0. + self._direction &= ~(1 << pin) & 0xFFFF + self._level &= ~(1 << pin) & 0xFFFF + else: + # Set the direction of the pin to 1. + self._direction |= (1 << pin) & 0xFFFF + + def setup(self, pin, mode): + """Set the input or output mode for a specified pin. Mode should be + either OUT or IN.""" + self._setup_pin(pin, mode) + self.mpsse_write_gpio() + + def setup_pins(self, pins, values={}, write=True): + """Setup multiple pins as inputs or outputs at once. Pins should be a + dict of pin name to pin mode (IN or OUT). Optional starting values of + pins can be provided in the values dict (with pin name to pin value). + """ + # General implementation that can be improved by subclasses. + for pin, mode in iter(pins.items()): + self._setup_pin(pin, mode) + for pin, value in iter(values.items()): + self._output_pin(pin, value) + if write: + self.mpsse_write_gpio() + + def _output_pin(self, pin, value): + if value: + self._level |= (1 << pin) & 0xFFFF + else: + self._level &= ~(1 << pin) & 0xFFFF + + def output(self, pin, value): + """Set the specified pin the provided high/low value. Value should be + either HIGH/LOW or a boolean (true = high).""" + if pin < 0 or pin > 15: + raise ValueError('Pin must be between 0 and 15 (inclusive).') + self._output_pin(pin, value) + self.mpsse_write_gpio() + + def output_pins(self, pins, write=True): + """Set multiple pins high or low at once. Pins should be a dict of pin + name to pin value (HIGH/True for 1, LOW/False for 0). All provided pins + will be set to the given values. + """ + for pin, value in iter(pins.items()): + self._output_pin(pin, value) + if write: + self.mpsse_write_gpio() + + def input(self, pin): + """Read the specified pin and return HIGH/true if the pin is pulled high, + or LOW/false if pulled low.""" + return self.input_pins([pin])[0] + + def input_pins(self, pins): + """Read multiple pins specified in the given list and return list of pin values + GPIO.HIGH/True if the pin is pulled high, or GPIO.LOW/False if pulled low.""" + if [pin for pin in pins if pin < 0 or pin > 15]: + raise ValueError('Pin must be between 0 and 15 (inclusive).') + _pins = self.mpsse_read_gpio() + return [((_pins >> pin) & 0x0001) == 1 for pin in pins] + + +class SPI(object): + def __init__(self, ft232h, cs=None, max_speed_hz=1000000, mode=0, bitorder=MSBFIRST): + self._ft232h = ft232h + # Initialize chip select pin if provided to output high. + if cs is not None: + ft232h.setup(cs, GPIO.OUT) + ft232h.set_high(cs) + self._cs = cs + # Initialize clock, mode, and bit order. + self.set_clock_hz(max_speed_hz) + self.set_mode(mode) + self.set_bit_order(bitorder) + + def _assert_cs(self): + if self._cs is not None: + self._ft232h.set_low(self._cs) + + def _deassert_cs(self): + if self._cs is not None: + self._ft232h.set_high(self._cs) + + def set_clock_hz(self, hz): + """Set the speed of the SPI clock in hertz. Note that not all speeds + are supported and a lower speed might be chosen by the hardware. + """ + self._ft232h.mpsse_set_clock(hz) + + def set_mode(self, mode): + """Set SPI mode which controls clock polarity and phase. Should be a + numeric value 0, 1, 2, or 3. See wikipedia page for details on meaning: + http://en.wikipedia.org/wiki/Serial_Peripheral_Interface_Bus + """ + if mode < 0 or mode > 3: + raise ValueError('Mode must be a value 0, 1, 2, or 3.') + if mode == 0: + # Mode 0 captures on rising clock, propagates on falling clock + self.write_clock_ve = 1 + self.read_clock_ve = 0 + # Clock base is low. + clock_base = GPIO.LOW + elif mode == 1: + # Mode 1 capture of falling edge, propagate on rising clock + self.write_clock_ve = 0 + self.read_clock_ve = 1 + # Clock base is low. + clock_base = GPIO.LOW + elif mode == 2: + # Mode 2 capture on rising clock, propagate on falling clock + self.write_clock_ve = 1 + self.read_clock_ve = 0 + # Clock base is high. + clock_base = GPIO.HIGH + elif mode == 3: + # Mode 3 capture on falling edge, propagage on rising clock + self.write_clock_ve = 0 + self.read_clock_ve = 1 + # Clock base is high. + clock_base = GPIO.HIGH + # Set clock and DO as output, DI as input. Also start clock at its base value. + self._ft232h.setup_pins({0: GPIO.OUT, 1: GPIO.OUT, 2: GPIO.IN}, {0: clock_base}) + + def set_bit_order(self, order): + """Set order of bits to be read/written over serial lines. Should be + either MSBFIRST for most-significant first, or LSBFIRST for + least-signifcant first. + """ + if order == MSBFIRST: + self.lsbfirst = 0 + elif order == LSBFIRST: + self.lsbfirst = 1 + else: + raise ValueError('Order must be MSBFIRST or LSBFIRST.') + + def write(self, data): + """Half-duplex SPI write. The specified array of bytes will be clocked + out the MOSI line. + """ + # Build command to write SPI data. + command = 0x10 | (self.lsbfirst << 3) | self.write_clock_ve + logger.debug('SPI write with command {0:2X}.'.format(command)) + # Compute length low and high bytes. + # NOTE: Must actually send length minus one because the MPSSE engine + # considers 0 a length of 1 and FFFF a length of 65536 + length = len(data)-1 + len_low = length & 0xFF + len_high = (length >> 8) & 0xFF + self._assert_cs() + # Send command and length. + self._ft232h._write(str(bytearray((command, len_low, len_high)))) + # Send data. + self._ft232h._write(str(bytearray(data))) + self._deassert_cs() + + def read(self, length): + """Half-duplex SPI read. The specified length of bytes will be clocked + in the MISO line and returned as a bytearray object. + """ + # Build command to read SPI data. + command = 0x20 | (self.lsbfirst << 3) | (self.read_clock_ve << 2) + logger.debug('SPI read with command {0:2X}.'.format(command)) + # Compute length low and high bytes. + # NOTE: Must actually send length minus one because the MPSSE engine + # considers 0 a length of 1 and FFFF a length of 65536 + len_low = (length-1) & 0xFF + len_high = ((length-1) >> 8) & 0xFF + self._assert_cs() + # Send command and length. + self._ft232h._write(str(bytearray((command, len_low, len_high, 0x87)))) + self._deassert_cs() + # Read response bytes. + return bytearray(self._ft232h._poll_read(length)) + + def transfer(self, data): + """Full-duplex SPI read and write. The specified array of bytes will be + clocked out the MOSI line, while simultaneously bytes will be read from + the MISO line. Read bytes will be returned as a bytearray object. + """ + # Build command to read and write SPI data. + command = 0x30 | (self.lsbfirst << 3) | (self.read_clock_ve << 2) | self.write_clock_ve + logger.debug('SPI transfer with command {0:2X}.'.format(command)) + # Compute length low and high bytes. + # NOTE: Must actually send length minus one because the MPSSE engine + # considers 0 a length of 1 and FFFF a length of 65536 + length = len(data) + len_low = (length-1) & 0xFF + len_high = ((length-1) >> 8) & 0xFF + # Send command and length. + self._assert_cs() + self._ft232h._write(str(bytearray((command, len_low, len_high)))) + self._ft232h._write(str(bytearray(data))) + self._ft232h._write('\x87') + self._deassert_cs() + # Read response bytes. + return bytearray(self._ft232h._poll_read(length)) + + +class I2CDevice(object): + """Class for communicating with an I2C device using the smbus library. + Allows reading and writing 8-bit, 16-bit, and byte array values to registers + on the device.""" + # Note that most of the functions in this code are adapted from this app note: + # http://www.ftdichip.com/Support/Documents/AppNotes/AN_255_USB%20to%20I2C%20Example%20using%20the%20FT232H%20and%20FT201X%20devices.pdf + def __init__(self, ft232h, address, clock_hz=100000): + """Create an instance of the I2C device at the specified address on the + specified I2C bus number.""" + self._address = address + self._ft232h = ft232h + # Enable clock with three phases for I2C. + self._ft232h.mpsse_set_clock(clock_hz, three_phase=True) + # Enable drive-zero mode to drive outputs low on 0 and tri-state on 1. + # This matches the protocol for I2C communication so multiple devices can + # share the I2C bus. + self._ft232h._write('\x9E\x07\x00') + self._idle() + + def _idle(self): + """Put I2C lines into idle state.""" + # Put the I2C lines into an idle state with SCL and SDA high. + self._ft232h.setup_pins({0: GPIO.OUT, 1: GPIO.OUT, 2: GPIO.IN}, + {0: GPIO.HIGH, 1: GPIO.HIGH}) + + def _transaction_start(self): + """Start I2C transaction.""" + # Clear command buffer and expected response bytes. + self._command = [] + self._expected = 0 + + def _transaction_end(self): + """End I2C transaction and get response bytes, including ACKs.""" + # Ask to return response bytes immediately. + self._command.append('\x87') + # Send the entire command to the MPSSE. + self._ft232h._write(''.join(self._command)) + # Read response bytes and return them. + return bytearray(self._ft232h._poll_read(self._expected)) + + def _i2c_start(self): + """Send I2C start signal. Must be called within a transaction start/end. + """ + # Set SCL high and SDA low, repeat 4 times to stay in this state for a + # short period of time. + self._ft232h.output_pins({0: GPIO.HIGH, 1: GPIO.LOW}, write=False) + self._command.append(self._ft232h.mpsse_gpio() * _REPEAT_DELAY) + # Now drop SCL to low (again repeat 4 times for short delay). + self._ft232h.output_pins({0: GPIO.LOW, 1: GPIO.LOW}, write=False) + self._command.append(self._ft232h.mpsse_gpio() * _REPEAT_DELAY) + + def _i2c_idle(self): + """Set I2C signals to idle state with SCL and SDA at a high value. Must + be called within a transaction start/end. + """ + self._ft232h.output_pins({0: GPIO.HIGH, 1: GPIO.HIGH}, write=False) + self._command.append(self._ft232h.mpsse_gpio() * _REPEAT_DELAY) + + def _i2c_stop(self): + """Send I2C stop signal. Must be called within a transaction start/end. + """ + # Set SCL low and SDA low for a short period. + self._ft232h.output_pins({0: GPIO.LOW, 1: GPIO.LOW}, write=False) + self._command.append(self._ft232h.mpsse_gpio() * _REPEAT_DELAY) + # Set SCL high and SDA low for a short period. + self._ft232h.output_pins({0: GPIO.HIGH, 1: GPIO.LOW}, write=False) + self._command.append(self._ft232h.mpsse_gpio() * _REPEAT_DELAY) + # Finally set SCL high and SDA high for a short period. + self._ft232h.output_pins({0: GPIO.HIGH, 1: GPIO.HIGH}, write=False) + self._command.append(self._ft232h.mpsse_gpio() * _REPEAT_DELAY) + + def _i2c_read_bytes(self, length=1): + """Read the specified number of bytes from the I2C bus. Length is the + number of bytes to read (must be 1 or more). + """ + for i in range(length-1): + # Read a byte and send ACK. + self._command.append('\x20\x00\x00\x13\x00\x00') + # Make sure pins are back in idle state with clock low and data high. + self._ft232h.output_pins({0: GPIO.LOW, 1: GPIO.HIGH}, write=False) + self._command.append(self._ft232h.mpsse_gpio()) + # Read last byte and send NAK. + self._command.append('\x20\x00\x00\x13\x00\xFF') + # Make sure pins are back in idle state with clock low and data high. + self._ft232h.output_pins({0: GPIO.LOW, 1: GPIO.HIGH}, write=False) + self._command.append(self._ft232h.mpsse_gpio()) + # Increase expected number of bytes. + self._expected += length + + def _i2c_write_bytes(self, data): + """Write the specified number of bytes to the chip.""" + for byte in data: + # Write byte. + self._command.append(str(bytearray((0x11, 0x00, 0x00, byte)))) + # Make sure pins are back in idle state with clock low and data high. + self._ft232h.output_pins({0: GPIO.LOW, 1: GPIO.HIGH}, write=False) + self._command.append(self._ft232h.mpsse_gpio() * _REPEAT_DELAY) + # Read bit for ACK/NAK. + self._command.append('\x22\x00') + # Increase expected response bytes. + self._expected += len(data) + + def _address_byte(self, read=True): + """Return the address byte with the specified R/W bit set. If read is + True the R/W bit will be 1, otherwise the R/W bit will be 0. + """ + if read: + return (self._address << 1) | 0x01 + else: + return self._address << 1 + + def _verify_acks(self, response): + """Check all the specified bytes have the ACK bit set. Throws a + RuntimeError exception if not all the ACKs are set. + """ + for byte in response: + if byte & 0x01 != 0x00: + raise RuntimeError('Failed to find expected I2C ACK!') + + def ping(self): + """Attempt to detect if a device at this address is present on the I2C + bus. Will send out the device's address for writing and verify an ACK + is received. Returns true if the ACK is received, and false if not. + """ + self._idle() + self._transaction_start() + self._i2c_start() + self._i2c_write_bytes([self._address_byte(False)]) + self._i2c_stop() + response = self._transaction_end() + if len(response) != 1: + raise RuntimeError('Expected 1 response byte but received {0} byte(s).'.format(len(response))) + return ((response[0] & 0x01) == 0x00) + + def writeRaw8(self, value): + """Write an 8-bit value on the bus (without register).""" + value = value & 0xFF + self._idle() + self._transaction_start() + self._i2c_start() + self._i2c_write_bytes([self._address_byte(False), value]) + self._i2c_stop() + response = self._transaction_end() + self._verify_acks(response) + + def write8(self, register, value): + """Write an 8-bit value to the specified register.""" + value = value & 0xFF + self._idle() + self._transaction_start() + self._i2c_start() + self._i2c_write_bytes([self._address_byte(False), register, value]) + self._i2c_stop() + response = self._transaction_end() + self._verify_acks(response) + + def write16(self, register, value, little_endian=True): + """Write a 16-bit value to the specified register.""" + value = value & 0xFFFF + value_low = value & 0xFF + value_high = (value >> 8) & 0xFF + if not little_endian: + value_low, value_high = value_high, value_low + self._idle() + self._transaction_start() + self._i2c_start() + self._i2c_write_bytes([self._address_byte(False), register, value_low, + value_high]) + self._i2c_stop() + response = self._transaction_end() + self._verify_acks(response) + + def writeList(self, register, data): + """Write bytes to the specified register.""" + self._idle() + self._transaction_start() + self._i2c_start() + self._i2c_write_bytes([self._address_byte(False), register] + data) + self._i2c_stop() + response = self._transaction_end() + self._verify_acks(response) + + def readList(self, register, length): + """Read a length number of bytes from the specified register. Results + will be returned as a bytearray.""" + if length <= 0: + raise ValueError("Length must be at least 1 byte.") + self._idle() + self._transaction_start() + self._i2c_start() + self._i2c_write_bytes([self._address_byte(True), register]) + self._i2c_stop() + self._i2c_idle() + self._i2c_start() + self._i2c_read_bytes(length) + self._i2c_stop() + response = self._transaction_end() + self._verify_acks(response[:-length]) + return response[-length:] + + def readRaw8(self): + """Read an 8-bit value on the bus (without register).""" + self._idle() + self._transaction_start() + self._i2c_start() + self._i2c_write_bytes([self._address_byte(False)]) + self._i2c_stop() + self._i2c_idle() + self._i2c_start() + self._i2c_write_bytes([self._address_byte(True)]) + self._i2c_read_bytes(1) + self._i2c_stop() + response = self._transaction_end() + self._verify_acks(response[:-1]) + return response[-1] + + def readU8(self, register): + """Read an unsigned byte from the specified register.""" + self._idle() + self._transaction_start() + self._i2c_start() + self._i2c_write_bytes([self._address_byte(False), register]) + self._i2c_stop() + self._i2c_idle() + self._i2c_start() + self._i2c_write_bytes([self._address_byte(True)]) + self._i2c_read_bytes(1) + self._i2c_stop() + response = self._transaction_end() + self._verify_acks(response[:-1]) + return response[-1] + + def readS8(self, register): + """Read a signed byte from the specified register.""" + result = self.readU8(register) + if result > 127: + result -= 256 + return result + + def readU16(self, register, little_endian=True): + """Read an unsigned 16-bit value from the specified register, with the + specified endianness (default little endian, or least significant byte + first).""" + self._idle() + self._transaction_start() + self._i2c_start() + self._i2c_write_bytes([self._address_byte(False), register]) + self._i2c_stop() + self._i2c_idle() + self._i2c_start() + self._i2c_write_bytes([self._address_byte(True)]) + self._i2c_read_bytes(2) + self._i2c_stop() + response = self._transaction_end() + self._verify_acks(response[:-2]) + if little_endian: + return (response[-1] << 8) | response[-2] + else: + return (response[-2] << 8) | response[-1] + + def readS16(self, register, little_endian=True): + """Read a signed 16-bit value from the specified register, with the + specified endianness (default little endian, or least significant byte + first).""" + result = self.readU16(register, little_endian) + if result > 32767: + result -= 65536 + return result + + def readU16LE(self, register): + """Read an unsigned 16-bit value from the specified register, in little + endian byte order.""" + return self.readU16(register, little_endian=True) + + def readU16BE(self, register): + """Read an unsigned 16-bit value from the specified register, in big + endian byte order.""" + return self.readU16(register, little_endian=False) + + def readS16LE(self, register): + """Read a signed 16-bit value from the specified register, in little + endian byte order.""" + return self.readS16(register, little_endian=True) + + def readS16BE(self, register): + """Read a signed 16-bit value from the specified register, in big + endian byte order.""" + return self.readS16(register, little_endian=False) diff --git a/Adafruit_Python_GPIO/Adafruit_GPIO/GPIO.py b/Adafruit_Python_GPIO/Adafruit_GPIO/GPIO.py new file mode 100644 index 0000000..08e99c6 --- /dev/null +++ b/Adafruit_Python_GPIO/Adafruit_GPIO/GPIO.py @@ -0,0 +1,426 @@ +# Copyright (c) 2014 Adafruit Industries +# Author: Tony DiCola +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import Adafruit_GPIO.Platform as Platform + + +OUT = 0 +IN = 1 +HIGH = True +LOW = False + +RISING = 1 +FALLING = 2 +BOTH = 3 + +PUD_OFF = 0 +PUD_DOWN = 1 +PUD_UP = 2 + +class BaseGPIO(object): + """Base class for implementing simple digital IO for a platform. + Implementors are expected to subclass from this and provide an implementation + of the setup, output, and input functions.""" + + def setup(self, pin, mode, pull_up_down=PUD_OFF): + """Set the input or output mode for a specified pin. Mode should be + either OUT or IN.""" + raise NotImplementedError + + def output(self, pin, value): + """Set the specified pin the provided high/low value. Value should be + either HIGH/LOW or a boolean (true = high).""" + raise NotImplementedError + + def input(self, pin): + """Read the specified pin and return HIGH/true if the pin is pulled high, + or LOW/false if pulled low.""" + raise NotImplementedError + + def set_high(self, pin): + """Set the specified pin HIGH.""" + self.output(pin, HIGH) + + def set_low(self, pin): + """Set the specified pin LOW.""" + self.output(pin, LOW) + + def is_high(self, pin): + """Return true if the specified pin is pulled high.""" + return self.input(pin) == HIGH + + def is_low(self, pin): + """Return true if the specified pin is pulled low.""" + return self.input(pin) == LOW + + +# Basic implementation of multiple pin methods just loops through pins and +# processes each one individually. This is not optimal, but derived classes can +# provide a more optimal implementation that deals with groups of pins +# simultaneously. +# See MCP230xx or PCF8574 classes for examples of optimized implementations. + + def output_pins(self, pins): + """Set multiple pins high or low at once. Pins should be a dict of pin + name to pin value (HIGH/True for 1, LOW/False for 0). All provided pins + will be set to the given values. + """ + # General implementation just loops through pins and writes them out + # manually. This is not optimized, but subclasses can choose to implement + # a more optimal batch output implementation. See the MCP230xx class for + # example of optimized implementation. + for pin, value in iter(pins.items()): + self.output(pin, value) + + def setup_pins(self, pins): + """Setup multiple pins as inputs or outputs at once. Pins should be a + dict of pin name to pin type (IN or OUT). + """ + # General implementation that can be optimized by derived classes. + for pin, value in iter(pins.items()): + self.setup(pin, value) + + def input_pins(self, pins): + """Read multiple pins specified in the given list and return list of pin values + GPIO.HIGH/True if the pin is pulled high, or GPIO.LOW/False if pulled low. + """ + # General implementation that can be optimized by derived classes. + return [self.input(pin) for pin in pins] + + + def add_event_detect(self, pin, edge): + """Enable edge detection events for a particular GPIO channel. Pin + should be type IN. Edge must be RISING, FALLING or BOTH. + """ + raise NotImplementedError + + def remove_event_detect(self, pin): + """Remove edge detection for a particular GPIO channel. Pin should be + type IN. + """ + raise NotImplementedError + + def add_event_callback(self, pin, callback): + """Add a callback for an event already defined using add_event_detect(). + Pin should be type IN. + """ + raise NotImplementedError + + def event_detected(self, pin): + """Returns True if an edge has occured on a given GPIO. You need to + enable edge detection using add_event_detect() first. Pin should be + type IN. + """ + raise NotImplementedError + + def wait_for_edge(self, pin, edge): + """Wait for an edge. Pin should be type IN. Edge must be RISING, + FALLING or BOTH.""" + raise NotImplementedError + + def cleanup(self, pin=None): + """Clean up GPIO event detection for specific pin, or all pins if none + is specified. + """ + raise NotImplementedError + + +# helper functions useful to derived classes + + def _validate_pin(self, pin): + # Raise an exception if pin is outside the range of allowed values. + if pin < 0 or pin >= self.NUM_GPIO: + raise ValueError('Invalid GPIO value, must be between 0 and {0}.'.format(self.NUM_GPIO)) + + def _bit2(self, src, bit, val): + bit = 1 << bit + return (src | bit) if val else (src & ~bit) + + +class RPiGPIOAdapter(BaseGPIO): + """GPIO implementation for the Raspberry Pi using the RPi.GPIO library.""" + + def __init__(self, rpi_gpio, mode=None): + self.rpi_gpio = rpi_gpio + # Suppress warnings about GPIO in use. + rpi_gpio.setwarnings(False) + # Setup board pin mode. + if mode == rpi_gpio.BOARD or mode == rpi_gpio.BCM: + rpi_gpio.setmode(mode) + elif mode is not None: + raise ValueError('Unexpected value for mode. Must be BOARD or BCM.') + else: + # Default to BCM numbering if not told otherwise. + rpi_gpio.setmode(rpi_gpio.BCM) + # Define mapping of Adafruit GPIO library constants to RPi.GPIO constants. + self._dir_mapping = { OUT: rpi_gpio.OUT, + IN: rpi_gpio.IN } + self._pud_mapping = { PUD_OFF: rpi_gpio.PUD_OFF, + PUD_DOWN: rpi_gpio.PUD_DOWN, + PUD_UP: rpi_gpio.PUD_UP } + self._edge_mapping = { RISING: rpi_gpio.RISING, + FALLING: rpi_gpio.FALLING, + BOTH: rpi_gpio.BOTH } + + def setup(self, pin, mode, pull_up_down=PUD_OFF): + """Set the input or output mode for a specified pin. Mode should be + either OUTPUT or INPUT. + """ + self.rpi_gpio.setup(pin, self._dir_mapping[mode], + pull_up_down=self._pud_mapping[pull_up_down]) + + def output(self, pin, value): + """Set the specified pin the provided high/low value. Value should be + either HIGH/LOW or a boolean (true = high). + """ + self.rpi_gpio.output(pin, value) + + def input(self, pin): + """Read the specified pin and return HIGH/true if the pin is pulled high, + or LOW/false if pulled low. + """ + return self.rpi_gpio.input(pin) + + def input_pins(self, pins): + """Read multiple pins specified in the given list and return list of pin values + GPIO.HIGH/True if the pin is pulled high, or GPIO.LOW/False if pulled low. + """ + # maybe rpi has a mass read... it would be more efficient to use it if it exists + return [self.rpi_gpio.input(pin) for pin in pins] + + def add_event_detect(self, pin, edge, callback=None, bouncetime=-1): + """Enable edge detection events for a particular GPIO channel. Pin + should be type IN. Edge must be RISING, FALLING or BOTH. Callback is a + function for the event. Bouncetime is switch bounce timeout in ms for + callback + """ + kwargs = {} + if callback: + kwargs['callback']=callback + if bouncetime > 0: + kwargs['bouncetime']=bouncetime + self.rpi_gpio.add_event_detect(pin, self._edge_mapping[edge], **kwargs) + + def remove_event_detect(self, pin): + """Remove edge detection for a particular GPIO channel. Pin should be + type IN. + """ + self.rpi_gpio.remove_event_detect(pin) + + def add_event_callback(self, pin, callback): + """Add a callback for an event already defined using add_event_detect(). + Pin should be type IN. + """ + self.rpi_gpio.add_event_callback(pin, callback) + + def event_detected(self, pin): + """Returns True if an edge has occured on a given GPIO. You need to + enable edge detection using add_event_detect() first. Pin should be + type IN. + """ + return self.rpi_gpio.event_detected(pin) + + def wait_for_edge(self, pin, edge): + """Wait for an edge. Pin should be type IN. Edge must be RISING, + FALLING or BOTH. + """ + self.rpi_gpio.wait_for_edge(pin, self._edge_mapping[edge]) + + def cleanup(self, pin=None): + """Clean up GPIO event detection for specific pin, or all pins if none + is specified. + """ + if pin is None: + self.rpi_gpio.cleanup() + else: + self.rpi_gpio.cleanup(pin) + +class AdafruitBBIOAdapter(BaseGPIO): + """GPIO implementation for the Beaglebone Black using the Adafruit_BBIO + library. + """ + + def __init__(self, bbio_gpio): + self.bbio_gpio = bbio_gpio + # Define mapping of Adafruit GPIO library constants to RPi.GPIO constants. + self._dir_mapping = { OUT: bbio_gpio.OUT, + IN: bbio_gpio.IN } + self._pud_mapping = { PUD_OFF: bbio_gpio.PUD_OFF, + PUD_DOWN: bbio_gpio.PUD_DOWN, + PUD_UP: bbio_gpio.PUD_UP } + self._edge_mapping = { RISING: bbio_gpio.RISING, + FALLING: bbio_gpio.FALLING, + BOTH: bbio_gpio.BOTH } + + def setup(self, pin, mode, pull_up_down=PUD_OFF): + """Set the input or output mode for a specified pin. Mode should be + either OUTPUT or INPUT. + """ + self.bbio_gpio.setup(pin, self._dir_mapping[mode], + pull_up_down=self._pud_mapping[pull_up_down]) + + def output(self, pin, value): + """Set the specified pin the provided high/low value. Value should be + either HIGH/LOW or a boolean (true = high). + """ + self.bbio_gpio.output(pin, value) + + def input(self, pin): + """Read the specified pin and return HIGH/true if the pin is pulled high, + or LOW/false if pulled low. + """ + return self.bbio_gpio.input(pin) + + def input_pins(self, pins): + """Read multiple pins specified in the given list and return list of pin values + GPIO.HIGH/True if the pin is pulled high, or GPIO.LOW/False if pulled low. + """ + # maybe bbb has a mass read... it would be more efficient to use it if it exists + return [self.bbio_gpio.input(pin) for pin in pins] + + def add_event_detect(self, pin, edge, callback=None, bouncetime=-1): + """Enable edge detection events for a particular GPIO channel. Pin + should be type IN. Edge must be RISING, FALLING or BOTH. Callback is a + function for the event. Bouncetime is switch bounce timeout in ms for + callback + """ + kwargs = {} + if callback: + kwargs['callback']=callback + if bouncetime > 0: + kwargs['bouncetime']=bouncetime + self.bbio_gpio.add_event_detect(pin, self._edge_mapping[edge], **kwargs) + + def remove_event_detect(self, pin): + """Remove edge detection for a particular GPIO channel. Pin should be + type IN. + """ + self.bbio_gpio.remove_event_detect(pin) + + def add_event_callback(self, pin, callback, bouncetime=-1): + """Add a callback for an event already defined using add_event_detect(). + Pin should be type IN. Bouncetime is switch bounce timeout in ms for + callback + """ + kwargs = {} + if bouncetime > 0: + kwargs['bouncetime']=bouncetime + self.bbio_gpio.add_event_callback(pin, callback, **kwargs) + + def event_detected(self, pin): + """Returns True if an edge has occured on a given GPIO. You need to + enable edge detection using add_event_detect() first. Pin should be + type IN. + """ + return self.bbio_gpio.event_detected(pin) + + def wait_for_edge(self, pin, edge): + """Wait for an edge. Pin should be type IN. Edge must be RISING, + FALLING or BOTH. + """ + self.bbio_gpio.wait_for_edge(pin, self._edge_mapping[edge]) + + def cleanup(self, pin=None): + """Clean up GPIO event detection for specific pin, or all pins if none + is specified. + """ + if pin is None: + self.bbio_gpio.cleanup() + else: + self.bbio_gpio.cleanup(pin) + +class AdafruitMinnowAdapter(BaseGPIO): + """GPIO implementation for the Minnowboard + MAX using the mraa library""" + + def __init__(self,mraa_gpio): + self.mraa_gpio = mraa_gpio + # Define mapping of Adafruit GPIO library constants to mraa constants + self._dir_mapping = { OUT: self.mraa_gpio.DIR_OUT, + IN: self.mraa_gpio.DIR_IN } + self._pud_mapping = { PUD_OFF: self.mraa_gpio.MODE_STRONG, + PUD_UP: self.mraa_gpio.MODE_HIZ, + PUD_DOWN: self.mraa_gpio.MODE_PULLDOWN } + self._edge_mapping = { RISING: self.mraa_gpio.EDGE_RISING, + FALLING: self.mraa_gpio.EDGE_FALLING, + BOTH: self.mraa_gpio.EDGE_BOTH } + + def setup(self,pin,mode): + """Set the input or output mode for a specified pin. Mode should be + either DIR_IN or DIR_OUT. + """ + self.mraa_gpio.Gpio.dir(self.mraa_gpio.Gpio(pin),self._dir_mapping[mode]) + + def output(self,pin,value): + """Set the specified pin the provided high/low value. Value should be + either 1 (ON or HIGH), or 0 (OFF or LOW) or a boolean. + """ + self.mraa_gpio.Gpio.write(self.mraa_gpio.Gpio(pin), value) + + def input(self,pin): + """Read the specified pin and return HIGH/true if the pin is pulled high, + or LOW/false if pulled low. + """ + return self.mraa_gpio.Gpio.read(self.mraa_gpio.Gpio(pin)) + + def add_event_detect(self, pin, edge, callback=None, bouncetime=-1): + """Enable edge detection events for a particular GPIO channel. Pin + should be type IN. Edge must be RISING, FALLING or BOTH. Callback is a + function for the event. Bouncetime is switch bounce timeout in ms for + callback + """ + kwargs = {} + if callback: + kwargs['callback']=callback + if bouncetime > 0: + kwargs['bouncetime']=bouncetime + self.mraa_gpio.Gpio.isr(self.mraa_gpio.Gpio(pin), self._edge_mapping[edge], **kwargs) + + def remove_event_detect(self, pin): + """Remove edge detection for a particular GPIO channel. Pin should be + type IN. + """ + self.mraa_gpio.Gpio.isrExit(self.mraa_gpio.Gpio(pin)) + + def wait_for_edge(self, pin, edge): + """Wait for an edge. Pin should be type IN. Edge must be RISING, + FALLING or BOTH. + """ + self.bbio_gpio.wait_for_edge(self.mraa_gpio.Gpio(pin), self._edge_mapping[edge]) + +def get_platform_gpio(**keywords): + """Attempt to return a GPIO instance for the platform which the code is being + executed on. Currently supports only the Raspberry Pi using the RPi.GPIO + library and Beaglebone Black using the Adafruit_BBIO library. Will throw an + exception if a GPIO instance can't be created for the current platform. The + returned GPIO object is an instance of BaseGPIO. + """ + plat = Platform.platform_detect() + if plat == Platform.RASPBERRY_PI: + import RPi.GPIO + return RPiGPIOAdapter(RPi.GPIO, **keywords) + elif plat == Platform.BEAGLEBONE_BLACK: + import Adafruit_BBIO.GPIO + return AdafruitBBIOAdapter(Adafruit_BBIO.GPIO, **keywords) + elif plat == Platform.MINNOWBOARD: + import mraa + return AdafruitMinnowAdapter(mraa, **keywords) + elif plat == Platform.UNKNOWN: + raise RuntimeError('Could not determine platform.') diff --git a/Adafruit_Python_GPIO/Adafruit_GPIO/I2C.py b/Adafruit_Python_GPIO/Adafruit_GPIO/I2C.py new file mode 100644 index 0000000..765ed82 --- /dev/null +++ b/Adafruit_Python_GPIO/Adafruit_GPIO/I2C.py @@ -0,0 +1,202 @@ +# Copyright (c) 2014 Adafruit Industries +# Author: Tony DiCola +# Based on Adafruit_I2C.py created by Kevin Townsend. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +import logging +import os +import subprocess + +import Adafruit_GPIO.Platform as Platform + + +def reverseByteOrder(data): + """DEPRECATED: See https://github.com/adafruit/Adafruit_Python_GPIO/issues/48""" + # # Courtesy Vishal Sapre + # byteCount = len(hex(data)[2:].replace('L','')[::2]) + # val = 0 + # for i in range(byteCount): + # val = (val << 8) | (data & 0xff) + # data >>= 8 + # return val + raise RuntimeError('reverseByteOrder is deprecated! See: https://github.com/adafruit/Adafruit_Python_GPIO/issues/48') + +def get_default_bus(): + """Return the default bus number based on the device platform. For a + Raspberry Pi either bus 0 or 1 (based on the Pi revision) will be returned. + For a Beaglebone Black the first user accessible bus, 1, will be returned. + """ + plat = Platform.platform_detect() + if plat == Platform.RASPBERRY_PI: + if Platform.pi_revision() == 1: + # Revision 1 Pi uses I2C bus 0. + return 0 + else: + # Revision 2 Pi uses I2C bus 1. + return 1 + elif plat == Platform.BEAGLEBONE_BLACK: + # Beaglebone Black has multiple I2C buses, default to 1 (P9_19 and P9_20). + return 1 + else: + raise RuntimeError('Could not determine default I2C bus for platform.') + +def get_i2c_device(address, busnum=None, i2c_interface=None, **kwargs): + """Return an I2C device for the specified address and on the specified bus. + If busnum isn't specified, the default I2C bus for the platform will attempt + to be detected. + """ + if busnum is None: + busnum = get_default_bus() + return Device(address, busnum, i2c_interface, **kwargs) + +def require_repeated_start(): + """Enable repeated start conditions for I2C register reads. This is the + normal behavior for I2C, however on some platforms like the Raspberry Pi + there are bugs which disable repeated starts unless explicitly enabled with + this function. See this thread for more details: + http://www.raspberrypi.org/forums/viewtopic.php?f=44&t=15840 + """ + plat = Platform.platform_detect() + if plat == Platform.RASPBERRY_PI and os.path.exists('/sys/module/i2c_bcm2708/parameters/combined'): + # On the Raspberry Pi there is a bug where register reads don't send a + # repeated start condition like the kernel smbus I2C driver functions + # define. As a workaround this bit in the BCM2708 driver sysfs tree can + # be changed to enable I2C repeated starts. + subprocess.check_call('chmod 666 /sys/module/i2c_bcm2708/parameters/combined', shell=True) + subprocess.check_call('echo -n 1 > /sys/module/i2c_bcm2708/parameters/combined', shell=True) + # Other platforms are a no-op because they (presumably) have the correct + # behavior and send repeated starts. + + +class Device(object): + """Class for communicating with an I2C device using the adafruit-pureio pure + python smbus library, or other smbus compatible I2C interface. Allows reading + and writing 8-bit, 16-bit, and byte array values to registers + on the device.""" + def __init__(self, address, busnum, i2c_interface=None): + """Create an instance of the I2C device at the specified address on the + specified I2C bus number.""" + self._address = address + if i2c_interface is None: + # Use pure python I2C interface if none is specified. + import Adafruit_PureIO.smbus + self._bus = Adafruit_PureIO.smbus.SMBus(busnum) + else: + # Otherwise use the provided class to create an smbus interface. + self._bus = i2c_interface(busnum) + self._logger = logging.getLogger('Adafruit_I2C.Device.Bus.{0}.Address.{1:#0X}' \ + .format(busnum, address)) + + def writeRaw8(self, value): + """Write an 8-bit value on the bus (without register).""" + value = value & 0xFF + self._bus.write_byte(self._address, value) + self._logger.debug("Wrote 0x%02X", + value) + + def write8(self, register, value): + """Write an 8-bit value to the specified register.""" + value = value & 0xFF + self._bus.write_byte_data(self._address, register, value) + self._logger.debug("Wrote 0x%02X to register 0x%02X", + value, register) + + def write16(self, register, value): + """Write a 16-bit value to the specified register.""" + value = value & 0xFFFF + self._bus.write_word_data(self._address, register, value) + self._logger.debug("Wrote 0x%04X to register pair 0x%02X, 0x%02X", + value, register, register+1) + + def writeList(self, register, data): + """Write bytes to the specified register.""" + self._bus.write_i2c_block_data(self._address, register, data) + self._logger.debug("Wrote to register 0x%02X: %s", + register, data) + + def readList(self, register, length): + """Read a length number of bytes from the specified register. Results + will be returned as a bytearray.""" + results = self._bus.read_i2c_block_data(self._address, register, length) + self._logger.debug("Read the following from register 0x%02X: %s", + register, results) + return results + + def readRaw8(self): + """Read an 8-bit value on the bus (without register).""" + result = self._bus.read_byte(self._address) & 0xFF + self._logger.debug("Read 0x%02X", + result) + return result + + def readU8(self, register): + """Read an unsigned byte from the specified register.""" + result = self._bus.read_byte_data(self._address, register) & 0xFF + self._logger.debug("Read 0x%02X from register 0x%02X", + result, register) + return result + + def readS8(self, register): + """Read a signed byte from the specified register.""" + result = self.readU8(register) + if result > 127: + result -= 256 + return result + + def readU16(self, register, little_endian=True): + """Read an unsigned 16-bit value from the specified register, with the + specified endianness (default little endian, or least significant byte + first).""" + result = self._bus.read_word_data(self._address,register) & 0xFFFF + self._logger.debug("Read 0x%04X from register pair 0x%02X, 0x%02X", + result, register, register+1) + # Swap bytes if using big endian because read_word_data assumes little + # endian on ARM (little endian) systems. + if not little_endian: + result = ((result << 8) & 0xFF00) + (result >> 8) + return result + + def readS16(self, register, little_endian=True): + """Read a signed 16-bit value from the specified register, with the + specified endianness (default little endian, or least significant byte + first).""" + result = self.readU16(register, little_endian) + if result > 32767: + result -= 65536 + return result + + def readU16LE(self, register): + """Read an unsigned 16-bit value from the specified register, in little + endian byte order.""" + return self.readU16(register, little_endian=True) + + def readU16BE(self, register): + """Read an unsigned 16-bit value from the specified register, in big + endian byte order.""" + return self.readU16(register, little_endian=False) + + def readS16LE(self, register): + """Read a signed 16-bit value from the specified register, in little + endian byte order.""" + return self.readS16(register, little_endian=True) + + def readS16BE(self, register): + """Read a signed 16-bit value from the specified register, in big + endian byte order.""" + return self.readS16(register, little_endian=False) diff --git a/Adafruit_Python_GPIO/Adafruit_GPIO/MCP230xx.py b/Adafruit_Python_GPIO/Adafruit_GPIO/MCP230xx.py new file mode 100644 index 0000000..3ba8b5f --- /dev/null +++ b/Adafruit_Python_GPIO/Adafruit_GPIO/MCP230xx.py @@ -0,0 +1,165 @@ +# Copyright (c) 2014 Adafruit Industries +# Author: Tony DiCola +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +import math + +import Adafruit_GPIO as GPIO +import Adafruit_GPIO.I2C as I2C + + +class MCP230xxBase(GPIO.BaseGPIO): + """Base class to represent an MCP230xx series GPIO extender. Is compatible + with the Adafruit_GPIO BaseGPIO class so it can be used as a custom GPIO + class for interacting with device. + """ + + def __init__(self, address, i2c=None, **kwargs): + """Initialize MCP230xx at specified I2C address and bus number. If bus + is not specified it will default to the appropriate platform detected bus. + """ + # Create I2C device. + if i2c is None: + import Adafruit_GPIO.I2C as I2C + i2c = I2C + self._device = i2c.get_i2c_device(address, **kwargs) + # Assume starting in ICON.BANK = 0 mode (sequential access). + # Compute how many bytes are needed to store count of GPIO. + self.gpio_bytes = int(math.ceil(self.NUM_GPIO/8.0)) + # Buffer register values so they can be changed without reading. + self.iodir = [0xFF]*self.gpio_bytes # Default direction to all inputs. + self.gppu = [0x00]*self.gpio_bytes # Default to pullups disabled. + self.gpio = [0x00]*self.gpio_bytes + # Write current direction and pullup buffer state. + self.write_iodir() + self.write_gppu() + + + def setup(self, pin, value): + """Set the input or output mode for a specified pin. Mode should be + either GPIO.OUT or GPIO.IN. + """ + self._validate_pin(pin) + # Set bit to 1 for input or 0 for output. + if value == GPIO.IN: + self.iodir[int(pin/8)] |= 1 << (int(pin%8)) + elif value == GPIO.OUT: + self.iodir[int(pin/8)] &= ~(1 << (int(pin%8))) + else: + raise ValueError('Unexpected value. Must be GPIO.IN or GPIO.OUT.') + self.write_iodir() + + + def output(self, pin, value): + """Set the specified pin the provided high/low value. Value should be + either GPIO.HIGH/GPIO.LOW or a boolean (True = HIGH). + """ + self.output_pins({pin: value}) + + def output_pins(self, pins): + """Set multiple pins high or low at once. Pins should be a dict of pin + name to pin value (HIGH/True for 1, LOW/False for 0). All provided pins + will be set to the given values. + """ + [self._validate_pin(pin) for pin in pins.keys()] + # Set each changed pin's bit. + for pin, value in iter(pins.items()): + if value: + self.gpio[int(pin/8)] |= 1 << (int(pin%8)) + else: + self.gpio[int(pin/8)] &= ~(1 << (int(pin%8))) + # Write GPIO state. + self.write_gpio() + + + def input(self, pin): + """Read the specified pin and return GPIO.HIGH/True if the pin is pulled + high, or GPIO.LOW/False if pulled low. + """ + return self.input_pins([pin])[0] + + def input_pins(self, pins): + """Read multiple pins specified in the given list and return list of pin values + GPIO.HIGH/True if the pin is pulled high, or GPIO.LOW/False if pulled low. + """ + [self._validate_pin(pin) for pin in pins] + # Get GPIO state. + self.gpio = self._device.readList(self.GPIO, self.gpio_bytes) + # Return True if pin's bit is set. + return [(self.gpio[int(pin/8)] & 1 << (int(pin%8))) > 0 for pin in pins] + + + def pullup(self, pin, enabled): + """Turn on the pull-up resistor for the specified pin if enabled is True, + otherwise turn off the pull-up resistor. + """ + self._validate_pin(pin) + if enabled: + self.gppu[int(pin/8)] |= 1 << (int(pin%8)) + else: + self.gppu[int(pin/8)] &= ~(1 << (int(pin%8))) + self.write_gppu() + + def write_gpio(self, gpio=None): + """Write the specified byte value to the GPIO registor. If no value + specified the current buffered value will be written. + """ + if gpio is not None: + self.gpio = gpio + self._device.writeList(self.GPIO, self.gpio) + + def write_iodir(self, iodir=None): + """Write the specified byte value to the IODIR registor. If no value + specified the current buffered value will be written. + """ + if iodir is not None: + self.iodir = iodir + self._device.writeList(self.IODIR, self.iodir) + + def write_gppu(self, gppu=None): + """Write the specified byte value to the GPPU registor. If no value + specified the current buffered value will be written. + """ + if gppu is not None: + self.gppu = gppu + self._device.writeList(self.GPPU, self.gppu) + + +class MCP23017(MCP230xxBase): + """MCP23017-based GPIO class with 16 GPIO pins.""" + # Define number of pins and registor addresses. + NUM_GPIO = 16 + IODIR = 0x00 + GPIO = 0x12 + GPPU = 0x0C + + def __init__(self, address=0x20, **kwargs): + super(MCP23017, self).__init__(address, **kwargs) + + +class MCP23008(MCP230xxBase): + """MCP23008-based GPIO class with 8 GPIO pins.""" + # Define number of pins and registor addresses. + NUM_GPIO = 8 + IODIR = 0x00 + GPIO = 0x09 + GPPU = 0x06 + + def __init__(self, address=0x20, **kwargs): + super(MCP23008, self).__init__(address, **kwargs) diff --git a/Adafruit_Python_GPIO/Adafruit_GPIO/PCA95xx.py b/Adafruit_Python_GPIO/Adafruit_GPIO/PCA95xx.py new file mode 100644 index 0000000..2a3b406 --- /dev/null +++ b/Adafruit_Python_GPIO/Adafruit_GPIO/PCA95xx.py @@ -0,0 +1,121 @@ +''' +Adafruit compatible using BaseGPIO class to represent a PCA9555 IO expander +Copyright (C) 2016 Matias Vidal +Ported from: https://github.com/dberlin/PCA95XX + +# Copyright 2012 Daniel Berlin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +of the Software, and to permit persons to whom the Software is furnished to do +so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE.''' + +import Adafruit_GPIO as GPIO +import Adafruit_GPIO.I2C as I2C + +# For the PCA 953X and 955X series, the chips with 8 GPIO's have these port numbers +# The chips with 16 GPIO's have the first port for each type at double these numbers +# IE The first config port is 6 + +INPUT_PORT = 0 +OUTPUT_PORT = 1 +POLARITY_PORT = 2 +CONFIG_PORT = 3 + +IN = GPIO.IN +OUT = GPIO.OUT +HIGH = GPIO.HIGH +LOW = GPIO.LOW + + +class PCA9555(GPIO.BaseGPIO): + """Class to represent a PCA9555 GPIO extender. Compatible + with the Adafruit_GPIO BaseGPIO class so it can be used as a custom GPIO + class for interacting with device. + """ + NUM_GPIO = 16 + + def __init__(self, address=0x20, busnum=None, i2c=None, num_gpios=16, **kwargs): + address = int(address) + self.__name__ = "PCA955" + # Create I2C device. + i2c = i2c or I2C + busnum = busnum or i2c.get_default_bus() + self._device = i2c.get_i2c_device(address, busnum, **kwargs) + self.num_gpios = num_gpios + + if self.num_gpios <= 8: + self.iodir = self._device.readU8(CONFIG_PORT) + self.outputvalue = self._device.readU8(OUTPUT_PORT) + + elif self.num_gpios > 8 and self.num_gpios <= 16: + self.iodir = self._device.readU16(CONFIG_PORT<< 1) + self.outputvalue = self._device.readU16(OUTPUT_PORT << 1) + + def _changebit(self, bitmap, bit, value): + assert value == 1 or value == 0, "Value is %s must be 1 or 0" % value + if value == 0: + return bitmap & ~(1 << bit) + elif value == 1: + return bitmap | (1 << bit) + + # Change the value of bit PIN on port PORT to VALUE. If the + # current pin state for the port is passed in as PORTSTATE, we + # will avoid doing a read to get it. The port pin state must be + # complete if passed in (IE it should not just be the value of the + # single pin we are trying to change) + def _readandchangepin(self, port, pin, value, portstate = None): + assert pin >= 0 and pin < self.num_gpios, "Pin number %s is invalid, only 0-%s are valid" % (pin, self.num_gpios) + if not portstate: + if self.num_gpios <= 8: + portstate = self._device.readU8(port) + elif self.num_gpios > 8 and self.num_gpios <= 16: + portstate = self._device.readU16(port << 1) + newstate = self._changebit(portstate, pin, value) + if self.num_gpios <= 8: + self._device.write8(port, newstate) + else: + self._device.write16(port << 1, newstate) + return newstate + + # Polarity inversion + def polarity(self, pin, value): + return self._readandchangepin(POLARITY_PORT, pin, value) + + # Pin direction + def config(self, pin, mode): + self.iodir = self._readandchangepin(CONFIG_PORT, pin, mode, self.iodir) + return self.iodir + + def output(self, pin, value): + assert self.iodir & (1 << pin) == 0, "Pin %s not set to output" % pin + self.outputvalue = self._readandchangepin(OUTPUT_PORT, pin, value, self.outputvalue) + return self.outputvalue + + def input(self, pin): + assert self.iodir & (1 << pin) != 0, "Pin %s not set to input" % pin + if self.num_gpios <= 8: + value = self._device.readU8(INPUT_PORT) + elif self.num_gpios > 8 and self.num_gpios <= 16: + value = self._device.readU16(INPUT_PORT << 1) + return value & (1 << pin) + + def setup(self, pin, mode): + self.config(pin, mode) + + def cleanup(self, pin=None): + # nothing to cleanup + pass diff --git a/Adafruit_Python_GPIO/Adafruit_GPIO/PCF8574.py b/Adafruit_Python_GPIO/Adafruit_GPIO/PCF8574.py new file mode 100644 index 0000000..02919d1 --- /dev/null +++ b/Adafruit_Python_GPIO/Adafruit_GPIO/PCF8574.py @@ -0,0 +1,94 @@ +''' +Adafruit compatible using BaseGPIO class to represent a PCF8574/A IO expander +Copyright (C) 2015 Sylvan Butler + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +of the Software, and to permit persons to whom the Software is furnished to do +so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE.''' + +import Adafruit_GPIO as GPIO +import Adafruit_GPIO.I2C as I2C + + + +IN = GPIO.IN +OUT = GPIO.OUT +HIGH = GPIO.HIGH +LOW = GPIO.LOW + + +class PCF8574(GPIO.BaseGPIO): + """Class to represent a PCF8574 or PCF8574A GPIO extender. Compatible + with the Adafruit_GPIO BaseGPIO class so it can be used as a custom GPIO + class for interacting with device. + """ + + NUM_GPIO = 8 + + def __init__(self, address=0x27, busnum=None, i2c=None, **kwargs): + address = int(address) + self.__name__ = \ + "PCF8574" if address in range(0x20, 0x28) else \ + "PCF8574A" if address in range(0x38, 0x40) else \ + "Bad address for PCF8574(A): 0x%02X not in range [0x20..0x27, 0x38..0x3F]" % address + if self.__name__[0] != 'P': + raise ValueError(self.__name__) + # Create I2C device. + i2c = i2c or I2C + busnum = busnum or i2c.get_default_bus() + self._device = i2c.get_i2c_device(address, busnum, **kwargs) + # Buffer register values so they can be changed without reading. + self.iodir = 0xFF # Default direction to all inputs is in + self.gpio = 0x00 + self._write_pins() + + + def _write_pins(self): + self._device.writeRaw8(self.gpio | self.iodir) + + def _read_pins(self): + return self._device.readRaw8() & self.iodir + + + def setup(self, pin, mode): + self.setup_pins({pin: mode}) + + def setup_pins(self, pins): + if False in [y for x,y in [(self._validate_pin(pin),mode in (IN,OUT)) for pin,mode in pins.items()]]: + raise ValueError('Invalid MODE, IN or OUT') + for pin,mode in pins.items(): + self.iodir = self._bit2(self.iodir, pin, mode) + self._write_pins() + + + def output(self, pin, value): + self.output_pins({pin: value}) + + def output_pins(self, pins): + [self._validate_pin(pin) for pin in pins.keys()] + for pin,value in pins.items(): + self.gpio = self._bit2(self.gpio, pin, bool(value)) + self._write_pins() + + + def input(self, pin): + return self.input_pins([pin])[0] + + def input_pins(self, pins): + [self._validate_pin(pin) for pin in pins] + inp = self._read_pins() + return [bool(inp & (1<<pin)) for pin in pins] diff --git a/Adafruit_Python_GPIO/Adafruit_GPIO/PWM.py b/Adafruit_Python_GPIO/Adafruit_GPIO/PWM.py new file mode 100644 index 0000000..1e1717a --- /dev/null +++ b/Adafruit_Python_GPIO/Adafruit_GPIO/PWM.py @@ -0,0 +1,128 @@ +# Copyright (c) 2014 Adafruit Industries +# Author: Tony DiCola +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +import Adafruit_GPIO.Platform as Platform + + +class RPi_PWM_Adapter(object): + """PWM implementation for the Raspberry Pi using the RPi.GPIO PWM library.""" + + def __init__(self, rpi_gpio, mode=None): + self.rpi_gpio = rpi_gpio + # Suppress warnings about GPIO in use. + rpi_gpio.setwarnings(False) + # Set board or BCM pin numbering. + if mode == rpi_gpio.BOARD or mode == rpi_gpio.BCM: + rpi_gpio.setmode(mode) + elif mode is not None: + raise ValueError('Unexpected value for mode. Must be BOARD or BCM.') + else: + # Default to BCM numbering if not told otherwise. + rpi_gpio.setmode(rpi_gpio.BCM) + # Store reference to each created PWM instance. + self.pwm = {} + + def start(self, pin, dutycycle, frequency_hz=2000): + """Enable PWM output on specified pin. Set to intiial percent duty cycle + value (0.0 to 100.0) and frequency (in Hz). + """ + if dutycycle < 0.0 or dutycycle > 100.0: + raise ValueError('Invalid duty cycle value, must be between 0.0 to 100.0 (inclusive).') + # Make pin an output. + self.rpi_gpio.setup(pin, self.rpi_gpio.OUT) + # Create PWM instance and save a reference for later access. + self.pwm[pin] = self.rpi_gpio.PWM(pin, frequency_hz) + # Start the PWM at the specified duty cycle. + self.pwm[pin].start(dutycycle) + + def set_duty_cycle(self, pin, dutycycle): + """Set percent duty cycle of PWM output on specified pin. Duty cycle must + be a value 0.0 to 100.0 (inclusive). + """ + if dutycycle < 0.0 or dutycycle > 100.0: + raise ValueError('Invalid duty cycle value, must be between 0.0 to 100.0 (inclusive).') + if pin not in self.pwm: + raise ValueError('Pin {0} is not configured as a PWM. Make sure to first call start for the pin.'.format(pin)) + self.pwm[pin].ChangeDutyCycle(dutycycle) + + def set_frequency(self, pin, frequency_hz): + """Set frequency (in Hz) of PWM output on specified pin.""" + if pin not in self.pwm: + raise ValueError('Pin {0} is not configured as a PWM. Make sure to first call start for the pin.'.format(pin)) + self.pwm[pin].ChangeFrequency(frequency_hz) + + def stop(self, pin): + """Stop PWM output on specified pin.""" + if pin not in self.pwm: + raise ValueError('Pin {0} is not configured as a PWM. Make sure to first call start for the pin.'.format(pin)) + self.pwm[pin].stop() + del self.pwm[pin] + + +class BBIO_PWM_Adapter(object): + """PWM implementation for the BeagleBone Black using the Adafruit_BBIO.PWM + library. + """ + + def __init__(self, bbio_pwm): + self.bbio_pwm = bbio_pwm + + def start(self, pin, dutycycle, frequency_hz=2000): + """Enable PWM output on specified pin. Set to intiial percent duty cycle + value (0.0 to 100.0) and frequency (in Hz). + """ + if dutycycle < 0.0 or dutycycle > 100.0: + raise ValueError('Invalid duty cycle value, must be between 0.0 to 100.0 (inclusive).') + self.bbio_pwm.start(pin, dutycycle, frequency_hz) + + def set_duty_cycle(self, pin, dutycycle): + """Set percent duty cycle of PWM output on specified pin. Duty cycle must + be a value 0.0 to 100.0 (inclusive). + """ + if dutycycle < 0.0 or dutycycle > 100.0: + raise ValueError('Invalid duty cycle value, must be between 0.0 to 100.0 (inclusive).') + self.bbio_pwm.set_duty_cycle(pin, dutycycle) + + def set_frequency(self, pin, frequency_hz): + """Set frequency (in Hz) of PWM output on specified pin.""" + self.bbio_pwm.set_frequency(pin, frequency_hz) + + def stop(self, pin): + """Stop PWM output on specified pin.""" + self.bbio_pwm.stop(pin) + + +def get_platform_pwm(**keywords): + """Attempt to return a PWM instance for the platform which the code is being + executed on. Currently supports only the Raspberry Pi using the RPi.GPIO + library and Beaglebone Black using the Adafruit_BBIO library. Will throw an + exception if a PWM instance can't be created for the current platform. The + returned PWM object has the same interface as the RPi_PWM_Adapter and + BBIO_PWM_Adapter classes. + """ + plat = Platform.platform_detect() + if plat == Platform.RASPBERRY_PI: + import RPi.GPIO + return RPi_PWM_Adapter(RPi.GPIO, **keywords) + elif plat == Platform.BEAGLEBONE_BLACK: + import Adafruit_BBIO.PWM + return BBIO_PWM_Adapter(Adafruit_BBIO.PWM, **keywords) + elif plat == Platform.UNKNOWN: + raise RuntimeError('Could not determine platform.') diff --git a/Adafruit_Python_GPIO/Adafruit_GPIO/Platform.py b/Adafruit_Python_GPIO/Adafruit_GPIO/Platform.py new file mode 100644 index 0000000..2c041a8 --- /dev/null +++ b/Adafruit_Python_GPIO/Adafruit_GPIO/Platform.py @@ -0,0 +1,110 @@ +# Copyright (c) 2014 Adafruit Industries +# Author: Tony DiCola + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +import platform +import re + +# Platform identification constants. +UNKNOWN = 0 +RASPBERRY_PI = 1 +BEAGLEBONE_BLACK = 2 +MINNOWBOARD = 3 + +def platform_detect(): + """Detect if running on the Raspberry Pi or Beaglebone Black and return the + platform type. Will return RASPBERRY_PI, BEAGLEBONE_BLACK, or UNKNOWN.""" + # Handle Raspberry Pi + pi = pi_version() + if pi is not None: + return RASPBERRY_PI + + # Handle Beaglebone Black + # TODO: Check the Beaglebone Black /proc/cpuinfo value instead of reading + # the platform. + plat = platform.platform() + if plat.lower().find('armv7l-with-debian') > -1: + return BEAGLEBONE_BLACK + elif plat.lower().find('armv7l-with-ubuntu') > -1: + return BEAGLEBONE_BLACK + elif plat.lower().find('armv7l-with-glibc2.4') > -1: + return BEAGLEBONE_BLACK + + # Handle Minnowboard + # Assumption is that mraa is installed + try: + import mraa + if mraa.getPlatformName()=='MinnowBoard MAX': + return MINNOWBOARD + except ImportError: + pass + + # Couldn't figure out the platform, just return unknown. + return UNKNOWN + + +def pi_revision(): + """Detect the revision number of a Raspberry Pi, useful for changing + functionality like default I2C bus based on revision.""" + # Revision list available at: http://elinux.org/RPi_HardwareHistory#Board_Revision_History + with open('/proc/cpuinfo', 'r') as infile: + for line in infile: + # Match a line of the form "Revision : 0002" while ignoring extra + # info in front of the revsion (like 1000 when the Pi was over-volted). + match = re.match('Revision\s+:\s+.*(\w{4})$', line, flags=re.IGNORECASE) + if match and match.group(1) in ['0000', '0002', '0003']: + # Return revision 1 if revision ends with 0000, 0002 or 0003. + return 1 + elif match: + # Assume revision 2 if revision ends with any other 4 chars. + return 2 + # Couldn't find the revision, throw an exception. + raise RuntimeError('Could not determine Raspberry Pi revision.') + + +def pi_version(): + """Detect the version of the Raspberry Pi. Returns either 1, 2 or + None depending on if it's a Raspberry Pi 1 (model A, B, A+, B+), + Raspberry Pi 2 (model B+), or not a Raspberry Pi. + """ + # Check /proc/cpuinfo for the Hardware field value. + # 2708 is pi 1 + # 2709 is pi 2 + # 2835 is pi 3 on 4.9.x kernel + # Anything else is not a pi. + with open('/proc/cpuinfo', 'r') as infile: + cpuinfo = infile.read() + # Match a line like 'Hardware : BCM2709' + match = re.search('^Hardware\s+:\s+(\w+)$', cpuinfo, + flags=re.MULTILINE | re.IGNORECASE) + if not match: + # Couldn't find the hardware, assume it isn't a pi. + return None + if match.group(1) == 'BCM2708': + # Pi 1 + return 1 + elif match.group(1) == 'BCM2709': + # Pi 2 + return 2 + elif match.group(1) == 'BCM2835': + # Pi 3 / Pi on 4.9.x kernel + return 3 + else: + # Something else, not a pi. + return None diff --git a/Adafruit_Python_GPIO/Adafruit_GPIO/SPI.py b/Adafruit_Python_GPIO/Adafruit_GPIO/SPI.py new file mode 100644 index 0000000..c20a32c --- /dev/null +++ b/Adafruit_Python_GPIO/Adafruit_GPIO/SPI.py @@ -0,0 +1,328 @@ +# Copyright (c) 2014 Adafruit Industries +# Author: Tony DiCola +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import operator +import time + +import Adafruit_GPIO as GPIO + + +MSBFIRST = 0 +LSBFIRST = 1 + + +class SpiDev(object): + """Hardware-based SPI implementation using the spidev interface.""" + + def __init__(self, port, device, max_speed_hz=500000): + """Initialize an SPI device using the SPIdev interface. Port and device + identify the device, for example the device /dev/spidev1.0 would be port + 1 and device 0. + """ + import spidev + self._device = spidev.SpiDev() + self._device.open(port, device) + self._device.max_speed_hz=max_speed_hz + # Default to mode 0, and make sure CS is active low. + self._device.mode = 0 + self._device.cshigh = False + + def set_clock_hz(self, hz): + """Set the speed of the SPI clock in hertz. Note that not all speeds + are supported and a lower speed might be chosen by the hardware. + """ + self._device.max_speed_hz=hz + + def set_mode(self, mode): + """Set SPI mode which controls clock polarity and phase. Should be a + numeric value 0, 1, 2, or 3. See wikipedia page for details on meaning: + http://en.wikipedia.org/wiki/Serial_Peripheral_Interface_Bus + """ + if mode < 0 or mode > 3: + raise ValueError('Mode must be a value 0, 1, 2, or 3.') + self._device.mode = mode + + def set_bit_order(self, order): + """Set order of bits to be read/written over serial lines. Should be + either MSBFIRST for most-significant first, or LSBFIRST for + least-signifcant first. + """ + if order == MSBFIRST: + self._device.lsbfirst = False + elif order == LSBFIRST: + self._device.lsbfirst = True + else: + raise ValueError('Order must be MSBFIRST or LSBFIRST.') + + def close(self): + """Close communication with the SPI device.""" + self._device.close() + + def write(self, data): + """Half-duplex SPI write. The specified array of bytes will be clocked + out the MOSI line. + """ + self._device.writebytes(data) + + def read(self, length): + """Half-duplex SPI read. The specified length of bytes will be clocked + in the MISO line and returned as a bytearray object. + """ + return bytearray(self._device.readbytes(length)) + + def transfer(self, data): + """Full-duplex SPI read and write. The specified array of bytes will be + clocked out the MOSI line, while simultaneously bytes will be read from + the MISO line. Read bytes will be returned as a bytearray object. + """ + return bytearray(self._device.xfer2(data)) + +class SpiDevMraa(object): + """Hardware SPI implementation with the mraa library on Minnowboard""" + def __init__(self, port, device, max_speed_hz=500000): + import mraa + self._device = mraa.Spi(0) + self._device.mode(0) + + def set_clock_hz(self, hz): + """Set the speed of the SPI clock in hertz. Note that not all speeds + are supported and a lower speed might be chosen by the hardware. + """ + self._device.frequency(hz) + + def set_mode(self,mode): + """Set SPI mode which controls clock polarity and phase. Should be a + numeric value 0, 1, 2, or 3. See wikipedia page for details on meaning: + http://en.wikipedia.org/wiki/Serial_Peripheral_Interface_Bus + """ + if mode < 0 or mode > 3: + raise ValueError('Mode must be a value 0, 1, 2, or 3.') + self._device.mode(mode) + + def set_bit_order(self, order): + """Set order of bits to be read/written over serial lines. Should be + either MSBFIRST for most-significant first, or LSBFIRST for + least-signifcant first. + """ + if order == MSBFIRST: + self._device.lsbmode(False) + elif order == LSBFIRST: + self._device.lsbmode(True) + else: + raise ValueError('Order must be MSBFIRST or LSBFIRST.') + + def close(self): + """Close communication with the SPI device.""" + self._device.Spi() + + def write(self, data): + """Half-duplex SPI write. The specified array of bytes will be clocked + out the MOSI line. + """ + self._device.write(bytearray(data)) + +class BitBang(object): + """Software-based implementation of the SPI protocol over GPIO pins.""" + + def __init__(self, gpio, sclk, mosi=None, miso=None, ss=None): + """Initialize bit bang (or software) based SPI. Must provide a BaseGPIO + class, the SPI clock, and optionally MOSI, MISO, and SS (slave select) + pin numbers. If MOSI is set to None then writes will be disabled and fail + with an error, likewise for MISO reads will be disabled. If SS is set to + None then SS will not be asserted high/low by the library when + transfering data. + """ + self._gpio = gpio + self._sclk = sclk + self._mosi = mosi + self._miso = miso + self._ss = ss + # Set pins as outputs/inputs. + gpio.setup(sclk, GPIO.OUT) + if mosi is not None: + gpio.setup(mosi, GPIO.OUT) + if miso is not None: + gpio.setup(miso, GPIO.IN) + if ss is not None: + gpio.setup(ss, GPIO.OUT) + # Assert SS high to start with device communication off. + gpio.set_high(ss) + # Assume mode 0. + self.set_mode(0) + # Assume most significant bit first order. + self.set_bit_order(MSBFIRST) + + def set_clock_hz(self, hz): + """Set the speed of the SPI clock. This is unsupported with the bit + bang SPI class and will be ignored. + """ + pass + + def set_mode(self, mode): + """Set SPI mode which controls clock polarity and phase. Should be a + numeric value 0, 1, 2, or 3. See wikipedia page for details on meaning: + http://en.wikipedia.org/wiki/Serial_Peripheral_Interface_Bus + """ + if mode < 0 or mode > 3: + raise ValueError('Mode must be a value 0, 1, 2, or 3.') + if mode & 0x02: + # Clock is normally high in mode 2 and 3. + self._clock_base = GPIO.HIGH + else: + # Clock is normally low in mode 0 and 1. + self._clock_base = GPIO.LOW + if mode & 0x01: + # Read on trailing edge in mode 1 and 3. + self._read_leading = False + else: + # Read on leading edge in mode 0 and 2. + self._read_leading = True + # Put clock into its base state. + self._gpio.output(self._sclk, self._clock_base) + + def set_bit_order(self, order): + """Set order of bits to be read/written over serial lines. Should be + either MSBFIRST for most-significant first, or LSBFIRST for + least-signifcant first. + """ + # Set self._mask to the bitmask which points at the appropriate bit to + # read or write, and appropriate left/right shift operator function for + # reading/writing. + if order == MSBFIRST: + self._mask = 0x80 + self._write_shift = operator.lshift + self._read_shift = operator.rshift + elif order == LSBFIRST: + self._mask = 0x01 + self._write_shift = operator.rshift + self._read_shift = operator.lshift + else: + raise ValueError('Order must be MSBFIRST or LSBFIRST.') + + def close(self): + """Close the SPI connection. Unused in the bit bang implementation.""" + pass + + def write(self, data, assert_ss=True, deassert_ss=True): + """Half-duplex SPI write. If assert_ss is True, the SS line will be + asserted low, the specified bytes will be clocked out the MOSI line, and + if deassert_ss is True the SS line be put back high. + """ + # Fail MOSI is not specified. + if self._mosi is None: + raise RuntimeError('Write attempted with no MOSI pin specified.') + if assert_ss and self._ss is not None: + self._gpio.set_low(self._ss) + for byte in data: + for i in range(8): + # Write bit to MOSI. + if self._write_shift(byte, i) & self._mask: + self._gpio.set_high(self._mosi) + else: + self._gpio.set_low(self._mosi) + # Flip clock off base. + self._gpio.output(self._sclk, not self._clock_base) + # Return clock to base. + self._gpio.output(self._sclk, self._clock_base) + if deassert_ss and self._ss is not None: + self._gpio.set_high(self._ss) + + def read(self, length, assert_ss=True, deassert_ss=True): + """Half-duplex SPI read. If assert_ss is true, the SS line will be + asserted low, the specified length of bytes will be clocked in the MISO + line, and if deassert_ss is true the SS line will be put back high. + Bytes which are read will be returned as a bytearray object. + """ + if self._miso is None: + raise RuntimeError('Read attempted with no MISO pin specified.') + if assert_ss and self._ss is not None: + self._gpio.set_low(self._ss) + result = bytearray(length) + for i in range(length): + for j in range(8): + # Flip clock off base. + self._gpio.output(self._sclk, not self._clock_base) + # Handle read on leading edge of clock. + if self._read_leading: + if self._gpio.is_high(self._miso): + # Set bit to 1 at appropriate location. + result[i] |= self._read_shift(self._mask, j) + else: + # Set bit to 0 at appropriate location. + result[i] &= ~self._read_shift(self._mask, j) + # Return clock to base. + self._gpio.output(self._sclk, self._clock_base) + # Handle read on trailing edge of clock. + if not self._read_leading: + if self._gpio.is_high(self._miso): + # Set bit to 1 at appropriate location. + result[i] |= self._read_shift(self._mask, j) + else: + # Set bit to 0 at appropriate location. + result[i] &= ~self._read_shift(self._mask, j) + if deassert_ss and self._ss is not None: + self._gpio.set_high(self._ss) + return result + + def transfer(self, data, assert_ss=True, deassert_ss=True): + """Full-duplex SPI read and write. If assert_ss is true, the SS line + will be asserted low, the specified bytes will be clocked out the MOSI + line while bytes will also be read from the MISO line, and if + deassert_ss is true the SS line will be put back high. Bytes which are + read will be returned as a bytearray object. + """ + if self._mosi is None: + raise RuntimeError('Write attempted with no MOSI pin specified.') + if self._mosi is None: + raise RuntimeError('Read attempted with no MISO pin specified.') + if assert_ss and self._ss is not None: + self._gpio.set_low(self._ss) + result = bytearray(len(data)) + for i in range(len(data)): + for j in range(8): + # Write bit to MOSI. + if self._write_shift(data[i], j) & self._mask: + self._gpio.set_high(self._mosi) + else: + self._gpio.set_low(self._mosi) + # Flip clock off base. + self._gpio.output(self._sclk, not self._clock_base) + # Handle read on leading edge of clock. + if self._read_leading: + if self._gpio.is_high(self._miso): + # Set bit to 1 at appropriate location. + result[i] |= self._read_shift(self._mask, j) + else: + # Set bit to 0 at appropriate location. + result[i] &= ~self._read_shift(self._mask, j) + # Return clock to base. + self._gpio.output(self._sclk, self._clock_base) + # Handle read on trailing edge of clock. + if not self._read_leading: + if self._gpio.is_high(self._miso): + # Set bit to 1 at appropriate location. + result[i] |= self._read_shift(self._mask, j) + else: + # Set bit to 0 at appropriate location. + result[i] &= ~self._read_shift(self._mask, j) + if deassert_ss and self._ss is not None: + self._gpio.set_high(self._ss) + return result diff --git a/Adafruit_Python_GPIO/Adafruit_GPIO/__init__.py b/Adafruit_Python_GPIO/Adafruit_GPIO/__init__.py new file mode 100644 index 0000000..7e99604 --- /dev/null +++ b/Adafruit_Python_GPIO/Adafruit_GPIO/__init__.py @@ -0,0 +1,3 @@ +from __future__ import absolute_import + +from Adafruit_GPIO.GPIO import * |