From 7919ebfa9c998e64ba9f29bcac13f38fe557abf3 Mon Sep 17 00:00:00 2001 From: Josh Pieper Date: Mon, 27 Jun 2022 09:38:38 -0400 Subject: [PATCH] Add more convenient python bindings for GPIO pins --- lib/python/examples/gpio.py | 67 +++++++++++++++++++++ lib/python/moteus/moteus.py | 83 ++++++++++++++++++++++++++- lib/python/moteus/test/moteus_test.py | 14 +++++ 3 files changed, 162 insertions(+), 2 deletions(-) create mode 100755 lib/python/examples/gpio.py diff --git a/lib/python/examples/gpio.py b/lib/python/examples/gpio.py new file mode 100755 index 00000000..c2b217b1 --- /dev/null +++ b/lib/python/examples/gpio.py @@ -0,0 +1,67 @@ +#!/usr/bin/python3 -B + +# Copyright 2022 Josh Pieper, jjp@pobox.com. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +""" +This example commands a single servo at ID #1 using the default +transport to hold the current position indefinitely, and prints the +state of the servo to the console. +""" + +import asyncio +import moteus + +async def main(): + # Communicate with controller ID 1 on the default transport. + c = moteus.Controller() + + # This returns the GPIO digital inputs as an array of bytes, one + # for each auxiliary port. + gpio_inputs = await c.read_gpio() + + # Expand these as individal pin values for display purposes. + def display(aux_num = 1, value = None, count = None): + print(f"AUX{aux_num}") + for i in range(count): + print(f" Pin {i} - {value & (1 << i)}") + + display(1, gpio_inputs[0], 5) + print() + display(2, gpio_inputs[1], 4) + + + # And to write the values to gpio pins, the + # `(make|set)_write_gpio` function can be used. Here, we'll set + # all possible GPIO digital outputs to 1. + await c.set_write_gpio(aux1=0x7f, aux2=0x7f) + + + # Finally, GPIO values can be included in general query results. + qr = moteus.QueryResolution() + qr.aux1_gpio = moteus.INT8 + qr.aux2_gpio = moteus.INT8 + c2 = moteus.Controller(query_resolution=qr) + result = await c2.query() + + print() + print("From moteus.Controller.query()") + display(1, result.values[moteus.Register.AUX1_GPIO_STATUS], 5) + print() + display(2, result.values[moteus.Register.AUX2_GPIO_STATUS], 4) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/lib/python/moteus/moteus.py b/lib/python/moteus/moteus.py index 6b3e4e4d..fe8e6e1c 100644 --- a/lib/python/moteus/moteus.py +++ b/lib/python/moteus/moteus.py @@ -271,6 +271,9 @@ class QueryResolution: temperature = mp.INT8 fault = mp.INT8 + aux1_gpio = mp.IGNORE + aux2_gpio = mp.IGNORE + # Additional registers can be queried by enumerating them as keys # in this dictionary, with the resolution as the matching value. _extra = { @@ -595,13 +598,20 @@ def _make_query_data(self): for i in range(4): c2.maybe_write() + c3 = mp.WriteCombiner(writer, 0x10, int(Register.AUX1_GPIO_STATUS), [ + qr.aux1_gpio, + qr.aux2_gpio, + ]) + for i in range(2): + c3.maybe_write() + if len(qr._extra): - c3 = mp.WriteCombiner( + c4 = mp.WriteCombiner( writer, 0x10, int(min(qr._extra.keys())), [qr._extra[y] for y in sorted(list([int(x) for x in qr._extra.keys()]))]) for _ in qr._extra.keys(): - c3.maybe_write() + c4.maybe_write() return buf.getvalue() @@ -955,6 +965,75 @@ async def set_stay_within(self, *args, **kwargs): return self._extract(await self._get_transport().cycle( [self.make_stay_within(**kwargs)])) + def make_write_gpio(self, aux1=None, aux2=None, query=False): + """Return a moteus.Command structure with data necessary to set one or + more GPIO registers. + + aux1/aux2 are an optional integer bitfield, where the least + significant bit is pin 0 on the respective port. + """ + + result = self._make_command(query=query) + + data_buf = io.BytesIO() + writer = Writer(data_buf) + + combiner = mp.WriteCombiner( + writer, 0x00, int(Register.AUX1_GPIO_COMMAND), [ + mp.INT8 if aux1 else mp.IGNORE, + mp.INT8 if aux2 else mp.IGNORE, + ]) + + if combiner.maybe_write(): + writer.write_int8(aux1) + if combiner.maybe_write(): + writer.write_int8(aux2) + + if query: + data_buf.write(self._query_data) + + result.data = data_buf.getvalue() + return result + + async def set_write_gpio(self, *args, **kwargs): + return self._extract(await self._get_transport().cycle( + [self.make_write_gpio(**kwargs)])) + + def make_read_gpio(self): + """Return a moteus.Command structure with data necessary to read all + GPIO digital inputs.""" + + result = self._make_command(query=True) + data_buf = io.BytesIO() + writer = Writer(data_buf) + + combiner = mp.WriteCombiner( + writer, 0x10, int(Register.AUX1_GPIO_STATUS), [ + mp.INT8, + mp.INT8, + ]) + + for i in range(2): + combiner.maybe_write() + + result.data = data_buf.getvalue() + return result + + async def read_gpio(self): + """Return a bytes() object with an int8 for each auxiliary port. The + pins for each port are represented as bits, with the least significant + bit being pin 0. + + None can be returned if no response is received. + """ + + results = await self._get_transport().cycle([self.make_read_gpio()]) + if len(results) == 0: + return None + result = results[0] + return bytes([result.values[Register.AUX1_GPIO_STATUS], + result.values[Register.AUX2_GPIO_STATUS]]) + def make_diagnostic_write(self, data): result = self._make_command(query=False) diff --git a/lib/python/moteus/test/moteus_test.py b/lib/python/moteus/test/moteus_test.py index a55e2b5e..e3ffccec 100644 --- a/lib/python/moteus/test/moteus_test.py +++ b/lib/python/moteus/test/moteus_test.py @@ -192,6 +192,20 @@ def test_make_rezero(self): result.data, bytes([0x0d, 0xb0, 0x02, 0x00, 0x00, 0x00, 0x40])) + def test_make_write_gpio(self): + dut = mot.Controller() + result = dut.make_write_gpio(aux1=3, aux2=5) + self.assertEqual( + result.data, + bytes([0x02, 0x5c, 0x03, 0x05])) + + def test_make_read_gpio(self): + dut = mot.Controller() + result = dut.make_read_gpio() + self.assertEqual( + result.data, + bytes([0x12, 0x5e])) + if __name__ == '__main__': unittest.main()