minimal MQTT Library. This version simply doubled the MQTT QUEUE sizes
Dependents: WNCInterface_M2XMQTTdemo
Fork of minimal-mqtt by
minimal-mqtt.h
- Committer:
- JMF
- Date:
- 2016-10-08
- Revision:
- 2:65f85aa601db
- Parent:
- 1:311cd16389ff
File content as of revision 2:65f85aa601db:
#ifndef MMQTT_H_
#define MMQTT_H_
#include <stdint.h>
#ifndef min
#define min(a, b) ((a) > (b) ? (b) : (a))
#endif /* min */
#define MMQTT_STREAM_MAX_LENGTH 16
#define MMQTT_QUEUE_MAX_LENGTH 4
/* mmqtt_ssize_t must be able to hold MMQTT_STREAM_MAX_LENGTH as well as
* MMQTT_QUEUE_MAX_LENGTH, we might add macro-based detection in the future.
*/
typedef int8_t mmqtt_ssize_t;
typedef int8_t mmqtt_status_t;
#define MMQTT_STATUS_OK 0
#define MMQTT_STATUS_UNKNOWN -1
#define MMQTT_STATUS_DONE -2
#define MMQTT_STATUS_NOT_PULLABLE -3
#define MMQTT_STATUS_NOT_PUSHABLE -4
#define MMQTT_STATUS_NOT_EXPECTED -5
#define MMQTT_STATUS_BROKEN_CONNECTION -6
#define MMQTT_STATUS_INVALID_STATE -7
#define MMQTT_MESSAGE_TYPE_CONNECT 0x1
#define MMQTT_MESSAGE_TYPE_CONNACK 0x2
#define MMQTT_MESSAGE_TYPE_PUBLISH 0x3
#define MMQTT_MESSAGE_TYPE_PUBACK 0x4
#define MMQTT_MESSAGE_TYPE_SUBSCRIBE 0x8
#define MMQTT_MESSAGE_TYPE_SUBACK 0x9
#define MMQTT_MESSAGE_TYPE_UNSUBSCRIBE 0x10
#define MMQTT_MESSAGE_TYPE_UNSUBACK 0x11
#define MMQTT_MESSAGE_TYPE_PINGREQ 0x12
#define MMQTT_MESSAGE_TYPE_PINGRESP 0x13
#define MMQTT_MESSAGE_TYPE_DISCONNECT 0x14
#define MMQTT_PACK_MESSAGE_TYPE(type_) ((type_) << 4)
#define MMQTT_UNPACK_MESSAGE_TYPE(type_) (((type_) >> 4) & 0xF)
struct mmqtt_ring_buffer_ {
mmqtt_ssize_t start;
mmqtt_ssize_t length;
mmqtt_ssize_t capacity;
};
struct mmqtt_stream {
struct mmqtt_ring_buffer_ ring_buffer;
uint8_t data[MMQTT_STREAM_MAX_LENGTH];
size_t left;
};
struct mmqtt_queue_ {
struct mmqtt_ring_buffer_ ring_buffer;
struct mmqtt_stream* streams[MMQTT_QUEUE_MAX_LENGTH];
};
#define MMQTT_QUEUE_LENGTH(queue) ((queue)->ring_buffer.length)
#define MMQTT_QUEUE_INDEX(queue, i) ((queue)->streams[((queue)->ring_buffer.start + (i)) % ((queue)->ring_buffer.capacity)])
struct mmqtt_connection {
void *connection;
struct mmqtt_queue_ read_queue;
struct mmqtt_queue_ write_queue;
};
static inline void
mmqtt_ring_buffer_init_(struct mmqtt_ring_buffer_ *buffer, mmqtt_ssize_t capacity)
{
buffer->start = 0;
buffer->length = 0;
buffer->capacity = capacity;
}
static inline mmqtt_ssize_t
mmqtt_ring_buffer_pullable_(const struct mmqtt_ring_buffer_ *buffer)
{
return min(buffer->start + buffer->length, buffer->capacity) - buffer->start;
}
static inline void
mmqtt_ring_buffer_pull_(struct mmqtt_ring_buffer_ *buffer, mmqtt_ssize_t length)
{
buffer->start = (buffer->start + length) % buffer->capacity;
buffer->length -= length;
}
static inline mmqtt_ssize_t
mmqtt_ring_buffer_pushable_(const struct mmqtt_ring_buffer_ *buffer)
{
if (buffer->start + buffer->length > buffer->capacity) {
return buffer->capacity - buffer->length;
}
return buffer->capacity - buffer->length - buffer->start;
}
static inline void
mmqtt_ring_buffer_push_(struct mmqtt_ring_buffer_ *buffer, mmqtt_ssize_t length)
{
buffer->length += length;
}
static void
mmqtt_stream_init(struct mmqtt_stream *stream, size_t total_size)
{
mmqtt_ring_buffer_init_(&stream->ring_buffer, MMQTT_STREAM_MAX_LENGTH);
stream->left = total_size;
}
static mmqtt_status_t
mmqtt_stream_running(struct mmqtt_stream *stream)
{
if (stream->left > 0 || stream->ring_buffer.length > 0) {
return MMQTT_STATUS_OK;
} else {
return MMQTT_STATUS_DONE;
}
}
static mmqtt_ssize_t
mmqtt_stream_pull(struct mmqtt_stream *stream, uint8_t* out, mmqtt_ssize_t max_length)
{
mmqtt_ssize_t pullable, i;
pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer);
if (pullable <= 0) {
if (stream->left == 0) {
return MMQTT_STATUS_DONE;
} else {
return MMQTT_STATUS_NOT_PULLABLE;
}
}
pullable = min(pullable, max_length);
if (out) {
for (i = 0; i < pullable; i++) {
out[i] = stream->data[stream->ring_buffer.start + i];
}
}
mmqtt_ring_buffer_pull_(&stream->ring_buffer, pullable);
return pullable;
}
static mmqtt_status_t
mmqtt_stream_external_pullable(struct mmqtt_stream *stream,
const uint8_t** data, mmqtt_ssize_t* max_length)
{
mmqtt_ssize_t pullable;
pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer);
if (pullable <= 0) {
if (stream->left == 0) {
return MMQTT_STATUS_DONE;
} else {
return MMQTT_STATUS_NOT_PULLABLE;
}
}
pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer);
if (pullable <= 0) { return MMQTT_STATUS_NOT_PULLABLE; }
*data = stream->data + stream->ring_buffer.start;
*max_length = pullable;
return MMQTT_STATUS_OK;
}
static mmqtt_status_t
mmqtt_stream_external_pull(struct mmqtt_stream *stream, mmqtt_ssize_t length)
{
mmqtt_ssize_t pullable;
pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer);
if (pullable < length) {
return MMQTT_STATUS_NOT_PULLABLE;
} else {
mmqtt_ring_buffer_pull_(&stream->ring_buffer, length);
return MMQTT_STATUS_OK;
}
}
static mmqtt_ssize_t
mmqtt_stream_push(struct mmqtt_stream *stream, const uint8_t* in, mmqtt_ssize_t length)
{
mmqtt_ssize_t pushable, i;
if (stream->left == 0) { return MMQTT_STATUS_DONE; }
pushable = mmqtt_ring_buffer_pushable_(&stream->ring_buffer);
if (pushable <= 0) { return MMQTT_STATUS_NOT_PUSHABLE; }
pushable = min(stream->left, min(pushable, length));
for (i = 0; i < pushable; i++) {
stream->data[stream->ring_buffer.start + i] = in[i];
}
mmqtt_ring_buffer_push_(&stream->ring_buffer, pushable);
if (stream->left > 0) { stream->left -= pushable; }
return pushable;
}
static mmqtt_status_t
mmqtt_stream_external_pushable(struct mmqtt_stream *stream,
uint8_t** data, mmqtt_ssize_t* max_length)
{
mmqtt_ssize_t pushable;
if (stream->left == 0) { return MMQTT_STATUS_DONE; }
pushable = mmqtt_ring_buffer_pushable_(&stream->ring_buffer);
if (pushable <= 0) { return MMQTT_STATUS_NOT_PUSHABLE; }
pushable = min(pushable, stream->left);
*data = stream->data + stream->ring_buffer.start;
*max_length = pushable;
return MMQTT_STATUS_OK;
}
static mmqtt_status_t
mmqtt_stream_external_push(struct mmqtt_stream *stream, mmqtt_ssize_t length)
{
mmqtt_ssize_t pushable;
if (stream->left == 0) { return MMQTT_STATUS_DONE; }
pushable = mmqtt_ring_buffer_pushable_(&stream->ring_buffer);
if (pushable <= 0) { return MMQTT_STATUS_NOT_PUSHABLE; }
pushable = min(pushable, stream->left);
if (pushable < length) {
return MMQTT_STATUS_NOT_PUSHABLE;
} else {
mmqtt_ring_buffer_push_(&stream->ring_buffer, pushable);
return MMQTT_STATUS_OK;
}
}
static void
mmqtt_queue_init_(struct mmqtt_queue_ *queue)
{
mmqtt_ring_buffer_init_(&queue->ring_buffer, MMQTT_QUEUE_MAX_LENGTH);
}
static mmqtt_status_t
mmqtt_queue_add_(struct mmqtt_queue_ *queue, struct mmqtt_stream *stream)
{
if (mmqtt_ring_buffer_pushable_(&queue->ring_buffer) <= 0) {
return MMQTT_STATUS_NOT_PUSHABLE;
}
queue->streams[queue->ring_buffer.start] = stream;
mmqtt_ring_buffer_push_(&queue->ring_buffer, 1);
return MMQTT_STATUS_OK;
}
#define MMQTT_QUEUE_AVAILABLE(queue) ((queue)->ring_buffer.length > 0)
#define MMQTT_QUEUE_FIRST(queue) ((queue)->streams[(queue)->ring_buffer.start])
static void
mmqtt_connection_init(struct mmqtt_connection *connection, void *conn)
{
connection->connection = conn;
mmqtt_queue_init_(&connection->read_queue);
mmqtt_queue_init_(&connection->write_queue);
}
/*
* Pulls data out of connection's write queue, the returned data is supposed
* to be sent to a TCP socket to remote servers.
*/
static mmqtt_ssize_t
mmqtt_connection_pull(struct mmqtt_connection *connection,
uint8_t *out, mmqtt_ssize_t max_length)
{
mmqtt_ssize_t length, current;
struct mmqtt_queue_ *queue;
length = 0;
queue = &connection->write_queue;
while(MMQTT_QUEUE_AVAILABLE(queue) && length < max_length) {
current = mmqtt_stream_pull(MMQTT_QUEUE_FIRST(queue),
out + length,
max_length - length);
/* TODO: in case current is 0, should we return non-pullable? */
if (current >= 0) {
length += current;
} else if (current == MMQTT_STATUS_DONE) {
/* Release stream from write queue since it is done */
mmqtt_ring_buffer_pull_(&queue->ring_buffer, 1);
} else if (current == MMQTT_STATUS_NOT_PULLABLE) {
return (length > 0) ? length : MMQTT_STATUS_NOT_PULLABLE;
} else {
return MMQTT_STATUS_UNKNOWN;
}
}
return (length > 0) ? length : MMQTT_STATUS_NOT_PULLABLE;
}
struct mmqtt_stream *
mmqtt_connection_pullable_stream(struct mmqtt_connection *connection)
{
mmqtt_ssize_t i;
struct mmqtt_queue_ *queue = &connection->write_queue;
struct mmqtt_stream *stream = NULL;
for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) {
stream = MMQTT_QUEUE_INDEX(queue, i);
if (mmqtt_stream_running(stream) == MMQTT_STATUS_OK) { return stream; }
}
return NULL;
}
/*
* Pushes data to connection's read queue, the data should come from
* a TCP socket to a remote machine.
*/
static mmqtt_ssize_t
mmqtt_connection_push(struct mmqtt_connection *connection,
const uint8_t* in, mmqtt_ssize_t length)
{
mmqtt_ssize_t i, current, consumed_length;
struct mmqtt_queue_ *queue;
consumed_length = 0;
queue = &connection->read_queue;
while (i < MMQTT_QUEUE_LENGTH(queue) && consumed_length < length) {
current = mmqtt_stream_push(MMQTT_QUEUE_INDEX(queue, i),
in + consumed_length,
length - consumed_length);
if (current >= 0) {
consumed_length += current;
} else if (current == MMQTT_STATUS_DONE) {
i++;
} else if (current == MMQTT_STATUS_NOT_PUSHABLE) {
return (consumed_length > 0) ? consumed_length : MMQTT_STATUS_NOT_PUSHABLE;
} else {
return MMQTT_STATUS_UNKNOWN;
}
}
return (consumed_length > 0) ? consumed_length : MMQTT_STATUS_NOT_PUSHABLE;
}
struct mmqtt_stream *
mmqtt_connection_pushable_stream(struct mmqtt_connection *connection)
{
mmqtt_ssize_t i;
struct mmqtt_queue_ *queue = &connection->read_queue;
struct mmqtt_stream *stream = NULL;
for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) {
stream = MMQTT_QUEUE_INDEX(queue, i);
if (mmqtt_stream_running(stream) == MMQTT_STATUS_OK) { return stream; }
}
return NULL;
}
static mmqtt_status_t
mmqtt_connection_add_read_stream(struct mmqtt_connection *connection,
struct mmqtt_stream *stream)
{
return mmqtt_queue_add_(&connection->read_queue, stream);
}
static mmqtt_status_t
mmqtt_connection_add_write_stream(struct mmqtt_connection *connection,
struct mmqtt_stream *stream)
{
return mmqtt_queue_add_(&connection->write_queue, stream);
}
static mmqtt_status_t
mmqtt_connection_in_read_stream(struct mmqtt_connection *connection,
struct mmqtt_stream *stream)
{
mmqtt_ssize_t i;
struct mmqtt_queue_ *queue = &connection->read_queue;
for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) {
if (MMQTT_QUEUE_INDEX(queue, i) == stream) { return MMQTT_STATUS_OK; }
}
return MMQTT_STATUS_NOT_EXPECTED;
}
static mmqtt_status_t
mmqtt_connection_in_write_stream(struct mmqtt_connection *connection,
struct mmqtt_stream *stream)
{
mmqtt_ssize_t i;
struct mmqtt_queue_ *queue = &connection->write_queue;
for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) {
if (MMQTT_QUEUE_INDEX(queue, i) == stream) { return MMQTT_STATUS_OK; }
}
return MMQTT_STATUS_NOT_EXPECTED;
}
static mmqtt_ssize_t
mmqtt_connection_read_stream_length(struct mmqtt_connection *connection)
{
return connection->read_queue.ring_buffer.length;
}
static mmqtt_ssize_t
mmqtt_connection_write_stream_length(struct mmqtt_connection *connection)
{
return connection->write_queue.ring_buffer.length;
}
static mmqtt_status_t
mmqtt_connection_release_write_stream(struct mmqtt_connection *connection,
struct mmqtt_stream *stream)
{
if (!MMQTT_QUEUE_AVAILABLE(&connection->write_queue)) {
return MMQTT_STATUS_NOT_PULLABLE;
}
if (stream != MMQTT_QUEUE_FIRST(&connection->write_queue)) {
return MMQTT_STATUS_NOT_EXPECTED;
}
mmqtt_ring_buffer_pull_(&connection->write_queue.ring_buffer, 1);
return MMQTT_STATUS_OK;
}
static mmqtt_status_t
mmqtt_connection_release_read_stream(struct mmqtt_connection *connection,
struct mmqtt_stream *stream)
{
if (!MMQTT_QUEUE_AVAILABLE(&connection->read_queue)) {
return MMQTT_STATUS_NOT_PULLABLE;
}
if (stream != MMQTT_QUEUE_FIRST(&connection->read_queue)) {
return MMQTT_STATUS_NOT_EXPECTED;
}
mmqtt_ring_buffer_pull_(&connection->read_queue.ring_buffer, 1);
return MMQTT_STATUS_OK;
}
static mmqtt_status_t
mmqtt_connection_autorelease_write_streams(struct mmqtt_connection *connection)
{
struct mmqtt_stream *stream;
while (MMQTT_QUEUE_AVAILABLE(&connection->write_queue)) {
stream = MMQTT_QUEUE_FIRST(&connection->write_queue);
if (mmqtt_stream_running(stream) == MMQTT_STATUS_DONE) {
mmqtt_connection_release_write_stream(connection, stream);
} else {
break;
}
}
return MMQTT_STATUS_OK;
}
static mmqtt_status_t
mmqtt_connection_autorelease_read_streams(struct mmqtt_connection *connection)
{
struct mmqtt_stream *stream;
while (MMQTT_QUEUE_AVAILABLE(&connection->read_queue)) {
stream = MMQTT_QUEUE_FIRST(&connection->read_queue);
if (mmqtt_stream_running(stream) == MMQTT_STATUS_DONE) {
mmqtt_connection_release_read_stream(connection, stream);
} else {
break;
}
}
return MMQTT_STATUS_OK;
}
#ifndef MMQTT_DISABLE_STATIC
typedef mmqtt_status_t (*mmqtt_s_puller)(struct mmqtt_connection*);
typedef mmqtt_status_t (*mmqtt_s_pusher)(struct mmqtt_connection*, mmqtt_ssize_t);
mmqtt_status_t
mmqtt_s_encode_buffer(struct mmqtt_connection *conn, mmqtt_s_puller puller,
const uint8_t *buffer, uint16_t length)
{
struct mmqtt_stream stream;
mmqtt_status_t status;
mmqtt_ssize_t current;
uint16_t consumed_length;
mmqtt_stream_init(&stream, length);
while ((status = mmqtt_connection_add_write_stream(conn, &stream)) == MMQTT_STATUS_NOT_PUSHABLE) {
status = puller(conn);
if (status != MMQTT_STATUS_OK) { return status; }
}
if (status != MMQTT_STATUS_OK) { return status; }
consumed_length = 0;
while (consumed_length < length) {
current = mmqtt_stream_push(&stream, buffer + consumed_length,
length - consumed_length);
if (current >= 0) {
consumed_length += current;
} else if (current == MMQTT_STATUS_NOT_PUSHABLE) {
status = puller(conn);
if (status != MMQTT_STATUS_OK) { return status; }
}
}
while (mmqtt_connection_in_write_stream(conn, &stream) == MMQTT_STATUS_OK) {
status = puller(conn);
if (status != MMQTT_STATUS_OK) { return status; }
}
return MMQTT_STATUS_OK;
}
mmqtt_status_t
mmqtt_s_decode_buffer(struct mmqtt_connection *conn, mmqtt_s_pusher pusher,
uint8_t *buffer, uint16_t buffer_length,
uint16_t consume_length)
{
struct mmqtt_stream stream;
mmqtt_status_t status;
mmqtt_ssize_t current;
uint16_t length;
uint8_t temp;
if (mmqtt_connection_read_stream_length(conn) > 0) {
return MMQTT_STATUS_INVALID_STATE;
}
if (consume_length < buffer_length) { consume_length = buffer_length; }
mmqtt_stream_init(&stream, consume_length);
/* Connection won't remove read stream automatically, pusher won't help here */
status = mmqtt_connection_add_read_stream(conn, &stream);
if (status != MMQTT_STATUS_OK) { return status; }
length = 0;
while (length < buffer_length) {
current = mmqtt_stream_pull(&stream, buffer + length,
buffer_length - length);
if (current >= 0) {
length += current;
} else if (current == MMQTT_STATUS_NOT_PULLABLE) {
status = pusher(conn, buffer_length - length);
if (status != MMQTT_STATUS_OK) {
mmqtt_connection_release_read_stream(conn, &stream);
return status;
}
}
}
while (length < consume_length) {
current = mmqtt_stream_pull(&stream, &temp, 1);
if (current >= 0) {
length += current;
} else if (current == MMQTT_STATUS_NOT_PULLABLE) {
status = pusher(conn, consume_length - length);
if (status != MMQTT_STATUS_OK) {
mmqtt_connection_release_read_stream(conn, &stream);
return status;
}
}
}
mmqtt_connection_release_read_stream(conn, &stream);
return MMQTT_STATUS_OK;
}
/* NOTE: for smaller buffer, we can also use mmqtt_s_decode_buffer with
* an empty buffer and 0 as buffer length. Here this function is for the
* case where we are skipping a whole packet altogether, which could be
* larger than 65535 in theory.
*/
mmqtt_status_t
mmqtt_s_skip_buffer(struct mmqtt_connection *conn, mmqtt_s_pusher pusher,
uint32_t length)
{
mmqtt_status_t status;
while (length > 65535) {
status = mmqtt_s_decode_buffer(conn, pusher, NULL, 0, 65535);
if (status != MMQTT_STATUS_OK) {
return status;
}
length -= 65535;
}
if (length > 0) {
return mmqtt_s_decode_buffer(conn, pusher, NULL, 0, (uint16_t) length);
} else {
return MMQTT_STATUS_OK;
}
}
mmqtt_status_t
mmqtt_s_encode_fixed_header(struct mmqtt_connection *conn, mmqtt_s_puller puller,
uint8_t flag, uint32_t length)
{
mmqtt_status_t status;
mmqtt_ssize_t i;
uint8_t c, buffer[4];
buffer[0] = flag;
status = mmqtt_s_encode_buffer(conn, puller, buffer, 1);
if (status != MMQTT_STATUS_OK) { return status; }
i = 0;
do {
c = (uint8_t) length % 128;
length /= 128;
if (length > 0) { c |= 0x80; }
buffer[i++] = c;
} while (length > 0);
return mmqtt_s_encode_buffer(conn, puller, buffer, i);
}
mmqtt_status_t
mmqtt_s_decode_fixed_header(struct mmqtt_connection *conn, mmqtt_s_pusher pusher,
uint8_t *flag, uint32_t *length)
{
mmqtt_status_t status;
uint8_t c;
uint32_t l, factor;
status = mmqtt_s_decode_buffer(conn, pusher, &c, 1, 1);
if (status != MMQTT_STATUS_OK) { return status; }
if (flag) { *flag = c; }
l = 0;
factor = 1;
do {
status = mmqtt_s_decode_buffer(conn, pusher, &c, 1, 1);
if (status != MMQTT_STATUS_OK) { return status; }
l += (c & 0x7F) * factor;
factor *= 128;
} while ((c & 0x80) != 0);
if (length) { *length = l; }
return MMQTT_STATUS_OK;
}
void mmqtt_s_pack_uint16_(uint16_t length, uint8_t *buffer)
{
buffer[0] = (length >> 8) & 0xFF;
buffer[1] = length & 0xFF;
}
uint16_t mmqtt_s_unpack_uint16_(uint8_t *buffer)
{
return ((uint16_t) (buffer[0] << 8)) | ((uint16_t) buffer[1]);
}
/* We don't case uint16_t* directly into uint8_t* because there might be
* endianness differences
*/
mmqtt_status_t
mmqtt_s_encode_uint16(struct mmqtt_connection *conn, mmqtt_s_puller puller,
uint16_t val)
{
uint8_t buffer[2];
mmqtt_s_pack_uint16_(val, buffer);
return mmqtt_s_encode_buffer(conn, puller, buffer, 2);
}
mmqtt_status_t
mmqtt_s_decode_uint16(struct mmqtt_connection *conn, mmqtt_s_pusher pusher,
uint16_t *out_val)
{
mmqtt_status_t status;
uint8_t buffer[2];
status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2);
if (status != MMQTT_STATUS_OK) { return status; }
if (out_val) { *out_val = mmqtt_s_unpack_uint16_(buffer); }
return MMQTT_STATUS_OK;
}
mmqtt_status_t
mmqtt_s_encode_string(struct mmqtt_connection *conn, mmqtt_s_puller puller,
const uint8_t *str, uint16_t length)
{
mmqtt_status_t status;
uint8_t buffer[2];
mmqtt_s_pack_uint16_(length, buffer);
status = mmqtt_s_encode_buffer(conn, puller, buffer, 2);
if (status != MMQTT_STATUS_OK) { return status; }
return mmqtt_s_encode_buffer(conn, puller, str, length);
}
mmqtt_status_t
mmqtt_s_decode_string(struct mmqtt_connection *conn, mmqtt_s_pusher pusher,
uint8_t *str, uint16_t max_length,
uint16_t *out_length, uint16_t *out_true_length) {
mmqtt_status_t status;
uint8_t buffer[2];
uint16_t true_length, length;
status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2);
if (status != MMQTT_STATUS_OK) { return status; }
true_length = mmqtt_s_unpack_uint16_(buffer);
length = min(true_length, max_length);
status = mmqtt_s_decode_buffer(conn, pusher, str, length, true_length);
if (status != MMQTT_STATUS_OK) { return status; }
if (out_length) { *out_length = length; }
if (out_true_length) { *out_true_length = true_length; }
return MMQTT_STATUS_OK;
}
/* p stands for packet here */
struct mmqtt_p_connect_header {
uint8_t *name;
uint16_t name_length;
uint16_t name_max_length;
uint8_t protocol_version;
uint8_t flags;
uint16_t keepalive;
};
mmqtt_status_t
mmqtt_s_encode_connect_header(struct mmqtt_connection *conn, mmqtt_s_puller puller,
const struct mmqtt_p_connect_header *header)
{
mmqtt_status_t status;
uint8_t buffer[2];
mmqtt_s_pack_uint16_(header->name_length, buffer);
status = mmqtt_s_encode_buffer(conn, puller, buffer, 2);
if (status != MMQTT_STATUS_OK) { return status; }
status = mmqtt_s_encode_buffer(conn, puller, header->name, header->name_length);
if (status != MMQTT_STATUS_OK) { return status; }
status = mmqtt_s_encode_buffer(conn, puller, &header->protocol_version, 1);
if (status != MMQTT_STATUS_OK) { return status; }
status = mmqtt_s_encode_buffer(conn, puller, &header->flags, 1);
if (status != MMQTT_STATUS_OK) { return status; }
mmqtt_s_pack_uint16_(header->keepalive, buffer);
return mmqtt_s_encode_buffer(conn, puller, buffer, 2);
}
/* We would rewrite name_length in header to contain returned name length */
mmqtt_status_t
mmqtt_s_decode_connect_header(struct mmqtt_connection *conn, mmqtt_s_pusher pusher,
struct mmqtt_p_connect_header *header)
{
mmqtt_status_t status;
uint8_t buffer[2];
uint16_t length;
status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2);
if (status != MMQTT_STATUS_OK) { return status; }
length = mmqtt_s_unpack_uint16_(buffer);
header->name_length = min(header->name_max_length, length);
status = mmqtt_s_decode_buffer(conn, pusher, header->name,
header->name_length, length);
if (status != MMQTT_STATUS_OK) { return status; }
status = mmqtt_s_decode_buffer(conn, pusher, &header->protocol_version, 1, 1);
if (status != MMQTT_STATUS_OK) { return status; }
status = mmqtt_s_decode_buffer(conn, pusher, &header->flags, 1, 1);
if (status != MMQTT_STATUS_OK) { return status; }
status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2);
if (status != MMQTT_STATUS_OK) { return status; }
header->keepalive = mmqtt_s_unpack_uint16_(buffer);
return MMQTT_STATUS_OK;
}
struct mmqtt_p_connack_header {
uint8_t reserved;
uint8_t return_code;
};
mmqtt_status_t
mmqtt_s_encode_connack_header(struct mmqtt_connection *conn, mmqtt_s_puller puller,
const struct mmqtt_p_connack_header *header)
{
mmqtt_status_t status;
status = mmqtt_s_encode_buffer(conn, puller, &header->reserved, 1);
if (status != MMQTT_STATUS_OK) { return status; }
return mmqtt_s_encode_buffer(conn, puller, &header->return_code, 1);
}
mmqtt_status_t
mmqtt_s_decode_connack_header(struct mmqtt_connection *conn, mmqtt_s_pusher pusher,
struct mmqtt_p_connack_header *header)
{
mmqtt_status_t status;
status = mmqtt_s_decode_buffer(conn, pusher, &header->reserved, 1, 1);
if (status != MMQTT_STATUS_OK) { return status; }
return mmqtt_s_decode_buffer(conn, pusher, &header->return_code, 1, 1);
}
/* Works for both subscribe and unsubscribe */
struct mmqtt_p_subscribe_decode_context {
uint32_t length;
uint8_t has_qos;
};
struct mmqtt_p_subscribe_payload_line {
uint8_t *topic;
uint16_t topic_length;
uint16_t topic_max_length;
uint8_t qos;
};
void
mmqtt_s_subscribe_decode_context_init(struct mmqtt_p_subscribe_decode_context *context,
uint32_t length)
{
context->length = length;
context->has_qos = 1;
}
void
mmqtt_s_unsubscribe_decode_context_init(struct mmqtt_p_subscribe_decode_context *context,
uint32_t length)
{
context->length = length;
context->has_qos = 0;
}
mmqtt_status_t
mmqtt_s_decode_subscribe_payload(struct mmqtt_connection *conn, mmqtt_s_pusher pusher,
struct mmqtt_p_subscribe_decode_context *context,
struct mmqtt_p_subscribe_payload_line *line)
{
mmqtt_status_t status;
uint16_t string_true_length;
if (context->length <= 0) { return MMQTT_STATUS_DONE; }
status = mmqtt_s_decode_string(conn, pusher, line->topic, line->topic_max_length,
&line->topic_length, &string_true_length);
if (status != MMQTT_STATUS_OK) { return status; }
context->length -= string_true_length;
context->length -= 2;
if (context->has_qos != 0) {
status = mmqtt_s_decode_buffer(conn, pusher, &line->qos, 1, 1);
if (status != MMQTT_STATUS_OK) { return status; }
context->length -= 1;
} else {
line->qos = 0;
}
return MMQTT_STATUS_OK;
}
uint32_t
mmqtt_s_string_encoded_length(uint16_t string_length)
{
return string_length + 2;
}
uint32_t
mmqtt_s_connect_header_encoded_length(const struct mmqtt_p_connect_header *header)
{
return mmqtt_s_string_encoded_length(header->name_length) + 4;
}
#endif /* MMQTT_DISABLE_STATIC */
#endif /* MMQTT_H_ */
