minimal MQTT Library. This version simply doubled the MQTT QUEUE sizes
Dependents: WNCInterface_M2XMQTTdemo
Fork of minimal-mqtt by
Diff: minimal-mqtt.h
- Revision:
- 0:a268b80e3093
- Child:
- 1:311cd16389ff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/minimal-mqtt.h Mon Jun 27 13:16:06 2016 +0000 @@ -0,0 +1,844 @@ +#ifndef MMQTT_H_ +#define MMQTT_H_ + +#include <stdint.h> +#ifndef min +#define min(a, b) ((a) > (b) ? (b) : (a)) +#endif /* min */ + +#define MMQTT_STREAM_MAX_LENGTH 8 +#define MMQTT_QUEUE_MAX_LENGTH 2 + +/* mmqtt_ssize_t must be able to hold MMQTT_STREAM_MAX_LENGTH as well as + * MMQTT_QUEUE_MAX_LENGTH, we might add macro-based detection in the future. + */ +typedef int8_t mmqtt_ssize_t; +typedef int8_t mmqtt_status_t; + +#define MMQTT_STATUS_OK 0 +#define MMQTT_STATUS_UNKNOWN -1 +#define MMQTT_STATUS_DONE -2 +#define MMQTT_STATUS_NOT_PULLABLE -3 +#define MMQTT_STATUS_NOT_PUSHABLE -4 +#define MMQTT_STATUS_NOT_EXPECTED -5 +#define MMQTT_STATUS_BROKEN_CONNECTION -6 +#define MMQTT_STATUS_INVALID_STATE -7 + +#define MMQTT_MESSAGE_TYPE_CONNECT 0x1 +#define MMQTT_MESSAGE_TYPE_CONNACK 0x2 +#define MMQTT_MESSAGE_TYPE_PUBLISH 0x3 +#define MMQTT_MESSAGE_TYPE_PUBACK 0x4 +#define MMQTT_MESSAGE_TYPE_SUBSCRIBE 0x8 +#define MMQTT_MESSAGE_TYPE_SUBACK 0x9 +#define MMQTT_MESSAGE_TYPE_UNSUBSCRIBE 0x10 +#define MMQTT_MESSAGE_TYPE_UNSUBACK 0x11 +#define MMQTT_MESSAGE_TYPE_PINGREQ 0x12 +#define MMQTT_MESSAGE_TYPE_PINGRESP 0x13 +#define MMQTT_MESSAGE_TYPE_DISCONNECT 0x14 + +#define MMQTT_PACK_MESSAGE_TYPE(type_) ((type_) << 4) +#define MMQTT_UNPACK_MESSAGE_TYPE(type_) (((type_) >> 4) & 0xF) + +struct mmqtt_ring_buffer_ { + mmqtt_ssize_t start; + mmqtt_ssize_t length; + mmqtt_ssize_t capacity; +}; + +struct mmqtt_stream { + struct mmqtt_ring_buffer_ ring_buffer; + uint8_t data[MMQTT_STREAM_MAX_LENGTH]; + size_t left; +}; + +struct mmqtt_queue_ { + struct mmqtt_ring_buffer_ ring_buffer; + struct mmqtt_stream* streams[MMQTT_QUEUE_MAX_LENGTH]; +}; + +#define MMQTT_QUEUE_LENGTH(queue) ((queue)->ring_buffer.length) +#define MMQTT_QUEUE_INDEX(queue, i) ((queue)->streams[((queue)->ring_buffer.start + (i)) % ((queue)->ring_buffer.capacity)]) + +struct mmqtt_connection { + void *connection; + struct mmqtt_queue_ read_queue; + struct mmqtt_queue_ write_queue; +}; + +static inline void +mmqtt_ring_buffer_init_(struct mmqtt_ring_buffer_ *buffer, mmqtt_ssize_t capacity) +{ + buffer->start = 0; + buffer->length = 0; + buffer->capacity = capacity; +} + +static inline mmqtt_ssize_t +mmqtt_ring_buffer_pullable_(const struct mmqtt_ring_buffer_ *buffer) +{ + return min(buffer->start + buffer->length, buffer->capacity) - buffer->start; +} + +static inline void +mmqtt_ring_buffer_pull_(struct mmqtt_ring_buffer_ *buffer, mmqtt_ssize_t length) +{ + buffer->start = (buffer->start + length) % buffer->capacity; + buffer->length -= length; +} + +static inline mmqtt_ssize_t +mmqtt_ring_buffer_pushable_(const struct mmqtt_ring_buffer_ *buffer) +{ + if (buffer->start + buffer->length > buffer->capacity) { + return buffer->capacity - buffer->length; + } + return buffer->capacity - buffer->length - buffer->start; +} + +static inline void +mmqtt_ring_buffer_push_(struct mmqtt_ring_buffer_ *buffer, mmqtt_ssize_t length) +{ + buffer->length += length; +} + +static void +mmqtt_stream_init(struct mmqtt_stream *stream, size_t total_size) +{ + mmqtt_ring_buffer_init_(&stream->ring_buffer, MMQTT_STREAM_MAX_LENGTH); + stream->left = total_size; +} + +static mmqtt_status_t +mmqtt_stream_running(struct mmqtt_stream *stream) +{ + if (stream->left > 0 || stream->ring_buffer.length > 0) { + return MMQTT_STATUS_OK; + } else { + return MMQTT_STATUS_DONE; + } +} + +static mmqtt_ssize_t +mmqtt_stream_pull(struct mmqtt_stream *stream, uint8_t* out, mmqtt_ssize_t max_length) +{ + mmqtt_ssize_t pullable, i; + pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer); + if (pullable <= 0) { + if (stream->left == 0) { + return MMQTT_STATUS_DONE; + } else { + return MMQTT_STATUS_NOT_PULLABLE; + } + } + pullable = min(pullable, max_length); + + if (out) { + for (i = 0; i < pullable; i++) { + out[i] = stream->data[stream->ring_buffer.start + i]; + } + } + mmqtt_ring_buffer_pull_(&stream->ring_buffer, pullable); + return pullable; +} + +static mmqtt_status_t +mmqtt_stream_external_pullable(struct mmqtt_stream *stream, + const uint8_t** data, mmqtt_ssize_t* max_length) +{ + mmqtt_ssize_t pullable; + pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer); + if (pullable <= 0) { + if (stream->left == 0) { + return MMQTT_STATUS_DONE; + } else { + return MMQTT_STATUS_NOT_PULLABLE; + } + } + pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer); + if (pullable <= 0) { return MMQTT_STATUS_NOT_PULLABLE; } + *data = stream->data + stream->ring_buffer.start; + *max_length = pullable; + return MMQTT_STATUS_OK; +} + +static mmqtt_status_t +mmqtt_stream_external_pull(struct mmqtt_stream *stream, mmqtt_ssize_t length) +{ + mmqtt_ssize_t pullable; + pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer); + if (pullable < length) { + return MMQTT_STATUS_NOT_PULLABLE; + } else { + mmqtt_ring_buffer_pull_(&stream->ring_buffer, length); + return MMQTT_STATUS_OK; + } +} + +static mmqtt_ssize_t +mmqtt_stream_push(struct mmqtt_stream *stream, const uint8_t* in, mmqtt_ssize_t length) +{ + mmqtt_ssize_t pushable, i; + if (stream->left == 0) { return MMQTT_STATUS_DONE; } + pushable = mmqtt_ring_buffer_pushable_(&stream->ring_buffer); + if (pushable <= 0) { return MMQTT_STATUS_NOT_PUSHABLE; } + pushable = min(stream->left, min(pushable, length)); + + for (i = 0; i < pushable; i++) { + stream->data[stream->ring_buffer.start + i] = in[i]; + } + mmqtt_ring_buffer_push_(&stream->ring_buffer, pushable); + if (stream->left > 0) { stream->left -= pushable; } + return pushable; +} + +static mmqtt_status_t +mmqtt_stream_external_pushable(struct mmqtt_stream *stream, + uint8_t** data, mmqtt_ssize_t* max_length) +{ + mmqtt_ssize_t pushable; + if (stream->left == 0) { return MMQTT_STATUS_DONE; } + pushable = mmqtt_ring_buffer_pushable_(&stream->ring_buffer); + if (pushable <= 0) { return MMQTT_STATUS_NOT_PUSHABLE; } + pushable = min(pushable, stream->left); + *data = stream->data + stream->ring_buffer.start; + *max_length = pushable; + return MMQTT_STATUS_OK; +} + +static mmqtt_status_t +mmqtt_stream_external_push(struct mmqtt_stream *stream, mmqtt_ssize_t length) +{ + mmqtt_ssize_t pushable; + if (stream->left == 0) { return MMQTT_STATUS_DONE; } + pushable = mmqtt_ring_buffer_pushable_(&stream->ring_buffer); + if (pushable <= 0) { return MMQTT_STATUS_NOT_PUSHABLE; } + pushable = min(pushable, stream->left); + if (pushable < length) { + return MMQTT_STATUS_NOT_PUSHABLE; + } else { + mmqtt_ring_buffer_push_(&stream->ring_buffer, pushable); + return MMQTT_STATUS_OK; + } +} + +static void +mmqtt_queue_init_(struct mmqtt_queue_ *queue) +{ + mmqtt_ring_buffer_init_(&queue->ring_buffer, MMQTT_QUEUE_MAX_LENGTH); +} + +static mmqtt_status_t +mmqtt_queue_add_(struct mmqtt_queue_ *queue, struct mmqtt_stream *stream) +{ + if (mmqtt_ring_buffer_pushable_(&queue->ring_buffer) <= 0) { + return MMQTT_STATUS_NOT_PUSHABLE; + } + queue->streams[queue->ring_buffer.start] = stream; + mmqtt_ring_buffer_push_(&queue->ring_buffer, 1); + return MMQTT_STATUS_OK; +} + +#define MMQTT_QUEUE_AVAILABLE(queue) ((queue)->ring_buffer.length > 0) +#define MMQTT_QUEUE_FIRST(queue) ((queue)->streams[(queue)->ring_buffer.start]) + +static void +mmqtt_connection_init(struct mmqtt_connection *connection, void *conn) +{ + connection->connection = conn; + mmqtt_queue_init_(&connection->read_queue); + mmqtt_queue_init_(&connection->write_queue); +} + +/* + * Pulls data out of connection's write queue, the returned data is supposed + * to be sent to a TCP socket to remote servers. + */ +static mmqtt_ssize_t +mmqtt_connection_pull(struct mmqtt_connection *connection, + uint8_t *out, mmqtt_ssize_t max_length) +{ + mmqtt_ssize_t length, current; + struct mmqtt_queue_ *queue; + length = 0; + queue = &connection->write_queue; + while(MMQTT_QUEUE_AVAILABLE(queue) && length < max_length) { + current = mmqtt_stream_pull(MMQTT_QUEUE_FIRST(queue), + out + length, + max_length - length); + /* TODO: in case current is 0, should we return non-pullable? */ + if (current >= 0) { + length += current; + } else if (current == MMQTT_STATUS_DONE) { + /* Release stream from write queue since it is done */ + mmqtt_ring_buffer_pull_(&queue->ring_buffer, 1); + } else if (current == MMQTT_STATUS_NOT_PULLABLE) { + return (length > 0) ? length : MMQTT_STATUS_NOT_PULLABLE; + } else { + return MMQTT_STATUS_UNKNOWN; + } + } + return (length > 0) ? length : MMQTT_STATUS_NOT_PULLABLE; +} + +struct mmqtt_stream * +mmqtt_connection_pullable_stream(struct mmqtt_connection *connection) +{ + mmqtt_ssize_t i; + struct mmqtt_queue_ *queue = &connection->write_queue; + struct mmqtt_stream *stream = NULL; + for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) { + stream = MMQTT_QUEUE_INDEX(queue, i); + if (mmqtt_stream_running(stream) == MMQTT_STATUS_OK) { return stream; } + } + return NULL; +} + +/* + * Pushes data to connection's read queue, the data should come from + * a TCP socket to a remote machine. + */ +static mmqtt_ssize_t +mmqtt_connection_push(struct mmqtt_connection *connection, + const uint8_t* in, mmqtt_ssize_t length) +{ + mmqtt_ssize_t i, current, consumed_length; + struct mmqtt_queue_ *queue; + consumed_length = 0; + queue = &connection->read_queue; + while (i < MMQTT_QUEUE_LENGTH(queue) && consumed_length < length) { + current = mmqtt_stream_push(MMQTT_QUEUE_INDEX(queue, i), + in + consumed_length, + length - consumed_length); + if (current >= 0) { + consumed_length += current; + } else if (current == MMQTT_STATUS_DONE) { + i++; + } else if (current == MMQTT_STATUS_NOT_PUSHABLE) { + return (consumed_length > 0) ? consumed_length : MMQTT_STATUS_NOT_PUSHABLE; + } else { + return MMQTT_STATUS_UNKNOWN; + } + } + return (consumed_length > 0) ? consumed_length : MMQTT_STATUS_NOT_PUSHABLE; +} + +struct mmqtt_stream * +mmqtt_connection_pushable_stream(struct mmqtt_connection *connection) +{ + mmqtt_ssize_t i; + struct mmqtt_queue_ *queue = &connection->read_queue; + struct mmqtt_stream *stream = NULL; + for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) { + stream = MMQTT_QUEUE_INDEX(queue, i); + if (mmqtt_stream_running(stream) == MMQTT_STATUS_OK) { return stream; } + } + return NULL; +} + +static mmqtt_status_t +mmqtt_connection_add_read_stream(struct mmqtt_connection *connection, + struct mmqtt_stream *stream) +{ + return mmqtt_queue_add_(&connection->read_queue, stream); +} + +static mmqtt_status_t +mmqtt_connection_add_write_stream(struct mmqtt_connection *connection, + struct mmqtt_stream *stream) +{ + return mmqtt_queue_add_(&connection->write_queue, stream); +} + +static mmqtt_status_t +mmqtt_connection_in_read_stream(struct mmqtt_connection *connection, + struct mmqtt_stream *stream) +{ + mmqtt_ssize_t i; + struct mmqtt_queue_ *queue = &connection->read_queue; + for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) { + if (MMQTT_QUEUE_INDEX(queue, i) == stream) { return MMQTT_STATUS_OK; } + } + return MMQTT_STATUS_NOT_EXPECTED; +} + +static mmqtt_status_t +mmqtt_connection_in_write_stream(struct mmqtt_connection *connection, + struct mmqtt_stream *stream) +{ + mmqtt_ssize_t i; + struct mmqtt_queue_ *queue = &connection->write_queue; + for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) { + if (MMQTT_QUEUE_INDEX(queue, i) == stream) { return MMQTT_STATUS_OK; } + } + return MMQTT_STATUS_NOT_EXPECTED; +} + +static mmqtt_ssize_t +mmqtt_connection_read_stream_length(struct mmqtt_connection *connection) +{ + return connection->read_queue.ring_buffer.length; +} + +static mmqtt_ssize_t +mmqtt_connection_write_stream_length(struct mmqtt_connection *connection) +{ + return connection->write_queue.ring_buffer.length; +} + +static mmqtt_status_t +mmqtt_connection_release_write_stream(struct mmqtt_connection *connection, + struct mmqtt_stream *stream) +{ + if (!MMQTT_QUEUE_AVAILABLE(&connection->write_queue)) { + return MMQTT_STATUS_NOT_PULLABLE; + } + if (stream != MMQTT_QUEUE_FIRST(&connection->write_queue)) { + return MMQTT_STATUS_NOT_EXPECTED; + } + mmqtt_ring_buffer_pull_(&connection->write_queue.ring_buffer, 1); + return MMQTT_STATUS_OK; +} + +static mmqtt_status_t +mmqtt_connection_release_read_stream(struct mmqtt_connection *connection, + struct mmqtt_stream *stream) +{ + if (!MMQTT_QUEUE_AVAILABLE(&connection->read_queue)) { + return MMQTT_STATUS_NOT_PULLABLE; + } + if (stream != MMQTT_QUEUE_FIRST(&connection->read_queue)) { + return MMQTT_STATUS_NOT_EXPECTED; + } + mmqtt_ring_buffer_pull_(&connection->read_queue.ring_buffer, 1); + return MMQTT_STATUS_OK; +} + +static mmqtt_status_t +mmqtt_connection_autorelease_write_streams(struct mmqtt_connection *connection) +{ + struct mmqtt_stream *stream; + while (MMQTT_QUEUE_AVAILABLE(&connection->write_queue)) { + stream = MMQTT_QUEUE_FIRST(&connection->write_queue); + if (mmqtt_stream_running(stream) == MMQTT_STATUS_DONE) { + mmqtt_connection_release_write_stream(connection, stream); + } else { + break; + } + } + return MMQTT_STATUS_OK; +} + +static mmqtt_status_t +mmqtt_connection_autorelease_read_streams(struct mmqtt_connection *connection) +{ + struct mmqtt_stream *stream; + while (MMQTT_QUEUE_AVAILABLE(&connection->read_queue)) { + stream = MMQTT_QUEUE_FIRST(&connection->read_queue); + if (mmqtt_stream_running(stream) == MMQTT_STATUS_DONE) { + mmqtt_connection_release_read_stream(connection, stream); + } else { + break; + } + } + return MMQTT_STATUS_OK; +} + +#ifndef MMQTT_DISABLE_STATIC + +typedef mmqtt_status_t (*mmqtt_s_puller)(struct mmqtt_connection*); +typedef mmqtt_status_t (*mmqtt_s_pusher)(struct mmqtt_connection*, mmqtt_ssize_t); + +mmqtt_status_t +mmqtt_s_encode_buffer(struct mmqtt_connection *conn, mmqtt_s_puller puller, + const uint8_t *buffer, uint16_t length) +{ + struct mmqtt_stream stream; + mmqtt_status_t status; + mmqtt_ssize_t current; + uint16_t consumed_length; + + mmqtt_stream_init(&stream, length); + while ((status = mmqtt_connection_add_write_stream(conn, &stream)) == MMQTT_STATUS_NOT_PUSHABLE) { + status = puller(conn); + if (status != MMQTT_STATUS_OK) { return status; } + } + if (status != MMQTT_STATUS_OK) { return status; } + + consumed_length = 0; + while (consumed_length < length) { + current = mmqtt_stream_push(&stream, buffer + consumed_length, + length - consumed_length); + if (current >= 0) { + consumed_length += current; + } else if (current == MMQTT_STATUS_NOT_PUSHABLE) { + status = puller(conn); + if (status != MMQTT_STATUS_OK) { return status; } + } + } + while (mmqtt_connection_in_write_stream(conn, &stream) == MMQTT_STATUS_OK) { + status = puller(conn); + if (status != MMQTT_STATUS_OK) { return status; } + } + return MMQTT_STATUS_OK; +} + +mmqtt_status_t +mmqtt_s_decode_buffer(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, + uint8_t *buffer, uint16_t buffer_length, + uint16_t consume_length) +{ + struct mmqtt_stream stream; + mmqtt_status_t status; + mmqtt_ssize_t current; + uint16_t length; + uint8_t temp; + + if (mmqtt_connection_read_stream_length(conn) > 0) { + return MMQTT_STATUS_INVALID_STATE; + } + + if (consume_length < buffer_length) { consume_length = buffer_length; } + mmqtt_stream_init(&stream, consume_length); + /* Connection won't remove read stream automatically, pusher won't help here */ + status = mmqtt_connection_add_read_stream(conn, &stream); + if (status != MMQTT_STATUS_OK) { return status; } + + length = 0; + while (length < buffer_length) { + current = mmqtt_stream_pull(&stream, buffer + length, + buffer_length - length); + if (current >= 0) { + length += current; + } else if (current == MMQTT_STATUS_NOT_PULLABLE) { + status = pusher(conn, buffer_length - length); + if (status != MMQTT_STATUS_OK) { + mmqtt_connection_release_read_stream(conn, &stream); + return status; + } + } + } + while (length < consume_length) { + current = mmqtt_stream_pull(&stream, &temp, 1); + if (current >= 0) { + length += current; + } else if (current == MMQTT_STATUS_NOT_PULLABLE) { + status = pusher(conn, consume_length - length); + if (status != MMQTT_STATUS_OK) { + mmqtt_connection_release_read_stream(conn, &stream); + return status; + } + } + } + mmqtt_connection_release_read_stream(conn, &stream); + return MMQTT_STATUS_OK; +} + +/* NOTE: for smaller buffer, we can also use mmqtt_s_decode_buffer with + * an empty buffer and 0 as buffer length. Here this function is for the + * case where we are skipping a whole packet altogether, which could be + * larger than 65535 in theory. + */ +mmqtt_status_t +mmqtt_s_skip_buffer(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, + uint32_t length) +{ + mmqtt_status_t status; + while (length > 65535) { + status = mmqtt_s_decode_buffer(conn, pusher, NULL, 0, 65535); + if (status != MMQTT_STATUS_OK) { + return status; + } + length -= 65535; + } + if (length > 0) { + return mmqtt_s_decode_buffer(conn, pusher, NULL, 0, (uint16_t) length); + } else { + return MMQTT_STATUS_OK; + } +} + +mmqtt_status_t +mmqtt_s_encode_fixed_header(struct mmqtt_connection *conn, mmqtt_s_puller puller, + uint8_t flag, uint32_t length) +{ + mmqtt_status_t status; + mmqtt_ssize_t i; + uint8_t c, buffer[4]; + + buffer[0] = flag; + status = mmqtt_s_encode_buffer(conn, puller, buffer, 1); + if (status != MMQTT_STATUS_OK) { return status; } + + i = 0; + do { + c = (uint8_t) length % 128; + length /= 128; + if (length > 0) { c |= 0x80; } + buffer[i++] = c; + } while (length > 0); + + return mmqtt_s_encode_buffer(conn, puller, buffer, i); +} + +mmqtt_status_t +mmqtt_s_decode_fixed_header(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, + uint8_t *flag, uint32_t *length) +{ + mmqtt_status_t status; + uint8_t c; + uint32_t l, factor; + + status = mmqtt_s_decode_buffer(conn, pusher, &c, 1, 1); + if (status != MMQTT_STATUS_OK) { return status; } + if (flag) { *flag = c; } + + l = 0; + factor = 1; + do { + status = mmqtt_s_decode_buffer(conn, pusher, &c, 1, 1); + if (status != MMQTT_STATUS_OK) { return status; } + l += (c & 0x7F) * factor; + factor *= 128; + } while ((c & 0x80) != 0); + if (length) { *length = l; } + + return MMQTT_STATUS_OK; +} + +void mmqtt_s_pack_uint16_(uint16_t length, uint8_t *buffer) +{ + buffer[0] = (length >> 8) & 0xFF; + buffer[1] = length & 0xFF; +} + +uint16_t mmqtt_s_unpack_uint16_(uint8_t *buffer) +{ + return ((uint16_t) (buffer[0] << 8)) | ((uint16_t) buffer[1]); +} + +/* We don't case uint16_t* directly into uint8_t* because there might be + * endianness differences + */ +mmqtt_status_t +mmqtt_s_encode_uint16(struct mmqtt_connection *conn, mmqtt_s_puller puller, + uint16_t val) +{ + uint8_t buffer[2]; + + mmqtt_s_pack_uint16_(val, buffer); + return mmqtt_s_encode_buffer(conn, puller, buffer, 2); +} + +mmqtt_status_t +mmqtt_s_decode_uint16(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, + uint16_t *out_val) +{ + mmqtt_status_t status; + uint8_t buffer[2]; + + status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2); + if (status != MMQTT_STATUS_OK) { return status; } + + if (out_val) { *out_val = mmqtt_s_unpack_uint16_(buffer); } + return MMQTT_STATUS_OK; +} + +mmqtt_status_t +mmqtt_s_encode_string(struct mmqtt_connection *conn, mmqtt_s_puller puller, + const uint8_t *str, uint16_t length) +{ + mmqtt_status_t status; + uint8_t buffer[2]; + + mmqtt_s_pack_uint16_(length, buffer); + status = mmqtt_s_encode_buffer(conn, puller, buffer, 2); + if (status != MMQTT_STATUS_OK) { return status; } + + return mmqtt_s_encode_buffer(conn, puller, str, length); +} + +mmqtt_status_t +mmqtt_s_decode_string(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, + uint8_t *str, uint16_t max_length, + uint16_t *out_length, uint16_t *out_true_length) { + mmqtt_status_t status; + uint8_t buffer[2]; + uint16_t true_length, length; + + status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2); + if (status != MMQTT_STATUS_OK) { return status; } + true_length = mmqtt_s_unpack_uint16_(buffer); + + length = min(true_length, max_length); + status = mmqtt_s_decode_buffer(conn, pusher, str, length, true_length); + if (status != MMQTT_STATUS_OK) { return status; } + + if (out_length) { *out_length = length; } + if (out_true_length) { *out_true_length = true_length; } + return MMQTT_STATUS_OK; +} + +/* p stands for packet here */ +struct mmqtt_p_connect_header { + uint8_t *name; + uint16_t name_length; + uint16_t name_max_length; + uint8_t protocol_version; + uint8_t flags; + uint16_t keepalive; +}; + +mmqtt_status_t +mmqtt_s_encode_connect_header(struct mmqtt_connection *conn, mmqtt_s_puller puller, + const struct mmqtt_p_connect_header *header) +{ + mmqtt_status_t status; + uint8_t buffer[2]; + + mmqtt_s_pack_uint16_(header->name_length, buffer); + status = mmqtt_s_encode_buffer(conn, puller, buffer, 2); + if (status != MMQTT_STATUS_OK) { return status; } + + status = mmqtt_s_encode_buffer(conn, puller, header->name, header->name_length); + if (status != MMQTT_STATUS_OK) { return status; } + + status = mmqtt_s_encode_buffer(conn, puller, &header->protocol_version, 1); + if (status != MMQTT_STATUS_OK) { return status; } + + status = mmqtt_s_encode_buffer(conn, puller, &header->flags, 1); + if (status != MMQTT_STATUS_OK) { return status; } + + mmqtt_s_pack_uint16_(header->keepalive, buffer); + return mmqtt_s_encode_buffer(conn, puller, buffer, 2); +} + +/* We would rewrite name_length in header to contain returned name length */ +mmqtt_status_t +mmqtt_s_decode_connect_header(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, + struct mmqtt_p_connect_header *header) +{ + mmqtt_status_t status; + uint8_t buffer[2]; + uint16_t length; + + status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2); + if (status != MMQTT_STATUS_OK) { return status; } + length = mmqtt_s_unpack_uint16_(buffer); + + header->name_length = min(header->name_max_length, length); + status = mmqtt_s_decode_buffer(conn, pusher, header->name, + header->name_length, length); + if (status != MMQTT_STATUS_OK) { return status; } + + status = mmqtt_s_decode_buffer(conn, pusher, &header->protocol_version, 1, 1); + if (status != MMQTT_STATUS_OK) { return status; } + + status = mmqtt_s_decode_buffer(conn, pusher, &header->flags, 1, 1); + if (status != MMQTT_STATUS_OK) { return status; } + + status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2); + if (status != MMQTT_STATUS_OK) { return status; } + header->keepalive = mmqtt_s_unpack_uint16_(buffer); + + return MMQTT_STATUS_OK; +} + +struct mmqtt_p_connack_header { + uint8_t reserved; + uint8_t return_code; +}; + +mmqtt_status_t +mmqtt_s_encode_connack_header(struct mmqtt_connection *conn, mmqtt_s_puller puller, + const struct mmqtt_p_connack_header *header) +{ + mmqtt_status_t status; + + status = mmqtt_s_encode_buffer(conn, puller, &header->reserved, 1); + if (status != MMQTT_STATUS_OK) { return status; } + + return mmqtt_s_encode_buffer(conn, puller, &header->return_code, 1); +} + +mmqtt_status_t +mmqtt_s_decode_connack_header(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, + struct mmqtt_p_connack_header *header) +{ + mmqtt_status_t status; + + status = mmqtt_s_decode_buffer(conn, pusher, &header->reserved, 1, 1); + if (status != MMQTT_STATUS_OK) { return status; } + + return mmqtt_s_decode_buffer(conn, pusher, &header->reserved, 1, 1); +} + +/* Works for both subscribe and unsubscribe */ +struct mmqtt_p_subscribe_decode_context { + uint32_t length; + uint8_t has_qos; +}; + +struct mmqtt_p_subscribe_payload_line { + uint8_t *topic; + uint16_t topic_length; + uint16_t topic_max_length; + uint8_t qos; +}; + +void +mmqtt_s_subscribe_decode_context_init(struct mmqtt_p_subscribe_decode_context *context, + uint32_t length) +{ + context->length = length; + context->has_qos = 1; +} + +void +mmqtt_s_unsubscribe_decode_context_init(struct mmqtt_p_subscribe_decode_context *context, + uint32_t length) +{ + context->length = length; + context->has_qos = 0; +} + +mmqtt_status_t +mmqtt_s_decode_subscribe_payload(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, + struct mmqtt_p_subscribe_decode_context *context, + struct mmqtt_p_subscribe_payload_line *line) +{ + mmqtt_status_t status; + uint16_t string_true_length; + + if (context->length <= 0) { return MMQTT_STATUS_DONE; } + status = mmqtt_s_decode_string(conn, pusher, line->topic, line->topic_max_length, + &line->topic_length, &string_true_length); + if (status != MMQTT_STATUS_OK) { return status; } + + context->length -= string_true_length; + context->length -= 2; + + if (context->has_qos != 0) { + status = mmqtt_s_decode_buffer(conn, pusher, &line->qos, 1, 1); + if (status != MMQTT_STATUS_OK) { return status; } + context->length -= 1; + } else { + line->qos = 0; + } + return MMQTT_STATUS_OK; +} + +uint32_t +mmqtt_s_string_encoded_length(uint16_t string_length) +{ + return string_length + 2; +} + +uint32_t +mmqtt_s_connect_header_encoded_length(const struct mmqtt_p_connect_header *header) +{ + return mmqtt_s_string_encoded_length(header->name_length) + 4; +} + +#endif /* MMQTT_DISABLE_STATIC */ + +#endif /* MMQTT_H_ */