diff options
Diffstat (limited to 'lib/Adafruit_Python_GPIO')
24 files changed, 3707 insertions, 0 deletions
diff --git a/lib/Adafruit_Python_GPIO/.github/ISSUE_TEMPLATE.md b/lib/Adafruit_Python_GPIO/.github/ISSUE_TEMPLATE.md new file mode 100644 index 0000000..6344b2f --- /dev/null +++ b/lib/Adafruit_Python_GPIO/.github/ISSUE_TEMPLATE.md @@ -0,0 +1,47 @@ +Thank you for opening an issue on an Adafruit Python library repository. To +improve the speed of resolution please review the following guidelines and +common troubleshooting steps below before creating the issue: + +- **Do not use GitHub issues for troubleshooting projects and issues.** Instead use + the forums at http://forums.adafruit.com to ask questions and troubleshoot why + something isn't working as expected. In many cases the problem is a common issue + that you will more quickly receive help from the forum community. GitHub issues + are meant for known defects in the code. If you don't know if there is a defect + in the code then start with troubleshooting on the forum first. + +- **If following a tutorial or guide be sure you didn't miss a step.** Carefully + check all of the steps and commands to run have been followed. Consult the + forum if you're unsure or have questions about steps in a guide/tutorial. + +- **For Python/Raspberry Pi projects check these very common issues to ensure they don't apply**: + + - If you are receiving an **ImportError: No module named...** error then a + library the code depends on is not installed. Check the tutorial/guide or + README to ensure you have installed the necessary libraries. Usually the + missing library can be installed with the `pip` tool, but check the tutorial/guide + for the exact command. + + - **Be sure you are supplying adequate power to the board.** Check the specs of + your board and power in an external power supply. In many cases just + plugging a board into your computer is not enough to power it and other + peripherals. + + - **Double check all soldering joints and connections.** Flakey connections + cause many mysterious problems. See the [guide to excellent soldering](https://learn.adafruit.com/adafruit-guide-excellent-soldering/tools) for examples of good solder joints. + +If you're sure this issue is a defect in the code and checked the steps above +please fill in the following fields to provide enough troubleshooting information. +You may delete the guideline and text above to just leave the following details: + +- Platform/operating system (i.e. Raspberry Pi with Raspbian operating system, + Windows 32-bit, Windows 64-bit, Mac OSX 64-bit, etc.): **INSERT PLATFORM/OPERATING + SYSTEM HERE** + +- Python version (run `python -version` or `python3 -version`): **INSERT PYTHON + VERSION HERE** + +- Error message you are receiving, including any Python exception traces: **INSERT + ERROR MESAGE/EXCEPTION TRACES HERE*** + +- List the steps to reproduce the problem below (if possible attach code or commands + to run): **LIST REPRO STEPS BELOW** diff --git a/lib/Adafruit_Python_GPIO/.github/PULL_REQUEST_TEMPLATE.md b/lib/Adafruit_Python_GPIO/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 0000000..7b641eb --- /dev/null +++ b/lib/Adafruit_Python_GPIO/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,26 @@ +Thank you for creating a pull request to contribute to Adafruit's GitHub code! +Before you open the request please review the following guidelines and tips to +help it be more easily integrated: + +- **Describe the scope of your change--i.e. what the change does and what parts + of the code were modified.** This will help us understand any risks of integrating + the code. + +- **Describe any known limitations with your change.** For example if the change + doesn't apply to a supported platform of the library please mention it. + +- **Please run any tests or examples that can exercise your modified code.** We + strive to not break users of the code and running tests/examples helps with this + process. + +Thank you again for contributing! We will try to test and integrate the change +as soon as we can, but be aware we have many GitHub repositories to manage and +can't immediately respond to every request. There is no need to bump or check in +on a pull request (it will clutter the discussion of the request). + +Also don't be worried if the request is closed or not integrated--sometimes the +priorities of Adafruit's GitHub code (education, ease of use) might not match the +priorities of the pull request. Don't fret, the open source community thrives on +forks and GitHub makes it easy to keep your changes in a forked repo. + +After reviewing the guidelines above you can delete this text from the pull request. diff --git a/lib/Adafruit_Python_GPIO/.gitignore b/lib/Adafruit_Python_GPIO/.gitignore new file mode 100644 index 0000000..dd4b68c --- /dev/null +++ b/lib/Adafruit_Python_GPIO/.gitignore @@ -0,0 +1,7 @@ +build/ +dist/ +*.egg-info +*.pyc +setuptools* + +.idea
\ No newline at end of file diff --git a/lib/Adafruit_Python_GPIO/Adafruit_GPIO/FT232H.py b/lib/Adafruit_Python_GPIO/Adafruit_GPIO/FT232H.py new file mode 100644 index 0000000..1874272 --- /dev/null +++ b/lib/Adafruit_Python_GPIO/Adafruit_GPIO/FT232H.py @@ -0,0 +1,816 @@ +# Copyright (c) 2014 Adafruit Industries +# Author: Tony DiCola +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import atexit +import logging +import math +import os +import subprocess +import sys +import time + +import ftdi1 as ftdi + +import Adafruit_GPIO.GPIO as GPIO + + +logger = logging.getLogger(__name__) + +FT232H_VID = 0x0403 # Default FTDI FT232H vendor ID +FT232H_PID = 0x6014 # Default FTDI FT232H product ID + +MSBFIRST = 0 +LSBFIRST = 1 + +_REPEAT_DELAY = 4 + + +def _check_running_as_root(): + # NOTE: Checking for root with user ID 0 isn't very portable, perhaps + # there's a better alternative? + if os.geteuid() != 0: + raise RuntimeError('Expected to be run by root user! Try running with sudo.') + +def disable_FTDI_driver(): + """Disable the FTDI drivers for the current platform. This is necessary + because they will conflict with libftdi and accessing the FT232H. Note you + can enable the FTDI drivers again by calling enable_FTDI_driver. + """ + logger.debug('Disabling FTDI driver.') + if sys.platform == 'darwin': + logger.debug('Detected Mac OSX') + # Mac OS commands to disable FTDI driver. + _check_running_as_root() + subprocess.call('kextunload -b com.apple.driver.AppleUSBFTDI', shell=True) + subprocess.call('kextunload /System/Library/Extensions/FTDIUSBSerialDriver.kext', shell=True) + elif sys.platform.startswith('linux'): + logger.debug('Detected Linux') + # Linux commands to disable FTDI driver. + _check_running_as_root() + subprocess.call('modprobe -r -q ftdi_sio', shell=True) + subprocess.call('modprobe -r -q usbserial', shell=True) + # Note there is no need to disable FTDI drivers on Windows! + +def enable_FTDI_driver(): + """Re-enable the FTDI drivers for the current platform.""" + logger.debug('Enabling FTDI driver.') + if sys.platform == 'darwin': + logger.debug('Detected Mac OSX') + # Mac OS commands to enable FTDI driver. + _check_running_as_root() + subprocess.check_call('kextload -b com.apple.driver.AppleUSBFTDI', shell=True) + subprocess.check_call('kextload /System/Library/Extensions/FTDIUSBSerialDriver.kext', shell=True) + elif sys.platform.startswith('linux'): + logger.debug('Detected Linux') + # Linux commands to enable FTDI driver. + _check_running_as_root() + subprocess.check_call('modprobe -q ftdi_sio', shell=True) + subprocess.check_call('modprobe -q usbserial', shell=True) + +def use_FT232H(): + """Disable any built in FTDI drivers which will conflict and cause problems + with libftdi (which is used to communicate with the FT232H). Will register + an exit function so the drivers are re-enabled on program exit. + """ + disable_FTDI_driver() + atexit.register(enable_FTDI_driver) + +def enumerate_device_serials(vid=FT232H_VID, pid=FT232H_PID): + """Return a list of all FT232H device serial numbers connected to the + machine. You can use these serial numbers to open a specific FT232H device + by passing it to the FT232H initializer's serial parameter. + """ + try: + # Create a libftdi context. + ctx = None + ctx = ftdi.new() + # Enumerate FTDI devices. + device_list = None + count, device_list = ftdi.usb_find_all(ctx, vid, pid) + if count < 0: + raise RuntimeError('ftdi_usb_find_all returned error {0}: {1}'.format(count, ftdi.get_error_string(self._ctx))) + # Walk through list of devices and assemble list of serial numbers. + devices = [] + while device_list is not None: + # Get USB device strings and add serial to list of devices. + ret, manufacturer, description, serial = ftdi.usb_get_strings(ctx, device_list.dev, 256, 256, 256) + if serial is not None: + devices.append(serial) + device_list = device_list.next + return devices + finally: + # Make sure to clean up list and context when done. + if device_list is not None: + ftdi.list_free(device_list) + if ctx is not None: + ftdi.free(ctx) + + +class FT232H(GPIO.BaseGPIO): + # Make GPIO constants that match main GPIO class for compatibility. + HIGH = GPIO.HIGH + LOW = GPIO.LOW + IN = GPIO.IN + OUT = GPIO.OUT + + def __init__(self, vid=FT232H_VID, pid=FT232H_PID, serial=None): + """Create a FT232H object. Will search for the first available FT232H + device with the specified USB vendor ID and product ID (defaults to + FT232H default VID & PID). Can also specify an optional serial number + string to open an explicit FT232H device given its serial number. See + the FT232H.enumerate_device_serials() function to see how to list all + connected device serial numbers. + """ + # Initialize FTDI device connection. + self._ctx = ftdi.new() + if self._ctx == 0: + raise RuntimeError('ftdi_new failed! Is libftdi1 installed?') + # Register handler to close and cleanup FTDI context on program exit. + atexit.register(self.close) + if serial is None: + # Open USB connection for specified VID and PID if no serial is specified. + self._check(ftdi.usb_open, vid, pid) + else: + # Open USB connection for VID, PID, serial. + self._check(ftdi.usb_open_string, 's:{0}:{1}:{2}'.format(vid, pid, serial)) + # Reset device. + self._check(ftdi.usb_reset) + # Disable flow control. Commented out because it is unclear if this is necessary. + #self._check(ftdi.setflowctrl, ftdi.SIO_DISABLE_FLOW_CTRL) + # Change read & write buffers to maximum size, 65535 bytes. + self._check(ftdi.read_data_set_chunksize, 65535) + self._check(ftdi.write_data_set_chunksize, 65535) + # Clear pending read data & write buffers. + self._check(ftdi.usb_purge_buffers) + # Enable MPSSE and syncronize communication with device. + self._mpsse_enable() + self._mpsse_sync() + # Initialize all GPIO as inputs. + self._write('\x80\x00\x00\x82\x00\x00') + self._direction = 0x0000 + self._level = 0x0000 + + def close(self): + """Close the FTDI device. Will be automatically called when the program ends.""" + if self._ctx is not None: + ftdi.free(self._ctx) + self._ctx = None + + def _write(self, string): + """Helper function to call write_data on the provided FTDI device and + verify it succeeds. + """ + # Get modem status. Useful to enable for debugging. + #ret, status = ftdi.poll_modem_status(self._ctx) + #if ret == 0: + # logger.debug('Modem status {0:02X}'.format(status)) + #else: + # logger.debug('Modem status error {0}'.format(ret)) + length = len(string) + ret = ftdi.write_data(self._ctx, string, length) + # Log the string that was written in a python hex string format using a very + # ugly one-liner list comprehension for brevity. + #logger.debug('Wrote {0}'.format(''.join(['\\x{0:02X}'.format(ord(x)) for x in string]))) + if ret < 0: + raise RuntimeError('ftdi_write_data failed with error {0}: {1}'.format(ret, ftdi.get_error_string(self._ctx))) + if ret != length: + raise RuntimeError('ftdi_write_data expected to write {0} bytes but actually wrote {1}!'.format(length, ret)) + + def _check(self, command, *args): + """Helper function to call the provided command on the FTDI device and + verify the response matches the expected value. + """ + ret = command(self._ctx, *args) + logger.debug('Called ftdi_{0} and got response {1}.'.format(command.__name__, ret)) + if ret != 0: + raise RuntimeError('ftdi_{0} failed with error {1}: {2}'.format(command.__name__, ret, ftdi.get_error_string(self._ctx))) + + def _poll_read(self, expected, timeout_s=5.0): + """Helper function to continuously poll reads on the FTDI device until an + expected number of bytes are returned. Will throw a timeout error if no + data is received within the specified number of timeout seconds. Returns + the read data as a string if successful, otherwise raises an execption. + """ + start = time.time() + # Start with an empty response buffer. + response = bytearray(expected) + index = 0 + # Loop calling read until the response buffer is full or a timeout occurs. + while time.time() - start <= timeout_s: + ret, data = ftdi.read_data(self._ctx, expected - index) + # Fail if there was an error reading data. + if ret < 0: + raise RuntimeError('ftdi_read_data failed with error code {0}.'.format(ret)) + # Add returned data to the buffer. + response[index:index+ret] = data[:ret] + index += ret + # Buffer is full, return the result data. + if index >= expected: + return str(response) + time.sleep(0.01) + raise RuntimeError('Timeout while polling ftdi_read_data for {0} bytes!'.format(expected)) + + def _mpsse_enable(self): + """Enable MPSSE mode on the FTDI device.""" + # Reset MPSSE by sending mask = 0 and mode = 0 + self._check(ftdi.set_bitmode, 0, 0) + # Enable MPSSE by sending mask = 0 and mode = 2 + self._check(ftdi.set_bitmode, 0, 2) + + def _mpsse_sync(self, max_retries=10): + """Synchronize buffers with MPSSE by sending bad opcode and reading expected + error response. Should be called once after enabling MPSSE.""" + # Send a bad/unknown command (0xAB), then read buffer until bad command + # response is found. + self._write('\xAB') + # Keep reading until bad command response (0xFA 0xAB) is returned. + # Fail if too many read attempts are made to prevent sticking in a loop. + tries = 0 + sync = False + while not sync: + data = self._poll_read(2) + if data == '\xFA\xAB': + sync = True + tries += 1 + if tries >= max_retries: + raise RuntimeError('Could not synchronize with FT232H!') + + def mpsse_set_clock(self, clock_hz, adaptive=False, three_phase=False): + """Set the clock speed of the MPSSE engine. Can be any value from 450hz + to 30mhz and will pick that speed or the closest speed below it. + """ + # Disable clock divisor by 5 to enable faster speeds on FT232H. + self._write('\x8A') + # Turn on/off adaptive clocking. + if adaptive: + self._write('\x96') + else: + self._write('\x97') + # Turn on/off three phase clock (needed for I2C). + # Also adjust the frequency for three-phase clocking as specified in section 2.2.4 + # of this document: + # http://www.ftdichip.com/Support/Documents/AppNotes/AN_255_USB%20to%20I2C%20Example%20using%20the%20FT232H%20and%20FT201X%20devices.pdf + if three_phase: + self._write('\x8C') + else: + self._write('\x8D') + # Compute divisor for requested clock. + # Use equation from section 3.8.1 of: + # http://www.ftdichip.com/Support/Documents/AppNotes/AN_108_Command_Processor_for_MPSSE_and_MCU_Host_Bus_Emulation_Modes.pdf + # Note equation is using 60mhz master clock instead of 12mhz. + divisor = int(math.ceil((30000000.0-float(clock_hz))/float(clock_hz))) & 0xFFFF + if three_phase: + divisor = int(divisor*(2.0/3.0)) + logger.debug('Setting clockspeed with divisor value {0}'.format(divisor)) + # Send command to set divisor from low and high byte values. + self._write(str(bytearray((0x86, divisor & 0xFF, (divisor >> 8) & 0xFF)))) + + def mpsse_read_gpio(self): + """Read both GPIO bus states and return a 16 bit value with their state. + D0-D7 are the lower 8 bits and C0-C7 are the upper 8 bits. + """ + # Send command to read low byte and high byte. + self._write('\x81\x83') + # Wait for 2 byte response. + data = self._poll_read(2) + # Assemble response into 16 bit value. + low_byte = ord(data[0]) + high_byte = ord(data[1]) + logger.debug('Read MPSSE GPIO low byte = {0:02X} and high byte = {1:02X}'.format(low_byte, high_byte)) + return (high_byte << 8) | low_byte + + def mpsse_gpio(self): + """Return command to update the MPSSE GPIO state to the current direction + and level. + """ + level_low = chr(self._level & 0xFF) + level_high = chr((self._level >> 8) & 0xFF) + dir_low = chr(self._direction & 0xFF) + dir_high = chr((self._direction >> 8) & 0xFF) + return str(bytearray((0x80, level_low, dir_low, 0x82, level_high, dir_high))) + + def mpsse_write_gpio(self): + """Write the current MPSSE GPIO state to the FT232H chip.""" + self._write(self.mpsse_gpio()) + + def get_i2c_device(self, address, **kwargs): + """Return an I2CDevice instance using this FT232H object and the provided + I2C address. Meant to be passed as the i2c_provider parameter to objects + which use the Adafruit_Python_GPIO library for I2C. + """ + return I2CDevice(self, address, **kwargs) + + # GPIO functions below: + + def _setup_pin(self, pin, mode): + if pin < 0 or pin > 15: + raise ValueError('Pin must be between 0 and 15 (inclusive).') + if mode not in (GPIO.IN, GPIO.OUT): + raise ValueError('Mode must be GPIO.IN or GPIO.OUT.') + if mode == GPIO.IN: + # Set the direction and level of the pin to 0. + self._direction &= ~(1 << pin) & 0xFFFF + self._level &= ~(1 << pin) & 0xFFFF + else: + # Set the direction of the pin to 1. + self._direction |= (1 << pin) & 0xFFFF + + def setup(self, pin, mode): + """Set the input or output mode for a specified pin. Mode should be + either OUT or IN.""" + self._setup_pin(pin, mode) + self.mpsse_write_gpio() + + def setup_pins(self, pins, values={}, write=True): + """Setup multiple pins as inputs or outputs at once. Pins should be a + dict of pin name to pin mode (IN or OUT). Optional starting values of + pins can be provided in the values dict (with pin name to pin value). + """ + # General implementation that can be improved by subclasses. + for pin, mode in iter(pins.items()): + self._setup_pin(pin, mode) + for pin, value in iter(values.items()): + self._output_pin(pin, value) + if write: + self.mpsse_write_gpio() + + def _output_pin(self, pin, value): + if value: + self._level |= (1 << pin) & 0xFFFF + else: + self._level &= ~(1 << pin) & 0xFFFF + + def output(self, pin, value): + """Set the specified pin the provided high/low value. Value should be + either HIGH/LOW or a boolean (true = high).""" + if pin < 0 or pin > 15: + raise ValueError('Pin must be between 0 and 15 (inclusive).') + self._output_pin(pin, value) + self.mpsse_write_gpio() + + def output_pins(self, pins, write=True): + """Set multiple pins high or low at once. Pins should be a dict of pin + name to pin value (HIGH/True for 1, LOW/False for 0). All provided pins + will be set to the given values. + """ + for pin, value in iter(pins.items()): + self._output_pin(pin, value) + if write: + self.mpsse_write_gpio() + + def input(self, pin): + """Read the specified pin and return HIGH/true if the pin is pulled high, + or LOW/false if pulled low.""" + return self.input_pins([pin])[0] + + def input_pins(self, pins): + """Read multiple pins specified in the given list and return list of pin values + GPIO.HIGH/True if the pin is pulled high, or GPIO.LOW/False if pulled low.""" + if [pin for pin in pins if pin < 0 or pin > 15]: + raise ValueError('Pin must be between 0 and 15 (inclusive).') + _pins = self.mpsse_read_gpio() + return [((_pins >> pin) & 0x0001) == 1 for pin in pins] + + +class SPI(object): + def __init__(self, ft232h, cs=None, max_speed_hz=1000000, mode=0, bitorder=MSBFIRST): + self._ft232h = ft232h + # Initialize chip select pin if provided to output high. + if cs is not None: + ft232h.setup(cs, GPIO.OUT) + ft232h.set_high(cs) + self._cs = cs + # Initialize clock, mode, and bit order. + self.set_clock_hz(max_speed_hz) + self.set_mode(mode) + self.set_bit_order(bitorder) + + def _assert_cs(self): + if self._cs is not None: + self._ft232h.set_low(self._cs) + + def _deassert_cs(self): + if self._cs is not None: + self._ft232h.set_high(self._cs) + + def set_clock_hz(self, hz): + """Set the speed of the SPI clock in hertz. Note that not all speeds + are supported and a lower speed might be chosen by the hardware. + """ + self._ft232h.mpsse_set_clock(hz) + + def set_mode(self, mode): + """Set SPI mode which controls clock polarity and phase. Should be a + numeric value 0, 1, 2, or 3. See wikipedia page for details on meaning: + http://en.wikipedia.org/wiki/Serial_Peripheral_Interface_Bus + """ + if mode < 0 or mode > 3: + raise ValueError('Mode must be a value 0, 1, 2, or 3.') + if mode == 0: + # Mode 0 captures on rising clock, propagates on falling clock + self.write_clock_ve = 1 + self.read_clock_ve = 0 + # Clock base is low. + clock_base = GPIO.LOW + elif mode == 1: + # Mode 1 capture of falling edge, propagate on rising clock + self.write_clock_ve = 0 + self.read_clock_ve = 1 + # Clock base is low. + clock_base = GPIO.LOW + elif mode == 2: + # Mode 2 capture on rising clock, propagate on falling clock + self.write_clock_ve = 1 + self.read_clock_ve = 0 + # Clock base is high. + clock_base = GPIO.HIGH + elif mode == 3: + # Mode 3 capture on falling edge, propagage on rising clock + self.write_clock_ve = 0 + self.read_clock_ve = 1 + # Clock base is high. + clock_base = GPIO.HIGH + # Set clock and DO as output, DI as input. Also start clock at its base value. + self._ft232h.setup_pins({0: GPIO.OUT, 1: GPIO.OUT, 2: GPIO.IN}, {0: clock_base}) + + def set_bit_order(self, order): + """Set order of bits to be read/written over serial lines. Should be + either MSBFIRST for most-significant first, or LSBFIRST for + least-signifcant first. + """ + if order == MSBFIRST: + self.lsbfirst = 0 + elif order == LSBFIRST: + self.lsbfirst = 1 + else: + raise ValueError('Order must be MSBFIRST or LSBFIRST.') + + def write(self, data): + """Half-duplex SPI write. The specified array of bytes will be clocked + out the MOSI line. + """ + # Build command to write SPI data. + command = 0x10 | (self.lsbfirst << 3) | self.write_clock_ve + logger.debug('SPI write with command {0:2X}.'.format(command)) + # Compute length low and high bytes. + # NOTE: Must actually send length minus one because the MPSSE engine + # considers 0 a length of 1 and FFFF a length of 65536 + length = len(data)-1 + len_low = length & 0xFF + len_high = (length >> 8) & 0xFF + self._assert_cs() + # Send command and length. + self._ft232h._write(str(bytearray((command, len_low, len_high)))) + # Send data. + self._ft232h._write(str(bytearray(data))) + self._deassert_cs() + + def read(self, length): + """Half-duplex SPI read. The specified length of bytes will be clocked + in the MISO line and returned as a bytearray object. + """ + # Build command to read SPI data. + command = 0x20 | (self.lsbfirst << 3) | (self.read_clock_ve << 2) + logger.debug('SPI read with command {0:2X}.'.format(command)) + # Compute length low and high bytes. + # NOTE: Must actually send length minus one because the MPSSE engine + # considers 0 a length of 1 and FFFF a length of 65536 + len_low = (length-1) & 0xFF + len_high = ((length-1) >> 8) & 0xFF + self._assert_cs() + # Send command and length. + self._ft232h._write(str(bytearray((command, len_low, len_high, 0x87)))) + self._deassert_cs() + # Read response bytes. + return bytearray(self._ft232h._poll_read(length)) + + def transfer(self, data): + """Full-duplex SPI read and write. The specified array of bytes will be + clocked out the MOSI line, while simultaneously bytes will be read from + the MISO line. Read bytes will be returned as a bytearray object. + """ + # Build command to read and write SPI data. + command = 0x30 | (self.lsbfirst << 3) | (self.read_clock_ve << 2) | self.write_clock_ve + logger.debug('SPI transfer with command {0:2X}.'.format(command)) + # Compute length low and high bytes. + # NOTE: Must actually send length minus one because the MPSSE engine + # considers 0 a length of 1 and FFFF a length of 65536 + length = len(data) + len_low = (length-1) & 0xFF + len_high = ((length-1) >> 8) & 0xFF + # Send command and length. + self._assert_cs() + self._ft232h._write(str(bytearray((command, len_low, len_high)))) + self._ft232h._write(str(bytearray(data))) + self._ft232h._write('\x87') + self._deassert_cs() + # Read response bytes. + return bytearray(self._ft232h._poll_read(length)) + + +class I2CDevice(object): + """Class for communicating with an I2C device using the smbus library. + Allows reading and writing 8-bit, 16-bit, and byte array values to registers + on the device.""" + # Note that most of the functions in this code are adapted from this app note: + # http://www.ftdichip.com/Support/Documents/AppNotes/AN_255_USB%20to%20I2C%20Example%20using%20the%20FT232H%20and%20FT201X%20devices.pdf + def __init__(self, ft232h, address, clock_hz=100000): + """Create an instance of the I2C device at the specified address on the + specified I2C bus number.""" + self._address = address + self._ft232h = ft232h + # Enable clock with three phases for I2C. + self._ft232h.mpsse_set_clock(clock_hz, three_phase=True) + # Enable drive-zero mode to drive outputs low on 0 and tri-state on 1. + # This matches the protocol for I2C communication so multiple devices can + # share the I2C bus. + self._ft232h._write('\x9E\x07\x00') + self._idle() + + def _idle(self): + """Put I2C lines into idle state.""" + # Put the I2C lines into an idle state with SCL and SDA high. + self._ft232h.setup_pins({0: GPIO.OUT, 1: GPIO.OUT, 2: GPIO.IN}, + {0: GPIO.HIGH, 1: GPIO.HIGH}) + + def _transaction_start(self): + """Start I2C transaction.""" + # Clear command buffer and expected response bytes. + self._command = [] + self._expected = 0 + + def _transaction_end(self): + """End I2C transaction and get response bytes, including ACKs.""" + # Ask to return response bytes immediately. + self._command.append('\x87') + # Send the entire command to the MPSSE. + self._ft232h._write(''.join(self._command)) + # Read response bytes and return them. + return bytearray(self._ft232h._poll_read(self._expected)) + + def _i2c_start(self): + """Send I2C start signal. Must be called within a transaction start/end. + """ + # Set SCL high and SDA low, repeat 4 times to stay in this state for a + # short period of time. + self._ft232h.output_pins({0: GPIO.HIGH, 1: GPIO.LOW}, write=False) + self._command.append(self._ft232h.mpsse_gpio() * _REPEAT_DELAY) + # Now drop SCL to low (again repeat 4 times for short delay). + self._ft232h.output_pins({0: GPIO.LOW, 1: GPIO.LOW}, write=False) + self._command.append(self._ft232h.mpsse_gpio() * _REPEAT_DELAY) + + def _i2c_idle(self): + """Set I2C signals to idle state with SCL and SDA at a high value. Must + be called within a transaction start/end. + """ + self._ft232h.output_pins({0: GPIO.HIGH, 1: GPIO.HIGH}, write=False) + self._command.append(self._ft232h.mpsse_gpio() * _REPEAT_DELAY) + + def _i2c_stop(self): + """Send I2C stop signal. Must be called within a transaction start/end. + """ + # Set SCL low and SDA low for a short period. + self._ft232h.output_pins({0: GPIO.LOW, 1: GPIO.LOW}, write=False) + self._command.append(self._ft232h.mpsse_gpio() * _REPEAT_DELAY) + # Set SCL high and SDA low for a short period. + self._ft232h.output_pins({0: GPIO.HIGH, 1: GPIO.LOW}, write=False) + self._command.append(self._ft232h.mpsse_gpio() * _REPEAT_DELAY) + # Finally set SCL high and SDA high for a short period. + self._ft232h.output_pins({0: GPIO.HIGH, 1: GPIO.HIGH}, write=False) + self._command.append(self._ft232h.mpsse_gpio() * _REPEAT_DELAY) + + def _i2c_read_bytes(self, length=1): + """Read the specified number of bytes from the I2C bus. Length is the + number of bytes to read (must be 1 or more). + """ + for i in range(length-1): + # Read a byte and send ACK. + self._command.append('\x20\x00\x00\x13\x00\x00') + # Make sure pins are back in idle state with clock low and data high. + self._ft232h.output_pins({0: GPIO.LOW, 1: GPIO.HIGH}, write=False) + self._command.append(self._ft232h.mpsse_gpio()) + # Read last byte and send NAK. + self._command.append('\x20\x00\x00\x13\x00\xFF') + # Make sure pins are back in idle state with clock low and data high. + self._ft232h.output_pins({0: GPIO.LOW, 1: GPIO.HIGH}, write=False) + self._command.append(self._ft232h.mpsse_gpio()) + # Increase expected number of bytes. + self._expected += length + + def _i2c_write_bytes(self, data): + """Write the specified number of bytes to the chip.""" + for byte in data: + # Write byte. + self._command.append(str(bytearray((0x11, 0x00, 0x00, byte)))) + # Make sure pins are back in idle state with clock low and data high. + self._ft232h.output_pins({0: GPIO.LOW, 1: GPIO.HIGH}, write=False) + self._command.append(self._ft232h.mpsse_gpio() * _REPEAT_DELAY) + # Read bit for ACK/NAK. + self._command.append('\x22\x00') + # Increase expected response bytes. + self._expected += len(data) + + def _address_byte(self, read=True): + """Return the address byte with the specified R/W bit set. If read is + True the R/W bit will be 1, otherwise the R/W bit will be 0. + """ + if read: + return (self._address << 1) | 0x01 + else: + return self._address << 1 + + def _verify_acks(self, response): + """Check all the specified bytes have the ACK bit set. Throws a + RuntimeError exception if not all the ACKs are set. + """ + for byte in response: + if byte & 0x01 != 0x00: + raise RuntimeError('Failed to find expected I2C ACK!') + + def ping(self): + """Attempt to detect if a device at this address is present on the I2C + bus. Will send out the device's address for writing and verify an ACK + is received. Returns true if the ACK is received, and false if not. + """ + self._idle() + self._transaction_start() + self._i2c_start() + self._i2c_write_bytes([self._address_byte(False)]) + self._i2c_stop() + response = self._transaction_end() + if len(response) != 1: + raise RuntimeError('Expected 1 response byte but received {0} byte(s).'.format(len(response))) + return ((response[0] & 0x01) == 0x00) + + def writeRaw8(self, value): + """Write an 8-bit value on the bus (without register).""" + value = value & 0xFF + self._idle() + self._transaction_start() + self._i2c_start() + self._i2c_write_bytes([self._address_byte(False), value]) + self._i2c_stop() + response = self._transaction_end() + self._verify_acks(response) + + def write8(self, register, value): + """Write an 8-bit value to the specified register.""" + value = value & 0xFF + self._idle() + self._transaction_start() + self._i2c_start() + self._i2c_write_bytes([self._address_byte(False), register, value]) + self._i2c_stop() + response = self._transaction_end() + self._verify_acks(response) + + def write16(self, register, value, little_endian=True): + """Write a 16-bit value to the specified register.""" + value = value & 0xFFFF + value_low = value & 0xFF + value_high = (value >> 8) & 0xFF + if not little_endian: + value_low, value_high = value_high, value_low + self._idle() + self._transaction_start() + self._i2c_start() + self._i2c_write_bytes([self._address_byte(False), register, value_low, + value_high]) + self._i2c_stop() + response = self._transaction_end() + self._verify_acks(response) + + def writeList(self, register, data): + """Write bytes to the specified register.""" + self._idle() + self._transaction_start() + self._i2c_start() + self._i2c_write_bytes([self._address_byte(False), register] + data) + self._i2c_stop() + response = self._transaction_end() + self._verify_acks(response) + + def readList(self, register, length): + """Read a length number of bytes from the specified register. Results + will be returned as a bytearray.""" + if length <= 0: + raise ValueError("Length must be at least 1 byte.") + self._idle() + self._transaction_start() + self._i2c_start() + self._i2c_write_bytes([self._address_byte(True), register]) + self._i2c_stop() + self._i2c_idle() + self._i2c_start() + self._i2c_read_bytes(length) + self._i2c_stop() + response = self._transaction_end() + self._verify_acks(response[:-length]) + return response[-length:] + + def readRaw8(self): + """Read an 8-bit value on the bus (without register).""" + self._idle() + self._transaction_start() + self._i2c_start() + self._i2c_write_bytes([self._address_byte(False)]) + self._i2c_stop() + self._i2c_idle() + self._i2c_start() + self._i2c_write_bytes([self._address_byte(True)]) + self._i2c_read_bytes(1) + self._i2c_stop() + response = self._transaction_end() + self._verify_acks(response[:-1]) + return response[-1] + + def readU8(self, register): + """Read an unsigned byte from the specified register.""" + self._idle() + self._transaction_start() + self._i2c_start() + self._i2c_write_bytes([self._address_byte(False), register]) + self._i2c_stop() + self._i2c_idle() + self._i2c_start() + self._i2c_write_bytes([self._address_byte(True)]) + self._i2c_read_bytes(1) + self._i2c_stop() + response = self._transaction_end() + self._verify_acks(response[:-1]) + return response[-1] + + def readS8(self, register): + """Read a signed byte from the specified register.""" + result = self.readU8(register) + if result > 127: + result -= 256 + return result + + def readU16(self, register, little_endian=True): + """Read an unsigned 16-bit value from the specified register, with the + specified endianness (default little endian, or least significant byte + first).""" + self._idle() + self._transaction_start() + self._i2c_start() + self._i2c_write_bytes([self._address_byte(False), register]) + self._i2c_stop() + self._i2c_idle() + self._i2c_start() + self._i2c_write_bytes([self._address_byte(True)]) + self._i2c_read_bytes(2) + self._i2c_stop() + response = self._transaction_end() + self._verify_acks(response[:-2]) + if little_endian: + return (response[-1] << 8) | response[-2] + else: + return (response[-2] << 8) | response[-1] + + def readS16(self, register, little_endian=True): + """Read a signed 16-bit value from the specified register, with the + specified endianness (default little endian, or least significant byte + first).""" + result = self.readU16(register, little_endian) + if result > 32767: + result -= 65536 + return result + + def readU16LE(self, register): + """Read an unsigned 16-bit value from the specified register, in little + endian byte order.""" + return self.readU16(register, little_endian=True) + + def readU16BE(self, register): + """Read an unsigned 16-bit value from the specified register, in big + endian byte order.""" + return self.readU16(register, little_endian=False) + + def readS16LE(self, register): + """Read a signed 16-bit value from the specified register, in little + endian byte order.""" + return self.readS16(register, little_endian=True) + + def readS16BE(self, register): + """Read a signed 16-bit value from the specified register, in big + endian byte order.""" + return self.readS16(register, little_endian=False) diff --git a/lib/Adafruit_Python_GPIO/Adafruit_GPIO/GPIO.py b/lib/Adafruit_Python_GPIO/Adafruit_GPIO/GPIO.py new file mode 100644 index 0000000..08e99c6 --- /dev/null +++ b/lib/Adafruit_Python_GPIO/Adafruit_GPIO/GPIO.py @@ -0,0 +1,426 @@ +# Copyright (c) 2014 Adafruit Industries +# Author: Tony DiCola +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import Adafruit_GPIO.Platform as Platform + + +OUT = 0 +IN = 1 +HIGH = True +LOW = False + +RISING = 1 +FALLING = 2 +BOTH = 3 + +PUD_OFF = 0 +PUD_DOWN = 1 +PUD_UP = 2 + +class BaseGPIO(object): + """Base class for implementing simple digital IO for a platform. + Implementors are expected to subclass from this and provide an implementation + of the setup, output, and input functions.""" + + def setup(self, pin, mode, pull_up_down=PUD_OFF): + """Set the input or output mode for a specified pin. Mode should be + either OUT or IN.""" + raise NotImplementedError + + def output(self, pin, value): + """Set the specified pin the provided high/low value. Value should be + either HIGH/LOW or a boolean (true = high).""" + raise NotImplementedError + + def input(self, pin): + """Read the specified pin and return HIGH/true if the pin is pulled high, + or LOW/false if pulled low.""" + raise NotImplementedError + + def set_high(self, pin): + """Set the specified pin HIGH.""" + self.output(pin, HIGH) + + def set_low(self, pin): + """Set the specified pin LOW.""" + self.output(pin, LOW) + + def is_high(self, pin): + """Return true if the specified pin is pulled high.""" + return self.input(pin) == HIGH + + def is_low(self, pin): + """Return true if the specified pin is pulled low.""" + return self.input(pin) == LOW + + +# Basic implementation of multiple pin methods just loops through pins and +# processes each one individually. This is not optimal, but derived classes can +# provide a more optimal implementation that deals with groups of pins +# simultaneously. +# See MCP230xx or PCF8574 classes for examples of optimized implementations. + + def output_pins(self, pins): + """Set multiple pins high or low at once. Pins should be a dict of pin + name to pin value (HIGH/True for 1, LOW/False for 0). All provided pins + will be set to the given values. + """ + # General implementation just loops through pins and writes them out + # manually. This is not optimized, but subclasses can choose to implement + # a more optimal batch output implementation. See the MCP230xx class for + # example of optimized implementation. + for pin, value in iter(pins.items()): + self.output(pin, value) + + def setup_pins(self, pins): + """Setup multiple pins as inputs or outputs at once. Pins should be a + dict of pin name to pin type (IN or OUT). + """ + # General implementation that can be optimized by derived classes. + for pin, value in iter(pins.items()): + self.setup(pin, value) + + def input_pins(self, pins): + """Read multiple pins specified in the given list and return list of pin values + GPIO.HIGH/True if the pin is pulled high, or GPIO.LOW/False if pulled low. + """ + # General implementation that can be optimized by derived classes. + return [self.input(pin) for pin in pins] + + + def add_event_detect(self, pin, edge): + """Enable edge detection events for a particular GPIO channel. Pin + should be type IN. Edge must be RISING, FALLING or BOTH. + """ + raise NotImplementedError + + def remove_event_detect(self, pin): + """Remove edge detection for a particular GPIO channel. Pin should be + type IN. + """ + raise NotImplementedError + + def add_event_callback(self, pin, callback): + """Add a callback for an event already defined using add_event_detect(). + Pin should be type IN. + """ + raise NotImplementedError + + def event_detected(self, pin): + """Returns True if an edge has occured on a given GPIO. You need to + enable edge detection using add_event_detect() first. Pin should be + type IN. + """ + raise NotImplementedError + + def wait_for_edge(self, pin, edge): + """Wait for an edge. Pin should be type IN. Edge must be RISING, + FALLING or BOTH.""" + raise NotImplementedError + + def cleanup(self, pin=None): + """Clean up GPIO event detection for specific pin, or all pins if none + is specified. + """ + raise NotImplementedError + + +# helper functions useful to derived classes + + def _validate_pin(self, pin): + # Raise an exception if pin is outside the range of allowed values. + if pin < 0 or pin >= self.NUM_GPIO: + raise ValueError('Invalid GPIO value, must be between 0 and {0}.'.format(self.NUM_GPIO)) + + def _bit2(self, src, bit, val): + bit = 1 << bit + return (src | bit) if val else (src & ~bit) + + +class RPiGPIOAdapter(BaseGPIO): + """GPIO implementation for the Raspberry Pi using the RPi.GPIO library.""" + + def __init__(self, rpi_gpio, mode=None): + self.rpi_gpio = rpi_gpio + # Suppress warnings about GPIO in use. + rpi_gpio.setwarnings(False) + # Setup board pin mode. + if mode == rpi_gpio.BOARD or mode == rpi_gpio.BCM: + rpi_gpio.setmode(mode) + elif mode is not None: + raise ValueError('Unexpected value for mode. Must be BOARD or BCM.') + else: + # Default to BCM numbering if not told otherwise. + rpi_gpio.setmode(rpi_gpio.BCM) + # Define mapping of Adafruit GPIO library constants to RPi.GPIO constants. + self._dir_mapping = { OUT: rpi_gpio.OUT, + IN: rpi_gpio.IN } + self._pud_mapping = { PUD_OFF: rpi_gpio.PUD_OFF, + PUD_DOWN: rpi_gpio.PUD_DOWN, + PUD_UP: rpi_gpio.PUD_UP } + self._edge_mapping = { RISING: rpi_gpio.RISING, + FALLING: rpi_gpio.FALLING, + BOTH: rpi_gpio.BOTH } + + def setup(self, pin, mode, pull_up_down=PUD_OFF): + """Set the input or output mode for a specified pin. Mode should be + either OUTPUT or INPUT. + """ + self.rpi_gpio.setup(pin, self._dir_mapping[mode], + pull_up_down=self._pud_mapping[pull_up_down]) + + def output(self, pin, value): + """Set the specified pin the provided high/low value. Value should be + either HIGH/LOW or a boolean (true = high). + """ + self.rpi_gpio.output(pin, value) + + def input(self, pin): + """Read the specified pin and return HIGH/true if the pin is pulled high, + or LOW/false if pulled low. + """ + return self.rpi_gpio.input(pin) + + def input_pins(self, pins): + """Read multiple pins specified in the given list and return list of pin values + GPIO.HIGH/True if the pin is pulled high, or GPIO.LOW/False if pulled low. + """ + # maybe rpi has a mass read... it would be more efficient to use it if it exists + return [self.rpi_gpio.input(pin) for pin in pins] + + def add_event_detect(self, pin, edge, callback=None, bouncetime=-1): + """Enable edge detection events for a particular GPIO channel. Pin + should be type IN. Edge must be RISING, FALLING or BOTH. Callback is a + function for the event. Bouncetime is switch bounce timeout in ms for + callback + """ + kwargs = {} + if callback: + kwargs['callback']=callback + if bouncetime > 0: + kwargs['bouncetime']=bouncetime + self.rpi_gpio.add_event_detect(pin, self._edge_mapping[edge], **kwargs) + + def remove_event_detect(self, pin): + """Remove edge detection for a particular GPIO channel. Pin should be + type IN. + """ + self.rpi_gpio.remove_event_detect(pin) + + def add_event_callback(self, pin, callback): + """Add a callback for an event already defined using add_event_detect(). + Pin should be type IN. + """ + self.rpi_gpio.add_event_callback(pin, callback) + + def event_detected(self, pin): + """Returns True if an edge has occured on a given GPIO. You need to + enable edge detection using add_event_detect() first. Pin should be + type IN. + """ + return self.rpi_gpio.event_detected(pin) + + def wait_for_edge(self, pin, edge): + """Wait for an edge. Pin should be type IN. Edge must be RISING, + FALLING or BOTH. + """ + self.rpi_gpio.wait_for_edge(pin, self._edge_mapping[edge]) + + def cleanup(self, pin=None): + """Clean up GPIO event detection for specific pin, or all pins if none + is specified. + """ + if pin is None: + self.rpi_gpio.cleanup() + else: + self.rpi_gpio.cleanup(pin) + +class AdafruitBBIOAdapter(BaseGPIO): + """GPIO implementation for the Beaglebone Black using the Adafruit_BBIO + library. + """ + + def __init__(self, bbio_gpio): + self.bbio_gpio = bbio_gpio + # Define mapping of Adafruit GPIO library constants to RPi.GPIO constants. + self._dir_mapping = { OUT: bbio_gpio.OUT, + IN: bbio_gpio.IN } + self._pud_mapping = { PUD_OFF: bbio_gpio.PUD_OFF, + PUD_DOWN: bbio_gpio.PUD_DOWN, + PUD_UP: bbio_gpio.PUD_UP } + self._edge_mapping = { RISING: bbio_gpio.RISING, + FALLING: bbio_gpio.FALLING, + BOTH: bbio_gpio.BOTH } + + def setup(self, pin, mode, pull_up_down=PUD_OFF): + """Set the input or output mode for a specified pin. Mode should be + either OUTPUT or INPUT. + """ + self.bbio_gpio.setup(pin, self._dir_mapping[mode], + pull_up_down=self._pud_mapping[pull_up_down]) + + def output(self, pin, value): + """Set the specified pin the provided high/low value. Value should be + either HIGH/LOW or a boolean (true = high). + """ + self.bbio_gpio.output(pin, value) + + def input(self, pin): + """Read the specified pin and return HIGH/true if the pin is pulled high, + or LOW/false if pulled low. + """ + return self.bbio_gpio.input(pin) + + def input_pins(self, pins): + """Read multiple pins specified in the given list and return list of pin values + GPIO.HIGH/True if the pin is pulled high, or GPIO.LOW/False if pulled low. + """ + # maybe bbb has a mass read... it would be more efficient to use it if it exists + return [self.bbio_gpio.input(pin) for pin in pins] + + def add_event_detect(self, pin, edge, callback=None, bouncetime=-1): + """Enable edge detection events for a particular GPIO channel. Pin + should be type IN. Edge must be RISING, FALLING or BOTH. Callback is a + function for the event. Bouncetime is switch bounce timeout in ms for + callback + """ + kwargs = {} + if callback: + kwargs['callback']=callback + if bouncetime > 0: + kwargs['bouncetime']=bouncetime + self.bbio_gpio.add_event_detect(pin, self._edge_mapping[edge], **kwargs) + + def remove_event_detect(self, pin): + """Remove edge detection for a particular GPIO channel. Pin should be + type IN. + """ + self.bbio_gpio.remove_event_detect(pin) + + def add_event_callback(self, pin, callback, bouncetime=-1): + """Add a callback for an event already defined using add_event_detect(). + Pin should be type IN. Bouncetime is switch bounce timeout in ms for + callback + """ + kwargs = {} + if bouncetime > 0: + kwargs['bouncetime']=bouncetime + self.bbio_gpio.add_event_callback(pin, callback, **kwargs) + + def event_detected(self, pin): + """Returns True if an edge has occured on a given GPIO. You need to + enable edge detection using add_event_detect() first. Pin should be + type IN. + """ + return self.bbio_gpio.event_detected(pin) + + def wait_for_edge(self, pin, edge): + """Wait for an edge. Pin should be type IN. Edge must be RISING, + FALLING or BOTH. + """ + self.bbio_gpio.wait_for_edge(pin, self._edge_mapping[edge]) + + def cleanup(self, pin=None): + """Clean up GPIO event detection for specific pin, or all pins if none + is specified. + """ + if pin is None: + self.bbio_gpio.cleanup() + else: + self.bbio_gpio.cleanup(pin) + +class AdafruitMinnowAdapter(BaseGPIO): + """GPIO implementation for the Minnowboard + MAX using the mraa library""" + + def __init__(self,mraa_gpio): + self.mraa_gpio = mraa_gpio + # Define mapping of Adafruit GPIO library constants to mraa constants + self._dir_mapping = { OUT: self.mraa_gpio.DIR_OUT, + IN: self.mraa_gpio.DIR_IN } + self._pud_mapping = { PUD_OFF: self.mraa_gpio.MODE_STRONG, + PUD_UP: self.mraa_gpio.MODE_HIZ, + PUD_DOWN: self.mraa_gpio.MODE_PULLDOWN } + self._edge_mapping = { RISING: self.mraa_gpio.EDGE_RISING, + FALLING: self.mraa_gpio.EDGE_FALLING, + BOTH: self.mraa_gpio.EDGE_BOTH } + + def setup(self,pin,mode): + """Set the input or output mode for a specified pin. Mode should be + either DIR_IN or DIR_OUT. + """ + self.mraa_gpio.Gpio.dir(self.mraa_gpio.Gpio(pin),self._dir_mapping[mode]) + + def output(self,pin,value): + """Set the specified pin the provided high/low value. Value should be + either 1 (ON or HIGH), or 0 (OFF or LOW) or a boolean. + """ + self.mraa_gpio.Gpio.write(self.mraa_gpio.Gpio(pin), value) + + def input(self,pin): + """Read the specified pin and return HIGH/true if the pin is pulled high, + or LOW/false if pulled low. + """ + return self.mraa_gpio.Gpio.read(self.mraa_gpio.Gpio(pin)) + + def add_event_detect(self, pin, edge, callback=None, bouncetime=-1): + """Enable edge detection events for a particular GPIO channel. Pin + should be type IN. Edge must be RISING, FALLING or BOTH. Callback is a + function for the event. Bouncetime is switch bounce timeout in ms for + callback + """ + kwargs = {} + if callback: + kwargs['callback']=callback + if bouncetime > 0: + kwargs['bouncetime']=bouncetime + self.mraa_gpio.Gpio.isr(self.mraa_gpio.Gpio(pin), self._edge_mapping[edge], **kwargs) + + def remove_event_detect(self, pin): + """Remove edge detection for a particular GPIO channel. Pin should be + type IN. + """ + self.mraa_gpio.Gpio.isrExit(self.mraa_gpio.Gpio(pin)) + + def wait_for_edge(self, pin, edge): + """Wait for an edge. Pin should be type IN. Edge must be RISING, + FALLING or BOTH. + """ + self.bbio_gpio.wait_for_edge(self.mraa_gpio.Gpio(pin), self._edge_mapping[edge]) + +def get_platform_gpio(**keywords): + """Attempt to return a GPIO instance for the platform which the code is being + executed on. Currently supports only the Raspberry Pi using the RPi.GPIO + library and Beaglebone Black using the Adafruit_BBIO library. Will throw an + exception if a GPIO instance can't be created for the current platform. The + returned GPIO object is an instance of BaseGPIO. + """ + plat = Platform.platform_detect() + if plat == Platform.RASPBERRY_PI: + import RPi.GPIO + return RPiGPIOAdapter(RPi.GPIO, **keywords) + elif plat == Platform.BEAGLEBONE_BLACK: + import Adafruit_BBIO.GPIO + return AdafruitBBIOAdapter(Adafruit_BBIO.GPIO, **keywords) + elif plat == Platform.MINNOWBOARD: + import mraa + return AdafruitMinnowAdapter(mraa, **keywords) + elif plat == Platform.UNKNOWN: + raise RuntimeError('Could not determine platform.') diff --git a/lib/Adafruit_Python_GPIO/Adafruit_GPIO/I2C.py b/lib/Adafruit_Python_GPIO/Adafruit_GPIO/I2C.py new file mode 100644 index 0000000..765ed82 --- /dev/null +++ b/lib/Adafruit_Python_GPIO/Adafruit_GPIO/I2C.py @@ -0,0 +1,202 @@ +# Copyright (c) 2014 Adafruit Industries +# Author: Tony DiCola +# Based on Adafruit_I2C.py created by Kevin Townsend. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +import logging +import os +import subprocess + +import Adafruit_GPIO.Platform as Platform + + +def reverseByteOrder(data): + """DEPRECATED: See https://github.com/adafruit/Adafruit_Python_GPIO/issues/48""" + # # Courtesy Vishal Sapre + # byteCount = len(hex(data)[2:].replace('L','')[::2]) + # val = 0 + # for i in range(byteCount): + # val = (val << 8) | (data & 0xff) + # data >>= 8 + # return val + raise RuntimeError('reverseByteOrder is deprecated! See: https://github.com/adafruit/Adafruit_Python_GPIO/issues/48') + +def get_default_bus(): + """Return the default bus number based on the device platform. For a + Raspberry Pi either bus 0 or 1 (based on the Pi revision) will be returned. + For a Beaglebone Black the first user accessible bus, 1, will be returned. + """ + plat = Platform.platform_detect() + if plat == Platform.RASPBERRY_PI: + if Platform.pi_revision() == 1: + # Revision 1 Pi uses I2C bus 0. + return 0 + else: + # Revision 2 Pi uses I2C bus 1. + return 1 + elif plat == Platform.BEAGLEBONE_BLACK: + # Beaglebone Black has multiple I2C buses, default to 1 (P9_19 and P9_20). + return 1 + else: + raise RuntimeError('Could not determine default I2C bus for platform.') + +def get_i2c_device(address, busnum=None, i2c_interface=None, **kwargs): + """Return an I2C device for the specified address and on the specified bus. + If busnum isn't specified, the default I2C bus for the platform will attempt + to be detected. + """ + if busnum is None: + busnum = get_default_bus() + return Device(address, busnum, i2c_interface, **kwargs) + +def require_repeated_start(): + """Enable repeated start conditions for I2C register reads. This is the + normal behavior for I2C, however on some platforms like the Raspberry Pi + there are bugs which disable repeated starts unless explicitly enabled with + this function. See this thread for more details: + http://www.raspberrypi.org/forums/viewtopic.php?f=44&t=15840 + """ + plat = Platform.platform_detect() + if plat == Platform.RASPBERRY_PI and os.path.exists('/sys/module/i2c_bcm2708/parameters/combined'): + # On the Raspberry Pi there is a bug where register reads don't send a + # repeated start condition like the kernel smbus I2C driver functions + # define. As a workaround this bit in the BCM2708 driver sysfs tree can + # be changed to enable I2C repeated starts. + subprocess.check_call('chmod 666 /sys/module/i2c_bcm2708/parameters/combined', shell=True) + subprocess.check_call('echo -n 1 > /sys/module/i2c_bcm2708/parameters/combined', shell=True) + # Other platforms are a no-op because they (presumably) have the correct + # behavior and send repeated starts. + + +class Device(object): + """Class for communicating with an I2C device using the adafruit-pureio pure + python smbus library, or other smbus compatible I2C interface. Allows reading + and writing 8-bit, 16-bit, and byte array values to registers + on the device.""" + def __init__(self, address, busnum, i2c_interface=None): + """Create an instance of the I2C device at the specified address on the + specified I2C bus number.""" + self._address = address + if i2c_interface is None: + # Use pure python I2C interface if none is specified. + import Adafruit_PureIO.smbus + self._bus = Adafruit_PureIO.smbus.SMBus(busnum) + else: + # Otherwise use the provided class to create an smbus interface. + self._bus = i2c_interface(busnum) + self._logger = logging.getLogger('Adafruit_I2C.Device.Bus.{0}.Address.{1:#0X}' \ + .format(busnum, address)) + + def writeRaw8(self, value): + """Write an 8-bit value on the bus (without register).""" + value = value & 0xFF + self._bus.write_byte(self._address, value) + self._logger.debug("Wrote 0x%02X", + value) + + def write8(self, register, value): + """Write an 8-bit value to the specified register.""" + value = value & 0xFF + self._bus.write_byte_data(self._address, register, value) + self._logger.debug("Wrote 0x%02X to register 0x%02X", + value, register) + + def write16(self, register, value): + """Write a 16-bit value to the specified register.""" + value = value & 0xFFFF + self._bus.write_word_data(self._address, register, value) + self._logger.debug("Wrote 0x%04X to register pair 0x%02X, 0x%02X", + value, register, register+1) + + def writeList(self, register, data): + """Write bytes to the specified register.""" + self._bus.write_i2c_block_data(self._address, register, data) + self._logger.debug("Wrote to register 0x%02X: %s", + register, data) + + def readList(self, register, length): + """Read a length number of bytes from the specified register. Results + will be returned as a bytearray.""" + results = self._bus.read_i2c_block_data(self._address, register, length) + self._logger.debug("Read the following from register 0x%02X: %s", + register, results) + return results + + def readRaw8(self): + """Read an 8-bit value on the bus (without register).""" + result = self._bus.read_byte(self._address) & 0xFF + self._logger.debug("Read 0x%02X", + result) + return result + + def readU8(self, register): + """Read an unsigned byte from the specified register.""" + result = self._bus.read_byte_data(self._address, register) & 0xFF + self._logger.debug("Read 0x%02X from register 0x%02X", + result, register) + return result + + def readS8(self, register): + """Read a signed byte from the specified register.""" + result = self.readU8(register) + if result > 127: + result -= 256 + return result + + def readU16(self, register, little_endian=True): + """Read an unsigned 16-bit value from the specified register, with the + specified endianness (default little endian, or least significant byte + first).""" + result = self._bus.read_word_data(self._address,register) & 0xFFFF + self._logger.debug("Read 0x%04X from register pair 0x%02X, 0x%02X", + result, register, register+1) + # Swap bytes if using big endian because read_word_data assumes little + # endian on ARM (little endian) systems. + if not little_endian: + result = ((result << 8) & 0xFF00) + (result >> 8) + return result + + def readS16(self, register, little_endian=True): + """Read a signed 16-bit value from the specified register, with the + specified endianness (default little endian, or least significant byte + first).""" + result = self.readU16(register, little_endian) + if result > 32767: + result -= 65536 + return result + + def readU16LE(self, register): + """Read an unsigned 16-bit value from the specified register, in little + endian byte order.""" + return self.readU16(register, little_endian=True) + + def readU16BE(self, register): + """Read an unsigned 16-bit value from the specified register, in big + endian byte order.""" + return self.readU16(register, little_endian=False) + + def readS16LE(self, register): + """Read a signed 16-bit value from the specified register, in little + endian byte order.""" + return self.readS16(register, little_endian=True) + + def readS16BE(self, register): + """Read a signed 16-bit value from the specified register, in big + endian byte order.""" + return self.readS16(register, little_endian=False) diff --git a/lib/Adafruit_Python_GPIO/Adafruit_GPIO/MCP230xx.py b/lib/Adafruit_Python_GPIO/Adafruit_GPIO/MCP230xx.py new file mode 100644 index 0000000..3ba8b5f --- /dev/null +++ b/lib/Adafruit_Python_GPIO/Adafruit_GPIO/MCP230xx.py @@ -0,0 +1,165 @@ +# Copyright (c) 2014 Adafruit Industries +# Author: Tony DiCola +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +import math + +import Adafruit_GPIO as GPIO +import Adafruit_GPIO.I2C as I2C + + +class MCP230xxBase(GPIO.BaseGPIO): + """Base class to represent an MCP230xx series GPIO extender. Is compatible + with the Adafruit_GPIO BaseGPIO class so it can be used as a custom GPIO + class for interacting with device. + """ + + def __init__(self, address, i2c=None, **kwargs): + """Initialize MCP230xx at specified I2C address and bus number. If bus + is not specified it will default to the appropriate platform detected bus. + """ + # Create I2C device. + if i2c is None: + import Adafruit_GPIO.I2C as I2C + i2c = I2C + self._device = i2c.get_i2c_device(address, **kwargs) + # Assume starting in ICON.BANK = 0 mode (sequential access). + # Compute how many bytes are needed to store count of GPIO. + self.gpio_bytes = int(math.ceil(self.NUM_GPIO/8.0)) + # Buffer register values so they can be changed without reading. + self.iodir = [0xFF]*self.gpio_bytes # Default direction to all inputs. + self.gppu = [0x00]*self.gpio_bytes # Default to pullups disabled. + self.gpio = [0x00]*self.gpio_bytes + # Write current direction and pullup buffer state. + self.write_iodir() + self.write_gppu() + + + def setup(self, pin, value): + """Set the input or output mode for a specified pin. Mode should be + either GPIO.OUT or GPIO.IN. + """ + self._validate_pin(pin) + # Set bit to 1 for input or 0 for output. + if value == GPIO.IN: + self.iodir[int(pin/8)] |= 1 << (int(pin%8)) + elif value == GPIO.OUT: + self.iodir[int(pin/8)] &= ~(1 << (int(pin%8))) + else: + raise ValueError('Unexpected value. Must be GPIO.IN or GPIO.OUT.') + self.write_iodir() + + + def output(self, pin, value): + """Set the specified pin the provided high/low value. Value should be + either GPIO.HIGH/GPIO.LOW or a boolean (True = HIGH). + """ + self.output_pins({pin: value}) + + def output_pins(self, pins): + """Set multiple pins high or low at once. Pins should be a dict of pin + name to pin value (HIGH/True for 1, LOW/False for 0). All provided pins + will be set to the given values. + """ + [self._validate_pin(pin) for pin in pins.keys()] + # Set each changed pin's bit. + for pin, value in iter(pins.items()): + if value: + self.gpio[int(pin/8)] |= 1 << (int(pin%8)) + else: + self.gpio[int(pin/8)] &= ~(1 << (int(pin%8))) + # Write GPIO state. + self.write_gpio() + + + def input(self, pin): + """Read the specified pin and return GPIO.HIGH/True if the pin is pulled + high, or GPIO.LOW/False if pulled low. + """ + return self.input_pins([pin])[0] + + def input_pins(self, pins): + """Read multiple pins specified in the given list and return list of pin values + GPIO.HIGH/True if the pin is pulled high, or GPIO.LOW/False if pulled low. + """ + [self._validate_pin(pin) for pin in pins] + # Get GPIO state. + self.gpio = self._device.readList(self.GPIO, self.gpio_bytes) + # Return True if pin's bit is set. + return [(self.gpio[int(pin/8)] & 1 << (int(pin%8))) > 0 for pin in pins] + + + def pullup(self, pin, enabled): + """Turn on the pull-up resistor for the specified pin if enabled is True, + otherwise turn off the pull-up resistor. + """ + self._validate_pin(pin) + if enabled: + self.gppu[int(pin/8)] |= 1 << (int(pin%8)) + else: + self.gppu[int(pin/8)] &= ~(1 << (int(pin%8))) + self.write_gppu() + + def write_gpio(self, gpio=None): + """Write the specified byte value to the GPIO registor. If no value + specified the current buffered value will be written. + """ + if gpio is not None: + self.gpio = gpio + self._device.writeList(self.GPIO, self.gpio) + + def write_iodir(self, iodir=None): + """Write the specified byte value to the IODIR registor. If no value + specified the current buffered value will be written. + """ + if iodir is not None: + self.iodir = iodir + self._device.writeList(self.IODIR, self.iodir) + + def write_gppu(self, gppu=None): + """Write the specified byte value to the GPPU registor. If no value + specified the current buffered value will be written. + """ + if gppu is not None: + self.gppu = gppu + self._device.writeList(self.GPPU, self.gppu) + + +class MCP23017(MCP230xxBase): + """MCP23017-based GPIO class with 16 GPIO pins.""" + # Define number of pins and registor addresses. + NUM_GPIO = 16 + IODIR = 0x00 + GPIO = 0x12 + GPPU = 0x0C + + def __init__(self, address=0x20, **kwargs): + super(MCP23017, self).__init__(address, **kwargs) + + +class MCP23008(MCP230xxBase): + """MCP23008-based GPIO class with 8 GPIO pins.""" + # Define number of pins and registor addresses. + NUM_GPIO = 8 + IODIR = 0x00 + GPIO = 0x09 + GPPU = 0x06 + + def __init__(self, address=0x20, **kwargs): + super(MCP23008, self).__init__(address, **kwargs) diff --git a/lib/Adafruit_Python_GPIO/Adafruit_GPIO/PCA95xx.py b/lib/Adafruit_Python_GPIO/Adafruit_GPIO/PCA95xx.py new file mode 100644 index 0000000..2a3b406 --- /dev/null +++ b/lib/Adafruit_Python_GPIO/Adafruit_GPIO/PCA95xx.py @@ -0,0 +1,121 @@ +''' +Adafruit compatible using BaseGPIO class to represent a PCA9555 IO expander +Copyright (C) 2016 Matias Vidal +Ported from: https://github.com/dberlin/PCA95XX + +# Copyright 2012 Daniel Berlin + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +of the Software, and to permit persons to whom the Software is furnished to do +so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE.''' + +import Adafruit_GPIO as GPIO +import Adafruit_GPIO.I2C as I2C + +# For the PCA 953X and 955X series, the chips with 8 GPIO's have these port numbers +# The chips with 16 GPIO's have the first port for each type at double these numbers +# IE The first config port is 6 + +INPUT_PORT = 0 +OUTPUT_PORT = 1 +POLARITY_PORT = 2 +CONFIG_PORT = 3 + +IN = GPIO.IN +OUT = GPIO.OUT +HIGH = GPIO.HIGH +LOW = GPIO.LOW + + +class PCA9555(GPIO.BaseGPIO): + """Class to represent a PCA9555 GPIO extender. Compatible + with the Adafruit_GPIO BaseGPIO class so it can be used as a custom GPIO + class for interacting with device. + """ + NUM_GPIO = 16 + + def __init__(self, address=0x20, busnum=None, i2c=None, num_gpios=16, **kwargs): + address = int(address) + self.__name__ = "PCA955" + # Create I2C device. + i2c = i2c or I2C + busnum = busnum or i2c.get_default_bus() + self._device = i2c.get_i2c_device(address, busnum, **kwargs) + self.num_gpios = num_gpios + + if self.num_gpios <= 8: + self.iodir = self._device.readU8(CONFIG_PORT) + self.outputvalue = self._device.readU8(OUTPUT_PORT) + + elif self.num_gpios > 8 and self.num_gpios <= 16: + self.iodir = self._device.readU16(CONFIG_PORT<< 1) + self.outputvalue = self._device.readU16(OUTPUT_PORT << 1) + + def _changebit(self, bitmap, bit, value): + assert value == 1 or value == 0, "Value is %s must be 1 or 0" % value + if value == 0: + return bitmap & ~(1 << bit) + elif value == 1: + return bitmap | (1 << bit) + + # Change the value of bit PIN on port PORT to VALUE. If the + # current pin state for the port is passed in as PORTSTATE, we + # will avoid doing a read to get it. The port pin state must be + # complete if passed in (IE it should not just be the value of the + # single pin we are trying to change) + def _readandchangepin(self, port, pin, value, portstate = None): + assert pin >= 0 and pin < self.num_gpios, "Pin number %s is invalid, only 0-%s are valid" % (pin, self.num_gpios) + if not portstate: + if self.num_gpios <= 8: + portstate = self._device.readU8(port) + elif self.num_gpios > 8 and self.num_gpios <= 16: + portstate = self._device.readU16(port << 1) + newstate = self._changebit(portstate, pin, value) + if self.num_gpios <= 8: + self._device.write8(port, newstate) + else: + self._device.write16(port << 1, newstate) + return newstate + + # Polarity inversion + def polarity(self, pin, value): + return self._readandchangepin(POLARITY_PORT, pin, value) + + # Pin direction + def config(self, pin, mode): + self.iodir = self._readandchangepin(CONFIG_PORT, pin, mode, self.iodir) + return self.iodir + + def output(self, pin, value): + assert self.iodir & (1 << pin) == 0, "Pin %s not set to output" % pin + self.outputvalue = self._readandchangepin(OUTPUT_PORT, pin, value, self.outputvalue) + return self.outputvalue + + def input(self, pin): + assert self.iodir & (1 << pin) != 0, "Pin %s not set to input" % pin + if self.num_gpios <= 8: + value = self._device.readU8(INPUT_PORT) + elif self.num_gpios > 8 and self.num_gpios <= 16: + value = self._device.readU16(INPUT_PORT << 1) + return value & (1 << pin) + + def setup(self, pin, mode): + self.config(pin, mode) + + def cleanup(self, pin=None): + # nothing to cleanup + pass diff --git a/lib/Adafruit_Python_GPIO/Adafruit_GPIO/PCF8574.py b/lib/Adafruit_Python_GPIO/Adafruit_GPIO/PCF8574.py new file mode 100644 index 0000000..02919d1 --- /dev/null +++ b/lib/Adafruit_Python_GPIO/Adafruit_GPIO/PCF8574.py @@ -0,0 +1,94 @@ +''' +Adafruit compatible using BaseGPIO class to represent a PCF8574/A IO expander +Copyright (C) 2015 Sylvan Butler + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +of the Software, and to permit persons to whom the Software is furnished to do +so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE.''' + +import Adafruit_GPIO as GPIO +import Adafruit_GPIO.I2C as I2C + + + +IN = GPIO.IN +OUT = GPIO.OUT +HIGH = GPIO.HIGH +LOW = GPIO.LOW + + +class PCF8574(GPIO.BaseGPIO): + """Class to represent a PCF8574 or PCF8574A GPIO extender. Compatible + with the Adafruit_GPIO BaseGPIO class so it can be used as a custom GPIO + class for interacting with device. + """ + + NUM_GPIO = 8 + + def __init__(self, address=0x27, busnum=None, i2c=None, **kwargs): + address = int(address) + self.__name__ = \ + "PCF8574" if address in range(0x20, 0x28) else \ + "PCF8574A" if address in range(0x38, 0x40) else \ + "Bad address for PCF8574(A): 0x%02X not in range [0x20..0x27, 0x38..0x3F]" % address + if self.__name__[0] != 'P': + raise ValueError(self.__name__) + # Create I2C device. + i2c = i2c or I2C + busnum = busnum or i2c.get_default_bus() + self._device = i2c.get_i2c_device(address, busnum, **kwargs) + # Buffer register values so they can be changed without reading. + self.iodir = 0xFF # Default direction to all inputs is in + self.gpio = 0x00 + self._write_pins() + + + def _write_pins(self): + self._device.writeRaw8(self.gpio | self.iodir) + + def _read_pins(self): + return self._device.readRaw8() & self.iodir + + + def setup(self, pin, mode): + self.setup_pins({pin: mode}) + + def setup_pins(self, pins): + if False in [y for x,y in [(self._validate_pin(pin),mode in (IN,OUT)) for pin,mode in pins.items()]]: + raise ValueError('Invalid MODE, IN or OUT') + for pin,mode in pins.items(): + self.iodir = self._bit2(self.iodir, pin, mode) + self._write_pins() + + + def output(self, pin, value): + self.output_pins({pin: value}) + + def output_pins(self, pins): + [self._validate_pin(pin) for pin in pins.keys()] + for pin,value in pins.items(): + self.gpio = self._bit2(self.gpio, pin, bool(value)) + self._write_pins() + + + def input(self, pin): + return self.input_pins([pin])[0] + + def input_pins(self, pins): + [self._validate_pin(pin) for pin in pins] + inp = self._read_pins() + return [bool(inp & (1<<pin)) for pin in pins] diff --git a/lib/Adafruit_Python_GPIO/Adafruit_GPIO/PWM.py b/lib/Adafruit_Python_GPIO/Adafruit_GPIO/PWM.py new file mode 100644 index 0000000..1e1717a --- /dev/null +++ b/lib/Adafruit_Python_GPIO/Adafruit_GPIO/PWM.py @@ -0,0 +1,128 @@ +# Copyright (c) 2014 Adafruit Industries +# Author: Tony DiCola +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +import Adafruit_GPIO.Platform as Platform + + +class RPi_PWM_Adapter(object): + """PWM implementation for the Raspberry Pi using the RPi.GPIO PWM library.""" + + def __init__(self, rpi_gpio, mode=None): + self.rpi_gpio = rpi_gpio + # Suppress warnings about GPIO in use. + rpi_gpio.setwarnings(False) + # Set board or BCM pin numbering. + if mode == rpi_gpio.BOARD or mode == rpi_gpio.BCM: + rpi_gpio.setmode(mode) + elif mode is not None: + raise ValueError('Unexpected value for mode. Must be BOARD or BCM.') + else: + # Default to BCM numbering if not told otherwise. + rpi_gpio.setmode(rpi_gpio.BCM) + # Store reference to each created PWM instance. + self.pwm = {} + + def start(self, pin, dutycycle, frequency_hz=2000): + """Enable PWM output on specified pin. Set to intiial percent duty cycle + value (0.0 to 100.0) and frequency (in Hz). + """ + if dutycycle < 0.0 or dutycycle > 100.0: + raise ValueError('Invalid duty cycle value, must be between 0.0 to 100.0 (inclusive).') + # Make pin an output. + self.rpi_gpio.setup(pin, self.rpi_gpio.OUT) + # Create PWM instance and save a reference for later access. + self.pwm[pin] = self.rpi_gpio.PWM(pin, frequency_hz) + # Start the PWM at the specified duty cycle. + self.pwm[pin].start(dutycycle) + + def set_duty_cycle(self, pin, dutycycle): + """Set percent duty cycle of PWM output on specified pin. Duty cycle must + be a value 0.0 to 100.0 (inclusive). + """ + if dutycycle < 0.0 or dutycycle > 100.0: + raise ValueError('Invalid duty cycle value, must be between 0.0 to 100.0 (inclusive).') + if pin not in self.pwm: + raise ValueError('Pin {0} is not configured as a PWM. Make sure to first call start for the pin.'.format(pin)) + self.pwm[pin].ChangeDutyCycle(dutycycle) + + def set_frequency(self, pin, frequency_hz): + """Set frequency (in Hz) of PWM output on specified pin.""" + if pin not in self.pwm: + raise ValueError('Pin {0} is not configured as a PWM. Make sure to first call start for the pin.'.format(pin)) + self.pwm[pin].ChangeFrequency(frequency_hz) + + def stop(self, pin): + """Stop PWM output on specified pin.""" + if pin not in self.pwm: + raise ValueError('Pin {0} is not configured as a PWM. Make sure to first call start for the pin.'.format(pin)) + self.pwm[pin].stop() + del self.pwm[pin] + + +class BBIO_PWM_Adapter(object): + """PWM implementation for the BeagleBone Black using the Adafruit_BBIO.PWM + library. + """ + + def __init__(self, bbio_pwm): + self.bbio_pwm = bbio_pwm + + def start(self, pin, dutycycle, frequency_hz=2000): + """Enable PWM output on specified pin. Set to intiial percent duty cycle + value (0.0 to 100.0) and frequency (in Hz). + """ + if dutycycle < 0.0 or dutycycle > 100.0: + raise ValueError('Invalid duty cycle value, must be between 0.0 to 100.0 (inclusive).') + self.bbio_pwm.start(pin, dutycycle, frequency_hz) + + def set_duty_cycle(self, pin, dutycycle): + """Set percent duty cycle of PWM output on specified pin. Duty cycle must + be a value 0.0 to 100.0 (inclusive). + """ + if dutycycle < 0.0 or dutycycle > 100.0: + raise ValueError('Invalid duty cycle value, must be between 0.0 to 100.0 (inclusive).') + self.bbio_pwm.set_duty_cycle(pin, dutycycle) + + def set_frequency(self, pin, frequency_hz): + """Set frequency (in Hz) of PWM output on specified pin.""" + self.bbio_pwm.set_frequency(pin, frequency_hz) + + def stop(self, pin): + """Stop PWM output on specified pin.""" + self.bbio_pwm.stop(pin) + + +def get_platform_pwm(**keywords): + """Attempt to return a PWM instance for the platform which the code is being + executed on. Currently supports only the Raspberry Pi using the RPi.GPIO + library and Beaglebone Black using the Adafruit_BBIO library. Will throw an + exception if a PWM instance can't be created for the current platform. The + returned PWM object has the same interface as the RPi_PWM_Adapter and + BBIO_PWM_Adapter classes. + """ + plat = Platform.platform_detect() + if plat == Platform.RASPBERRY_PI: + import RPi.GPIO + return RPi_PWM_Adapter(RPi.GPIO, **keywords) + elif plat == Platform.BEAGLEBONE_BLACK: + import Adafruit_BBIO.PWM + return BBIO_PWM_Adapter(Adafruit_BBIO.PWM, **keywords) + elif plat == Platform.UNKNOWN: + raise RuntimeError('Could not determine platform.') diff --git a/lib/Adafruit_Python_GPIO/Adafruit_GPIO/Platform.py b/lib/Adafruit_Python_GPIO/Adafruit_GPIO/Platform.py new file mode 100644 index 0000000..2c041a8 --- /dev/null +++ b/lib/Adafruit_Python_GPIO/Adafruit_GPIO/Platform.py @@ -0,0 +1,110 @@ +# Copyright (c) 2014 Adafruit Industries +# Author: Tony DiCola + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +import platform +import re + +# Platform identification constants. +UNKNOWN = 0 +RASPBERRY_PI = 1 +BEAGLEBONE_BLACK = 2 +MINNOWBOARD = 3 + +def platform_detect(): + """Detect if running on the Raspberry Pi or Beaglebone Black and return the + platform type. Will return RASPBERRY_PI, BEAGLEBONE_BLACK, or UNKNOWN.""" + # Handle Raspberry Pi + pi = pi_version() + if pi is not None: + return RASPBERRY_PI + + # Handle Beaglebone Black + # TODO: Check the Beaglebone Black /proc/cpuinfo value instead of reading + # the platform. + plat = platform.platform() + if plat.lower().find('armv7l-with-debian') > -1: + return BEAGLEBONE_BLACK + elif plat.lower().find('armv7l-with-ubuntu') > -1: + return BEAGLEBONE_BLACK + elif plat.lower().find('armv7l-with-glibc2.4') > -1: + return BEAGLEBONE_BLACK + + # Handle Minnowboard + # Assumption is that mraa is installed + try: + import mraa + if mraa.getPlatformName()=='MinnowBoard MAX': + return MINNOWBOARD + except ImportError: + pass + + # Couldn't figure out the platform, just return unknown. + return UNKNOWN + + +def pi_revision(): + """Detect the revision number of a Raspberry Pi, useful for changing + functionality like default I2C bus based on revision.""" + # Revision list available at: http://elinux.org/RPi_HardwareHistory#Board_Revision_History + with open('/proc/cpuinfo', 'r') as infile: + for line in infile: + # Match a line of the form "Revision : 0002" while ignoring extra + # info in front of the revsion (like 1000 when the Pi was over-volted). + match = re.match('Revision\s+:\s+.*(\w{4})$', line, flags=re.IGNORECASE) + if match and match.group(1) in ['0000', '0002', '0003']: + # Return revision 1 if revision ends with 0000, 0002 or 0003. + return 1 + elif match: + # Assume revision 2 if revision ends with any other 4 chars. + return 2 + # Couldn't find the revision, throw an exception. + raise RuntimeError('Could not determine Raspberry Pi revision.') + + +def pi_version(): + """Detect the version of the Raspberry Pi. Returns either 1, 2 or + None depending on if it's a Raspberry Pi 1 (model A, B, A+, B+), + Raspberry Pi 2 (model B+), or not a Raspberry Pi. + """ + # Check /proc/cpuinfo for the Hardware field value. + # 2708 is pi 1 + # 2709 is pi 2 + # 2835 is pi 3 on 4.9.x kernel + # Anything else is not a pi. + with open('/proc/cpuinfo', 'r') as infile: + cpuinfo = infile.read() + # Match a line like 'Hardware : BCM2709' + match = re.search('^Hardware\s+:\s+(\w+)$', cpuinfo, + flags=re.MULTILINE | re.IGNORECASE) + if not match: + # Couldn't find the hardware, assume it isn't a pi. + return None + if match.group(1) == 'BCM2708': + # Pi 1 + return 1 + elif match.group(1) == 'BCM2709': + # Pi 2 + return 2 + elif match.group(1) == 'BCM2835': + # Pi 3 / Pi on 4.9.x kernel + return 3 + else: + # Something else, not a pi. + return None diff --git a/lib/Adafruit_Python_GPIO/Adafruit_GPIO/SPI.py b/lib/Adafruit_Python_GPIO/Adafruit_GPIO/SPI.py new file mode 100644 index 0000000..c20a32c --- /dev/null +++ b/lib/Adafruit_Python_GPIO/Adafruit_GPIO/SPI.py @@ -0,0 +1,328 @@ +# Copyright (c) 2014 Adafruit Industries +# Author: Tony DiCola +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import operator +import time + +import Adafruit_GPIO as GPIO + + +MSBFIRST = 0 +LSBFIRST = 1 + + +class SpiDev(object): + """Hardware-based SPI implementation using the spidev interface.""" + + def __init__(self, port, device, max_speed_hz=500000): + """Initialize an SPI device using the SPIdev interface. Port and device + identify the device, for example the device /dev/spidev1.0 would be port + 1 and device 0. + """ + import spidev + self._device = spidev.SpiDev() + self._device.open(port, device) + self._device.max_speed_hz=max_speed_hz + # Default to mode 0, and make sure CS is active low. + self._device.mode = 0 + self._device.cshigh = False + + def set_clock_hz(self, hz): + """Set the speed of the SPI clock in hertz. Note that not all speeds + are supported and a lower speed might be chosen by the hardware. + """ + self._device.max_speed_hz=hz + + def set_mode(self, mode): + """Set SPI mode which controls clock polarity and phase. Should be a + numeric value 0, 1, 2, or 3. See wikipedia page for details on meaning: + http://en.wikipedia.org/wiki/Serial_Peripheral_Interface_Bus + """ + if mode < 0 or mode > 3: + raise ValueError('Mode must be a value 0, 1, 2, or 3.') + self._device.mode = mode + + def set_bit_order(self, order): + """Set order of bits to be read/written over serial lines. Should be + either MSBFIRST for most-significant first, or LSBFIRST for + least-signifcant first. + """ + if order == MSBFIRST: + self._device.lsbfirst = False + elif order == LSBFIRST: + self._device.lsbfirst = True + else: + raise ValueError('Order must be MSBFIRST or LSBFIRST.') + + def close(self): + """Close communication with the SPI device.""" + self._device.close() + + def write(self, data): + """Half-duplex SPI write. The specified array of bytes will be clocked + out the MOSI line. + """ + self._device.writebytes(data) + + def read(self, length): + """Half-duplex SPI read. The specified length of bytes will be clocked + in the MISO line and returned as a bytearray object. + """ + return bytearray(self._device.readbytes(length)) + + def transfer(self, data): + """Full-duplex SPI read and write. The specified array of bytes will be + clocked out the MOSI line, while simultaneously bytes will be read from + the MISO line. Read bytes will be returned as a bytearray object. + """ + return bytearray(self._device.xfer2(data)) + +class SpiDevMraa(object): + """Hardware SPI implementation with the mraa library on Minnowboard""" + def __init__(self, port, device, max_speed_hz=500000): + import mraa + self._device = mraa.Spi(0) + self._device.mode(0) + + def set_clock_hz(self, hz): + """Set the speed of the SPI clock in hertz. Note that not all speeds + are supported and a lower speed might be chosen by the hardware. + """ + self._device.frequency(hz) + + def set_mode(self,mode): + """Set SPI mode which controls clock polarity and phase. Should be a + numeric value 0, 1, 2, or 3. See wikipedia page for details on meaning: + http://en.wikipedia.org/wiki/Serial_Peripheral_Interface_Bus + """ + if mode < 0 or mode > 3: + raise ValueError('Mode must be a value 0, 1, 2, or 3.') + self._device.mode(mode) + + def set_bit_order(self, order): + """Set order of bits to be read/written over serial lines. Should be + either MSBFIRST for most-significant first, or LSBFIRST for + least-signifcant first. + """ + if order == MSBFIRST: + self._device.lsbmode(False) + elif order == LSBFIRST: + self._device.lsbmode(True) + else: + raise ValueError('Order must be MSBFIRST or LSBFIRST.') + + def close(self): + """Close communication with the SPI device.""" + self._device.Spi() + + def write(self, data): + """Half-duplex SPI write. The specified array of bytes will be clocked + out the MOSI line. + """ + self._device.write(bytearray(data)) + +class BitBang(object): + """Software-based implementation of the SPI protocol over GPIO pins.""" + + def __init__(self, gpio, sclk, mosi=None, miso=None, ss=None): + """Initialize bit bang (or software) based SPI. Must provide a BaseGPIO + class, the SPI clock, and optionally MOSI, MISO, and SS (slave select) + pin numbers. If MOSI is set to None then writes will be disabled and fail + with an error, likewise for MISO reads will be disabled. If SS is set to + None then SS will not be asserted high/low by the library when + transfering data. + """ + self._gpio = gpio + self._sclk = sclk + self._mosi = mosi + self._miso = miso + self._ss = ss + # Set pins as outputs/inputs. + gpio.setup(sclk, GPIO.OUT) + if mosi is not None: + gpio.setup(mosi, GPIO.OUT) + if miso is not None: + gpio.setup(miso, GPIO.IN) + if ss is not None: + gpio.setup(ss, GPIO.OUT) + # Assert SS high to start with device communication off. + gpio.set_high(ss) + # Assume mode 0. + self.set_mode(0) + # Assume most significant bit first order. + self.set_bit_order(MSBFIRST) + + def set_clock_hz(self, hz): + """Set the speed of the SPI clock. This is unsupported with the bit + bang SPI class and will be ignored. + """ + pass + + def set_mode(self, mode): + """Set SPI mode which controls clock polarity and phase. Should be a + numeric value 0, 1, 2, or 3. See wikipedia page for details on meaning: + http://en.wikipedia.org/wiki/Serial_Peripheral_Interface_Bus + """ + if mode < 0 or mode > 3: + raise ValueError('Mode must be a value 0, 1, 2, or 3.') + if mode & 0x02: + # Clock is normally high in mode 2 and 3. + self._clock_base = GPIO.HIGH + else: + # Clock is normally low in mode 0 and 1. + self._clock_base = GPIO.LOW + if mode & 0x01: + # Read on trailing edge in mode 1 and 3. + self._read_leading = False + else: + # Read on leading edge in mode 0 and 2. + self._read_leading = True + # Put clock into its base state. + self._gpio.output(self._sclk, self._clock_base) + + def set_bit_order(self, order): + """Set order of bits to be read/written over serial lines. Should be + either MSBFIRST for most-significant first, or LSBFIRST for + least-signifcant first. + """ + # Set self._mask to the bitmask which points at the appropriate bit to + # read or write, and appropriate left/right shift operator function for + # reading/writing. + if order == MSBFIRST: + self._mask = 0x80 + self._write_shift = operator.lshift + self._read_shift = operator.rshift + elif order == LSBFIRST: + self._mask = 0x01 + self._write_shift = operator.rshift + self._read_shift = operator.lshift + else: + raise ValueError('Order must be MSBFIRST or LSBFIRST.') + + def close(self): + """Close the SPI connection. Unused in the bit bang implementation.""" + pass + + def write(self, data, assert_ss=True, deassert_ss=True): + """Half-duplex SPI write. If assert_ss is True, the SS line will be + asserted low, the specified bytes will be clocked out the MOSI line, and + if deassert_ss is True the SS line be put back high. + """ + # Fail MOSI is not specified. + if self._mosi is None: + raise RuntimeError('Write attempted with no MOSI pin specified.') + if assert_ss and self._ss is not None: + self._gpio.set_low(self._ss) + for byte in data: + for i in range(8): + # Write bit to MOSI. + if self._write_shift(byte, i) & self._mask: + self._gpio.set_high(self._mosi) + else: + self._gpio.set_low(self._mosi) + # Flip clock off base. + self._gpio.output(self._sclk, not self._clock_base) + # Return clock to base. + self._gpio.output(self._sclk, self._clock_base) + if deassert_ss and self._ss is not None: + self._gpio.set_high(self._ss) + + def read(self, length, assert_ss=True, deassert_ss=True): + """Half-duplex SPI read. If assert_ss is true, the SS line will be + asserted low, the specified length of bytes will be clocked in the MISO + line, and if deassert_ss is true the SS line will be put back high. + Bytes which are read will be returned as a bytearray object. + """ + if self._miso is None: + raise RuntimeError('Read attempted with no MISO pin specified.') + if assert_ss and self._ss is not None: + self._gpio.set_low(self._ss) + result = bytearray(length) + for i in range(length): + for j in range(8): + # Flip clock off base. + self._gpio.output(self._sclk, not self._clock_base) + # Handle read on leading edge of clock. + if self._read_leading: + if self._gpio.is_high(self._miso): + # Set bit to 1 at appropriate location. + result[i] |= self._read_shift(self._mask, j) + else: + # Set bit to 0 at appropriate location. + result[i] &= ~self._read_shift(self._mask, j) + # Return clock to base. + self._gpio.output(self._sclk, self._clock_base) + # Handle read on trailing edge of clock. + if not self._read_leading: + if self._gpio.is_high(self._miso): + # Set bit to 1 at appropriate location. + result[i] |= self._read_shift(self._mask, j) + else: + # Set bit to 0 at appropriate location. + result[i] &= ~self._read_shift(self._mask, j) + if deassert_ss and self._ss is not None: + self._gpio.set_high(self._ss) + return result + + def transfer(self, data, assert_ss=True, deassert_ss=True): + """Full-duplex SPI read and write. If assert_ss is true, the SS line + will be asserted low, the specified bytes will be clocked out the MOSI + line while bytes will also be read from the MISO line, and if + deassert_ss is true the SS line will be put back high. Bytes which are + read will be returned as a bytearray object. + """ + if self._mosi is None: + raise RuntimeError('Write attempted with no MOSI pin specified.') + if self._mosi is None: + raise RuntimeError('Read attempted with no MISO pin specified.') + if assert_ss and self._ss is not None: + self._gpio.set_low(self._ss) + result = bytearray(len(data)) + for i in range(len(data)): + for j in range(8): + # Write bit to MOSI. + if self._write_shift(data[i], j) & self._mask: + self._gpio.set_high(self._mosi) + else: + self._gpio.set_low(self._mosi) + # Flip clock off base. + self._gpio.output(self._sclk, not self._clock_base) + # Handle read on leading edge of clock. + if self._read_leading: + if self._gpio.is_high(self._miso): + # Set bit to 1 at appropriate location. + result[i] |= self._read_shift(self._mask, j) + else: + # Set bit to 0 at appropriate location. + result[i] &= ~self._read_shift(self._mask, j) + # Return clock to base. + self._gpio.output(self._sclk, self._clock_base) + # Handle read on trailing edge of clock. + if not self._read_leading: + if self._gpio.is_high(self._miso): + # Set bit to 1 at appropriate location. + result[i] |= self._read_shift(self._mask, j) + else: + # Set bit to 0 at appropriate location. + result[i] &= ~self._read_shift(self._mask, j) + if deassert_ss and self._ss is not None: + self._gpio.set_high(self._ss) + return result diff --git a/lib/Adafruit_Python_GPIO/Adafruit_GPIO/__init__.py b/lib/Adafruit_Python_GPIO/Adafruit_GPIO/__init__.py new file mode 100644 index 0000000..7e99604 --- /dev/null +++ b/lib/Adafruit_Python_GPIO/Adafruit_GPIO/__init__.py @@ -0,0 +1,3 @@ +from __future__ import absolute_import + +from Adafruit_GPIO.GPIO import * diff --git a/lib/Adafruit_Python_GPIO/LICENSE b/lib/Adafruit_Python_GPIO/LICENSE new file mode 100644 index 0000000..35a35d3 --- /dev/null +++ b/lib/Adafruit_Python_GPIO/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2014 Adafruit Industries + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE.
\ No newline at end of file diff --git a/lib/Adafruit_Python_GPIO/README.md b/lib/Adafruit_Python_GPIO/README.md new file mode 100644 index 0000000..a1dbeef --- /dev/null +++ b/lib/Adafruit_Python_GPIO/README.md @@ -0,0 +1,39 @@ +Adafruit Python GPIO Library +============================ + +Library to provide a cross-platform GPIO interface on the Raspberry Pi and Beaglebone Black using the [RPi.GPIO](https://pypi.python.org/pypi/RPi.GPIO) and [Adafruit_BBIO](https://pypi.python.org/pypi/Adafruit_BBIO) libraries. + +The library is currently in an early stage, but you can see how its used in the [Adafruit Nokia LCD library](https://github.com/adafruit/Adafruit_Nokia_LCD) to write Python code that is easily portable between the Raspberry Pi and Beaglebone Black. + +Note that you typically don't need to install this library directly as other libraries will depend on it in their setup and automatically install it. However if you do need to manually install do so by running these commands: + +- On a Debian-based Linux like Raspbian, Ubuntu, etc. in a terminal execute: + + ``` + sudo apt-get update + sudo apt-get install build-essential python-pip python-dev python-smbus git + git clone https://github.com/adafruit/Adafruit_Python_GPIO.git + cd Adafruit_Python_GPIO + sudo python setup.py install + ``` + +- On Mac OSX, first install PIP by [downloading the python script here](https://bootstrap.pypa.io/get-pip.py) and execute it with `python get-pip.py` in a terminal, then install the [git source control system](http://git-scm.com/downloads). Then in a terminal execute: + + ``` + git clone https://github.com/adafruit/Adafruit_Python_GPIO.git + cd Adafruit_Python_GPIO + sudo python setup.py install + ``` + +- On Windows, first install the [latest Python 2.7 version](https://www.python.org/downloads/windows/), then install PIP by [downloading the python script here](https://bootstrap.pypa.io/get-pip.py) and execute it with `python get-pip.py` in a terminal, and finally install the [git source control system](http://git-scm.com/downloads). Then in a git bash prompt execute: + + ``` + git clone https://github.com/adafruit/Adafruit_Python_GPIO.git + cd Adafruit_Python_GPIO + python setup.py install + ``` + +Contributing +------------ + +For information on contributing, such as how to run tests, etc. please see the [project wiki](https://github.com/adafruit/Adafruit_Python_GPIO/wiki/Running-Tests) on GitHub. diff --git a/lib/Adafruit_Python_GPIO/ez_setup.py b/lib/Adafruit_Python_GPIO/ez_setup.py new file mode 100644 index 0000000..23ea9a2 --- /dev/null +++ b/lib/Adafruit_Python_GPIO/ez_setup.py @@ -0,0 +1,332 @@ +#!/usr/bin/env python +"""Bootstrap setuptools installation + +To use setuptools in your package's setup.py, include this +file in the same directory and add this to the top of your setup.py:: + + from ez_setup import use_setuptools + use_setuptools() + +To require a specific version of setuptools, set a download +mirror, or use an alternate download directory, simply supply +the appropriate options to ``use_setuptools()``. + +This file can also be run as a script to install or upgrade setuptools. +""" +import os +import shutil +import sys +import tempfile +import zipfile +import optparse +import subprocess +import platform +import textwrap +import contextlib + +from distutils import log + +try: + from site import USER_SITE +except ImportError: + USER_SITE = None + +DEFAULT_VERSION = "3.5.1" +DEFAULT_URL = "https://pypi.python.org/packages/source/s/setuptools/" + +def _python_cmd(*args): + """ + Return True if the command succeeded. + """ + args = (sys.executable,) + args + return subprocess.call(args) == 0 + + +def _install(archive_filename, install_args=()): + with archive_context(archive_filename): + # installing + log.warn('Installing Setuptools') + if not _python_cmd('setup.py', 'install', *install_args): + log.warn('Something went wrong during the installation.') + log.warn('See the error message above.') + # exitcode will be 2 + return 2 + + +def _build_egg(egg, archive_filename, to_dir): + with archive_context(archive_filename): + # building an egg + log.warn('Building a Setuptools egg in %s', to_dir) + _python_cmd('setup.py', '-q', 'bdist_egg', '--dist-dir', to_dir) + # returning the result + log.warn(egg) + if not os.path.exists(egg): + raise IOError('Could not build the egg.') + + +def get_zip_class(): + """ + Supplement ZipFile class to support context manager for Python 2.6 + """ + class ContextualZipFile(zipfile.ZipFile): + def __enter__(self): + return self + def __exit__(self, type, value, traceback): + self.close + return zipfile.ZipFile if hasattr(zipfile.ZipFile, '__exit__') else \ + ContextualZipFile + + +@contextlib.contextmanager +def archive_context(filename): + # extracting the archive + tmpdir = tempfile.mkdtemp() + log.warn('Extracting in %s', tmpdir) + old_wd = os.getcwd() + try: + os.chdir(tmpdir) + with get_zip_class()(filename) as archive: + archive.extractall() + + # going in the directory + subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0]) + os.chdir(subdir) + log.warn('Now working in %s', subdir) + yield + + finally: + os.chdir(old_wd) + shutil.rmtree(tmpdir) + + +def _do_download(version, download_base, to_dir, download_delay): + egg = os.path.join(to_dir, 'setuptools-%s-py%d.%d.egg' + % (version, sys.version_info[0], sys.version_info[1])) + if not os.path.exists(egg): + archive = download_setuptools(version, download_base, + to_dir, download_delay) + _build_egg(egg, archive, to_dir) + sys.path.insert(0, egg) + + # Remove previously-imported pkg_resources if present (see + # https://bitbucket.org/pypa/setuptools/pull-request/7/ for details). + if 'pkg_resources' in sys.modules: + del sys.modules['pkg_resources'] + + import setuptools + setuptools.bootstrap_install_from = egg + + +def use_setuptools(version=DEFAULT_VERSION, download_base=DEFAULT_URL, + to_dir=os.curdir, download_delay=15): + to_dir = os.path.abspath(to_dir) + rep_modules = 'pkg_resources', 'setuptools' + imported = set(sys.modules).intersection(rep_modules) + try: + import pkg_resources + except ImportError: + return _do_download(version, download_base, to_dir, download_delay) + try: + pkg_resources.require("setuptools>=" + version) + return + except pkg_resources.DistributionNotFound: + return _do_download(version, download_base, to_dir, download_delay) + except pkg_resources.VersionConflict as VC_err: + if imported: + msg = textwrap.dedent(""" + The required version of setuptools (>={version}) is not available, + and can't be installed while this script is running. Please + install a more recent version first, using + 'easy_install -U setuptools'. + + (Currently using {VC_err.args[0]!r}) + """).format(VC_err=VC_err, version=version) + sys.stderr.write(msg) + sys.exit(2) + + # otherwise, reload ok + del pkg_resources, sys.modules['pkg_resources'] + return _do_download(version, download_base, to_dir, download_delay) + +def _clean_check(cmd, target): + """ + Run the command to download target. If the command fails, clean up before + re-raising the error. + """ + try: + subprocess.check_call(cmd) + except subprocess.CalledProcessError: + if os.access(target, os.F_OK): + os.unlink(target) + raise + +def download_file_powershell(url, target): + """ + Download the file at url to target using Powershell (which will validate + trust). Raise an exception if the command cannot complete. + """ + target = os.path.abspath(target) + cmd = [ + 'powershell', + '-Command', + "(new-object System.Net.WebClient).DownloadFile(%(url)r, %(target)r)" % vars(), + ] + _clean_check(cmd, target) + +def has_powershell(): + if platform.system() != 'Windows': + return False + cmd = ['powershell', '-Command', 'echo test'] + devnull = open(os.path.devnull, 'wb') + try: + try: + subprocess.check_call(cmd, stdout=devnull, stderr=devnull) + except Exception: + return False + finally: + devnull.close() + return True + +download_file_powershell.viable = has_powershell + +def download_file_curl(url, target): + cmd = ['curl', url, '--silent', '--output', target] + _clean_check(cmd, target) + +def has_curl(): + cmd = ['curl', '--version'] + devnull = open(os.path.devnull, 'wb') + try: + try: + subprocess.check_call(cmd, stdout=devnull, stderr=devnull) + except Exception: + return False + finally: + devnull.close() + return True + +download_file_curl.viable = has_curl + +def download_file_wget(url, target): + cmd = ['wget', url, '--quiet', '--output-document', target] + _clean_check(cmd, target) + +def has_wget(): + cmd = ['wget', '--version'] + devnull = open(os.path.devnull, 'wb') + try: + try: + subprocess.check_call(cmd, stdout=devnull, stderr=devnull) + except Exception: + return False + finally: + devnull.close() + return True + +download_file_wget.viable = has_wget + +def download_file_insecure(url, target): + """ + Use Python to download the file, even though it cannot authenticate the + connection. + """ + try: + from urllib.request import urlopen + except ImportError: + from urllib2 import urlopen + src = dst = None + try: + src = urlopen(url) + # Read/write all in one block, so we don't create a corrupt file + # if the download is interrupted. + data = src.read() + dst = open(target, "wb") + dst.write(data) + finally: + if src: + src.close() + if dst: + dst.close() + +download_file_insecure.viable = lambda: True + +def get_best_downloader(): + downloaders = [ + download_file_powershell, + download_file_curl, + download_file_wget, + download_file_insecure, + ] + + for dl in downloaders: + if dl.viable(): + return dl + +def download_setuptools(version=DEFAULT_VERSION, download_base=DEFAULT_URL, + to_dir=os.curdir, delay=15, downloader_factory=get_best_downloader): + """ + Download setuptools from a specified location and return its filename + + `version` should be a valid setuptools version number that is available + as an egg for download under the `download_base` URL (which should end + with a '/'). `to_dir` is the directory where the egg will be downloaded. + `delay` is the number of seconds to pause before an actual download + attempt. + + ``downloader_factory`` should be a function taking no arguments and + returning a function for downloading a URL to a target. + """ + # making sure we use the absolute path + to_dir = os.path.abspath(to_dir) + zip_name = "setuptools-%s.zip" % version + url = download_base + zip_name + saveto = os.path.join(to_dir, zip_name) + if not os.path.exists(saveto): # Avoid repeated downloads + log.warn("Downloading %s", url) + downloader = downloader_factory() + downloader(url, saveto) + return os.path.realpath(saveto) + +def _build_install_args(options): + """ + Build the arguments to 'python setup.py install' on the setuptools package + """ + return ['--user'] if options.user_install else [] + +def _parse_args(): + """ + Parse the command line for options + """ + parser = optparse.OptionParser() + parser.add_option( + '--user', dest='user_install', action='store_true', default=False, + help='install in user site package (requires Python 2.6 or later)') + parser.add_option( + '--download-base', dest='download_base', metavar="URL", + default=DEFAULT_URL, + help='alternative URL from where to download the setuptools package') + parser.add_option( + '--insecure', dest='downloader_factory', action='store_const', + const=lambda: download_file_insecure, default=get_best_downloader, + help='Use internal, non-validating downloader' + ) + parser.add_option( + '--version', help="Specify which version to download", + default=DEFAULT_VERSION, + ) + options, args = parser.parse_args() + # positional arguments are ignored + return options + +def main(): + """Install or upgrade setuptools and EasyInstall""" + options = _parse_args() + archive = download_setuptools( + version=options.version, + download_base=options.download_base, + downloader_factory=options.downloader_factory, + ) + return _install(archive, _build_install_args(options)) + +if __name__ == '__main__': + sys.exit(main()) diff --git a/lib/Adafruit_Python_GPIO/setup.py b/lib/Adafruit_Python_GPIO/setup.py new file mode 100644 index 0000000..592a34c --- /dev/null +++ b/lib/Adafruit_Python_GPIO/setup.py @@ -0,0 +1,28 @@ +try: + # Try using ez_setup to install setuptools if not already installed. + from ez_setup import use_setuptools + use_setuptools() +except ImportError: + # Ignore import error and assume Python 3 which already has setuptools. + pass + +from setuptools import setup, find_packages + +import sys + +# Define required packages. +requires = ['adafruit-pureio'] +# Assume spidev is required on non-windows & non-mac platforms (i.e. linux). +if sys.platform != 'win32' and sys.platform != 'darwin': + requires.append('spidev') + +setup(name = 'Adafruit_GPIO', + version = '1.0.3', + author = 'Tony DiCola', + author_email = 'tdicola@adafruit.com', + description = 'Library to provide a cross-platform GPIO interface on the Raspberry Pi and Beaglebone Black using the RPi.GPIO and Adafruit_BBIO libraries.', + license = 'MIT', + url = 'https://github.com/adafruit/Adafruit_Python_GPIO/', + install_requires = requires, + test_suite = 'tests', + packages = find_packages()) diff --git a/lib/Adafruit_Python_GPIO/tests/MockGPIO.py b/lib/Adafruit_Python_GPIO/tests/MockGPIO.py new file mode 100644 index 0000000..d3dce13 --- /dev/null +++ b/lib/Adafruit_Python_GPIO/tests/MockGPIO.py @@ -0,0 +1,40 @@ +# Copyright (c) 2014 Adafruit Industries +# Author: Tony DiCola +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import Adafruit_GPIO as GPIO + + +class MockGPIO(GPIO.BaseGPIO): + def __init__(self): + self.pin_mode = {} + self.pin_written = {} + self.pin_read = {} + + def setup(self, pin, mode): + self.pin_mode[pin] = mode + + def output(self, pin, bit): + self.pin_written.setdefault(pin, []).append(1 if bit else 0) + + def input(self, pin): + if pin not in self.pin_read: + raise RuntimeError('No mock GPIO data to read for pin {0}'.format(pin)) + return self.pin_read[pin].pop(0) == 1 diff --git a/lib/Adafruit_Python_GPIO/tests/__init__.py b/lib/Adafruit_Python_GPIO/tests/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/lib/Adafruit_Python_GPIO/tests/__init__.py diff --git a/lib/Adafruit_Python_GPIO/tests/test_GPIO.py b/lib/Adafruit_Python_GPIO/tests/test_GPIO.py new file mode 100644 index 0000000..24c51a6 --- /dev/null +++ b/lib/Adafruit_Python_GPIO/tests/test_GPIO.py @@ -0,0 +1,228 @@ +# Copyright (c) 2014 Adafruit Industries +# Author: Tony DiCola +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import unittest + +from mock import Mock, patch + +import Adafruit_GPIO as GPIO +import Adafruit_GPIO.SPI as SPI +import Adafruit_GPIO.Platform as Platform + +from MockGPIO import MockGPIO + + +class TestBaseGPIO(unittest.TestCase): + def test_set_high_and_set_low(self): + gpio = MockGPIO() + gpio.set_high(1) + gpio.set_low(1) + self.assertDictEqual(gpio.pin_written, {1: [1, 0]}) + + def test_is_high_and_is_low(self): + gpio = MockGPIO() + gpio.pin_read[1] = [0, 0, 1, 1] + self.assertTrue(gpio.is_low(1)) + self.assertFalse(gpio.is_high(1)) + self.assertFalse(gpio.is_low(1)) + self.assertTrue(gpio.is_high(1)) + + def test_output_pins(self): + gpio = MockGPIO() + gpio.output_pins({0: True, 1: False, 7: True}) + self.assertDictEqual(gpio.pin_written, {0: [1], 1: [0], 7: [1]}) + + +class TestRPiGPIOAdapter(unittest.TestCase): + def test_setup(self): + rpi_gpio = Mock() + adapter = GPIO.RPiGPIOAdapter(rpi_gpio) + adapter.setup(1, GPIO.OUT) + rpi_gpio.setup.assert_called_with(1, rpi_gpio.OUT, pull_up_down=rpi_gpio.PUD_OFF) + adapter.setup(1, GPIO.IN) + rpi_gpio.setup.assert_called_with(1, rpi_gpio.IN, pull_up_down=rpi_gpio.PUD_OFF) + adapter.setup(1, GPIO.IN, GPIO.PUD_DOWN) + rpi_gpio.setup.assert_called_with(1, rpi_gpio.IN, pull_up_down=rpi_gpio.PUD_DOWN) + adapter.setup(1, GPIO.IN, GPIO.PUD_UP) + rpi_gpio.setup.assert_called_with(1, rpi_gpio.IN, pull_up_down=rpi_gpio.PUD_UP) + + def test_output(self): + rpi_gpio = Mock() + adapter = GPIO.RPiGPIOAdapter(rpi_gpio) + adapter.output(1, True) + rpi_gpio.output.assert_called_with(1, True) + adapter.output(1, False) + rpi_gpio.output.assert_called_with(1, False) + + def test_input(self): + rpi_gpio = Mock() + adapter = GPIO.RPiGPIOAdapter(rpi_gpio) + rpi_gpio.input = Mock(return_value=True) + val = adapter.input(1) + self.assertTrue(val) + rpi_gpio.input.assert_called_with(1) + + def test_setmode(self): + rpi_gpio = Mock() + adapter = GPIO.RPiGPIOAdapter(rpi_gpio, mode=rpi_gpio.BCM) + rpi_gpio.setmode.assert_called_with(rpi_gpio.BCM) + adapter = GPIO.RPiGPIOAdapter(rpi_gpio, mode=rpi_gpio.BOARD) + rpi_gpio.setmode.assert_called_with(rpi_gpio.BOARD) + adapter = GPIO.RPiGPIOAdapter(rpi_gpio) + rpi_gpio.setmode.assert_called_with(rpi_gpio.BCM) + + def test_add_event_detect(self): + rpi_gpio = Mock() + adapter = GPIO.RPiGPIOAdapter(rpi_gpio) + adapter.add_event_detect(1, GPIO.RISING) + rpi_gpio.add_event_detect.assert_called_with(1, rpi_gpio.RISING) + + def test_remove_event_detect(self): + rpi_gpio = Mock() + adapter = GPIO.RPiGPIOAdapter(rpi_gpio) + adapter.remove_event_detect(1) + rpi_gpio.remove_event_detect.assert_called_with(1) + + def test_add_event_callback(self): + rpi_gpio = Mock() + adapter = GPIO.RPiGPIOAdapter(rpi_gpio) + adapter.add_event_callback(1, callback=self.test_add_event_callback) + rpi_gpio.add_event_callback.assert_called_with(1, self.test_add_event_callback) + + def test_event_detected(self): + rpi_gpio = Mock() + adapter = GPIO.RPiGPIOAdapter(rpi_gpio) + adapter.event_detected(1) + rpi_gpio.event_detected.assert_called_with(1) + + def test_wait_for_edge(self): + rpi_gpio = Mock() + adapter = GPIO.RPiGPIOAdapter(rpi_gpio) + adapter.wait_for_edge(1, GPIO.FALLING) + rpi_gpio.wait_for_edge.assert_called_with(1, rpi_gpio.FALLING) + + def test_cleanup(self): + rpi_gpio = Mock() + adapter = GPIO.AdafruitBBIOAdapter(rpi_gpio) + adapter.cleanup() + rpi_gpio.cleanup.assert_called() + + def test_cleanup_pin(self): + rpi_gpio = Mock() + adapter = GPIO.AdafruitBBIOAdapter(rpi_gpio) + adapter.cleanup(1) + rpi_gpio.cleanup.assert_called_with(1) + + +class TestAdafruitBBIOAdapter(unittest.TestCase): + def test_setup(self): + bbio_gpio = Mock() + adapter = GPIO.AdafruitBBIOAdapter(bbio_gpio) + adapter.setup(1, GPIO.OUT) + bbio_gpio.setup.assert_called_with(1, bbio_gpio.OUT, pull_up_down=bbio_gpio.PUD_OFF) + adapter.setup(1, GPIO.IN) + bbio_gpio.setup.assert_called_with(1, bbio_gpio.IN, pull_up_down=bbio_gpio.PUD_OFF) + adapter.setup(1, GPIO.IN, GPIO.PUD_DOWN) + bbio_gpio.setup.assert_called_with(1, bbio_gpio.IN, pull_up_down=bbio_gpio.PUD_DOWN) + adapter.setup(1, GPIO.IN, GPIO.PUD_UP) + bbio_gpio.setup.assert_called_with(1, bbio_gpio.IN, pull_up_down=bbio_gpio.PUD_UP) + + def test_output(self): + bbio_gpio = Mock() + adapter = GPIO.AdafruitBBIOAdapter(bbio_gpio) + adapter.output(1, True) + bbio_gpio.output.assert_called_with(1, True) + adapter.output(1, False) + bbio_gpio.output.assert_called_with(1, False) + + def test_input(self): + bbio_gpio = Mock() + adapter = GPIO.AdafruitBBIOAdapter(bbio_gpio) + bbio_gpio.input = Mock(return_value=True) + val = adapter.input(1) + self.assertTrue(val) + bbio_gpio.input.assert_called_with(1) + + def test_add_event_detect(self): + bbio_gpio = Mock() + adapter = GPIO.AdafruitBBIOAdapter(bbio_gpio) + adapter.add_event_detect(1, GPIO.RISING) + bbio_gpio.add_event_detect.assert_called_with(1, bbio_gpio.RISING) + + def test_add_event_detect(self): + bbio_gpio = Mock() + adapter = GPIO.AdafruitBBIOAdapter(bbio_gpio) + adapter.add_event_detect(1, GPIO.RISING) + bbio_gpio.add_event_detect.assert_called_with(1, bbio_gpio.RISING) + + def test_remove_event_detect(self): + bbio_gpio = Mock() + adapter = GPIO.AdafruitBBIOAdapter(bbio_gpio) + adapter.remove_event_detect(1) + bbio_gpio.remove_event_detect.assert_called_with(1) + + def test_add_event_callback(self): + bbio_gpio = Mock() + adapter = GPIO.AdafruitBBIOAdapter(bbio_gpio) + adapter.add_event_callback(1, callback=self.test_add_event_callback) + bbio_gpio.add_event_callback.assert_called_with(1, self.test_add_event_callback) + + def test_event_detected(self): + bbio_gpio = Mock() + adapter = GPIO.AdafruitBBIOAdapter(bbio_gpio) + adapter.event_detected(1) + bbio_gpio.event_detected.assert_called_with(1) + + def test_wait_for_edge(self): + bbio_gpio = Mock() + adapter = GPIO.AdafruitBBIOAdapter(bbio_gpio) + adapter.wait_for_edge(1, GPIO.FALLING) + bbio_gpio.wait_for_edge.assert_called_with(1, bbio_gpio.FALLING) + + def test_cleanup(self): + bbio_gpio = Mock() + adapter = GPIO.AdafruitBBIOAdapter(bbio_gpio) + adapter.cleanup() + bbio_gpio.cleanup.assert_called() + + def test_cleanup_pin(self): + bbio_gpio = Mock() + adapter = GPIO.AdafruitBBIOAdapter(bbio_gpio) + adapter.cleanup(1) + bbio_gpio.cleanup.assert_called_with(1) + + +class TestGetPlatformGPIO(unittest.TestCase): + @patch.dict('sys.modules', {'RPi': Mock(), 'RPi.GPIO': Mock()}) + @patch('Adafruit_GPIO.Platform.platform_detect', Mock(return_value=Platform.RASPBERRY_PI)) + def test_raspberrypi(self): + gpio = GPIO.get_platform_gpio() + self.assertIsInstance(gpio, GPIO.RPiGPIOAdapter) + + @patch.dict('sys.modules', {'Adafruit_BBIO': Mock(), 'Adafruit_BBIO.GPIO': Mock()}) + @patch('Adafruit_GPIO.Platform.platform_detect', Mock(return_value=Platform.BEAGLEBONE_BLACK)) + def test_beagleboneblack(self): + gpio = GPIO.get_platform_gpio() + self.assertIsInstance(gpio, GPIO.AdafruitBBIOAdapter) + + @patch('Adafruit_GPIO.Platform.platform_detect', Mock(return_value=Platform.UNKNOWN)) + def test_unknown(self): + self.assertRaises(RuntimeError, GPIO.get_platform_gpio) diff --git a/lib/Adafruit_Python_GPIO/tests/test_I2C.py b/lib/Adafruit_Python_GPIO/tests/test_I2C.py new file mode 100644 index 0000000..01d0393 --- /dev/null +++ b/lib/Adafruit_Python_GPIO/tests/test_I2C.py @@ -0,0 +1,181 @@ +# Copyright (c) 2014 Adafruit Industries +# Author: Tony DiCola +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import logging +import unittest + +from mock import Mock, patch + +import Adafruit_GPIO.Platform as Platform + + +# Enable debug logging to stdout during tests. +logging.basicConfig() +logging.getLogger().setLevel(logging.DEBUG) + + +class MockSMBus(object): + # Mock the smbus.SMBus class to record all data written to specific + # addresses and registers in the _written member. + def __init__(self): + # _written will store a dictionary of address to register dictionary. + # Each register dictionary will store a mapping of register value to + # an array of all written values (in sequential write order). + self._written = {} + self._read = {} + + def _write_register(self, address, register, value): + self._written.setdefault(address, {}).setdefault(register, []).append(value) + + def _read_register(self, address, register): + return self._read.get(address).get(register).pop(0) + + def write_byte_data(self, address, register, value): + self._write_register(address, register, value) + + def write_word_data(self, address, register, value): + self._write_register(address, register, value >> 8 & 0xFF) + self._write_register(address, register+1, value & 0xFF) + + def write_i2c_block_data(self, address, register, values): + for i, value in enumerate(values): + self._write_register(address, register+i, value & 0xFF) + + def read_byte_data(self, address, register): + return self._read_register(address, register) + + def read_word_data(self, address, register): + high = self._read_register(address, register) + low = self._read_register(address, register+1) + return (high << 8) | low + + def read_i2c_block_data(self, address, length): + return [self._read_register(address+i) for i in range(length)] + + +def create_device(address, busnum): + # Mock the smbus module and inject it into the global namespace so the + # Adafruit_GPIO.I2C module can be imported. Also inject a mock SMBus + # instance to be returned by smbus.SMBus function calls. + smbus = Mock() + mockbus = MockSMBus() + smbus.SMBus.return_value = mockbus + with patch.dict('sys.modules', {'smbus': smbus}): + import Adafruit_GPIO.I2C as I2C + return (I2C.Device(address, busnum), smbus, mockbus) + +def safe_import_i2c(): + # Mock the smbus module and inject it into the global namespace so the + # Adafruit_GPIO.I2C module can be imported. The imported I2C module is + # returned so global functions can be called on it. + with patch.dict('sys.modules', {'smbus': Mock() }): + import Adafruit_GPIO.I2C as I2C + return I2C + + +class TestI2CDevice(unittest.TestCase): + + def test_address_and_bus_set_correctly(self): + device, smbus, mockbus = create_device(0x1F, 1) + self.assertEqual(device._bus, mockbus) + smbus.SMBus.assert_called_with(1) + self.assertEqual(device._address, 0x1F) + + def test_write8(self): + device, smbus, mockbus = create_device(0x1F, 1) + device.write8(0xFE, 0xED) + self.assertDictEqual(mockbus._written, { 0x1F: { 0xFE: [0xED] }}) + + def test_write8_truncates_to_8bits(self): + device, smbus, mockbus = create_device(0x1F, 1) + device.write8(0xFE, 0xBEEFED) + self.assertDictEqual(mockbus._written, { 0x1F: { 0xFE: [0xED] }}) + + def test_write16(self): + device, smbus, mockbus = create_device(0x1F, 1) + device.write16(0xFE, 0xBEEF) + self.assertDictEqual(mockbus._written, { 0x1F: { 0xFE: [0xBE], + 0xFF: [0xEF] }}) + + def test_write16_truncates_to_8bits(self): + device, smbus, mockbus = create_device(0x1F, 1) + device.write16(0xFE, 0xFEEDBEEF) + self.assertDictEqual(mockbus._written, { 0x1F: { 0xFE: [0xBE], + 0xFF: [0xEF] }}) + + def test_writeList(self): + device, smbus, mockbus = create_device(0x1F, 1) + device.writeList(0x00, [0xFE, 0xED, 0xBE, 0xEF]) + self.assertDictEqual(mockbus._written, { 0x1F: { 0x00: [0xFE], + 0x01: [0xED], + 0x02: [0xBE], + 0x03: [0xEF] }}) + + def test_readU8(self): + device, smbus, mockbus = create_device(0x1F, 1) + mockbus._read[0x1F] = { 0xFE: [0xED] } + value = device.readU8(0xFE) + self.assertEqual(value, 0xED) + + def test_readS8(self): + device, smbus, mockbus = create_device(0x1F, 1) + mockbus._read[0x1F] = { 0xFE: [0xED] } + value = device.readS8(0xFE) + self.assertEqual(value, -19) + + def test_readU16(self): + device, smbus, mockbus = create_device(0x1F, 1) + mockbus._read[0x1F] = { 0xFE: [0xED], 0xFF: [0x01] } + value = device.readU16(0xFE) + self.assertEqual(value, 0xED01) + + def test_readS16(self): + device, smbus, mockbus = create_device(0x1F, 1) + mockbus._read[0x1F] = { 0xFE: [0xED], 0xFF: [0x01] } + value = device.readS16(0xFE) + self.assertEqual(value, -4863) + + +class TestGetDefaultBus(unittest.TestCase): + @patch('Adafruit_GPIO.Platform.pi_revision', Mock(return_value=1)) + @patch('Adafruit_GPIO.Platform.platform_detect', Mock(return_value=Platform.RASPBERRY_PI)) + def test_raspberry_pi_rev1(self): + I2C = safe_import_i2c() + bus = I2C.get_default_bus() + self.assertEqual(bus, 0) + + @patch('Adafruit_GPIO.Platform.pi_revision', Mock(return_value=2)) + @patch('Adafruit_GPIO.Platform.platform_detect', Mock(return_value=Platform.RASPBERRY_PI)) + def test_raspberry_pi_rev2(self): + I2C = safe_import_i2c() + bus = I2C.get_default_bus() + self.assertEqual(bus, 1) + + @patch('Adafruit_GPIO.Platform.platform_detect', Mock(return_value=Platform.BEAGLEBONE_BLACK)) + def test_beaglebone_black(self): + I2C = safe_import_i2c() + bus = I2C.get_default_bus() + self.assertEqual(bus, 1) + + @patch('Adafruit_GPIO.Platform.platform_detect', Mock(return_value=Platform.UNKNOWN)) + def test_unknown(self): + I2C = safe_import_i2c() + self.assertRaises(RuntimeError, I2C.get_default_bus) diff --git a/lib/Adafruit_Python_GPIO/tests/test_PWM.py b/lib/Adafruit_Python_GPIO/tests/test_PWM.py new file mode 100644 index 0000000..d80bd1f --- /dev/null +++ b/lib/Adafruit_Python_GPIO/tests/test_PWM.py @@ -0,0 +1,103 @@ +# Copyright (c) 2014 Adafruit Industries +# Author: Tony DiCola +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import unittest + +from mock import Mock, patch + +import Adafruit_GPIO.PWM as PWM +import Adafruit_GPIO.Platform as Platform + + +class TestRPi_PWM_Adapter(unittest.TestCase): + def test_setup(self): + rpi_gpio = Mock() + pwm = PWM.RPi_PWM_Adapter(rpi_gpio) + pwm.start(1, 50) + rpi_gpio.PWM.assert_called_with(1, 2000) + + def test_set_duty_cycle_valid(self): + rpi_gpio = Mock() + pwm = PWM.RPi_PWM_Adapter(rpi_gpio) + pwm.start(1, 50) + pwm.set_duty_cycle(1, 75) + # Implicit verification that no assertion or other error thrown. + + def test_set_duty_cycle_invalid(self): + rpi_gpio = Mock() + pwm = PWM.RPi_PWM_Adapter(rpi_gpio) + pwm.start(1, 50) + self.assertRaises(ValueError, pwm.set_duty_cycle, 1, 150) + self.assertRaises(ValueError, pwm.set_duty_cycle, 1, -10) + + def test_set_frequency(self): + rpi_gpio = Mock() + pwm = PWM.RPi_PWM_Adapter(rpi_gpio) + pwm.start(1, 50) + pwm.set_frequency(1, 1000) + # Implicit verification that no assertion or other error thrown. + + +class TestBBIO_PWM_Adapter(unittest.TestCase): + def test_setup(self): + bbio_pwm = Mock() + pwm = PWM.BBIO_PWM_Adapter(bbio_pwm) + pwm.start('P9_16', 50) + bbio_pwm.start.assert_called_with('P9_16', 50, 2000) + + def test_set_duty_cycle_valid(self): + bbio_pwm = Mock() + pwm = PWM.BBIO_PWM_Adapter(bbio_pwm) + pwm.start('P9_16', 50) + pwm.set_duty_cycle('P9_16', 75) + bbio_pwm.set_duty_cycle.assert_called_with('P9_16', 75) + + def test_set_duty_cycle_invalid(self): + bbio_pwm = Mock() + pwm = PWM.BBIO_PWM_Adapter(bbio_pwm) + pwm.start('P9_16', 50) + self.assertRaises(ValueError, pwm.set_duty_cycle, 'P9_16', 150) + self.assertRaises(ValueError, pwm.set_duty_cycle, 'P9_16', -10) + + def test_set_frequency(self): + bbio_pwm = Mock() + pwm = PWM.BBIO_PWM_Adapter(bbio_pwm) + pwm.start('P9_16', 50) + pwm.set_frequency('P9_16', 1000) + bbio_pwm.set_frequency.assert_called_with('P9_16', 1000) + + +class TestGetPlatformPWM(unittest.TestCase): + @patch.dict('sys.modules', {'RPi': Mock(), 'RPi.GPIO': Mock()}) + @patch('Adafruit_GPIO.Platform.platform_detect', Mock(return_value=Platform.RASPBERRY_PI)) + def test_raspberrypi(self): + pwm = PWM.get_platform_pwm() + self.assertIsInstance(pwm, PWM.RPi_PWM_Adapter) + + @patch.dict('sys.modules', {'Adafruit_BBIO': Mock(), 'Adafruit_BBIO.PWM': Mock()}) + @patch('Adafruit_GPIO.Platform.platform_detect', Mock(return_value=Platform.BEAGLEBONE_BLACK)) + def test_beagleboneblack(self): + pwm = PWM.get_platform_pwm() + self.assertIsInstance(pwm, PWM.BBIO_PWM_Adapter) + + @patch('Adafruit_GPIO.Platform.platform_detect', Mock(return_value=Platform.UNKNOWN)) + def test_otherplatform(self): + self.assertRaises(RuntimeError, PWM.get_platform_pwm) diff --git a/lib/Adafruit_Python_GPIO/tests/test_Platform.py b/lib/Adafruit_Python_GPIO/tests/test_Platform.py new file mode 100644 index 0000000..42da8e3 --- /dev/null +++ b/lib/Adafruit_Python_GPIO/tests/test_Platform.py @@ -0,0 +1,70 @@ +# Copyright (c) 2014 Adafruit Industries +# Author: Tony DiCola +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +import unittest + +from mock import Mock, patch + +import Adafruit_GPIO.Platform as Platform + + +class TestPlatformDetect(unittest.TestCase): + @patch('platform.platform', Mock(return_value='Linux-3.8.13-bone47-armv7l-with-debian-7.4')) + def test_beaglebone_black(self): + result = Platform.platform_detect() + self.assertEquals(result, Platform.BEAGLEBONE_BLACK) + + @patch('platform.platform', Mock(return_value='Darwin-13.2.0-x86_64-i386-64bit')) + def test_unknown(self): + result = Platform.platform_detect() + self.assertEquals(result, Platform.UNKNOWN) + + +class TestPiRevision(unittest.TestCase): + def test_revision_1(self): + with patch('__builtin__.open') as mock_open: + handle = mock_open.return_value.__enter__.return_value + handle.__iter__.return_value = iter(['Revision : 0000']) + rev = Platform.pi_revision() + self.assertEquals(rev, 1) + with patch('__builtin__.open') as mock_open: + handle = mock_open.return_value.__enter__.return_value + handle.__iter__.return_value = iter(['Revision : 0002']) + rev = Platform.pi_revision() + self.assertEquals(rev, 1) + with patch('__builtin__.open') as mock_open: + handle = mock_open.return_value.__enter__.return_value + handle.__iter__.return_value = iter(['Revision : 0003']) + rev = Platform.pi_revision() + self.assertEquals(rev, 1) + + def test_revision_2(self): + with patch('__builtin__.open') as mock_open: + handle = mock_open.return_value.__enter__.return_value + handle.__iter__.return_value = iter(['Revision : 000e']) + rev = Platform.pi_revision() + self.assertEquals(rev, 2) + + def test_unknown_revision(self): + with patch('__builtin__.open') as mock_open: + handle = mock_open.return_value.__enter__.return_value + handle.__iter__.return_value = iter(['foobar']) + self.assertRaises(RuntimeError, Platform.pi_revision) + diff --git a/lib/Adafruit_Python_GPIO/tests/test_SPI.py b/lib/Adafruit_Python_GPIO/tests/test_SPI.py new file mode 100644 index 0000000..123eec1 --- /dev/null +++ b/lib/Adafruit_Python_GPIO/tests/test_SPI.py @@ -0,0 +1,192 @@ +# Copyright (c) 2014 Adafruit Industries +# Author: Tony DiCola +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import unittest + +import Adafruit_GPIO as GPIO +import Adafruit_GPIO.SPI as SPI + +from MockGPIO import MockGPIO + + +class TestBitBangSPI(unittest.TestCase): + def test_pin_modes_set_correctly(self): + gpio = MockGPIO() + device = SPI.BitBang(gpio, 1, 2, 3, 4) + self.assertDictEqual(gpio.pin_mode, { 1: GPIO.OUT, + 2: GPIO.OUT, + 3: GPIO.IN, + 4: GPIO.OUT }) + + def test_ss_set_high_after_initialization(self): + gpio = MockGPIO() + device = SPI.BitBang(gpio, 1, 2, 3, 4) + self.assertListEqual(gpio.pin_written[4], [1]) + + def test_mode_0_write(self): + gpio = MockGPIO() + device = SPI.BitBang(gpio, 1, 2, 3, 4) + device.write([0x1F]) + # Verify clock + self.assertListEqual(gpio.pin_written[1], [0, 1, 0, 1, 0, 1, 0, 1, + 0, 1, 0, 1, 0, 1, 0, 1, 0]) + # Verify MOSI + self.assertListEqual(gpio.pin_written[2], [0, 0, 0, 1, 1, 1, 1, 1]) + # Verify MISO + self.assertNotIn(3, gpio.pin_written) + # Verify SS + self.assertListEqual(gpio.pin_written[4], [1, 0, 1]) + + def test_write_assert_deassert_ss_false(self): + gpio = MockGPIO() + device = SPI.BitBang(gpio, 1, 2, 3, 4) + device.write([0x1F], assert_ss=False, deassert_ss=False) + self.assertListEqual(gpio.pin_written[4], [1]) + + def test_write_lsbfirst(self): + gpio = MockGPIO() + device = SPI.BitBang(gpio, 1, 2, 3, 4) + device.set_bit_order(SPI.LSBFIRST) + device.write([0x1F]) + self.assertListEqual(gpio.pin_written[2], [1, 1, 1, 1, 1, 0, 0, 0]) + + def test_invalid_mode_fails(self): + gpio = MockGPIO() + device = SPI.BitBang(gpio, 1, 2, 3, 4) + self.assertRaises(ValueError, device.set_mode, -1) + self.assertRaises(ValueError, device.set_mode, 4) + + def test_invalid_bit_order_fails(self): + gpio = MockGPIO() + device = SPI.BitBang(gpio, 1, 2, 3, 4) + self.assertRaises(ValueError, device.set_bit_order, -1) + self.assertRaises(ValueError, device.set_bit_order, 2) + + def test_mode_0_read(self): + gpio = MockGPIO() + device = SPI.BitBang(gpio, 1, 2, 3, 4) + gpio.pin_read[3] = [0, 0, 0, 1, 1, 1, 1, 1] + result = device.read(1) + # Verify clock + self.assertListEqual(gpio.pin_written[1], [0, 1, 0, 1, 0, 1, 0, 1, + 0, 1, 0, 1, 0, 1, 0, 1, 0]) + # Verify MOSI + self.assertNotIn(2, gpio.pin_written) + # Verify MISO + self.assertNotIn(3, gpio.pin_written) + # Verify SS + self.assertListEqual(gpio.pin_written[4], [1, 0, 1]) + # Verify result + self.assertEqual(result, bytearray([0x1F])) + + def test_read_assert_deassert_ss_false(self): + gpio = MockGPIO() + device = SPI.BitBang(gpio, 1, 2, 3, 4) + gpio.pin_read[3] = [0, 0, 0, 1, 1, 1, 1, 1] + result = device.read(1, assert_ss=False, deassert_ss=False) + self.assertListEqual(gpio.pin_written[4], [1]) + + def test_read_multiple_bytes(self): + gpio = MockGPIO() + device = SPI.BitBang(gpio, 1, 2, 3, 4) + gpio.pin_read[3] = [0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, + 0, 0, 0, 1, 1, 1, 1, 1] + result = device.read(3) + # Verify clock + self.assertListEqual(gpio.pin_written[1], [0, 1, 0, 1, 0, 1, 0, 1, + 0, 1, 0, 1, 0, 1, 0, 1, + 0, 1, 0, 1, 0, 1, 0, 1, + 0, 1, 0, 1, 0, 1, 0, 1, + 0, 1, 0, 1, 0, 1, 0, 1, + 0, 1, 0, 1, 0, 1, 0, 1, 0]) + # Verify MOSI + self.assertNotIn(2, gpio.pin_written) + # Verify MISO + self.assertNotIn(3, gpio.pin_written) + # Verify SS + self.assertListEqual(gpio.pin_written[4], [1, 0, 1]) + # Verify result + self.assertEqual(result, bytearray([0x1F, 0xF8, 0x1F])) + + def test_write_multiple_bytes(self): + gpio = MockGPIO() + device = SPI.BitBang(gpio, 1, 2, 3, 4) + device.write([0x1F, 0xF8, 0x1F]) + # Verify clock + self.assertListEqual(gpio.pin_written[1], [0, 1, 0, 1, 0, 1, 0, 1, + 0, 1, 0, 1, 0, 1, 0, 1, + 0, 1, 0, 1, 0, 1, 0, 1, + 0, 1, 0, 1, 0, 1, 0, 1, + 0, 1, 0, 1, 0, 1, 0, 1, + 0, 1, 0, 1, 0, 1, 0, 1, 0]) + # Verify MOSI + self.assertListEqual(gpio.pin_written[2], [0, 0, 0, 1, 1, 1, 1, 1, + 1, 1, 1, 1, 1, 0, 0, 0, + 0, 0, 0, 1, 1, 1, 1, 1]) + # Verify MISO + self.assertNotIn(3, gpio.pin_written) + # Verify SS + self.assertListEqual(gpio.pin_written[4], [1, 0, 1]) + + def test_mode_0_transfer(self): + gpio = MockGPIO() + device = SPI.BitBang(gpio, 1, 2, 3, 4) + gpio.pin_read[3] = [0, 0, 0, 1, 1, 1, 1, 1] + result = device.transfer([0xF8]) + # Verify clock + self.assertListEqual(gpio.pin_written[1], [0, 1, 0, 1, 0, 1, 0, 1, + 0, 1, 0, 1, 0, 1, 0, 1, 0]) + # Verify MOSI + self.assertListEqual(gpio.pin_written[2], [1, 1, 1, 1, 1, 0, 0, 0]) + # Verify MISO + self.assertNotIn(3, gpio.pin_written) + # Verify SS + self.assertListEqual(gpio.pin_written[4], [1, 0, 1]) + # Verify result + self.assertEqual(result, bytearray([0x1F])) + + def test_transfer_multiple_bytes(self): + gpio = MockGPIO() + device = SPI.BitBang(gpio, 1, 2, 3, 4) + gpio.pin_read[3] = [0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, + 0, 0, 0, 1, 1, 1, 1, 1] + result = device.transfer([0xF8, 0x1F, 0xF8]) + # Verify clock + self.assertListEqual(gpio.pin_written[1], [0, 1, 0, 1, 0, 1, 0, 1, + 0, 1, 0, 1, 0, 1, 0, 1, + 0, 1, 0, 1, 0, 1, 0, 1, + 0, 1, 0, 1, 0, 1, 0, 1, + 0, 1, 0, 1, 0, 1, 0, 1, + 0, 1, 0, 1, 0, 1, 0, 1, 0]) + # Verify MOSI + self.assertListEqual(gpio.pin_written[2], [1, 1, 1, 1, 1, 0, 0, 0, + 0, 0, 0, 1, 1, 1, 1, 1, + 1, 1, 1, 1, 1, 0, 0, 0]) + # Verify MISO + self.assertNotIn(3, gpio.pin_written) + # Verify SS + self.assertListEqual(gpio.pin_written[4], [1, 0, 1]) + # Verify result + self.assertEqual(result, bytearray([0x1F, 0xF8, 0x1F])) + + #TODO: Test mode 1, 2, 3 + + #TODO: Test null MOSI, MISO, SS |