diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..89cc49c --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.pio +.vscode/.browse.c_cpp.db* +.vscode/c_cpp_properties.json +.vscode/launch.json +.vscode/ipch diff --git a/.vscode/extensions.json b/.vscode/extensions.json new file mode 100644 index 0000000..0f0d740 --- /dev/null +++ b/.vscode/extensions.json @@ -0,0 +1,7 @@ +{ + // See http://go.microsoft.com/fwlink/?LinkId=827846 + // for the documentation about the extensions.json format + "recommendations": [ + "platformio.platformio-ide" + ] +} diff --git a/Demo.gif b/Demo.gif new file mode 100644 index 0000000..7ca67ed Binary files /dev/null and b/Demo.gif differ diff --git a/HW.jpg b/HW.jpg new file mode 100755 index 0000000..4c40099 Binary files /dev/null and b/HW.jpg differ diff --git a/README.md b/README.md index 5cf039a..b584ad8 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,28 @@ # esp8266_NeoPixel -MQTT NeoPixel control using a esp8266 based board + +## About +MQTT NeoPixel control using a esp8266 based board (ESP-01S). Platformio project file for convenience. + +## Requirements ++ ESP-01S board ++ ESP-01 ESP-01S RGB LED Controller Adapter ++ WS2812 WS2812B n Bits Light Ring (I used 16 Bits) ++ 3-5V supply or battery + +![HW](HW.jpg) + ++ [Visual Studio Code](https://code.visualstudio.com/) (or Codium) with: + + [PlatformIO extension](https://platformio.org/) with platform Espressif 8266 installed (configuration for the board in [platformio.ini](platformio.ini)) + +## Demo +(Tested with MQTT_Dashboard) + +![Demo](Demo.gif) + +## References and links ++ Libraries and code based on examples from the Unoficial Develpment Kit for Espressif ESP8266, available at https://programs74.ru/udkew-en.html ++ Adafruit NeoPixel Arduino Library available at: https://github.com/adafruit/Adafruit_NeoPixel + +## TODO ++ Implement initial SSID and password configuration through serial port ++ Standalone light effects diff --git a/include/README b/include/README new file mode 100644 index 0000000..194dcd4 --- /dev/null +++ b/include/README @@ -0,0 +1,39 @@ + +This directory is intended for project header files. + +A header file is a file containing C declarations and macro definitions +to be shared between several project source files. You request the use of a +header file in your project source file (C, C++, etc) located in `src` folder +by including it, with the C preprocessing directive `#include'. + +```src/main.c + +#include "header.h" + +int main (void) +{ + ... +} +``` + +Including a header file produces the same results as copying the header file +into each source file that needs it. Such copying would be time-consuming +and error-prone. With a header file, the related declarations appear +in only one place. If they need to be changed, they can be changed in one +place, and programs that include the header file will automatically use the +new version when next recompiled. The header file eliminates the labor of +finding and changing all the copies as well as the risk that a failure to +find one copy will result in inconsistencies within a program. + +In C, the usual convention is to give header files names that end with `.h'. +It is most portable to use only letters, digits, dashes, and underscores in +header file names, and at most one dot. + +Read more about using header files in official GCC documentation: + +* Include Syntax +* Include Operation +* Once-Only Headers +* Computed Includes + +https://gcc.gnu.org/onlinedocs/cpp/Header-Files.html diff --git a/lib/README b/lib/README new file mode 100644 index 0000000..6debab1 --- /dev/null +++ b/lib/README @@ -0,0 +1,46 @@ + +This directory is intended for project specific (private) libraries. +PlatformIO will compile them to static libraries and link into executable file. + +The source code of each library should be placed in a an own separate directory +("lib/your_library_name/[here are source files]"). + +For example, see a structure of the following two libraries `Foo` and `Bar`: + +|--lib +| | +| |--Bar +| | |--docs +| | |--examples +| | |--src +| | |- Bar.c +| | |- Bar.h +| | |- library.json (optional, custom build options, etc) https://docs.platformio.org/page/librarymanager/config.html +| | +| |--Foo +| | |- Foo.c +| | |- Foo.h +| | +| |- README --> THIS FILE +| +|- platformio.ini +|--src + |- main.c + +and a contents of `src/main.c`: +``` +#include +#include + +int main (void) +{ + ... +} + +``` + +PlatformIO Library Dependency Finder will find automatically dependent +libraries scanning project source files. + +More information about PlatformIO Library Dependency Finder +- https://docs.platformio.org/page/librarymanager/ldf.html diff --git a/lib/httpclient/httpclient.c b/lib/httpclient/httpclient.c new file mode 100755 index 0000000..93f61f5 --- /dev/null +++ b/lib/httpclient/httpclient.c @@ -0,0 +1,498 @@ +/* + * ---------------------------------------------------------------------------- + * "THE BEER-WARE LICENSE" (Revision 42): + * Martin d'Allens wrote this file. As long as you retain + * this notice you can do whatever you want with this stuff. If we meet some day, + * and you think this stuff is worth it, you can buy me a beer in return. + * ---------------------------------------------------------------------------- + */ + +// FIXME: sprintf->snprintf everywhere. +// FIXME: support null characters in responses. + +#include "osapi.h" +#include "user_interface.h" +#include "espconn.h" +#include "mem.h" +#include "limits.h" +#include "httpclient.h" + + +// Debug output. +#ifdef HTTP_DEBUG +#undef HTTP_DEBUG +#define HTTP_DEBUG(...) os_printf(__VA_ARGS__); +#else +#define HTTP_DEBUG(...) +#endif + +// Internal state. +typedef struct { + char * path; + int port; + char * post_data; + char * headers; + char * hostname; + char * buffer; + int buffer_size; + bool secure; + http_callback user_callback; +} request_args; + +static char * ICACHE_FLASH_ATTR esp_strdup(const char * str) +{ + if (str == NULL) { + return NULL; + } + char * new_str = (char *)os_malloc(os_strlen(str) + 1); // 1 for null character + if (new_str == NULL) { + os_printf("esp_strdup: malloc error"); + return NULL; + } + os_strcpy(new_str, str); + return new_str; +} + +static int ICACHE_FLASH_ATTR +esp_isupper(char c) +{ + return (c >= 'A' && c <= 'Z'); +} + +static int ICACHE_FLASH_ATTR +esp_isalpha(char c) +{ + return ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')); +} + + +static int ICACHE_FLASH_ATTR +esp_isspace(char c) +{ + return (c == ' ' || c == '\t' || c == '\n' || c == '\12'); +} + +static int ICACHE_FLASH_ATTR +esp_isdigit(char c) +{ + return (c >= '0' && c <= '9'); +} + +/* + * Convert a string to a long integer. + * + * Ignores `locale' stuff. Assumes that the upper and lower case + * alphabets and digits are each contiguous. + */ +long ICACHE_FLASH_ATTR +esp_strtol(nptr, endptr, base) + const char *nptr; + char **endptr; + int base; +{ + const char *s = nptr; + unsigned long acc; + int c; + unsigned long cutoff; + int neg = 0, any, cutlim; + + /* + * Skip white space and pick up leading +/- sign if any. + * If base is 0, allow 0x for hex and 0 for octal, else + * assume decimal; if base is already 16, allow 0x. + */ + do { + c = *s++; + } while (esp_isspace(c)); + if (c == '-') { + neg = 1; + c = *s++; + } else if (c == '+') + c = *s++; + if ((base == 0 || base == 16) && + c == '0' && (*s == 'x' || *s == 'X')) { + c = s[1]; + s += 2; + base = 16; + } else if ((base == 0 || base == 2) && + c == '0' && (*s == 'b' || *s == 'B')) { + c = s[1]; + s += 2; + base = 2; + } + if (base == 0) + base = c == '0' ? 8 : 10; + + /* + * Compute the cutoff value between legal numbers and illegal + * numbers. That is the largest legal value, divided by the + * base. An input number that is greater than this value, if + * followed by a legal input character, is too big. One that + * is equal to this value may be valid or not; the limit + * between valid and invalid numbers is then based on the last + * digit. For instance, if the range for longs is + * [-2147483648..2147483647] and the input base is 10, + * cutoff will be set to 214748364 and cutlim to either + * 7 (neg==0) or 8 (neg==1), meaning that if we have accumulated + * a value > 214748364, or equal but the next digit is > 7 (or 8), + * the number is too big, and we will return a range error. + * + * Set any if any `digits' consumed; make it negative to indicate + * overflow. + */ + cutoff = neg ? -(unsigned long)LONG_MIN : LONG_MAX; + cutlim = cutoff % (unsigned long)base; + cutoff /= (unsigned long)base; + for (acc = 0, any = 0;; c = *s++) { + if (esp_isdigit(c)) + c -= '0'; + else if (esp_isalpha(c)) + c -= esp_isupper(c) ? 'A' - 10 : 'a' - 10; + else + break; + if (c >= base) + break; + if (any < 0 || acc > cutoff || acc == cutoff && c > cutlim) + any = -1; + else { + any = 1; + acc *= base; + acc += c; + } + } + if (any < 0) { + acc = neg ? LONG_MIN : LONG_MAX; +// errno = ERANGE; + } else if (neg) + acc = -acc; + if (endptr != 0) + *endptr = (char *)(any ? s - 1 : nptr); + return (acc); +} + +static int ICACHE_FLASH_ATTR chunked_decode(const char * chunked, char * decode) +{ + int i = 0, j = 0; + int decode_size = 0; + char *str = (char *)chunked; + do + { + char * endstr = NULL; + //[chunk-size] + i = esp_strtol(str + j, endstr, 16); + HTTP_DEBUG("Chunk Size:%d\r\n", i); + if (i <= 0) + break; + //[chunk-size-end-ptr] + endstr = (char *)os_strstr(str + j, "\r\n"); + //[chunk-ext] + j += endstr - (str + j); + //[CRLF] + j += 2; + //[chunk-data] + decode_size += i; + os_memcpy((char *)&decode[decode_size - i], (char *)str + j, i); + j += i; + //[CRLF] + j += 2; + } while(true); + + // + //footer CRLF + // + + return j; +} + +static void ICACHE_FLASH_ATTR receive_callback(void * arg, char * buf, unsigned short len) +{ + struct espconn * conn = (struct espconn *)arg; + request_args * req = (request_args *)conn->reverse; + + if (req->buffer == NULL) { + return; + } + + // Let's do the equivalent of a realloc(). + const int new_size = req->buffer_size + len; + char * new_buffer; + if (new_size > BUFFER_SIZE_MAX || NULL == (new_buffer = (char *)os_malloc(new_size))) { + os_printf("Response too long (%d)\n", new_size); + req->buffer[0] = '\0'; // Discard the buffer to avoid using an incomplete response. + if (req->secure) + espconn_secure_disconnect(conn); + else + espconn_disconnect(conn); + return; // The disconnect callback will be called. + } + + os_memcpy(new_buffer, req->buffer, req->buffer_size); + os_memcpy(new_buffer + req->buffer_size - 1 /*overwrite the null character*/, buf, len); // Append new data. + new_buffer[new_size - 1] = '\0'; // Make sure there is an end of string. + + os_free(req->buffer); + req->buffer = new_buffer; + req->buffer_size = new_size; +} + +static void ICACHE_FLASH_ATTR sent_callback(void * arg) +{ + struct espconn * conn = (struct espconn *)arg; + request_args * req = (request_args *)conn->reverse; + + if (req->post_data == NULL) { + HTTP_DEBUG("All sent\n"); + } + else { + // The headers were sent, now send the contents. + HTTP_DEBUG("Sending request body\n"); + if (req->secure) + espconn_secure_sent(conn, (uint8_t *)req->post_data, strlen(req->post_data)); + else + espconn_sent(conn, (uint8_t *)req->post_data, strlen(req->post_data)); + os_free(req->post_data); + req->post_data = NULL; + } +} + +static void ICACHE_FLASH_ATTR connect_callback(void * arg) +{ + HTTP_DEBUG("Connected\n"); + struct espconn * conn = (struct espconn *)arg; + request_args * req = (request_args *)conn->reverse; + + espconn_regist_recvcb(conn, receive_callback); + espconn_regist_sentcb(conn, sent_callback); + + const char * method = "GET"; + char post_headers[32] = ""; + + if (req->post_data != NULL) { // If there is data this is a POST request. + method = "POST"; + os_sprintf(post_headers, "Content-Length: %d\r\n", strlen(req->post_data)); + } + + char buf[69 + strlen(method) + strlen(req->path) + strlen(req->hostname) + + strlen(req->headers) + strlen(post_headers)]; + int len = os_sprintf(buf, + "%s %s HTTP/1.1\r\n" + "Host: %s:%d\r\n" + "Connection: close\r\n" + "User-Agent: ESP8266\r\n" + "%s" + "%s" + "\r\n", + method, req->path, req->hostname, req->port, req->headers, post_headers); + + if (req->secure) + espconn_secure_sent(conn, (uint8_t *)buf, len); + else + espconn_sent(conn, (uint8_t *)buf, len); + os_free(req->headers); + req->headers = NULL; + HTTP_DEBUG("Sending request header\n"); +} + +static void ICACHE_FLASH_ATTR disconnect_callback(void * arg) +{ + HTTP_DEBUG("Disconnected\n"); + struct espconn *conn = (struct espconn *)arg; + + if(conn == NULL) { + return; + } + + if(conn->reverse != NULL) { + request_args * req = (request_args *)conn->reverse; + int http_status = -1; + char * body = ""; + if (req->buffer == NULL) { + os_printf("Buffer shouldn't be NULL\n"); + } + else if (req->buffer[0] != '\0') { + // FIXME: make sure this is not a partial response, using the Content-Length header. + + const char * version = "HTTP/1.1 "; + if (os_strncmp(req->buffer, version, strlen(version)) != 0) { + os_printf("Invalid version in %s\n", req->buffer); + } + else { + http_status = atoi(req->buffer + strlen(version)); + body = (char *)os_strstr(req->buffer, "\r\n\r\n") + 4; + if(os_strstr(req->buffer, "Transfer-Encoding: chunked")) + { + int body_size = req->buffer_size - (body - req->buffer); + char chunked_decode_buffer[body_size]; + os_memset(chunked_decode_buffer, 0, body_size); + // Chunked data + chunked_decode(body, chunked_decode_buffer); + os_memcpy(body, chunked_decode_buffer, body_size); + } + } + } + + if (req->user_callback != NULL) { // Callback is optional. + req->user_callback(body, http_status, req->buffer); + } + + os_free(req->buffer); + os_free(req->hostname); + os_free(req->path); + os_free(req); + } + espconn_delete(conn); + if(conn->proto.tcp != NULL) { + os_free(conn->proto.tcp); + } + os_free(conn); +} + +static void ICACHE_FLASH_ATTR error_callback(void *arg, sint8 errType) +{ + HTTP_DEBUG("Disconnected with error\n"); + disconnect_callback(arg); +} + +static void ICACHE_FLASH_ATTR dns_callback(const char * hostname, ip_addr_t * addr, void * arg) +{ + request_args * req = (request_args *)arg; + + if (addr == NULL) { + os_printf("DNS failed for %s\n", hostname); + if (req->user_callback != NULL) { + req->user_callback("", -1, ""); + } + os_free(req); + } + else { + HTTP_DEBUG("DNS found %s " IPSTR "\n", hostname, IP2STR(addr)); + + struct espconn * conn = (struct espconn *)os_malloc(sizeof(struct espconn)); + conn->type = ESPCONN_TCP; + conn->state = ESPCONN_NONE; + conn->proto.tcp = (esp_tcp *)os_malloc(sizeof(esp_tcp)); + conn->proto.tcp->local_port = espconn_port(); + conn->proto.tcp->remote_port = req->port; + conn->reverse = req; + + os_memcpy(conn->proto.tcp->remote_ip, addr, 4); + + espconn_regist_connectcb(conn, connect_callback); + espconn_regist_disconcb(conn, disconnect_callback); + espconn_regist_reconcb(conn, error_callback); + + if (req->secure) { + espconn_secure_set_size(ESPCONN_CLIENT,5120); // set SSL buffer size + espconn_secure_connect(conn); + } else { + espconn_connect(conn); + } + } +} + +void ICACHE_FLASH_ATTR http_raw_request(const char * hostname, int port, bool secure, const char * path, const char * post_data, const char * headers, http_callback user_callback) +{ + HTTP_DEBUG("DNS request\n"); + + request_args * req = (request_args *)os_malloc(sizeof(request_args)); + req->hostname = esp_strdup(hostname); + req->path = esp_strdup(path); + req->port = port; + req->secure = secure; + req->headers = esp_strdup(headers); + req->post_data = esp_strdup(post_data); + req->buffer_size = 1; + req->buffer = (char *)os_malloc(1); + req->buffer[0] = '\0'; // Empty string. + req->user_callback = user_callback; + + ip_addr_t addr; + err_t error = espconn_gethostbyname((struct espconn *)req, // It seems we don't need a real espconn pointer here. + hostname, &addr, dns_callback); + + if (error == ESPCONN_INPROGRESS) { + HTTP_DEBUG("DNS pending\n"); + } + else if (error == ESPCONN_OK) { + // Already in the local names table (or hostname was an IP address), execute the callback ourselves. + dns_callback(hostname, &addr, req); + } + else { + if (error == ESPCONN_ARG) { + os_printf("DNS arg error %s\n", hostname); + } + else { + os_printf("DNS error code %d\n", error); + } + dns_callback(hostname, NULL, req); // Handle all DNS errors the same way. + } +} + +/* + * Parse an URL of the form http://host:port/path + * can be a hostname or an IP address + * is optional + */ +void ICACHE_FLASH_ATTR http_post(const char * url, const char * post_data, const char * headers, http_callback user_callback) +{ + // FIXME: handle HTTP auth with http://user:pass@host/ + // FIXME: get rid of the #anchor part if present. + + char hostname[128] = ""; + int port = 80; + bool secure = false; + + bool is_http = os_strncmp(url, "http://", strlen("http://")) == 0; + bool is_https = os_strncmp(url, "https://", strlen("https://")) == 0; + + if (is_http) + url += strlen("http://"); // Get rid of the protocol. + else if (is_https) { + port = 443; + secure = true; + url += strlen("https://"); // Get rid of the protocol. + } else { + os_printf("URL is not HTTP or HTTPS %s\n", url); + return; + } + + char * path = os_strchr(url, '/'); + if (path == NULL) { + path = os_strchr(url, '\0'); // Pointer to end of string. + } + + char * colon = os_strchr(url, ':'); + if (colon > path) { + colon = NULL; // Limit the search to characters before the path. + } + + if (colon == NULL) { // The port is not present. + os_memcpy(hostname, url, path - url); + hostname[path - url] = '\0'; + } + else { + port = atoi(colon + 1); + if (port == 0) { + os_printf("Port error %s\n", url); + return; + } + + os_memcpy(hostname, url, colon - url); + hostname[colon - url] = '\0'; + } + + + if (path[0] == '\0') { // Empty path is not allowed. + path = "/"; + } + + HTTP_DEBUG("hostname=%s\n", hostname); + HTTP_DEBUG("port=%d\n", port); + HTTP_DEBUG("path=%s\n", path); + http_raw_request(hostname, port, secure, path, post_data, headers, user_callback); +} + +void ICACHE_FLASH_ATTR http_get(const char * url, const char * headers, http_callback user_callback) +{ + http_post(url, NULL, headers, user_callback); +} diff --git a/lib/httpclient/httpclient.h b/lib/httpclient/httpclient.h new file mode 100755 index 0000000..58af238 --- /dev/null +++ b/lib/httpclient/httpclient.h @@ -0,0 +1,45 @@ +/* + * ---------------------------------------------------------------------------- + * "THE BEER-WARE LICENSE" (Revision 42): + * Martin d'Allens wrote this file. As long as you retain + * this notice you can do whatever you want with this stuff. If we meet some day, + * and you think this stuff is worth it, you can buy me a beer in return. + * ---------------------------------------------------------------------------- + */ + +#ifndef HTTPCLIENT_H +#define HTTPCLIENT_H + +#define HTTP_STATUS_GENERIC_ERROR -1 // In case of TCP or DNS error the callback is called with this status. +#define BUFFER_SIZE_MAX 5000 // Size of http responses that will cause an error. + +/* + * "full_response" is a string containing all response headers and the response body. + * "response_body and "http_status" are extracted from "full_response" for convenience. + * + * A successful request corresponds to an HTTP status code of 200 (OK). + * More info at http://en.wikipedia.org/wiki/List_of_HTTP_status_codes + */ +typedef void (* http_callback)(char * response_body, int http_status, char * full_response); + +/* + * Download a web page from its URL. + * Try: + * http_get("http://wtfismyip.com/text", http_callback_example); + */ +void ICACHE_FLASH_ATTR http_get(const char * url, const char * headers, http_callback user_callback); + +/* + * Post data to a web form. + * The data should be encoded as application/x-www-form-urlencoded. + * Try: + * http_post("http://httpbin.org/post", "first_word=hello&second_word=world", http_callback_example); + */ +void ICACHE_FLASH_ATTR http_post(const char * url, const char * post_data, const char * headers, http_callback user_callback); + +/* + * Call this function to skip URL parsing if the arguments are already in separate variables. + */ +void ICACHE_FLASH_ATTR http_raw_request(const char * hostname, int port, bool secure, const char * path, const char * post_data, const char * headers, http_callback user_callback); + +#endif diff --git a/lib/mqtt/debug.h b/lib/mqtt/debug.h new file mode 100755 index 0000000..7494543 --- /dev/null +++ b/lib/mqtt/debug.h @@ -0,0 +1,15 @@ +/* + * debug.h + * + * Created on: Dec 4, 2014 + * Author: Minh + */ + +#ifndef USER_DEBUG_H_ +#define USER_DEBUG_H_ + +#ifndef INFO +#define INFO os_printf +#endif + +#endif /* USER_DEBUG_H_ */ diff --git a/lib/mqtt/mqtt.c b/lib/mqtt/mqtt.c new file mode 100755 index 0000000..0d2c6a9 --- /dev/null +++ b/lib/mqtt/mqtt.c @@ -0,0 +1,653 @@ +/* mqtt.c +* Protocol: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html +* +* Copyright (c) 2014-2015, Tuan PM +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions are met: +* +* * Redistributions of source code must retain the above copyright notice, +* this list of conditions and the following disclaimer. +* * Redistributions in binary form must reproduce the above copyright +* notice, this list of conditions and the following disclaimer in the +* documentation and/or other materials provided with the distribution. +* * Neither the name of Redis nor the names of its contributors may be used +* to endorse or promote products derived from this software without +* specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +* POSSIBILITY OF SUCH DAMAGE. +*/ + +#include "user_interface.h" +#include "osapi.h" +#include "espconn.h" +#include "os_type.h" +#include "mem.h" +#include "mqtt_msg.h" +#include "debug.h" +#include "user_config.h" +#include "mqtt.h" +#include "queue.h" + +#define MQTT_TASK_PRIO 0 +#define MQTT_TASK_QUEUE_SIZE 1 +#define MQTT_SEND_TIMOUT 5 + +#ifndef QUEUE_BUFFER_SIZE +#define QUEUE_BUFFER_SIZE 2048 +#endif + +unsigned char *default_certificate; +unsigned int default_certificate_len = 0; +unsigned char *default_private_key; +unsigned int default_private_key_len = 0; + +os_event_t mqtt_procTaskQueue[MQTT_TASK_QUEUE_SIZE]; + +LOCAL void ICACHE_FLASH_ATTR +mqtt_dns_found(const char *name, ip_addr_t *ipaddr, void *arg) +{ + struct espconn *pConn = (struct espconn *)arg; + MQTT_Client* client = (MQTT_Client *)pConn->reverse; + + + if(ipaddr == NULL) + { + INFO("DNS: Found, but got no ip, try to reconnect\r\n"); + client->connState = TCP_RECONNECT_REQ; + return; + } + + INFO("DNS: found ip %d.%d.%d.%d\n", + *((uint8 *) &ipaddr->addr), + *((uint8 *) &ipaddr->addr + 1), + *((uint8 *) &ipaddr->addr + 2), + *((uint8 *) &ipaddr->addr + 3)); + + if(client->ip.addr == 0 && ipaddr->addr != 0) + { + os_memcpy(client->pCon->proto.tcp->remote_ip, &ipaddr->addr, 4); + if(client->security){ + espconn_secure_connect(client->pCon); + } + else { + espconn_connect(client->pCon); + } + + client->connState = TCP_CONNECTING; + INFO("TCP: connecting...\r\n"); + } + + system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); +} + + + +LOCAL void ICACHE_FLASH_ATTR +deliver_publish(MQTT_Client* client, uint8_t* message, int length) +{ + mqtt_event_data_t event_data; + + event_data.topic_length = length; + event_data.topic = mqtt_get_publish_topic(message, &event_data.topic_length); + event_data.data_length = length; + event_data.data = mqtt_get_publish_data(message, &event_data.data_length); + + if(client->dataCb) + client->dataCb((uint32_t*)client, event_data.topic, event_data.topic_length, event_data.data, event_data.data_length); + +} + + +/** + * @brief Client received callback function. + * @param arg: contain the ip link information + * @param pdata: received data + * @param len: the lenght of received data + * @retval None + */ +void ICACHE_FLASH_ATTR +mqtt_tcpclient_recv(void *arg, char *pdata, unsigned short len) +{ + uint8_t msg_type; + uint8_t msg_qos; + uint16_t msg_id; + + struct espconn *pCon = (struct espconn*)arg; + MQTT_Client *client = (MQTT_Client *)pCon->reverse; + +READPACKET: + INFO("TCP: data received %d bytes\r\n", len); + if(len < MQTT_BUF_SIZE && len > 0){ + os_memcpy(client->mqtt_state.in_buffer, pdata, len); + + msg_type = mqtt_get_type(client->mqtt_state.in_buffer); + msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer); + msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.in_buffer_length); + switch(client->connState){ + case MQTT_CONNECT_SENDING: + if(msg_type == MQTT_MSG_TYPE_CONNACK){ + if(client->mqtt_state.pending_msg_type != MQTT_MSG_TYPE_CONNECT){ + INFO("MQTT: Invalid packet\r\n"); + if(client->security){ + espconn_secure_disconnect(client->pCon); + } + else { + espconn_disconnect(client->pCon); + } + } else { + INFO("MQTT: Connected to %s:%d\r\n", client->host, client->port); + client->connState = MQTT_DATA; + if(client->connectedCb) + client->connectedCb((uint32_t*)client); + } + + } + break; + case MQTT_DATA: + client->mqtt_state.message_length_read = len; + client->mqtt_state.message_length = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.message_length_read); + + + switch(msg_type) + { + + case MQTT_MSG_TYPE_SUBACK: + if(client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_SUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id) + INFO("MQTT: Subscribe successful\r\n"); + break; + case MQTT_MSG_TYPE_UNSUBACK: + if(client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_UNSUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id) + INFO("MQTT: UnSubscribe successful\r\n"); + break; + case MQTT_MSG_TYPE_PUBLISH: + if(msg_qos == 1) + client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id); + else if(msg_qos == 2) + client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id); + if(msg_qos == 1 || msg_qos == 2){ + INFO("MQTT: Queue response QoS: %d\r\n", msg_qos); + if(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){ + INFO("MQTT: Queue full\r\n"); + } + } + + deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length_read); + break; + case MQTT_MSG_TYPE_PUBACK: + if(client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id){ + INFO("MQTT: received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish\r\n"); + } + + break; + case MQTT_MSG_TYPE_PUBREC: + client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id); + if(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){ + INFO("MQTT: Queue full\r\n"); + } + break; + case MQTT_MSG_TYPE_PUBREL: + client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id); + if(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){ + INFO("MQTT: Queue full\r\n"); + } + break; + case MQTT_MSG_TYPE_PUBCOMP: + if(client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id){ + INFO("MQTT: receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish\r\n"); + } + break; + case MQTT_MSG_TYPE_PINGREQ: + client->mqtt_state.outbound_message = mqtt_msg_pingresp(&client->mqtt_state.mqtt_connection); + if(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){ + INFO("MQTT: Queue full\r\n"); + } + break; + case MQTT_MSG_TYPE_PINGRESP: + // Ignore + break; + } + // NOTE: this is done down here and not in the switch case above + // because the PSOCK_READBUF_LEN() won't work inside a switch + // statement due to the way protothreads resume. + if(msg_type == MQTT_MSG_TYPE_PUBLISH) + { + len = client->mqtt_state.message_length_read; + + if(client->mqtt_state.message_length < client->mqtt_state.message_length_read) + { + //client->connState = MQTT_PUBLISH_RECV; + //Not Implement yet + len -= client->mqtt_state.message_length; + pdata += client->mqtt_state.message_length; + + INFO("Get another published message\r\n"); + goto READPACKET; + } + + } + break; + } + } else { + INFO("ERROR: Message too long\r\n"); + } + system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); +} + +/** + * @brief Client send over callback function. + * @param arg: contain the ip link information + * @retval None + */ +void ICACHE_FLASH_ATTR +mqtt_tcpclient_sent_cb(void *arg) +{ + struct espconn *pCon = (struct espconn *)arg; + MQTT_Client* client = (MQTT_Client *)pCon->reverse; + INFO("TCP: Sent\r\n"); + client->sendTimeout = 0; + if(client->connState == MQTT_DATA && client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH){ + if(client->publishedCb) + client->publishedCb((uint32_t*)client); + } + system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); +} + +void ICACHE_FLASH_ATTR mqtt_timer(void *arg) +{ + MQTT_Client* client = (MQTT_Client*)arg; + + if(client->connState == MQTT_DATA){ + client->keepAliveTick ++; + if(client->keepAliveTick > client->mqtt_state.connect_info->keepalive){ + + INFO("\r\nMQTT: Send keepalive packet to %s:%d!\r\n", client->host, client->port); + client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection); + client->mqtt_state.pending_msg_type = MQTT_MSG_TYPE_PINGREQ; + client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); + client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); + + + client->sendTimeout = MQTT_SEND_TIMOUT; + INFO("MQTT: Sending, type: %d, id: %04X\r\n",client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id); + if(client->security){ + espconn_secure_sent(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); + } + else{ + espconn_sent(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); + } + + client->mqtt_state.outbound_message = NULL; + + client->keepAliveTick = 0; + system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); + } + + } else if(client->connState == TCP_RECONNECT_REQ){ + client->reconnectTick ++; + if(client->reconnectTick > MQTT_RECONNECT_TIMEOUT) { + client->reconnectTick = 0; + client->connState = TCP_RECONNECT; + system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); + } + } + if(client->sendTimeout > 0) + client->sendTimeout --; +} + +void ICACHE_FLASH_ATTR +mqtt_tcpclient_discon_cb(void *arg) +{ + + struct espconn *pespconn = (struct espconn *)arg; + MQTT_Client* client = (MQTT_Client *)pespconn->reverse; + INFO("TCP: Disconnected callback\r\n"); + client->connState = TCP_RECONNECT_REQ; + if(client->disconnectedCb) + client->disconnectedCb((uint32_t*)client); + + system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); +} + + + +/** + * @brief Tcp client connect success callback function. + * @param arg: contain the ip link information + * @retval None + */ +void ICACHE_FLASH_ATTR +mqtt_tcpclient_connect_cb(void *arg) +{ + struct espconn *pCon = (struct espconn *)arg; + MQTT_Client* client = (MQTT_Client *)pCon->reverse; + + espconn_regist_disconcb(client->pCon, mqtt_tcpclient_discon_cb); + espconn_regist_recvcb(client->pCon, mqtt_tcpclient_recv);//////// + espconn_regist_sentcb(client->pCon, mqtt_tcpclient_sent_cb);/////// + INFO("MQTT: Connected to broker %s:%d\r\n", client->host, client->port); + + mqtt_msg_init(&client->mqtt_state.mqtt_connection, client->mqtt_state.out_buffer, client->mqtt_state.out_buffer_length); + client->mqtt_state.outbound_message = mqtt_msg_connect(&client->mqtt_state.mqtt_connection, client->mqtt_state.connect_info); + client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data); + client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); + + + client->sendTimeout = MQTT_SEND_TIMOUT; + INFO("MQTT: Sending, type: %d, id: %04X\r\n",client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id); + if(client->security){ + espconn_secure_sent(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); + } + else{ + espconn_sent(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length); + } + + client->mqtt_state.outbound_message = NULL; + client->connState = MQTT_CONNECT_SENDING; + system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); +} + +/** + * @brief Tcp client connect repeat callback function. + * @param arg: contain the ip link information + * @retval None + */ +void ICACHE_FLASH_ATTR +mqtt_tcpclient_recon_cb(void *arg, sint8 errType) +{ + struct espconn *pCon = (struct espconn *)arg; + MQTT_Client* client = (MQTT_Client *)pCon->reverse; + + INFO("TCP: Reconnect to %s:%d\r\n", client->host, client->port); + + client->connState = TCP_RECONNECT_REQ; + + system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); + +} + +/** + * @brief MQTT publish function. + * @param client: MQTT_Client reference + * @param topic: string topic will publish to + * @param data: buffer data send point to + * @param data_length: length of data + * @param qos: qos + * @param retain: retain + * @retval TRUE if success queue + */ +BOOL ICACHE_FLASH_ATTR +MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int data_length, int qos, int retain) +{ + uint8_t dataBuffer[MQTT_BUF_SIZE]; + uint16_t dataLen; + client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection, + topic, data, data_length, + qos, retain, + &client->mqtt_state.pending_msg_id); + if(client->mqtt_state.outbound_message->length == 0){ + INFO("MQTT: Queuing publish failed\r\n"); + return FALSE; + } + INFO("MQTT: queuing publish, length: %d, queue size(%d/%d)\r\n", client->mqtt_state.outbound_message->length, client->msgQueue.rb.fill_cnt, client->msgQueue.rb.size); + while(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){ + INFO("MQTT: Queue full\r\n"); + if(QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) { + INFO("MQTT: Serious buffer error\r\n"); + return FALSE; + } + } + system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); + return TRUE; +} + +/** + * @brief MQTT subscibe function. + * @param client: MQTT_Client reference + * @param topic: string topic will subscribe + * @param qos: qos + * @retval TRUE if success queue + */ +BOOL ICACHE_FLASH_ATTR +MQTT_Subscribe(MQTT_Client *client, char* topic, uint8_t qos) +{ + uint8_t dataBuffer[MQTT_BUF_SIZE]; + uint16_t dataLen; + + client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection, + topic, 0, + &client->mqtt_state.pending_msg_id); + INFO("MQTT: queue subscribe, topic\"%s\", id: %d\r\n",topic, client->mqtt_state.pending_msg_id); + while(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){ + INFO("MQTT: Queue full\r\n"); + if(QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) { + INFO("MQTT: Serious buffer error\r\n"); + return FALSE; + } + } + system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client); + return TRUE; +} + +void ICACHE_FLASH_ATTR +MQTT_Task(os_event_t *e) +{ + MQTT_Client* client = (MQTT_Client*)e->par; + uint8_t dataBuffer[MQTT_BUF_SIZE]; + uint16_t dataLen; + if(e->par == 0) + return; + switch(client->connState){ + + case TCP_RECONNECT_REQ: + break; + case TCP_RECONNECT: + MQTT_Connect(client); + INFO("TCP: Reconnect to: %s:%d\r\n", client->host, client->port); + client->connState = TCP_CONNECTING; + break; + case MQTT_DATA: + if(QUEUE_IsEmpty(&client->msgQueue) || client->sendTimeout != 0) { + break; + } + if(QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == 0){ + client->mqtt_state.pending_msg_type = mqtt_get_type(dataBuffer); + client->mqtt_state.pending_msg_id = mqtt_get_id(dataBuffer, dataLen); + + + client->sendTimeout = MQTT_SEND_TIMOUT; + INFO("MQTT: Sending, type: %d, id: %04X\r\n",client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id); + if(client->security){ + espconn_secure_sent(client->pCon, dataBuffer, dataLen); + } + else{ + espconn_sent(client->pCon, dataBuffer, dataLen); + } + + client->mqtt_state.outbound_message = NULL; + break; + } + break; + } +} + +/** + * @brief MQTT initialization connection function + * @param client: MQTT_Client reference + * @param host: Domain or IP string + * @param port: Port to connect + * @param security: 1 for ssl, 0 for none + * @retval None + */ +void ICACHE_FLASH_ATTR +MQTT_InitConnection(MQTT_Client *mqttClient, uint8_t* host, uint32 port, uint8_t security) +{ + uint32_t temp; + INFO("MQTT_InitConnection\r\n"); + os_memset(mqttClient, 0, sizeof(MQTT_Client)); + temp = os_strlen(host); + mqttClient->host = (uint8_t*)os_zalloc(temp + 1); + os_strcpy(mqttClient->host, host); + mqttClient->host[temp] = 0; + mqttClient->port = port; + mqttClient->security = security; + +} + +/** + * @brief MQTT initialization mqtt client function + * @param client: MQTT_Client reference + * @param clientid: MQTT client id + * @param client_user:MQTT client user + * @param client_pass:MQTT client password + * @param client_pass:MQTT keep alive timer, in second + * @retval None + */ +void ICACHE_FLASH_ATTR +MQTT_InitClient(MQTT_Client *mqttClient, uint8_t* client_id, uint8_t* client_user, uint8_t* client_pass, uint32_t keepAliveTime, uint8_t cleanSession) +{ + uint32_t temp; + INFO("MQTT_InitClient\r\n"); + + os_memset(&mqttClient->connect_info, 0, sizeof(mqtt_connect_info_t)); + + temp = os_strlen(client_id); + mqttClient->connect_info.client_id = (uint8_t*)os_zalloc(temp + 1); + os_strcpy(mqttClient->connect_info.client_id, client_id); + mqttClient->connect_info.client_id[temp] = 0; + + temp = os_strlen(client_user); + mqttClient->connect_info.username = (uint8_t*)os_zalloc(temp + 1); + os_strcpy(mqttClient->connect_info.username, client_user); + mqttClient->connect_info.username[temp] = 0; + + temp = os_strlen(client_pass); + mqttClient->connect_info.password = (uint8_t*)os_zalloc(temp + 1); + os_strcpy(mqttClient->connect_info.password, client_pass); + mqttClient->connect_info.password[temp] = 0; + + + mqttClient->connect_info.keepalive = keepAliveTime; + mqttClient->connect_info.clean_session = cleanSession; + + mqttClient->mqtt_state.in_buffer = (uint8_t *)os_zalloc(MQTT_BUF_SIZE); + mqttClient->mqtt_state.in_buffer_length = MQTT_BUF_SIZE; + mqttClient->mqtt_state.out_buffer = (uint8_t *)os_zalloc(MQTT_BUF_SIZE); + mqttClient->mqtt_state.out_buffer_length = MQTT_BUF_SIZE; + mqttClient->mqtt_state.connect_info = &mqttClient->connect_info; + + mqtt_msg_init(&mqttClient->mqtt_state.mqtt_connection, mqttClient->mqtt_state.out_buffer, mqttClient->mqtt_state.out_buffer_length); + + QUEUE_Init(&mqttClient->msgQueue, QUEUE_BUFFER_SIZE); + + system_os_task(MQTT_Task, MQTT_TASK_PRIO, mqtt_procTaskQueue, MQTT_TASK_QUEUE_SIZE); + system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)mqttClient); +} +void ICACHE_FLASH_ATTR +MQTT_InitLWT(MQTT_Client *mqttClient, uint8_t* will_topic, uint8_t* will_msg, uint8_t will_qos, uint8_t will_retain) +{ + uint32_t temp; + temp = os_strlen(will_topic); + mqttClient->connect_info.will_topic = (uint8_t*)os_zalloc(temp + 1); + os_strcpy(mqttClient->connect_info.will_topic, will_topic); + mqttClient->connect_info.will_topic[temp] = 0; + + temp = os_strlen(will_msg); + mqttClient->connect_info.will_message = (uint8_t*)os_zalloc(temp + 1); + os_strcpy(mqttClient->connect_info.will_message, will_msg); + mqttClient->connect_info.will_message[temp] = 0; + + + mqttClient->connect_info.will_qos = will_qos; + mqttClient->connect_info.will_retain = will_retain; +} +/** + * @brief Begin connect to MQTT broker + * @param client: MQTT_Client reference + * @retval None + */ +void ICACHE_FLASH_ATTR +MQTT_Connect(MQTT_Client *mqttClient) +{ + MQTT_Disconnect(mqttClient); + mqttClient->pCon = (struct espconn *)os_zalloc(sizeof(struct espconn)); + mqttClient->pCon->type = ESPCONN_TCP; + mqttClient->pCon->state = ESPCONN_NONE; + mqttClient->pCon->proto.tcp = (esp_tcp *)os_zalloc(sizeof(esp_tcp)); + mqttClient->pCon->proto.tcp->local_port = espconn_port(); + mqttClient->pCon->proto.tcp->remote_port = mqttClient->port; + mqttClient->pCon->reverse = mqttClient; + espconn_regist_connectcb(mqttClient->pCon, mqtt_tcpclient_connect_cb); + espconn_regist_reconcb(mqttClient->pCon, mqtt_tcpclient_recon_cb); + + mqttClient->keepAliveTick = 0; + mqttClient->reconnectTick = 0; + + + os_timer_disarm(&mqttClient->mqttTimer); + os_timer_setfn(&mqttClient->mqttTimer, (os_timer_func_t *)mqtt_timer, mqttClient); + os_timer_arm(&mqttClient->mqttTimer, 1000, 1); + + if(UTILS_StrToIP(mqttClient->host, &mqttClient->pCon->proto.tcp->remote_ip)) { + INFO("TCP: Connect to ip %s:%d\r\n", mqttClient->host, mqttClient->port); + if(mqttClient->security){ + espconn_secure_connect(mqttClient->pCon); + } + else { + espconn_connect(mqttClient->pCon); + } + } + else { + INFO("TCP: Connect to domain %s:%d\r\n", mqttClient->host, mqttClient->port); + espconn_gethostbyname(mqttClient->pCon, mqttClient->host, &mqttClient->ip, mqtt_dns_found); + } + mqttClient->connState = TCP_CONNECTING; +} + +void ICACHE_FLASH_ATTR +MQTT_Disconnect(MQTT_Client *mqttClient) +{ + if(mqttClient->pCon){ + INFO("Free memory\r\n"); + if(mqttClient->pCon->proto.tcp) + os_free(mqttClient->pCon->proto.tcp); + os_free(mqttClient->pCon); + mqttClient->pCon = NULL; + } + + os_timer_disarm(&mqttClient->mqttTimer); +} +void ICACHE_FLASH_ATTR +MQTT_OnConnected(MQTT_Client *mqttClient, MqttCallback connectedCb) +{ + mqttClient->connectedCb = connectedCb; +} + +void ICACHE_FLASH_ATTR +MQTT_OnDisconnected(MQTT_Client *mqttClient, MqttCallback disconnectedCb) +{ + mqttClient->disconnectedCb = disconnectedCb; +} + +void ICACHE_FLASH_ATTR +MQTT_OnData(MQTT_Client *mqttClient, MqttDataCallback dataCb) +{ + mqttClient->dataCb = dataCb; +} + +void ICACHE_FLASH_ATTR +MQTT_OnPublished(MQTT_Client *mqttClient, MqttCallback publishedCb) +{ + mqttClient->publishedCb = publishedCb; +} diff --git a/lib/mqtt/mqtt.h b/lib/mqtt/mqtt.h new file mode 100755 index 0000000..0b9c4f9 --- /dev/null +++ b/lib/mqtt/mqtt.h @@ -0,0 +1,138 @@ +/* mqtt.h +* +* Copyright (c) 2014-2015, Tuan PM +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions are met: +* +* * Redistributions of source code must retain the above copyright notice, +* this list of conditions and the following disclaimer. +* * Redistributions in binary form must reproduce the above copyright +* notice, this list of conditions and the following disclaimer in the +* documentation and/or other materials provided with the distribution. +* * Neither the name of Redis nor the names of its contributors may be used +* to endorse or promote products derived from this software without +* specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +* POSSIBILITY OF SUCH DAMAGE. +*/ +#ifndef USER_AT_MQTT_H_ +#define USER_AT_MQTT_H_ +#include "mqtt_msg.h" +#include "user_interface.h" + +#include "queue.h" +typedef struct mqtt_event_data_t +{ + uint8_t type; + const char* topic; + const char* data; + uint16_t topic_length; + uint16_t data_length; + uint16_t data_offset; +} mqtt_event_data_t; + +typedef struct mqtt_state_t +{ + uint16_t port; + int auto_reconnect; + mqtt_connect_info_t* connect_info; + uint8_t* in_buffer; + uint8_t* out_buffer; + int in_buffer_length; + int out_buffer_length; + uint16_t message_length; + uint16_t message_length_read; + mqtt_message_t* outbound_message; + mqtt_connection_t mqtt_connection; + uint16_t pending_msg_id; + int pending_msg_type; + int pending_publish_qos; +} mqtt_state_t; + +typedef enum { + WIFI_INIT, + WIFI_CONNECTING, + WIFI_CONNECTING_ERROR, + WIFI_CONNECTED, + DNS_RESOLVE, + TCP_DISCONNECTED, + TCP_RECONNECT_REQ, + TCP_RECONNECT, + TCP_CONNECTING, + TCP_CONNECTING_ERROR, + TCP_CONNECTED, + MQTT_CONNECT_SEND, + MQTT_CONNECT_SENDING, + MQTT_SUBSCIBE_SEND, + MQTT_SUBSCIBE_SENDING, + MQTT_DATA, + MQTT_PUBLISH_RECV, + MQTT_PUBLISHING +} tConnState; + +typedef void (*MqttCallback)(uint32_t *args); +typedef void (*MqttDataCallback)(uint32_t *args, const char* topic, uint32_t topic_len, const char *data, uint32_t lengh); + +typedef struct { + struct espconn *pCon; + uint8_t security; + uint8_t* host; + uint32_t port; + ip_addr_t ip; + mqtt_state_t mqtt_state; + mqtt_connect_info_t connect_info; + MqttCallback connectedCb; + MqttCallback disconnectedCb; + MqttCallback publishedCb; + MqttDataCallback dataCb; + ETSTimer mqttTimer; + uint32_t keepAliveTick; + uint32_t reconnectTick; + uint32_t sendTimeout; + tConnState connState; + QUEUE msgQueue; + void* user_data; +} MQTT_Client; + +#define SEC_NONSSL 0 +#define SEC_SSL 1 + +#define MQTT_FLAG_CONNECTED 1 +#define MQTT_FLAG_READY 2 +#define MQTT_FLAG_EXIT 4 + +#define MQTT_EVENT_TYPE_NONE 0 +#define MQTT_EVENT_TYPE_CONNECTED 1 +#define MQTT_EVENT_TYPE_DISCONNECTED 2 +#define MQTT_EVENT_TYPE_SUBSCRIBED 3 +#define MQTT_EVENT_TYPE_UNSUBSCRIBED 4 +#define MQTT_EVENT_TYPE_PUBLISH 5 +#define MQTT_EVENT_TYPE_PUBLISHED 6 +#define MQTT_EVENT_TYPE_EXITED 7 +#define MQTT_EVENT_TYPE_PUBLISH_CONTINUATION 8 + +void ICACHE_FLASH_ATTR MQTT_InitConnection(MQTT_Client *mqttClient, uint8_t* host, uint32 port, uint8_t security); +void ICACHE_FLASH_ATTR MQTT_InitClient(MQTT_Client *mqttClient, uint8_t* client_id, uint8_t* client_user, uint8_t* client_pass, uint32_t keepAliveTime, uint8_t cleanSession); +void ICACHE_FLASH_ATTR MQTT_InitLWT(MQTT_Client *mqttClient, uint8_t* will_topic, uint8_t* will_msg, uint8_t will_qos, uint8_t will_retain); +void ICACHE_FLASH_ATTR MQTT_OnConnected(MQTT_Client *mqttClient, MqttCallback connectedCb); +void ICACHE_FLASH_ATTR MQTT_OnDisconnected(MQTT_Client *mqttClient, MqttCallback disconnectedCb); +void ICACHE_FLASH_ATTR MQTT_OnPublished(MQTT_Client *mqttClient, MqttCallback publishedCb); +void ICACHE_FLASH_ATTR MQTT_OnData(MQTT_Client *mqttClient, MqttDataCallback dataCb); +BOOL ICACHE_FLASH_ATTR MQTT_Subscribe(MQTT_Client *client, char* topic, uint8_t qos); +void ICACHE_FLASH_ATTR MQTT_Connect(MQTT_Client *mqttClient); +void ICACHE_FLASH_ATTR MQTT_Disconnect(MQTT_Client *mqttClient); +BOOL ICACHE_FLASH_ATTR MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int data_length, int qos, int retain); + +#endif /* USER_AT_MQTT_H_ */ diff --git a/lib/mqtt/mqtt_msg.c b/lib/mqtt/mqtt_msg.c new file mode 100755 index 0000000..4c28ac4 --- /dev/null +++ b/lib/mqtt/mqtt_msg.c @@ -0,0 +1,472 @@ +/* +* Copyright (c) 2014, Stephen Robinson +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions +* are met: +* +* 1. Redistributions of source code must retain the above copyright +* notice, this list of conditions and the following disclaimer. +* 2. Redistributions in binary form must reproduce the above copyright +* notice, this list of conditions and the following disclaimer in the +* documentation and/or other materials provided with the distribution. +* 3. Neither the name of the copyright holder nor the names of its +* contributors may be used to endorse or promote products derived +* from this software without specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +* POSSIBILITY OF SUCH DAMAGE. +* +*/ + +#include +#include "mqtt_msg.h" +#include "user_config.h" +#define MQTT_MAX_FIXED_HEADER_SIZE 3 + +enum mqtt_connect_flag +{ + MQTT_CONNECT_FLAG_USERNAME = 1 << 7, + MQTT_CONNECT_FLAG_PASSWORD = 1 << 6, + MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5, + MQTT_CONNECT_FLAG_WILL = 1 << 2, + MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1 +}; + +struct __attribute((__packed__)) mqtt_connect_variable_header +{ + uint8_t lengthMsb; + uint8_t lengthLsb; +#if defined(PROTOCOL_NAMEv31) + uint8_t magic[6]; +#elif defined(PROTOCOL_NAMEv311) + uint8_t magic[4]; +#else +#error "Please define protocol name" +#endif + uint8_t version; + uint8_t flags; + uint8_t keepaliveMsb; + uint8_t keepaliveLsb; +}; + +static int ICACHE_FLASH_ATTR append_string(mqtt_connection_t* connection, const char* string, int len) +{ + if(connection->message.length + len + 2 > connection->buffer_length) + return -1; + + connection->buffer[connection->message.length++] = len >> 8; + connection->buffer[connection->message.length++] = len & 0xff; + memcpy(connection->buffer + connection->message.length, string, len); + connection->message.length += len; + + return len + 2; +} + +static uint16_t ICACHE_FLASH_ATTR append_message_id(mqtt_connection_t* connection, uint16_t message_id) +{ + // If message_id is zero then we should assign one, otherwise + // we'll use the one supplied by the caller + while(message_id == 0) + message_id = ++connection->message_id; + + if(connection->message.length + 2 > connection->buffer_length) + return 0; + + connection->buffer[connection->message.length++] = message_id >> 8; + connection->buffer[connection->message.length++] = message_id & 0xff; + + return message_id; +} + +static int ICACHE_FLASH_ATTR init_message(mqtt_connection_t* connection) +{ + connection->message.length = MQTT_MAX_FIXED_HEADER_SIZE; + return MQTT_MAX_FIXED_HEADER_SIZE; +} + +static mqtt_message_t* ICACHE_FLASH_ATTR fail_message(mqtt_connection_t* connection) +{ + connection->message.data = connection->buffer; + connection->message.length = 0; + return &connection->message; +} + +static mqtt_message_t* ICACHE_FLASH_ATTR fini_message(mqtt_connection_t* connection, int type, int dup, int qos, int retain) +{ + int remaining_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE; + + if(remaining_length > 127) + { + connection->buffer[0] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1); + connection->buffer[1] = 0x80 | (remaining_length % 128); + connection->buffer[2] = remaining_length / 128; + connection->message.length = remaining_length + 3; + connection->message.data = connection->buffer; + } + else + { + connection->buffer[1] = ((type & 0x0f) << 4) | ((dup & 1) << 3) | ((qos & 3) << 1) | (retain & 1); + connection->buffer[2] = remaining_length; + connection->message.length = remaining_length + 2; + connection->message.data = connection->buffer + 1; + } + + return &connection->message; +} + +void ICACHE_FLASH_ATTR mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length) +{ + memset(connection, 0, sizeof(connection)); + connection->buffer = buffer; + connection->buffer_length = buffer_length; +} + +int ICACHE_FLASH_ATTR mqtt_get_total_length(uint8_t* buffer, uint16_t length) +{ + int i; + int totlen = 0; + + for(i = 1; i < length; ++i) + { + totlen += (buffer[i] & 0x7f) << (7 * (i - 1)); + if((buffer[i] & 0x80) == 0) + { + ++i; + break; + } + } + totlen += i; + + return totlen; +} + +const char* ICACHE_FLASH_ATTR mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length) +{ + int i; + int totlen = 0; + int topiclen; + + for(i = 1; i < *length; ++i) + { + totlen += (buffer[i] & 0x7f) << (7 * (i -1)); + if((buffer[i] & 0x80) == 0) + { + ++i; + break; + } + } + totlen += i; + + if(i + 2 >= *length) + return NULL; + topiclen = buffer[i++] << 8; + topiclen |= buffer[i++]; + + if(i + topiclen > *length) + return NULL; + + *length = topiclen; + return (const char*)(buffer + i); +} + +const char* ICACHE_FLASH_ATTR mqtt_get_publish_data(uint8_t* buffer, uint16_t* length) +{ + int i; + int totlen = 0; + int topiclen; + + for(i = 1; i < *length; ++i) + { + totlen += (buffer[i] & 0x7f) << (7 * (i - 1)); + if((buffer[i] & 0x80) == 0) + { + ++i; + break; + } + } + totlen += i; + + if(i + 2 >= *length) + return NULL; + topiclen = buffer[i++] << 8; + topiclen |= buffer[i++]; + + if(i + topiclen >= *length){ + *length = 0; + return NULL; + } + i += topiclen; + + if(mqtt_get_qos(buffer) > 0) + { + if(i + 2 >= *length) + return NULL; + i += 2; + } + + if(totlen < i) + return NULL; + + if(totlen <= *length) + *length = totlen - i; + else + *length = *length - i; + return (const char*)(buffer + i); +} + +uint16_t ICACHE_FLASH_ATTR mqtt_get_id(uint8_t* buffer, uint16_t length) +{ + if(length < 1) + return 0; + + switch(mqtt_get_type(buffer)) + { + case MQTT_MSG_TYPE_PUBLISH: + { + int i; + int topiclen; + + for(i = 1; i < length; ++i) + { + if((buffer[i] & 0x80) == 0) + { + ++i; + break; + } + } + + if(i + 2 >= length) + return 0; + topiclen = buffer[i++] << 8; + topiclen |= buffer[i++]; + + if(i + topiclen >= length) + return 0; + i += topiclen; + + if(mqtt_get_qos(buffer) > 0) + { + if(i + 2 >= length) + return 0; + //i += 2; + } else { + return 0; + } + + return (buffer[i] << 8) | buffer[i + 1]; + } + case MQTT_MSG_TYPE_PUBACK: + case MQTT_MSG_TYPE_PUBREC: + case MQTT_MSG_TYPE_PUBREL: + case MQTT_MSG_TYPE_PUBCOMP: + case MQTT_MSG_TYPE_SUBACK: + case MQTT_MSG_TYPE_UNSUBACK: + case MQTT_MSG_TYPE_SUBSCRIBE: + { + // This requires the remaining length to be encoded in 1 byte, + // which it should be. + if(length >= 4 && (buffer[1] & 0x80) == 0) + return (buffer[2] << 8) | buffer[3]; + else + return 0; + } + + default: + return 0; + } +} + +mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info) +{ + struct mqtt_connect_variable_header* variable_header; + + init_message(connection); + + if(connection->message.length + sizeof(*variable_header) > connection->buffer_length) + return fail_message(connection); + variable_header = (void*)(connection->buffer + connection->message.length); + connection->message.length += sizeof(*variable_header); + + variable_header->lengthMsb = 0; +#if defined(PROTOCOL_NAMEv31) + variable_header->lengthLsb = 6; + memcpy(variable_header->magic, "MQIsdp", 6); + variable_header->version = 3; +#elif defined(PROTOCOL_NAMEv311) + variable_header->lengthLsb = 4; + memcpy(variable_header->magic, "MQTT", 4); + variable_header->version = 4; +#else +#error "Please define protocol name" +#endif + + variable_header->flags = 0; + variable_header->keepaliveMsb = info->keepalive >> 8; + variable_header->keepaliveLsb = info->keepalive & 0xff; + + if(info->clean_session) + variable_header->flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION; + + if(info->client_id != NULL && info->client_id[0] != '\0') + { + if(append_string(connection, info->client_id, strlen(info->client_id)) < 0) + return fail_message(connection); + } + else + return fail_message(connection); + + if(info->will_topic != NULL && info->will_topic[0] != '\0') + { + if(append_string(connection, info->will_topic, strlen(info->will_topic)) < 0) + return fail_message(connection); + + if(append_string(connection, info->will_message, strlen(info->will_message)) < 0) + return fail_message(connection); + + variable_header->flags |= MQTT_CONNECT_FLAG_WILL; + if(info->will_retain) + variable_header->flags |= MQTT_CONNECT_FLAG_WILL_RETAIN; + variable_header->flags |= (info->will_qos & 3) << 3; + } + + if(info->username != NULL && info->username[0] != '\0') + { + if(append_string(connection, info->username, strlen(info->username)) < 0) + return fail_message(connection); + + variable_header->flags |= MQTT_CONNECT_FLAG_USERNAME; + } + + if(info->password != NULL && info->password[0] != '\0') + { + if(append_string(connection, info->password, strlen(info->password)) < 0) + return fail_message(connection); + + variable_header->flags |= MQTT_CONNECT_FLAG_PASSWORD; + } + + return fini_message(connection, MQTT_MSG_TYPE_CONNECT, 0, 0, 0); +} + +mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id) +{ + init_message(connection); + + if(topic == NULL || topic[0] == '\0') + return fail_message(connection); + + if(append_string(connection, topic, strlen(topic)) < 0) + return fail_message(connection); + + if(qos > 0) + { + if((*message_id = append_message_id(connection, 0)) == 0) + return fail_message(connection); + } + else + *message_id = 0; + + if(connection->message.length + data_length > connection->buffer_length) + return fail_message(connection); + memcpy(connection->buffer + connection->message.length, data, data_length); + connection->message.length += data_length; + + return fini_message(connection, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain); +} + +mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_puback(mqtt_connection_t* connection, uint16_t message_id) +{ + init_message(connection); + if(append_message_id(connection, message_id) == 0) + return fail_message(connection); + return fini_message(connection, MQTT_MSG_TYPE_PUBACK, 0, 0, 0); +} + +mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pubrec(mqtt_connection_t* connection, uint16_t message_id) +{ + init_message(connection); + if(append_message_id(connection, message_id) == 0) + return fail_message(connection); + return fini_message(connection, MQTT_MSG_TYPE_PUBREC, 0, 0, 0); +} + +mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pubrel(mqtt_connection_t* connection, uint16_t message_id) +{ + init_message(connection); + if(append_message_id(connection, message_id) == 0) + return fail_message(connection); + return fini_message(connection, MQTT_MSG_TYPE_PUBREL, 0, 1, 0); +} + +mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pubcomp(mqtt_connection_t* connection, uint16_t message_id) +{ + init_message(connection); + if(append_message_id(connection, message_id) == 0) + return fail_message(connection); + return fini_message(connection, MQTT_MSG_TYPE_PUBCOMP, 0, 0, 0); +} + +mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_subscribe(mqtt_connection_t* connection, const char* topic, int qos, uint16_t* message_id) +{ + init_message(connection); + + if(topic == NULL || topic[0] == '\0') + return fail_message(connection); + + if((*message_id = append_message_id(connection, 0)) == 0) + return fail_message(connection); + + if(append_string(connection, topic, strlen(topic)) < 0) + return fail_message(connection); + + if(connection->message.length + 1 > connection->buffer_length) + return fail_message(connection); + connection->buffer[connection->message.length++] = qos; + + return fini_message(connection, MQTT_MSG_TYPE_SUBSCRIBE, 0, 1, 0); +} + +mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_unsubscribe(mqtt_connection_t* connection, const char* topic, uint16_t* message_id) +{ + init_message(connection); + + if(topic == NULL || topic[0] == '\0') + return fail_message(connection); + + if((*message_id = append_message_id(connection, 0)) == 0) + return fail_message(connection); + + if(append_string(connection, topic, strlen(topic)) < 0) + return fail_message(connection); + + return fini_message(connection, MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0); +} + +mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pingreq(mqtt_connection_t* connection) +{ + init_message(connection); + return fini_message(connection, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0); +} + +mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pingresp(mqtt_connection_t* connection) +{ + init_message(connection); + return fini_message(connection, MQTT_MSG_TYPE_PINGRESP, 0, 0, 0); +} + +mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_disconnect(mqtt_connection_t* connection) +{ + init_message(connection); + return fini_message(connection, MQTT_MSG_TYPE_DISCONNECT, 0, 0, 0); +} diff --git a/lib/mqtt/mqtt_msg.h b/lib/mqtt/mqtt_msg.h new file mode 100755 index 0000000..a3f25ad --- /dev/null +++ b/lib/mqtt/mqtt_msg.h @@ -0,0 +1,129 @@ +/* + * File: mqtt_msg.h + * Author: Minh Tuan + * + * Created on July 12, 2014, 1:05 PM + */ + +#ifndef MQTT_MSG_H +#define MQTT_MSG_H +#include "c_types.h" +#ifdef __cplusplus +extern "C" { +#endif + +/* +* Copyright (c) 2014, Stephen Robinson +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions +* are met: +* +* 1. Redistributions of source code must retain the above copyright +* notice, this list of conditions and the following disclaimer. +* 2. Redistributions in binary form must reproduce the above copyright +* notice, this list of conditions and the following disclaimer in the +* documentation and/or other materials provided with the distribution. +* 3. Neither the name of the copyright holder nor the names of its +* contributors may be used to endorse or promote products derived +* from this software without specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +* POSSIBILITY OF SUCH DAMAGE. +* +*/ +/* 7 6 5 4 3 2 1 0*/ +/*| --- Message Type---- | DUP Flag | QoS Level | Retain | +/* Remaining Length */ + + +enum mqtt_message_type +{ + MQTT_MSG_TYPE_CONNECT = 1, + MQTT_MSG_TYPE_CONNACK = 2, + MQTT_MSG_TYPE_PUBLISH = 3, + MQTT_MSG_TYPE_PUBACK = 4, + MQTT_MSG_TYPE_PUBREC = 5, + MQTT_MSG_TYPE_PUBREL = 6, + MQTT_MSG_TYPE_PUBCOMP = 7, + MQTT_MSG_TYPE_SUBSCRIBE = 8, + MQTT_MSG_TYPE_SUBACK = 9, + MQTT_MSG_TYPE_UNSUBSCRIBE = 10, + MQTT_MSG_TYPE_UNSUBACK = 11, + MQTT_MSG_TYPE_PINGREQ = 12, + MQTT_MSG_TYPE_PINGRESP = 13, + MQTT_MSG_TYPE_DISCONNECT = 14 +}; + +typedef struct mqtt_message +{ + uint8_t* data; + uint16_t length; + +} mqtt_message_t; + +typedef struct mqtt_connection +{ + mqtt_message_t message; + + uint16_t message_id; + uint8_t* buffer; + uint16_t buffer_length; + +} mqtt_connection_t; + +typedef struct mqtt_connect_info +{ + char* client_id; + char* username; + char* password; + char* will_topic; + char* will_message; + int keepalive; + int will_qos; + int will_retain; + int clean_session; + +} mqtt_connect_info_t; + + +static inline int ICACHE_FLASH_ATTR mqtt_get_type(uint8_t* buffer) { return (buffer[0] & 0xf0) >> 4; } +static inline int ICACHE_FLASH_ATTR mqtt_get_dup(uint8_t* buffer) { return (buffer[0] & 0x08) >> 3; } +static inline int ICACHE_FLASH_ATTR mqtt_get_qos(uint8_t* buffer) { return (buffer[0] & 0x06) >> 1; } +static inline int ICACHE_FLASH_ATTR mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); } + +void ICACHE_FLASH_ATTR mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length); +int ICACHE_FLASH_ATTR mqtt_get_total_length(uint8_t* buffer, uint16_t length); +const char* ICACHE_FLASH_ATTR mqtt_get_publish_topic(uint8_t* buffer, uint16_t* length); +const char* ICACHE_FLASH_ATTR mqtt_get_publish_data(uint8_t* buffer, uint16_t* length); +uint16_t ICACHE_FLASH_ATTR mqtt_get_id(uint8_t* buffer, uint16_t length); + +mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info); +mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_publish(mqtt_connection_t* connection, const char* topic, const char* data, int data_length, int qos, int retain, uint16_t* message_id); +mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_puback(mqtt_connection_t* connection, uint16_t message_id); +mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pubrec(mqtt_connection_t* connection, uint16_t message_id); +mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pubrel(mqtt_connection_t* connection, uint16_t message_id); +mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pubcomp(mqtt_connection_t* connection, uint16_t message_id); +mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_subscribe(mqtt_connection_t* connection, const char* topic, int qos, uint16_t* message_id); +mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_unsubscribe(mqtt_connection_t* connection, const char* topic, uint16_t* message_id); +mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pingreq(mqtt_connection_t* connection); +mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_pingresp(mqtt_connection_t* connection); +mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_disconnect(mqtt_connection_t* connection); + + +#ifdef __cplusplus +} +#endif + +#endif /* MQTT_MSG_H */ + diff --git a/lib/mqtt/proto.c b/lib/mqtt/proto.c new file mode 100755 index 0000000..3e6b102 --- /dev/null +++ b/lib/mqtt/proto.c @@ -0,0 +1,129 @@ +#include "proto.h" +#include "ringbuf.h" +I8 ICACHE_FLASH_ATTR PROTO_Init(PROTO_PARSER *parser, PROTO_PARSE_CALLBACK *completeCallback, U8 *buf, U16 bufSize) +{ + parser->buf = buf; + parser->bufSize = bufSize; + parser->dataLen = 0; + parser->callback = completeCallback; + parser->isEsc = 0; + return 0; +} + +I8 ICACHE_FLASH_ATTR PROTO_ParseByte(PROTO_PARSER *parser, U8 value) +{ + switch(value){ + case 0x7D: + parser->isEsc = 1; + break; + + case 0x7E: + parser->dataLen = 0; + parser->isEsc = 0; + parser->isBegin = 1; + break; + + case 0x7F: + if (parser->callback != NULL) + parser->callback(); + parser->isBegin = 0; + return 0; + break; + + default: + if(parser->isBegin == 0) break; + + if(parser->isEsc){ + value ^= 0x20; + parser->isEsc = 0; + } + + if(parser->dataLen < parser->bufSize) + parser->buf[parser->dataLen++] = value; + + break; + } + return -1; +} + +I8 ICACHE_FLASH_ATTR PROTO_Parse(PROTO_PARSER *parser, U8 *buf, U16 len) +{ + while(len--) + PROTO_ParseByte(parser, *buf++); + + return 0; +} +I16 ICACHE_FLASH_ATTR PROTO_ParseRb(RINGBUF* rb, U8 *bufOut, U16* len, U16 maxBufLen) +{ + U8 c; + + PROTO_PARSER proto; + PROTO_Init(&proto, NULL, bufOut, maxBufLen); + while(RINGBUF_Get(rb, &c) == 0){ + if(PROTO_ParseByte(&proto, c) == 0){ + *len = proto.dataLen; + return 0; + } + } + return -1; +} +I16 ICACHE_FLASH_ATTR PROTO_Add(U8 *buf, const U8 *packet, I16 bufSize) +{ + U16 i = 2; + U16 len = *(U16*) packet; + + if (bufSize < 1) return -1; + + *buf++ = 0x7E; + bufSize--; + + while (len--) { + switch (*packet) { + case 0x7D: + case 0x7E: + case 0x7F: + if (bufSize < 2) return -1; + *buf++ = 0x7D; + *buf++ = *packet++ ^ 0x20; + i += 2; + bufSize -= 2; + break; + default: + if (bufSize < 1) return -1; + *buf++ = *packet++; + i++; + bufSize--; + break; + } + } + + if (bufSize < 1) return -1; + *buf++ = 0x7F; + + return i; +} + +I16 ICACHE_FLASH_ATTR PROTO_AddRb(RINGBUF *rb, const U8 *packet, I16 len) +{ + U16 i = 2; + if(RINGBUF_Put(rb, 0x7E) == -1) return -1; + while (len--) { + switch (*packet) { + case 0x7D: + case 0x7E: + case 0x7F: + if(RINGBUF_Put(rb, 0x7D) == -1) return -1; + if(RINGBUF_Put(rb, *packet++ ^ 0x20) == -1) return -1; + i += 2; + break; + default: + if(RINGBUF_Put(rb, *packet++) == -1) return -1; + i++; + break; + } + } + if(RINGBUF_Put(rb, 0x7F) == -1) return -1; + + return i; +} + diff --git a/lib/mqtt/proto.h b/lib/mqtt/proto.h new file mode 100755 index 0000000..e5d4446 --- /dev/null +++ b/lib/mqtt/proto.h @@ -0,0 +1,32 @@ +/* + * File: proto.h + * Author: ThuHien + * + * Created on November 23, 2012, 8:57 AM + */ + +#ifndef _PROTO_H_ +#define _PROTO_H_ +#include +#include "typedef.h" +#include "ringbuf.h" + +typedef void(PROTO_PARSE_CALLBACK)(); + +typedef struct{ + U8 *buf; + U16 bufSize; + U16 dataLen; + U8 isEsc; + U8 isBegin; + PROTO_PARSE_CALLBACK* callback; +}PROTO_PARSER; + +I8 ICACHE_FLASH_ATTR PROTO_Init(PROTO_PARSER *parser, PROTO_PARSE_CALLBACK *completeCallback, U8 *buf, U16 bufSize); +I8 ICACHE_FLASH_ATTR PROTO_Parse(PROTO_PARSER *parser, U8 *buf, U16 len); +I16 ICACHE_FLASH_ATTR PROTO_Add(U8 *buf, const U8 *packet, I16 bufSize); +I16 ICACHE_FLASH_ATTR PROTO_AddRb(RINGBUF *rb, const U8 *packet, I16 len); +I8 ICACHE_FLASH_ATTR PROTO_ParseByte(PROTO_PARSER *parser, U8 value); +I16 ICACHE_FLASH_ATTR PROTO_ParseRb(RINGBUF *rb, U8 *bufOut, U16* len, U16 maxBufLen); +#endif + diff --git a/lib/mqtt/queue.c b/lib/mqtt/queue.c new file mode 100755 index 0000000..51e60d7 --- /dev/null +++ b/lib/mqtt/queue.c @@ -0,0 +1,57 @@ +/* str_queue.c +* +* Copyright (c) 2014-2015, Tuan PM +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions are met: +* +* * Redistributions of source code must retain the above copyright notice, +* this list of conditions and the following disclaimer. +* * Redistributions in binary form must reproduce the above copyright +* notice, this list of conditions and the following disclaimer in the +* documentation and/or other materials provided with the distribution. +* * Neither the name of Redis nor the names of its contributors may be used +* to endorse or promote products derived from this software without +* specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +* POSSIBILITY OF SUCH DAMAGE. +*/ +#include "queue.h" + +#include "user_interface.h" +#include "osapi.h" +#include "os_type.h" +#include "mem.h" +#include "proto.h" +void ICACHE_FLASH_ATTR QUEUE_Init(QUEUE *queue, int bufferSize) +{ + queue->buf = (uint8_t*)os_zalloc(bufferSize); + RINGBUF_Init(&queue->rb, queue->buf, bufferSize); +} +int32_t ICACHE_FLASH_ATTR QUEUE_Puts(QUEUE *queue, uint8_t* buffer, uint16_t len) +{ + return PROTO_AddRb(&queue->rb, buffer, len); +} +int32_t ICACHE_FLASH_ATTR QUEUE_Gets(QUEUE *queue, uint8_t* buffer, uint16_t* len, uint16_t maxLen) +{ + + return PROTO_ParseRb(&queue->rb, buffer, len, maxLen); +} + +BOOL ICACHE_FLASH_ATTR QUEUE_IsEmpty(QUEUE *queue) +{ + if(queue->rb.fill_cnt<=0) + return TRUE; + return FALSE; +} diff --git a/lib/mqtt/queue.h b/lib/mqtt/queue.h new file mode 100755 index 0000000..40f4207 --- /dev/null +++ b/lib/mqtt/queue.h @@ -0,0 +1,44 @@ +/* str_queue.h -- +* +* Copyright (c) 2014-2015, Tuan PM +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions are met: +* +* * Redistributions of source code must retain the above copyright notice, +* this list of conditions and the following disclaimer. +* * Redistributions in binary form must reproduce the above copyright +* notice, this list of conditions and the following disclaimer in the +* documentation and/or other materials provided with the distribution. +* * Neither the name of Redis nor the names of its contributors may be used +* to endorse or promote products derived from this software without +* specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +* POSSIBILITY OF SUCH DAMAGE. +*/ + +#ifndef USER_QUEUE_H_ +#define USER_QUEUE_H_ +#include "os_type.h" +#include "ringbuf.h" +typedef struct { + uint8_t *buf; + RINGBUF rb; +} QUEUE; + +void ICACHE_FLASH_ATTR QUEUE_Init(QUEUE *queue, int bufferSize); +int32_t ICACHE_FLASH_ATTR QUEUE_Puts(QUEUE *queue, uint8_t* buffer, uint16_t len); +int32_t ICACHE_FLASH_ATTR QUEUE_Gets(QUEUE *queue, uint8_t* buffer, uint16_t* len, uint16_t maxLen); +BOOL ICACHE_FLASH_ATTR QUEUE_IsEmpty(QUEUE *queue); +#endif /* USER_QUEUE_H_ */ diff --git a/lib/mqtt/ringbuf.c b/lib/mqtt/ringbuf.c new file mode 100755 index 0000000..e017758 --- /dev/null +++ b/lib/mqtt/ringbuf.c @@ -0,0 +1,67 @@ +/** +* \file +* Ring Buffer library +*/ + +#include "ringbuf.h" + + +/** +* \brief init a RINGBUF object +* \param r pointer to a RINGBUF object +* \param buf pointer to a byte array +* \param size size of buf +* \return 0 if successfull, otherwise failed +*/ +I16 ICACHE_FLASH_ATTR RINGBUF_Init(RINGBUF *r, U8* buf, I32 size) +{ + if(r == NULL || buf == NULL || size < 2) return -1; + + r->p_o = r->p_r = r->p_w = buf; + r->fill_cnt = 0; + r->size = size; + + return 0; +} +/** +* \brief put a character into ring buffer +* \param r pointer to a ringbuf object +* \param c character to be put +* \return 0 if successfull, otherwise failed +*/ +I16 ICACHE_FLASH_ATTR RINGBUF_Put(RINGBUF *r, U8 c) +{ + if(r->fill_cnt>=r->size)return -1; // ring buffer is full, this should be atomic operation + + + r->fill_cnt++; // increase filled slots count, this should be atomic operation + + + *r->p_w++ = c; // put character into buffer + + if(r->p_w >= r->p_o + r->size) // rollback if write pointer go pass + r->p_w = r->p_o; // the physical boundary + + return 0; +} +/** +* \brief get a character from ring buffer +* \param r pointer to a ringbuf object +* \param c read character +* \return 0 if successfull, otherwise failed +*/ +I16 ICACHE_FLASH_ATTR RINGBUF_Get(RINGBUF *r, U8* c) +{ + if(r->fill_cnt<=0)return -1; // ring buffer is empty, this should be atomic operation + + + r->fill_cnt--; // decrease filled slots count + + + *c = *r->p_r++; // get the character out + + if(r->p_r >= r->p_o + r->size) // rollback if write pointer go pass + r->p_r = r->p_o; // the physical boundary + + return 0; +} diff --git a/lib/mqtt/ringbuf.h b/lib/mqtt/ringbuf.h new file mode 100755 index 0000000..431a06b --- /dev/null +++ b/lib/mqtt/ringbuf.h @@ -0,0 +1,19 @@ +#ifndef _RING_BUF_H_ +#define _RING_BUF_H_ + +#include +#include +#include "typedef.h" + +typedef struct{ + U8* p_o; /**< Original pointer */ + U8* volatile p_r; /**< Read pointer */ + U8* volatile p_w; /**< Write pointer */ + volatile I32 fill_cnt; /**< Number of filled slots */ + I32 size; /**< Buffer size */ +}RINGBUF; + +I16 ICACHE_FLASH_ATTR RINGBUF_Init(RINGBUF *r, U8* buf, I32 size); +I16 ICACHE_FLASH_ATTR RINGBUF_Put(RINGBUF *r, U8 c); +I16 ICACHE_FLASH_ATTR RINGBUF_Get(RINGBUF *r, U8* c); +#endif diff --git a/lib/mqtt/typedef.h b/lib/mqtt/typedef.h new file mode 100755 index 0000000..17a99b2 --- /dev/null +++ b/lib/mqtt/typedef.h @@ -0,0 +1,17 @@ +/** +* \file +* Standard Types definition +*/ + +#ifndef _TYPE_DEF_H_ +#define _TYPE_DEF_H_ + +typedef char I8; +typedef unsigned char U8; +typedef short I16; +typedef unsigned short U16; +typedef long I32; +typedef unsigned long U32; +typedef unsigned long long U64; + +#endif diff --git a/lib/mqtt/utils.c b/lib/mqtt/utils.c new file mode 100755 index 0000000..e84171d --- /dev/null +++ b/lib/mqtt/utils.c @@ -0,0 +1,149 @@ +/* +* Copyright (c) 2014, Tuan PM +* Email: tuanpm@live.com +* +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions +* are met: +* +* 1. Redistributions of source code must retain the above copyright +* notice, this list of conditions and the following disclaimer. +* 2. Redistributions in binary form must reproduce the above copyright +* notice, this list of conditions and the following disclaimer in the +* documentation and/or other materials provided with the distribution. +* 3. Neither the name of the copyright holder nor the names of its +* contributors may be used to endorse or promote products derived +* from this software without specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +* POSSIBILITY OF SUCH DAMAGE. +* +*/ +#include +#include +#include +#include +#include +#include "utils.h" + + +uint8_t ICACHE_FLASH_ATTR UTILS_IsIPV4 (int8_t *str) +{ + uint8_t segs = 0; /* Segment count. */ + uint8_t chcnt = 0; /* Character count within segment. */ + uint8_t accum = 0; /* Accumulator for segment. */ + /* Catch NULL pointer. */ + if (str == 0) + return 0; + /* Process every character in string. */ + + while (*str != '\0') { + /* Segment changeover. */ + + if (*str == '.') { + /* Must have some digits in segment. */ + if (chcnt == 0) + return 0; + /* Limit number of segments. */ + if (++segs == 4) + return 0; + /* Reset segment values and restart loop. */ + chcnt = accum = 0; + str++; + continue; + } + + /* Check numeric. */ + if ((*str < '0') || (*str > '9')) + return 0; + + /* Accumulate and check segment. */ + + if ((accum = accum * 10 + *str - '0') > 255) + return 0; + /* Advance other segment specific stuff and continue loop. */ + + chcnt++; + str++; + } + + /* Check enough segments and enough characters in last segment. */ + + if (segs != 3) + return 0; + if (chcnt == 0) + return 0; + /* Address okay. */ + + return 1; +} +uint8_t ICACHE_FLASH_ATTR UTILS_StrToIP(const int8_t* str, void *ip) +{ + + /* The count of the number of bytes processed. */ + int i; + /* A pointer to the next digit to process. */ + const char * start; + + start = str; + for (i = 0; i < 4; i++) { + /* The digit being processed. */ + char c; + /* The value of this byte. */ + int n = 0; + while (1) { + c = * start; + start++; + if (c >= '0' && c <= '9') { + n *= 10; + n += c - '0'; + } + /* We insist on stopping at "." if we are still parsing + the first, second, or third numbers. If we have reached + the end of the numbers, we will allow any character. */ + else if ((i < 3 && c == '.') || i == 3) { + break; + } + else { + return 0; + } + } + if (n >= 256) { + return 0; + } + ((uint8_t*)ip)[i] = n; + } + return 1; + +} +uint32_t ICACHE_FLASH_ATTR UTILS_Atoh(const int8_t *s) +{ + uint32_t value = 0, digit; + int8_t c; + + while((c = *s++)){ + if('0' <= c && c <= '9') + digit = c - '0'; + else if('A' <= c && c <= 'F') + digit = c - 'A' + 10; + else if('a' <= c && c<= 'f') + digit = c - 'a' + 10; + else break; + + value = (value << 4) | digit; + } + + return value; +} + diff --git a/lib/mqtt/utils.h b/lib/mqtt/utils.h new file mode 100755 index 0000000..16ff85b --- /dev/null +++ b/lib/mqtt/utils.h @@ -0,0 +1,9 @@ +#ifndef _UTILS_H_ +#define _UTILS_H_ + +#include "c_types.h" + +uint32_t ICACHE_FLASH_ATTR UTILS_Atoh(const int8_t *s); +uint8_t ICACHE_FLASH_ATTR UTILS_StrToIP(const int8_t* str, void *ip); +uint8_t ICACHE_FLASH_ATTR UTILS_IsIPV4 (int8_t *str); +#endif diff --git a/platformio.ini b/platformio.ini new file mode 100644 index 0000000..e5a1515 --- /dev/null +++ b/platformio.ini @@ -0,0 +1,15 @@ +; PlatformIO Project Configuration File +; +; Build options: build flags, source filter +; Upload options: custom upload port, speed and extra flags +; Library options: dependencies, extra library storages +; Advanced options: extra scripting +; +; Please visit documentation for the other options and examples +; https://docs.platformio.org/page/projectconf.html + +[env:esp01_1m] +platform = espressif8266 +board = esp01_1m +framework = esp8266-nonos-sdk +build_flags = -I src/ diff --git a/src/user_config.h b/src/user_config.h new file mode 100755 index 0000000..8855d6c --- /dev/null +++ b/src/user_config.h @@ -0,0 +1,36 @@ +#ifndef _USER_CONFIG_H_ +#define _USER_CONFIG_H_ + +#define INFO_MSG true + +#define WIFI_CLIENTSSID "" +#define WIFI_CLIENTPASSWORD "" + +#define WIFI_CHECK_DELAY 3000 /* milliseconds */ //TODO:change to check every 5 minutes? +#define FADE_DELAY 2000 + +/*MQTT stuff*/ +#define MQTT_HOST "" //Host IP address, Raspberry Pi or "mqtt.yourdomain.com" +#define MQTT_PORT 1883 +#define MQTT_BUF_SIZE 1024 +#define MQTT_KEEPALIVE 120 /*second*/ + +#define MQTT_CLIENT_ID "ESP_NeoPixel" //Name for your client, no duplicates +#define MQTT_USER "" +#define MQTT_PASS "" +#define MQTT_RECONNECT_TIMEOUT 5 /*second*/ +#define DEFAULT_SECURITY 0 +#define QUEUE_BUFFER_SIZE 2048 +#define PROTOCOL_NAMEv31 /*MQTT version 3.1 compatible with Mosquitto v0.15*/ +//PROTOCOL_NAMEv311 /*MQTT version 3.11 compatible with https://eclipse.org/paho/clients/testing/*/ +#define CLIENT_SSL_ENABLE + +//Adafruit_NeoPixel Stuff +#define PIN 2 +#define NUMPIXELS 16 +#define BRIGHTNESS 50 +// RGB NeoPixel permutations; white and red offsets are always same +// Offset: W R G B +#define NEO_GRB ((1 << 6) | (1 << 4) | (0 << 2) | (2)) +#define _BV(b) (1UL << (b)) +#endif diff --git a/src/user_main.c b/src/user_main.c new file mode 100755 index 0000000..e2d29a8 --- /dev/null +++ b/src/user_main.c @@ -0,0 +1,699 @@ +#include +#include +#include +#include +#include +#include "driver/uart.h" +#include "user_config.h" +#include "mqtt.h" +#include +#include + +//typedef void (*WifiCallback)(uint8_t); + +extern int ets_uart_printf(const char *fmt, ...); +int (*console_printf)(const char *fmt, ...) = ets_uart_printf; + +// Debug output. +#ifdef INFO_MSG +#undef INFO_MSG +#define INFO_MSG(...) console_printf(__VA_ARGS__); +#else +#define INFO_MSG(...) +#endif + +typedef uint8_t neoPixelType; + +static uint32_t _getCycleCount(void) __attribute__((always_inline)); +static inline uint32_t _getCycleCount(void) { + uint32_t ccount; + __asm__ __volatile__("rsr %0,ccount":"=a" (ccount)); + return ccount; +} + +struct Adafruit_NeoPixel{ + bool + begun; // true if begin() previously called + uint16_t + numLEDs, // Number of RGB LEDs in strip + numBytes; // Size of 'pixels' buffer below (3 or 4 bytes/pixel) + int8_t + pin; // Output pin number (-1 if not yet set) + uint8_t + brightness, + *pixels, // Holds LED color values (3 or 4 bytes each) + rOffset, // Index of red byte within each 3- or 4-byte pixel + gOffset, // Index of green byte + bOffset, // Index of blue byte + wOffset; // Index of white byte (same as rOffset if no white) + uint32_t + endTime; +}; + +bool neopixel_canShow(struct Adafruit_NeoPixel *an){ + return (system_get_time() - an->endTime) >= 300L; +} + +LOCAL void espShow(uint8_t pin, uint8_t *pixels, uint32_t numBytes) { + +#define CYCLES_800_T0H (APB_CLK_FREQ / 2500000) // 0.4us //F_CPU changed for the equivalent in eagle_soc.h : CPU_CLK_FREQ +#define CYCLES_800_T1H (APB_CLK_FREQ / 1250000) // 0.8us +#define CYCLES_800 (APB_CLK_FREQ / 800000) // 1.25us per bit +#define CYCLES_400_T0H (APB_CLK_FREQ / 2000000) // 0.5uS +#define CYCLES_400_T1H (APB_CLK_FREQ / 833333) // 1.2us +#define CYCLES_400 (APB_CLK_FREQ / 400000) // 2.5us per bit + + uint8_t *p, *end, pix, mask; + uint32_t t, time0, time1, period, c, startTime, pinMask; + + pinMask = _BV(pin); //_BV is in arduino library had to include the define here + p = pixels; + end = p + numBytes; + pix = *p++; + mask = 0x80; + startTime = 0; + + time0 = CYCLES_800_T0H; + time1 = CYCLES_800_T1H; + period = CYCLES_800; + + for(t = time0;; t = time0) { + if(pix & mask) t = time1; // Bit high duration + while(((c = _getCycleCount()) - startTime) < period); // Wait for bit start + GPIO_REG_WRITE(GPIO_OUT_W1TS_ADDRESS, pinMask); // Set high + startTime = c; // Save start time + while(((c = _getCycleCount()) - startTime) < t); // Wait high duration + GPIO_REG_WRITE(GPIO_OUT_W1TC_ADDRESS, pinMask); // Set low + if(!(mask >>= 1)) { // Next bit/byte + if(p >= end) break; + pix = *p++; + mask = 0x80; + } + } + while((_getCycleCount() - startTime) < period); // Wait for last bit +} + +void begin(struct Adafruit_NeoPixel *an) { + if(an->pin >= 0) { //these functions depend on Arduino, declared using equivalents from + // pinMode(pin, OUTPUT); + // digitalWrite(pin, LOW); + PIN_FUNC_SELECT(PERIPHS_IO_MUX_GPIO2_U,FUNC_GPIO2); //hardcoded GPIO 2 + GPIO_OUTPUT_SET(GPIO_ID_PIN(an->pin),0); // assuming GPIO2 = pin2 and the only argument passed is pin 2, otherwise this won't work + } + an->begun = true; +} + +void setPin(struct Adafruit_NeoPixel *an, uint8_t p){ + if(an->begun && (an->pin >= 0)) PIN_FUNC_SELECT(PERIPHS_IO_MUX_GPIO2_U,FUNC_GPIO2); //pinMode(pin, INPUT); //again hardcoded, this function may be useless + an->pin = p; + if(an->begun) { + //pinMode(p, OUTPUT); + //digitalWrite(p, LOW); + PIN_FUNC_SELECT(PERIPHS_IO_MUX_GPIO2_U,FUNC_GPIO2); //hardcoded GPIO 2 + GPIO_OUTPUT_SET(GPIO_ID_PIN(an->pin),0); + } +} + +void updateLength(struct Adafruit_NeoPixel *an, uint16_t n){ + if(an->pixels){ os_free(an->pixels);} // Free existing data (if any) + // Allocate new data -- note: ALL PIXELS ARE CLEARED + an->numBytes = n * ((an->wOffset == an->rOffset) ? 3 : 4); + if((an->pixels = (uint8_t *)os_malloc(an->numBytes))) { + os_memset(an->pixels, 0, an->numBytes); + an->numLEDs = n; + } else { + an->numLEDs = an->numBytes = 0; + } +} + +void updateType(struct Adafruit_NeoPixel *an, neoPixelType t){ + bool oldThreeBytesPerPixel = (an->wOffset == an->rOffset); // false if RGBW + an->wOffset = (t >> 6) & 0b11; // See notes in header file + an->rOffset = (t >> 4) & 0b11; // regarding R/G/B/W offsets + an->gOffset = (t >> 2) & 0b11; + an->bOffset = t & 0b11; + + // If bytes-per-pixel has changed (and pixel data was previously + // allocated), re-allocate to new size. Will clear any data. + if(an->pixels) { + bool newThreeBytesPerPixel = (an->wOffset == an->rOffset); + if(newThreeBytesPerPixel != oldThreeBytesPerPixel) updateLength(an ,an->numLEDs); + } +} + +uint32_t color_rgb(uint8_t r, uint8_t g, uint8_t b){ + return ((uint32_t)r << 16) | ((uint32_t)g << 8) | b; +} + +//Set pixel color from 'packed' 32-bit RGB color: +void setPixelColor(struct Adafruit_NeoPixel *an, uint16_t n, uint32_t c){ + if(n < an->numLEDs) { + uint8_t *p, + r = (uint8_t)(c >> 16), + g = (uint8_t)(c >> 8), + b = (uint8_t)c; + if(an->brightness) { // See notes in setBrightness() + r = (r * an->brightness) >> 8; + g = (g * an->brightness) >> 8; + b = (b * an->brightness) >> 8; + } + if(an->wOffset == an->rOffset) { + p = &(an->pixels[n * 3]); + } else { + p = &(an->pixels[n * 4]); + uint8_t w = (uint8_t)(c >> 24); + p[an->wOffset] = an->brightness ? ((w * an->brightness) >> 8) : w; + } + p[an->rOffset] = r; + p[an->gOffset] = g; + p[an->bOffset] = b; + } +} + +void show(struct Adafruit_NeoPixel *an) { + if(!an->pixels) return; + while(!neopixel_canShow(an)); + espShow(an->pin, an->pixels, an->numBytes); + an->endTime = system_get_time(); +} + +void constructor (struct Adafruit_NeoPixel *an, uint16_t n, uint8_t p, neoPixelType t){ + an->begun = false; + an->brightness = 50; + an->pixels = NULL; + an->endTime = 0; + + updateType(an, t); + updateLength(an, n); + setPin(an, p); +} + +struct Adafruit_NeoPixel ledstrip; + +MQTT_Client mqttClient; +LOCAL bool ICACHE_FLASH_ATTR setup_wifi_st_mode(); +static void ICACHE_FLASH_ATTR wifi_check_ip(void *arg); +static struct ip_info ipConfig; +static ETSTimer WiFiLinker; +//static ETSTimer fadeTimer; +//static void ICACHE_FLASH_ATTR fadeFunction(void *arg); +static tConnState connState = WIFI_CONNECTING; +bool firstTime = true; +uint8_t received_red; +uint8_t received_green; +uint8_t received_blue; +bool fade = false; +bool solid = false; + +/*void setStripColor(uint8_t red, uint8_t green, uint8_t blue){ + uint8_t i = 0; + for(i=0;issid); + INFO_MSG("SECURITY: %s\r\n", authMapping[bssInfo->authmode]); + INFO_MSG("RSSI: %d dB\r\n\n", bssInfo->rssi); + bssInfo = STAILQ_NEXT(bssInfo, next); + } + + INFO_MSG("Scan done, setting WiFi check timer...\r\n"); + // Wait for Wi-Fi connection + os_timer_disarm(&WiFiLinker); + os_timer_setfn(&WiFiLinker, (os_timer_func_t *)wifi_check_ip, NULL); + os_timer_arm(&WiFiLinker, WIFI_CHECK_DELAY, 0); + }else{ + INFO_MSG("There is a problem scanning nearby networks \r\n"); + INFO_MSG("Status is: %s \r\n", statusMapping[status]); + } +} + +LOCAL void ICACHE_FLASH_ATTR to_scan(void) { wifi_station_scan(NULL,wifi_show_scan_info); } + +LOCAL bool ICACHE_FLASH_ATTR setup_wifi_st_mode() +{ + struct station_config stconfig; + wifi_station_disconnect(); + wifi_station_dhcpc_stop(); + if(wifi_station_get_config(&stconfig)) + { + os_memset(stconfig.ssid, 0, sizeof(stconfig.ssid)); + os_memset(stconfig.password, 0, sizeof(stconfig.password)); + os_sprintf(stconfig.ssid, "%s", WIFI_CLIENTSSID); + os_sprintf(stconfig.password, "%s", WIFI_CLIENTPASSWORD); + if(!wifi_station_set_config(&stconfig)) + { + INFO_MSG("ESP8266 not set station config!\r\n"); + return false; + } + } + wifi_station_connect(); + wifi_station_dhcpc_start(); + INFO_MSG("ESP8266 in STA mode configured.\r\n"); + return true; +} + +/****************************************************************************** + * FunctionName : user_rf_cal_sector_set + * Description : SDK just reversed 4 sectors, used for rf init data and paramters. + * We add this function to force users to set rf cal sector, since + * we don't know which sector is free in user's application. + * sector map for last several sectors : ABBBCDDD + * A : rf cal + * B : at parameters + * C : rf init data + * D : sdk parameters + * Parameters : none + * Returns : rf cal sector +*******************************************************************************/ +uint32 ICACHE_FLASH_ATTR user_rf_cal_sector_set(void) +{ + enum flash_size_map size_map = system_get_flash_size_map(); + uint32 rf_cal_sec = 0; + + switch (size_map) { + case FLASH_SIZE_4M_MAP_256_256: + rf_cal_sec = 128 - 8; + break; + + case FLASH_SIZE_8M_MAP_512_512: + rf_cal_sec = 256 - 5; + break; + + case FLASH_SIZE_16M_MAP_512_512: + case FLASH_SIZE_16M_MAP_1024_1024: + rf_cal_sec = 512 - 5; + break; + + case FLASH_SIZE_32M_MAP_512_512: + case FLASH_SIZE_32M_MAP_1024_1024: + rf_cal_sec = 1024 - 5; + break; + + default: + rf_cal_sec = 0; + break; + } + + return rf_cal_sec; +} + +void ICACHE_FLASH_ATTR user_rf_pre_init(void) +{ +} + +void ICACHE_FLASH_ATTR user_init(void) +{ + // Configure the UART + uart_init(BIT_RATE_115200, BIT_RATE_115200); + // Enable system messages + system_set_os_print(1); + + //To print available networks + wifi_set_opmode(STATION_MODE); + system_init_done_cb(to_scan); + + INFO_MSG("\n==== System info: ====\n"); + INFO_MSG("SDK version:%s rom %d\n", system_get_sdk_version(), system_upgrade_userbin_check()); + INFO_MSG("Time = %ld\n", system_get_time()); + INFO_MSG("Chip id = 0x%x\n", system_get_chip_id()); + INFO_MSG("CPU freq = %d MHz\n", system_get_cpu_freq()); + //INFO_MSG("Flash size map = %s\n", system_get_flash_size_map()); //doesn't work for some reason + INFO_MSG("Free heap size = %d\n", system_get_free_heap_size()); + INFO_MSG("==== End System info ====\n"); + INFO_MSG("==== MQTT client setup ====\n"); + MQTT_InitConnection(&mqttClient, MQTT_HOST, MQTT_PORT, DEFAULT_SECURITY); + INFO_MSG("MQTT settings:\r\n Host: %s\r\n Port: %d\r\n Security: %d\r\n", MQTT_HOST, MQTT_PORT, DEFAULT_SECURITY); + MQTT_InitClient(&mqttClient, MQTT_CLIENT_ID, MQTT_USER, MQTT_PASS, MQTT_KEEPALIVE, 1); + INFO_MSG("MQTT client settings:\r\n Device ID: %s\r\n MQTT_User: %s\r\n MQTT_Password: %s\r\n MQTT_Keepalive: %d\r\n Uses clean session\r\n", MQTT_CLIENT_ID, MQTT_USER, MQTT_PASS, MQTT_KEEPALIVE); + MQTT_InitLWT(&mqttClient, "lwt", "offline", 0, 0); //last will topic + MQTT_OnConnected(&mqttClient, mqttConnectedCb); + MQTT_OnDisconnected(&mqttClient, mqttDisconnectedCb); + MQTT_OnPublished(&mqttClient, mqttPublishedCb); + MQTT_OnData(&mqttClient, mqttDataCb); + INFO_MSG("==== End MQTT client setup ====\n"); + os_delay_us(10000); + INFO_MSG("System init...\r\n"); + if(setup_wifi_st_mode()){ + if(wifi_get_phy_mode() != PHY_MODE_11N) + wifi_set_phy_mode(PHY_MODE_11N); + if(wifi_station_get_auto_connect() == 0) + wifi_station_set_auto_connect(1); + wifi_station_set_reconnect_policy(TRUE); + } + // Init NeoPixel ledstrip + INFO_MSG("Initializing NeoPixel led Strip ...\r\n"); + constructor(&ledstrip, NUMPIXELS, PIN, NEO_GRB); + INFO_MSG("Updated NeoPixel led Strip struct...\r\n"); + os_delay_us(50000); + begin(&ledstrip); //pin initialization + int i; + for(i=0;i