|  | 
|  | 1 | +# SPDX-FileCopyrightText: 2019 ladyada for Adafruit Industries | 
|  | 2 | +# | 
|  | 3 | +# SPDX-License-Identifier: MIT | 
|  | 4 | + | 
|  | 5 | +import os | 
|  | 6 | +import board | 
|  | 7 | +import busio | 
|  | 8 | +from digitalio import DigitalInOut | 
|  | 9 | +import neopixel | 
|  | 10 | + | 
|  | 11 | +from adafruit_esp32spi import adafruit_esp32spi | 
|  | 12 | +import adafruit_esp32spi.adafruit_esp32spi_wifimanager as wifimanager | 
|  | 13 | +import adafruit_wsgi.esp32spi_wsgiserver as server | 
|  | 14 | + | 
|  | 15 | +# This example depends on the 'static' folder in the examples folder | 
|  | 16 | +# being copied to the root of the circuitpython filesystem. | 
|  | 17 | +# This is where our static assets like html, js, and css live. | 
|  | 18 | + | 
|  | 19 | +# Get wifi details and more from a secrets.py file | 
|  | 20 | +try: | 
|  | 21 | +    from secrets import secrets | 
|  | 22 | +except ImportError: | 
|  | 23 | +    print("WiFi secrets are kept in secrets.py, please add them there!") | 
|  | 24 | +    raise | 
|  | 25 | + | 
|  | 26 | +try: | 
|  | 27 | +    import json as json_module | 
|  | 28 | +except ImportError: | 
|  | 29 | +    import ujson as json_module | 
|  | 30 | + | 
|  | 31 | +print("ESP32 SPI simple web server test!") | 
|  | 32 | + | 
|  | 33 | +# If you are using a board with pre-defined ESP32 Pins: | 
|  | 34 | +esp32_cs = DigitalInOut(board.ESP_CS) | 
|  | 35 | +esp32_ready = DigitalInOut(board.ESP_BUSY) | 
|  | 36 | +esp32_reset = DigitalInOut(board.ESP_RESET) | 
|  | 37 | + | 
|  | 38 | +# If you have an externally connected ESP32: | 
|  | 39 | +# esp32_cs = DigitalInOut(board.D9) | 
|  | 40 | +# esp32_ready = DigitalInOut(board.D10) | 
|  | 41 | +# esp32_reset = DigitalInOut(board.D5) | 
|  | 42 | + | 
|  | 43 | +spi = busio.SPI(board.SCK, board.MOSI, board.MISO) | 
|  | 44 | +esp = adafruit_esp32spi.ESP_SPIcontrol( | 
|  | 45 | +    spi, esp32_cs, esp32_ready, esp32_reset | 
|  | 46 | +)  # pylint: disable=line-too-long | 
|  | 47 | + | 
|  | 48 | +print("MAC addr:", [hex(i) for i in esp.MAC_address]) | 
|  | 49 | +print("MAC addr actual:", [hex(i) for i in esp.MAC_address_actual]) | 
|  | 50 | + | 
|  | 51 | +# Use below for Most Boards | 
|  | 52 | +status_light = neopixel.NeoPixel( | 
|  | 53 | +    board.NEOPIXEL, 1, brightness=0.2 | 
|  | 54 | +)  # Uncomment for Most Boards | 
|  | 55 | +# Uncomment below for ItsyBitsy M4 | 
|  | 56 | +# import adafruit_dotstar as dotstar | 
|  | 57 | +# status_light = dotstar.DotStar(board.APA102_SCK, board.APA102_MOSI, 1, brightness=1) | 
|  | 58 | + | 
|  | 59 | +## If you want to connect to wifi with secrets: | 
|  | 60 | +wifi = wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light) | 
|  | 61 | +wifi.connect() | 
|  | 62 | + | 
|  | 63 | +## If you want to create a WIFI hotspot to connect to with secrets: | 
|  | 64 | +# secrets = {"ssid": "My ESP32 AP!", "password": "supersecret"} | 
|  | 65 | +# wifi = wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light) | 
|  | 66 | +# wifi.create_ap() | 
|  | 67 | + | 
|  | 68 | +## To you want to create an un-protected WIFI hotspot to connect to with secrets:" | 
|  | 69 | +# secrets = {"ssid": "My ESP32 AP!"} | 
|  | 70 | +# wifi = wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light) | 
|  | 71 | +# wifi.create_ap() | 
|  | 72 | + | 
|  | 73 | + | 
|  | 74 | +class SimpleWSGIApplication: | 
|  | 75 | +    """ | 
|  | 76 | +    An example of a simple WSGI Application that supports | 
|  | 77 | +    basic route handling and static asset file serving for common file types | 
|  | 78 | +    """ | 
|  | 79 | + | 
|  | 80 | +    INDEX = "/index.html" | 
|  | 81 | +    CHUNK_SIZE = 8912  # max number of bytes to read at once when reading files | 
|  | 82 | + | 
|  | 83 | +    def __init__(self, static_dir=None, debug=False): | 
|  | 84 | +        self._debug = debug | 
|  | 85 | +        self._listeners = {} | 
|  | 86 | +        self._start_response = None | 
|  | 87 | +        self._static = static_dir | 
|  | 88 | +        if self._static: | 
|  | 89 | +            self._static_files = ["/" + file for file in os.listdir(self._static)] | 
|  | 90 | + | 
|  | 91 | +    def __call__(self, environ, start_response): | 
|  | 92 | +        """ | 
|  | 93 | +        Called whenever the server gets a request. | 
|  | 94 | +        The environ dict has details about the request per wsgi specification. | 
|  | 95 | +        Call start_response with the response status string and headers as a list of tuples. | 
|  | 96 | +        Return a single item list with the item being your response data string. | 
|  | 97 | +        """ | 
|  | 98 | +        if self._debug: | 
|  | 99 | +            self._log_environ(environ) | 
|  | 100 | + | 
|  | 101 | +        self._start_response = start_response | 
|  | 102 | +        status = "" | 
|  | 103 | +        headers = [] | 
|  | 104 | +        resp_data = [] | 
|  | 105 | + | 
|  | 106 | +        key = self._get_listener_key( | 
|  | 107 | +            environ["REQUEST_METHOD"].lower(), environ["PATH_INFO"] | 
|  | 108 | +        ) | 
|  | 109 | +        if key in self._listeners: | 
|  | 110 | +            status, headers, resp_data = self._listeners[key](environ) | 
|  | 111 | +        if environ["REQUEST_METHOD"].lower() == "get" and self._static: | 
|  | 112 | +            path = environ["PATH_INFO"] | 
|  | 113 | +            if path in self._static_files: | 
|  | 114 | +                status, headers, resp_data = self.serve_file( | 
|  | 115 | +                    path, directory=self._static | 
|  | 116 | +                ) | 
|  | 117 | +            elif path == "/" and self.INDEX in self._static_files: | 
|  | 118 | +                status, headers, resp_data = self.serve_file( | 
|  | 119 | +                    self.INDEX, directory=self._static | 
|  | 120 | +                ) | 
|  | 121 | + | 
|  | 122 | +        self._start_response(status, headers) | 
|  | 123 | +        return resp_data | 
|  | 124 | + | 
|  | 125 | +    def on(self, method, path, request_handler): | 
|  | 126 | +        """ | 
|  | 127 | +        Register a Request Handler for a particular HTTP method and path. | 
|  | 128 | +        request_handler will be called whenever a matching HTTP request is received. | 
|  | 129 | +
 | 
|  | 130 | +        request_handler should accept the following args: | 
|  | 131 | +            (Dict environ) | 
|  | 132 | +        request_handler should return a tuple in the shape of: | 
|  | 133 | +            (status, header_list, data_iterable) | 
|  | 134 | +
 | 
|  | 135 | +        :param str method: the method of the HTTP request | 
|  | 136 | +        :param str path: the path of the HTTP request | 
|  | 137 | +        :param func request_handler: the function to call | 
|  | 138 | +        """ | 
|  | 139 | +        self._listeners[self._get_listener_key(method, path)] = request_handler | 
|  | 140 | + | 
|  | 141 | +    def serve_file(self, file_path, directory=None): | 
|  | 142 | +        status = "200 OK" | 
|  | 143 | +        headers = [("Content-Type", self._get_content_type(file_path))] | 
|  | 144 | + | 
|  | 145 | +        full_path = file_path if not directory else directory + file_path | 
|  | 146 | + | 
|  | 147 | +        def resp_iter(): | 
|  | 148 | +            with open(full_path, "rb") as file: | 
|  | 149 | +                while True: | 
|  | 150 | +                    chunk = file.read(self.CHUNK_SIZE) | 
|  | 151 | +                    if chunk: | 
|  | 152 | +                        yield chunk | 
|  | 153 | +                    else: | 
|  | 154 | +                        break | 
|  | 155 | + | 
|  | 156 | +        return (status, headers, resp_iter()) | 
|  | 157 | + | 
|  | 158 | +    def _log_environ(self, environ):  # pylint: disable=no-self-use | 
|  | 159 | +        print("environ map:") | 
|  | 160 | +        for name, value in environ.items(): | 
|  | 161 | +            print(name, value) | 
|  | 162 | + | 
|  | 163 | +    def _get_listener_key(self, method, path):  # pylint: disable=no-self-use | 
|  | 164 | +        return "{0}|{1}".format(method.lower(), path) | 
|  | 165 | + | 
|  | 166 | +    def _get_content_type(self, file):  # pylint: disable=no-self-use | 
|  | 167 | +        ext = file.split(".")[-1] | 
|  | 168 | +        if ext in ("html", "htm"): | 
|  | 169 | +            return "text/html" | 
|  | 170 | +        if ext == "js": | 
|  | 171 | +            return "application/javascript" | 
|  | 172 | +        if ext == "css": | 
|  | 173 | +            return "text/css" | 
|  | 174 | +        if ext in ("jpg", "jpeg"): | 
|  | 175 | +            return "image/jpeg" | 
|  | 176 | +        if ext == "png": | 
|  | 177 | +            return "image/png" | 
|  | 178 | +        return "text/plain" | 
|  | 179 | + | 
|  | 180 | + | 
|  | 181 | +# Our HTTP Request handlers | 
|  | 182 | +def led_on(environ):  # pylint: disable=unused-argument | 
|  | 183 | +    print("led on!") | 
|  | 184 | +    status_light.fill((0, 0, 100)) | 
|  | 185 | +    return web_app.serve_file("static/index.html") | 
|  | 186 | + | 
|  | 187 | + | 
|  | 188 | +def led_off(environ):  # pylint: disable=unused-argument | 
|  | 189 | +    print("led off!") | 
|  | 190 | +    status_light.fill(0) | 
|  | 191 | +    return web_app.serve_file("static/index.html") | 
|  | 192 | + | 
|  | 193 | + | 
|  | 194 | +def led_color(environ):  # pylint: disable=unused-argument | 
|  | 195 | +    json = json_module.loads(environ["wsgi.input"].getvalue()) | 
|  | 196 | +    print(json) | 
|  | 197 | +    rgb_tuple = (json.get("r"), json.get("g"), json.get("b")) | 
|  | 198 | +    status_light.fill(rgb_tuple) | 
|  | 199 | +    return ("200 OK", [], []) | 
|  | 200 | + | 
|  | 201 | + | 
|  | 202 | +# Here we create our application, setting the static directory location | 
|  | 203 | +# and registering the above request_handlers for specific HTTP requests | 
|  | 204 | +# we want to listen and respond to. | 
|  | 205 | +static = "/static" | 
|  | 206 | +try: | 
|  | 207 | +    static_files = os.listdir(static) | 
|  | 208 | +    if "index.html" not in static_files: | 
|  | 209 | +        raise RuntimeError( | 
|  | 210 | +            """ | 
|  | 211 | +            This example depends on an index.html, but it isn't present. | 
|  | 212 | +            Please add it to the {0} directory""".format( | 
|  | 213 | +                static | 
|  | 214 | +            ) | 
|  | 215 | +        ) | 
|  | 216 | +except OSError as e: | 
|  | 217 | +    raise RuntimeError( | 
|  | 218 | +        """ | 
|  | 219 | +        This example depends on a static asset directory. | 
|  | 220 | +        Please create one named {0} in the root of the device filesystem.""".format( | 
|  | 221 | +            static | 
|  | 222 | +        ) | 
|  | 223 | +    ) from e | 
|  | 224 | + | 
|  | 225 | +web_app = SimpleWSGIApplication(static_dir=static) | 
|  | 226 | +web_app.on("GET", "/led_on", led_on) | 
|  | 227 | +web_app.on("GET", "/led_off", led_off) | 
|  | 228 | +web_app.on("POST", "/ajax/ledcolor", led_color) | 
|  | 229 | + | 
|  | 230 | +# Here we setup our server, passing in our web_app as the application | 
|  | 231 | +server.set_interface(esp) | 
|  | 232 | +wsgiServer = server.WSGIServer(80, application=web_app) | 
|  | 233 | + | 
|  | 234 | +print("open this IP in your browser: ", esp.pretty_ip(esp.ip_address)) | 
|  | 235 | + | 
|  | 236 | +# Start the server | 
|  | 237 | +wsgiServer.start() | 
|  | 238 | +while True: | 
|  | 239 | +    # Our main loop where we have the server poll for incoming requests | 
|  | 240 | +    try: | 
|  | 241 | +        wsgiServer.update_poll() | 
|  | 242 | +        # Could do any other background tasks here, like reading sensors | 
|  | 243 | +    except OSError as e: | 
|  | 244 | +        print("Failed to update server, restarting ESP32\n", e) | 
|  | 245 | +        wifi.reset() | 
|  | 246 | +        continue | 
0 commit comments