minimal MQTT Library. This version simply doubled the MQTT QUEUE sizes
Dependents: WNCInterface_M2XMQTTdemo
Fork of minimal-mqtt by
minimal-mqtt.h
- Committer:
- defmacro
- Date:
- 2016-06-27
- Revision:
- 1:311cd16389ff
- Parent:
- 0:a268b80e3093
- Child:
- 2:65f85aa601db
File content as of revision 1:311cd16389ff:
#ifndef MMQTT_H_ #define MMQTT_H_ #include <stdint.h> #ifndef min #define min(a, b) ((a) > (b) ? (b) : (a)) #endif /* min */ #define MMQTT_STREAM_MAX_LENGTH 8 #define MMQTT_QUEUE_MAX_LENGTH 2 /* mmqtt_ssize_t must be able to hold MMQTT_STREAM_MAX_LENGTH as well as * MMQTT_QUEUE_MAX_LENGTH, we might add macro-based detection in the future. */ typedef int8_t mmqtt_ssize_t; typedef int8_t mmqtt_status_t; #define MMQTT_STATUS_OK 0 #define MMQTT_STATUS_UNKNOWN -1 #define MMQTT_STATUS_DONE -2 #define MMQTT_STATUS_NOT_PULLABLE -3 #define MMQTT_STATUS_NOT_PUSHABLE -4 #define MMQTT_STATUS_NOT_EXPECTED -5 #define MMQTT_STATUS_BROKEN_CONNECTION -6 #define MMQTT_STATUS_INVALID_STATE -7 #define MMQTT_MESSAGE_TYPE_CONNECT 0x1 #define MMQTT_MESSAGE_TYPE_CONNACK 0x2 #define MMQTT_MESSAGE_TYPE_PUBLISH 0x3 #define MMQTT_MESSAGE_TYPE_PUBACK 0x4 #define MMQTT_MESSAGE_TYPE_SUBSCRIBE 0x8 #define MMQTT_MESSAGE_TYPE_SUBACK 0x9 #define MMQTT_MESSAGE_TYPE_UNSUBSCRIBE 0x10 #define MMQTT_MESSAGE_TYPE_UNSUBACK 0x11 #define MMQTT_MESSAGE_TYPE_PINGREQ 0x12 #define MMQTT_MESSAGE_TYPE_PINGRESP 0x13 #define MMQTT_MESSAGE_TYPE_DISCONNECT 0x14 #define MMQTT_PACK_MESSAGE_TYPE(type_) ((type_) << 4) #define MMQTT_UNPACK_MESSAGE_TYPE(type_) (((type_) >> 4) & 0xF) struct mmqtt_ring_buffer_ { mmqtt_ssize_t start; mmqtt_ssize_t length; mmqtt_ssize_t capacity; }; struct mmqtt_stream { struct mmqtt_ring_buffer_ ring_buffer; uint8_t data[MMQTT_STREAM_MAX_LENGTH]; size_t left; }; struct mmqtt_queue_ { struct mmqtt_ring_buffer_ ring_buffer; struct mmqtt_stream* streams[MMQTT_QUEUE_MAX_LENGTH]; }; #define MMQTT_QUEUE_LENGTH(queue) ((queue)->ring_buffer.length) #define MMQTT_QUEUE_INDEX(queue, i) ((queue)->streams[((queue)->ring_buffer.start + (i)) % ((queue)->ring_buffer.capacity)]) struct mmqtt_connection { void *connection; struct mmqtt_queue_ read_queue; struct mmqtt_queue_ write_queue; }; static inline void mmqtt_ring_buffer_init_(struct mmqtt_ring_buffer_ *buffer, mmqtt_ssize_t capacity) { buffer->start = 0; buffer->length = 0; buffer->capacity = capacity; } static inline mmqtt_ssize_t mmqtt_ring_buffer_pullable_(const struct mmqtt_ring_buffer_ *buffer) { return min(buffer->start + buffer->length, buffer->capacity) - buffer->start; } static inline void mmqtt_ring_buffer_pull_(struct mmqtt_ring_buffer_ *buffer, mmqtt_ssize_t length) { buffer->start = (buffer->start + length) % buffer->capacity; buffer->length -= length; } static inline mmqtt_ssize_t mmqtt_ring_buffer_pushable_(const struct mmqtt_ring_buffer_ *buffer) { if (buffer->start + buffer->length > buffer->capacity) { return buffer->capacity - buffer->length; } return buffer->capacity - buffer->length - buffer->start; } static inline void mmqtt_ring_buffer_push_(struct mmqtt_ring_buffer_ *buffer, mmqtt_ssize_t length) { buffer->length += length; } static void mmqtt_stream_init(struct mmqtt_stream *stream, size_t total_size) { mmqtt_ring_buffer_init_(&stream->ring_buffer, MMQTT_STREAM_MAX_LENGTH); stream->left = total_size; } static mmqtt_status_t mmqtt_stream_running(struct mmqtt_stream *stream) { if (stream->left > 0 || stream->ring_buffer.length > 0) { return MMQTT_STATUS_OK; } else { return MMQTT_STATUS_DONE; } } static mmqtt_ssize_t mmqtt_stream_pull(struct mmqtt_stream *stream, uint8_t* out, mmqtt_ssize_t max_length) { mmqtt_ssize_t pullable, i; pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer); if (pullable <= 0) { if (stream->left == 0) { return MMQTT_STATUS_DONE; } else { return MMQTT_STATUS_NOT_PULLABLE; } } pullable = min(pullable, max_length); if (out) { for (i = 0; i < pullable; i++) { out[i] = stream->data[stream->ring_buffer.start + i]; } } mmqtt_ring_buffer_pull_(&stream->ring_buffer, pullable); return pullable; } static mmqtt_status_t mmqtt_stream_external_pullable(struct mmqtt_stream *stream, const uint8_t** data, mmqtt_ssize_t* max_length) { mmqtt_ssize_t pullable; pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer); if (pullable <= 0) { if (stream->left == 0) { return MMQTT_STATUS_DONE; } else { return MMQTT_STATUS_NOT_PULLABLE; } } pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer); if (pullable <= 0) { return MMQTT_STATUS_NOT_PULLABLE; } *data = stream->data + stream->ring_buffer.start; *max_length = pullable; return MMQTT_STATUS_OK; } static mmqtt_status_t mmqtt_stream_external_pull(struct mmqtt_stream *stream, mmqtt_ssize_t length) { mmqtt_ssize_t pullable; pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer); if (pullable < length) { return MMQTT_STATUS_NOT_PULLABLE; } else { mmqtt_ring_buffer_pull_(&stream->ring_buffer, length); return MMQTT_STATUS_OK; } } static mmqtt_ssize_t mmqtt_stream_push(struct mmqtt_stream *stream, const uint8_t* in, mmqtt_ssize_t length) { mmqtt_ssize_t pushable, i; if (stream->left == 0) { return MMQTT_STATUS_DONE; } pushable = mmqtt_ring_buffer_pushable_(&stream->ring_buffer); if (pushable <= 0) { return MMQTT_STATUS_NOT_PUSHABLE; } pushable = min(stream->left, min(pushable, length)); for (i = 0; i < pushable; i++) { stream->data[stream->ring_buffer.start + i] = in[i]; } mmqtt_ring_buffer_push_(&stream->ring_buffer, pushable); if (stream->left > 0) { stream->left -= pushable; } return pushable; } static mmqtt_status_t mmqtt_stream_external_pushable(struct mmqtt_stream *stream, uint8_t** data, mmqtt_ssize_t* max_length) { mmqtt_ssize_t pushable; if (stream->left == 0) { return MMQTT_STATUS_DONE; } pushable = mmqtt_ring_buffer_pushable_(&stream->ring_buffer); if (pushable <= 0) { return MMQTT_STATUS_NOT_PUSHABLE; } pushable = min(pushable, stream->left); *data = stream->data + stream->ring_buffer.start; *max_length = pushable; return MMQTT_STATUS_OK; } static mmqtt_status_t mmqtt_stream_external_push(struct mmqtt_stream *stream, mmqtt_ssize_t length) { mmqtt_ssize_t pushable; if (stream->left == 0) { return MMQTT_STATUS_DONE; } pushable = mmqtt_ring_buffer_pushable_(&stream->ring_buffer); if (pushable <= 0) { return MMQTT_STATUS_NOT_PUSHABLE; } pushable = min(pushable, stream->left); if (pushable < length) { return MMQTT_STATUS_NOT_PUSHABLE; } else { mmqtt_ring_buffer_push_(&stream->ring_buffer, pushable); return MMQTT_STATUS_OK; } } static void mmqtt_queue_init_(struct mmqtt_queue_ *queue) { mmqtt_ring_buffer_init_(&queue->ring_buffer, MMQTT_QUEUE_MAX_LENGTH); } static mmqtt_status_t mmqtt_queue_add_(struct mmqtt_queue_ *queue, struct mmqtt_stream *stream) { if (mmqtt_ring_buffer_pushable_(&queue->ring_buffer) <= 0) { return MMQTT_STATUS_NOT_PUSHABLE; } queue->streams[queue->ring_buffer.start] = stream; mmqtt_ring_buffer_push_(&queue->ring_buffer, 1); return MMQTT_STATUS_OK; } #define MMQTT_QUEUE_AVAILABLE(queue) ((queue)->ring_buffer.length > 0) #define MMQTT_QUEUE_FIRST(queue) ((queue)->streams[(queue)->ring_buffer.start]) static void mmqtt_connection_init(struct mmqtt_connection *connection, void *conn) { connection->connection = conn; mmqtt_queue_init_(&connection->read_queue); mmqtt_queue_init_(&connection->write_queue); } /* * Pulls data out of connection's write queue, the returned data is supposed * to be sent to a TCP socket to remote servers. */ static mmqtt_ssize_t mmqtt_connection_pull(struct mmqtt_connection *connection, uint8_t *out, mmqtt_ssize_t max_length) { mmqtt_ssize_t length, current; struct mmqtt_queue_ *queue; length = 0; queue = &connection->write_queue; while(MMQTT_QUEUE_AVAILABLE(queue) && length < max_length) { current = mmqtt_stream_pull(MMQTT_QUEUE_FIRST(queue), out + length, max_length - length); /* TODO: in case current is 0, should we return non-pullable? */ if (current >= 0) { length += current; } else if (current == MMQTT_STATUS_DONE) { /* Release stream from write queue since it is done */ mmqtt_ring_buffer_pull_(&queue->ring_buffer, 1); } else if (current == MMQTT_STATUS_NOT_PULLABLE) { return (length > 0) ? length : MMQTT_STATUS_NOT_PULLABLE; } else { return MMQTT_STATUS_UNKNOWN; } } return (length > 0) ? length : MMQTT_STATUS_NOT_PULLABLE; } struct mmqtt_stream * mmqtt_connection_pullable_stream(struct mmqtt_connection *connection) { mmqtt_ssize_t i; struct mmqtt_queue_ *queue = &connection->write_queue; struct mmqtt_stream *stream = NULL; for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) { stream = MMQTT_QUEUE_INDEX(queue, i); if (mmqtt_stream_running(stream) == MMQTT_STATUS_OK) { return stream; } } return NULL; } /* * Pushes data to connection's read queue, the data should come from * a TCP socket to a remote machine. */ static mmqtt_ssize_t mmqtt_connection_push(struct mmqtt_connection *connection, const uint8_t* in, mmqtt_ssize_t length) { mmqtt_ssize_t i, current, consumed_length; struct mmqtt_queue_ *queue; consumed_length = 0; queue = &connection->read_queue; while (i < MMQTT_QUEUE_LENGTH(queue) && consumed_length < length) { current = mmqtt_stream_push(MMQTT_QUEUE_INDEX(queue, i), in + consumed_length, length - consumed_length); if (current >= 0) { consumed_length += current; } else if (current == MMQTT_STATUS_DONE) { i++; } else if (current == MMQTT_STATUS_NOT_PUSHABLE) { return (consumed_length > 0) ? consumed_length : MMQTT_STATUS_NOT_PUSHABLE; } else { return MMQTT_STATUS_UNKNOWN; } } return (consumed_length > 0) ? consumed_length : MMQTT_STATUS_NOT_PUSHABLE; } struct mmqtt_stream * mmqtt_connection_pushable_stream(struct mmqtt_connection *connection) { mmqtt_ssize_t i; struct mmqtt_queue_ *queue = &connection->read_queue; struct mmqtt_stream *stream = NULL; for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) { stream = MMQTT_QUEUE_INDEX(queue, i); if (mmqtt_stream_running(stream) == MMQTT_STATUS_OK) { return stream; } } return NULL; } static mmqtt_status_t mmqtt_connection_add_read_stream(struct mmqtt_connection *connection, struct mmqtt_stream *stream) { return mmqtt_queue_add_(&connection->read_queue, stream); } static mmqtt_status_t mmqtt_connection_add_write_stream(struct mmqtt_connection *connection, struct mmqtt_stream *stream) { return mmqtt_queue_add_(&connection->write_queue, stream); } static mmqtt_status_t mmqtt_connection_in_read_stream(struct mmqtt_connection *connection, struct mmqtt_stream *stream) { mmqtt_ssize_t i; struct mmqtt_queue_ *queue = &connection->read_queue; for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) { if (MMQTT_QUEUE_INDEX(queue, i) == stream) { return MMQTT_STATUS_OK; } } return MMQTT_STATUS_NOT_EXPECTED; } static mmqtt_status_t mmqtt_connection_in_write_stream(struct mmqtt_connection *connection, struct mmqtt_stream *stream) { mmqtt_ssize_t i; struct mmqtt_queue_ *queue = &connection->write_queue; for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) { if (MMQTT_QUEUE_INDEX(queue, i) == stream) { return MMQTT_STATUS_OK; } } return MMQTT_STATUS_NOT_EXPECTED; } static mmqtt_ssize_t mmqtt_connection_read_stream_length(struct mmqtt_connection *connection) { return connection->read_queue.ring_buffer.length; } static mmqtt_ssize_t mmqtt_connection_write_stream_length(struct mmqtt_connection *connection) { return connection->write_queue.ring_buffer.length; } static mmqtt_status_t mmqtt_connection_release_write_stream(struct mmqtt_connection *connection, struct mmqtt_stream *stream) { if (!MMQTT_QUEUE_AVAILABLE(&connection->write_queue)) { return MMQTT_STATUS_NOT_PULLABLE; } if (stream != MMQTT_QUEUE_FIRST(&connection->write_queue)) { return MMQTT_STATUS_NOT_EXPECTED; } mmqtt_ring_buffer_pull_(&connection->write_queue.ring_buffer, 1); return MMQTT_STATUS_OK; } static mmqtt_status_t mmqtt_connection_release_read_stream(struct mmqtt_connection *connection, struct mmqtt_stream *stream) { if (!MMQTT_QUEUE_AVAILABLE(&connection->read_queue)) { return MMQTT_STATUS_NOT_PULLABLE; } if (stream != MMQTT_QUEUE_FIRST(&connection->read_queue)) { return MMQTT_STATUS_NOT_EXPECTED; } mmqtt_ring_buffer_pull_(&connection->read_queue.ring_buffer, 1); return MMQTT_STATUS_OK; } static mmqtt_status_t mmqtt_connection_autorelease_write_streams(struct mmqtt_connection *connection) { struct mmqtt_stream *stream; while (MMQTT_QUEUE_AVAILABLE(&connection->write_queue)) { stream = MMQTT_QUEUE_FIRST(&connection->write_queue); if (mmqtt_stream_running(stream) == MMQTT_STATUS_DONE) { mmqtt_connection_release_write_stream(connection, stream); } else { break; } } return MMQTT_STATUS_OK; } static mmqtt_status_t mmqtt_connection_autorelease_read_streams(struct mmqtt_connection *connection) { struct mmqtt_stream *stream; while (MMQTT_QUEUE_AVAILABLE(&connection->read_queue)) { stream = MMQTT_QUEUE_FIRST(&connection->read_queue); if (mmqtt_stream_running(stream) == MMQTT_STATUS_DONE) { mmqtt_connection_release_read_stream(connection, stream); } else { break; } } return MMQTT_STATUS_OK; } #ifndef MMQTT_DISABLE_STATIC typedef mmqtt_status_t (*mmqtt_s_puller)(struct mmqtt_connection*); typedef mmqtt_status_t (*mmqtt_s_pusher)(struct mmqtt_connection*, mmqtt_ssize_t); mmqtt_status_t mmqtt_s_encode_buffer(struct mmqtt_connection *conn, mmqtt_s_puller puller, const uint8_t *buffer, uint16_t length) { struct mmqtt_stream stream; mmqtt_status_t status; mmqtt_ssize_t current; uint16_t consumed_length; mmqtt_stream_init(&stream, length); while ((status = mmqtt_connection_add_write_stream(conn, &stream)) == MMQTT_STATUS_NOT_PUSHABLE) { status = puller(conn); if (status != MMQTT_STATUS_OK) { return status; } } if (status != MMQTT_STATUS_OK) { return status; } consumed_length = 0; while (consumed_length < length) { current = mmqtt_stream_push(&stream, buffer + consumed_length, length - consumed_length); if (current >= 0) { consumed_length += current; } else if (current == MMQTT_STATUS_NOT_PUSHABLE) { status = puller(conn); if (status != MMQTT_STATUS_OK) { return status; } } } while (mmqtt_connection_in_write_stream(conn, &stream) == MMQTT_STATUS_OK) { status = puller(conn); if (status != MMQTT_STATUS_OK) { return status; } } return MMQTT_STATUS_OK; } mmqtt_status_t mmqtt_s_decode_buffer(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, uint8_t *buffer, uint16_t buffer_length, uint16_t consume_length) { struct mmqtt_stream stream; mmqtt_status_t status; mmqtt_ssize_t current; uint16_t length; uint8_t temp; if (mmqtt_connection_read_stream_length(conn) > 0) { return MMQTT_STATUS_INVALID_STATE; } if (consume_length < buffer_length) { consume_length = buffer_length; } mmqtt_stream_init(&stream, consume_length); /* Connection won't remove read stream automatically, pusher won't help here */ status = mmqtt_connection_add_read_stream(conn, &stream); if (status != MMQTT_STATUS_OK) { return status; } length = 0; while (length < buffer_length) { current = mmqtt_stream_pull(&stream, buffer + length, buffer_length - length); if (current >= 0) { length += current; } else if (current == MMQTT_STATUS_NOT_PULLABLE) { status = pusher(conn, buffer_length - length); if (status != MMQTT_STATUS_OK) { mmqtt_connection_release_read_stream(conn, &stream); return status; } } } while (length < consume_length) { current = mmqtt_stream_pull(&stream, &temp, 1); if (current >= 0) { length += current; } else if (current == MMQTT_STATUS_NOT_PULLABLE) { status = pusher(conn, consume_length - length); if (status != MMQTT_STATUS_OK) { mmqtt_connection_release_read_stream(conn, &stream); return status; } } } mmqtt_connection_release_read_stream(conn, &stream); return MMQTT_STATUS_OK; } /* NOTE: for smaller buffer, we can also use mmqtt_s_decode_buffer with * an empty buffer and 0 as buffer length. Here this function is for the * case where we are skipping a whole packet altogether, which could be * larger than 65535 in theory. */ mmqtt_status_t mmqtt_s_skip_buffer(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, uint32_t length) { mmqtt_status_t status; while (length > 65535) { status = mmqtt_s_decode_buffer(conn, pusher, NULL, 0, 65535); if (status != MMQTT_STATUS_OK) { return status; } length -= 65535; } if (length > 0) { return mmqtt_s_decode_buffer(conn, pusher, NULL, 0, (uint16_t) length); } else { return MMQTT_STATUS_OK; } } mmqtt_status_t mmqtt_s_encode_fixed_header(struct mmqtt_connection *conn, mmqtt_s_puller puller, uint8_t flag, uint32_t length) { mmqtt_status_t status; mmqtt_ssize_t i; uint8_t c, buffer[4]; buffer[0] = flag; status = mmqtt_s_encode_buffer(conn, puller, buffer, 1); if (status != MMQTT_STATUS_OK) { return status; } i = 0; do { c = (uint8_t) length % 128; length /= 128; if (length > 0) { c |= 0x80; } buffer[i++] = c; } while (length > 0); return mmqtt_s_encode_buffer(conn, puller, buffer, i); } mmqtt_status_t mmqtt_s_decode_fixed_header(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, uint8_t *flag, uint32_t *length) { mmqtt_status_t status; uint8_t c; uint32_t l, factor; status = mmqtt_s_decode_buffer(conn, pusher, &c, 1, 1); if (status != MMQTT_STATUS_OK) { return status; } if (flag) { *flag = c; } l = 0; factor = 1; do { status = mmqtt_s_decode_buffer(conn, pusher, &c, 1, 1); if (status != MMQTT_STATUS_OK) { return status; } l += (c & 0x7F) * factor; factor *= 128; } while ((c & 0x80) != 0); if (length) { *length = l; } return MMQTT_STATUS_OK; } void mmqtt_s_pack_uint16_(uint16_t length, uint8_t *buffer) { buffer[0] = (length >> 8) & 0xFF; buffer[1] = length & 0xFF; } uint16_t mmqtt_s_unpack_uint16_(uint8_t *buffer) { return ((uint16_t) (buffer[0] << 8)) | ((uint16_t) buffer[1]); } /* We don't case uint16_t* directly into uint8_t* because there might be * endianness differences */ mmqtt_status_t mmqtt_s_encode_uint16(struct mmqtt_connection *conn, mmqtt_s_puller puller, uint16_t val) { uint8_t buffer[2]; mmqtt_s_pack_uint16_(val, buffer); return mmqtt_s_encode_buffer(conn, puller, buffer, 2); } mmqtt_status_t mmqtt_s_decode_uint16(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, uint16_t *out_val) { mmqtt_status_t status; uint8_t buffer[2]; status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2); if (status != MMQTT_STATUS_OK) { return status; } if (out_val) { *out_val = mmqtt_s_unpack_uint16_(buffer); } return MMQTT_STATUS_OK; } mmqtt_status_t mmqtt_s_encode_string(struct mmqtt_connection *conn, mmqtt_s_puller puller, const uint8_t *str, uint16_t length) { mmqtt_status_t status; uint8_t buffer[2]; mmqtt_s_pack_uint16_(length, buffer); status = mmqtt_s_encode_buffer(conn, puller, buffer, 2); if (status != MMQTT_STATUS_OK) { return status; } return mmqtt_s_encode_buffer(conn, puller, str, length); } mmqtt_status_t mmqtt_s_decode_string(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, uint8_t *str, uint16_t max_length, uint16_t *out_length, uint16_t *out_true_length) { mmqtt_status_t status; uint8_t buffer[2]; uint16_t true_length, length; status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2); if (status != MMQTT_STATUS_OK) { return status; } true_length = mmqtt_s_unpack_uint16_(buffer); length = min(true_length, max_length); status = mmqtt_s_decode_buffer(conn, pusher, str, length, true_length); if (status != MMQTT_STATUS_OK) { return status; } if (out_length) { *out_length = length; } if (out_true_length) { *out_true_length = true_length; } return MMQTT_STATUS_OK; } /* p stands for packet here */ struct mmqtt_p_connect_header { uint8_t *name; uint16_t name_length; uint16_t name_max_length; uint8_t protocol_version; uint8_t flags; uint16_t keepalive; }; mmqtt_status_t mmqtt_s_encode_connect_header(struct mmqtt_connection *conn, mmqtt_s_puller puller, const struct mmqtt_p_connect_header *header) { mmqtt_status_t status; uint8_t buffer[2]; mmqtt_s_pack_uint16_(header->name_length, buffer); status = mmqtt_s_encode_buffer(conn, puller, buffer, 2); if (status != MMQTT_STATUS_OK) { return status; } status = mmqtt_s_encode_buffer(conn, puller, header->name, header->name_length); if (status != MMQTT_STATUS_OK) { return status; } status = mmqtt_s_encode_buffer(conn, puller, &header->protocol_version, 1); if (status != MMQTT_STATUS_OK) { return status; } status = mmqtt_s_encode_buffer(conn, puller, &header->flags, 1); if (status != MMQTT_STATUS_OK) { return status; } mmqtt_s_pack_uint16_(header->keepalive, buffer); return mmqtt_s_encode_buffer(conn, puller, buffer, 2); } /* We would rewrite name_length in header to contain returned name length */ mmqtt_status_t mmqtt_s_decode_connect_header(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, struct mmqtt_p_connect_header *header) { mmqtt_status_t status; uint8_t buffer[2]; uint16_t length; status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2); if (status != MMQTT_STATUS_OK) { return status; } length = mmqtt_s_unpack_uint16_(buffer); header->name_length = min(header->name_max_length, length); status = mmqtt_s_decode_buffer(conn, pusher, header->name, header->name_length, length); if (status != MMQTT_STATUS_OK) { return status; } status = mmqtt_s_decode_buffer(conn, pusher, &header->protocol_version, 1, 1); if (status != MMQTT_STATUS_OK) { return status; } status = mmqtt_s_decode_buffer(conn, pusher, &header->flags, 1, 1); if (status != MMQTT_STATUS_OK) { return status; } status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2); if (status != MMQTT_STATUS_OK) { return status; } header->keepalive = mmqtt_s_unpack_uint16_(buffer); return MMQTT_STATUS_OK; } struct mmqtt_p_connack_header { uint8_t reserved; uint8_t return_code; }; mmqtt_status_t mmqtt_s_encode_connack_header(struct mmqtt_connection *conn, mmqtt_s_puller puller, const struct mmqtt_p_connack_header *header) { mmqtt_status_t status; status = mmqtt_s_encode_buffer(conn, puller, &header->reserved, 1); if (status != MMQTT_STATUS_OK) { return status; } return mmqtt_s_encode_buffer(conn, puller, &header->return_code, 1); } mmqtt_status_t mmqtt_s_decode_connack_header(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, struct mmqtt_p_connack_header *header) { mmqtt_status_t status; status = mmqtt_s_decode_buffer(conn, pusher, &header->reserved, 1, 1); if (status != MMQTT_STATUS_OK) { return status; } return mmqtt_s_decode_buffer(conn, pusher, &header->return_code, 1, 1); } /* Works for both subscribe and unsubscribe */ struct mmqtt_p_subscribe_decode_context { uint32_t length; uint8_t has_qos; }; struct mmqtt_p_subscribe_payload_line { uint8_t *topic; uint16_t topic_length; uint16_t topic_max_length; uint8_t qos; }; void mmqtt_s_subscribe_decode_context_init(struct mmqtt_p_subscribe_decode_context *context, uint32_t length) { context->length = length; context->has_qos = 1; } void mmqtt_s_unsubscribe_decode_context_init(struct mmqtt_p_subscribe_decode_context *context, uint32_t length) { context->length = length; context->has_qos = 0; } mmqtt_status_t mmqtt_s_decode_subscribe_payload(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, struct mmqtt_p_subscribe_decode_context *context, struct mmqtt_p_subscribe_payload_line *line) { mmqtt_status_t status; uint16_t string_true_length; if (context->length <= 0) { return MMQTT_STATUS_DONE; } status = mmqtt_s_decode_string(conn, pusher, line->topic, line->topic_max_length, &line->topic_length, &string_true_length); if (status != MMQTT_STATUS_OK) { return status; } context->length -= string_true_length; context->length -= 2; if (context->has_qos != 0) { status = mmqtt_s_decode_buffer(conn, pusher, &line->qos, 1, 1); if (status != MMQTT_STATUS_OK) { return status; } context->length -= 1; } else { line->qos = 0; } return MMQTT_STATUS_OK; } uint32_t mmqtt_s_string_encoded_length(uint16_t string_length) { return string_length + 2; } uint32_t mmqtt_s_connect_header_encoded_length(const struct mmqtt_p_connect_header *header) { return mmqtt_s_string_encoded_length(header->name_length) + 4; } #endif /* MMQTT_DISABLE_STATIC */ #endif /* MMQTT_H_ */