minimal MQTT Library. This version simply doubled the MQTT QUEUE sizes

Dependents:   WNCInterface_M2XMQTTdemo

Fork of minimal-mqtt by Xuejie Xiao

Committer:
JMF
Date:
Sat Oct 08 00:46:33 2016 +0000
Revision:
2:65f85aa601db
Parent:
1:311cd16389ff
doubled MMQTRT_STREAM_MAX_LENGTH and MMQTT_QUEUE_MAX_LENGTH to account for longer MQTT sizes.

Who changed what in which revision?

UserRevisionLine numberNew contents of line
defmacro 0:a268b80e3093 1 #ifndef MMQTT_H_
defmacro 0:a268b80e3093 2 #define MMQTT_H_
defmacro 0:a268b80e3093 3
defmacro 0:a268b80e3093 4 #include <stdint.h>
defmacro 0:a268b80e3093 5 #ifndef min
defmacro 0:a268b80e3093 6 #define min(a, b) ((a) > (b) ? (b) : (a))
defmacro 0:a268b80e3093 7 #endif /* min */
defmacro 0:a268b80e3093 8
JMF 2:65f85aa601db 9 #define MMQTT_STREAM_MAX_LENGTH 16
JMF 2:65f85aa601db 10 #define MMQTT_QUEUE_MAX_LENGTH 4
defmacro 0:a268b80e3093 11
defmacro 0:a268b80e3093 12 /* mmqtt_ssize_t must be able to hold MMQTT_STREAM_MAX_LENGTH as well as
defmacro 0:a268b80e3093 13 * MMQTT_QUEUE_MAX_LENGTH, we might add macro-based detection in the future.
defmacro 0:a268b80e3093 14 */
defmacro 0:a268b80e3093 15 typedef int8_t mmqtt_ssize_t;
defmacro 0:a268b80e3093 16 typedef int8_t mmqtt_status_t;
defmacro 0:a268b80e3093 17
defmacro 0:a268b80e3093 18 #define MMQTT_STATUS_OK 0
defmacro 0:a268b80e3093 19 #define MMQTT_STATUS_UNKNOWN -1
defmacro 0:a268b80e3093 20 #define MMQTT_STATUS_DONE -2
defmacro 0:a268b80e3093 21 #define MMQTT_STATUS_NOT_PULLABLE -3
defmacro 0:a268b80e3093 22 #define MMQTT_STATUS_NOT_PUSHABLE -4
defmacro 0:a268b80e3093 23 #define MMQTT_STATUS_NOT_EXPECTED -5
defmacro 0:a268b80e3093 24 #define MMQTT_STATUS_BROKEN_CONNECTION -6
defmacro 0:a268b80e3093 25 #define MMQTT_STATUS_INVALID_STATE -7
defmacro 0:a268b80e3093 26
defmacro 0:a268b80e3093 27 #define MMQTT_MESSAGE_TYPE_CONNECT 0x1
defmacro 0:a268b80e3093 28 #define MMQTT_MESSAGE_TYPE_CONNACK 0x2
defmacro 0:a268b80e3093 29 #define MMQTT_MESSAGE_TYPE_PUBLISH 0x3
defmacro 0:a268b80e3093 30 #define MMQTT_MESSAGE_TYPE_PUBACK 0x4
defmacro 0:a268b80e3093 31 #define MMQTT_MESSAGE_TYPE_SUBSCRIBE 0x8
defmacro 0:a268b80e3093 32 #define MMQTT_MESSAGE_TYPE_SUBACK 0x9
defmacro 0:a268b80e3093 33 #define MMQTT_MESSAGE_TYPE_UNSUBSCRIBE 0x10
defmacro 0:a268b80e3093 34 #define MMQTT_MESSAGE_TYPE_UNSUBACK 0x11
defmacro 0:a268b80e3093 35 #define MMQTT_MESSAGE_TYPE_PINGREQ 0x12
defmacro 0:a268b80e3093 36 #define MMQTT_MESSAGE_TYPE_PINGRESP 0x13
defmacro 0:a268b80e3093 37 #define MMQTT_MESSAGE_TYPE_DISCONNECT 0x14
defmacro 0:a268b80e3093 38
defmacro 0:a268b80e3093 39 #define MMQTT_PACK_MESSAGE_TYPE(type_) ((type_) << 4)
defmacro 0:a268b80e3093 40 #define MMQTT_UNPACK_MESSAGE_TYPE(type_) (((type_) >> 4) & 0xF)
defmacro 0:a268b80e3093 41
defmacro 0:a268b80e3093 42 struct mmqtt_ring_buffer_ {
defmacro 0:a268b80e3093 43 mmqtt_ssize_t start;
defmacro 0:a268b80e3093 44 mmqtt_ssize_t length;
defmacro 0:a268b80e3093 45 mmqtt_ssize_t capacity;
defmacro 0:a268b80e3093 46 };
defmacro 0:a268b80e3093 47
defmacro 0:a268b80e3093 48 struct mmqtt_stream {
defmacro 0:a268b80e3093 49 struct mmqtt_ring_buffer_ ring_buffer;
defmacro 0:a268b80e3093 50 uint8_t data[MMQTT_STREAM_MAX_LENGTH];
defmacro 0:a268b80e3093 51 size_t left;
defmacro 0:a268b80e3093 52 };
defmacro 0:a268b80e3093 53
defmacro 0:a268b80e3093 54 struct mmqtt_queue_ {
defmacro 0:a268b80e3093 55 struct mmqtt_ring_buffer_ ring_buffer;
defmacro 0:a268b80e3093 56 struct mmqtt_stream* streams[MMQTT_QUEUE_MAX_LENGTH];
defmacro 0:a268b80e3093 57 };
defmacro 0:a268b80e3093 58
defmacro 0:a268b80e3093 59 #define MMQTT_QUEUE_LENGTH(queue) ((queue)->ring_buffer.length)
defmacro 0:a268b80e3093 60 #define MMQTT_QUEUE_INDEX(queue, i) ((queue)->streams[((queue)->ring_buffer.start + (i)) % ((queue)->ring_buffer.capacity)])
defmacro 0:a268b80e3093 61
defmacro 0:a268b80e3093 62 struct mmqtt_connection {
defmacro 0:a268b80e3093 63 void *connection;
defmacro 0:a268b80e3093 64 struct mmqtt_queue_ read_queue;
defmacro 0:a268b80e3093 65 struct mmqtt_queue_ write_queue;
defmacro 0:a268b80e3093 66 };
defmacro 0:a268b80e3093 67
defmacro 0:a268b80e3093 68 static inline void
defmacro 0:a268b80e3093 69 mmqtt_ring_buffer_init_(struct mmqtt_ring_buffer_ *buffer, mmqtt_ssize_t capacity)
defmacro 0:a268b80e3093 70 {
defmacro 0:a268b80e3093 71 buffer->start = 0;
defmacro 0:a268b80e3093 72 buffer->length = 0;
defmacro 0:a268b80e3093 73 buffer->capacity = capacity;
defmacro 0:a268b80e3093 74 }
defmacro 0:a268b80e3093 75
defmacro 0:a268b80e3093 76 static inline mmqtt_ssize_t
defmacro 0:a268b80e3093 77 mmqtt_ring_buffer_pullable_(const struct mmqtt_ring_buffer_ *buffer)
defmacro 0:a268b80e3093 78 {
defmacro 0:a268b80e3093 79 return min(buffer->start + buffer->length, buffer->capacity) - buffer->start;
defmacro 0:a268b80e3093 80 }
defmacro 0:a268b80e3093 81
defmacro 0:a268b80e3093 82 static inline void
defmacro 0:a268b80e3093 83 mmqtt_ring_buffer_pull_(struct mmqtt_ring_buffer_ *buffer, mmqtt_ssize_t length)
defmacro 0:a268b80e3093 84 {
defmacro 0:a268b80e3093 85 buffer->start = (buffer->start + length) % buffer->capacity;
defmacro 0:a268b80e3093 86 buffer->length -= length;
defmacro 0:a268b80e3093 87 }
defmacro 0:a268b80e3093 88
defmacro 0:a268b80e3093 89 static inline mmqtt_ssize_t
defmacro 0:a268b80e3093 90 mmqtt_ring_buffer_pushable_(const struct mmqtt_ring_buffer_ *buffer)
defmacro 0:a268b80e3093 91 {
defmacro 0:a268b80e3093 92 if (buffer->start + buffer->length > buffer->capacity) {
defmacro 0:a268b80e3093 93 return buffer->capacity - buffer->length;
defmacro 0:a268b80e3093 94 }
defmacro 0:a268b80e3093 95 return buffer->capacity - buffer->length - buffer->start;
defmacro 0:a268b80e3093 96 }
defmacro 0:a268b80e3093 97
defmacro 0:a268b80e3093 98 static inline void
defmacro 0:a268b80e3093 99 mmqtt_ring_buffer_push_(struct mmqtt_ring_buffer_ *buffer, mmqtt_ssize_t length)
defmacro 0:a268b80e3093 100 {
defmacro 0:a268b80e3093 101 buffer->length += length;
defmacro 0:a268b80e3093 102 }
defmacro 0:a268b80e3093 103
defmacro 0:a268b80e3093 104 static void
defmacro 0:a268b80e3093 105 mmqtt_stream_init(struct mmqtt_stream *stream, size_t total_size)
defmacro 0:a268b80e3093 106 {
defmacro 0:a268b80e3093 107 mmqtt_ring_buffer_init_(&stream->ring_buffer, MMQTT_STREAM_MAX_LENGTH);
defmacro 0:a268b80e3093 108 stream->left = total_size;
defmacro 0:a268b80e3093 109 }
defmacro 0:a268b80e3093 110
defmacro 0:a268b80e3093 111 static mmqtt_status_t
defmacro 0:a268b80e3093 112 mmqtt_stream_running(struct mmqtt_stream *stream)
defmacro 0:a268b80e3093 113 {
defmacro 0:a268b80e3093 114 if (stream->left > 0 || stream->ring_buffer.length > 0) {
defmacro 0:a268b80e3093 115 return MMQTT_STATUS_OK;
defmacro 0:a268b80e3093 116 } else {
defmacro 0:a268b80e3093 117 return MMQTT_STATUS_DONE;
defmacro 0:a268b80e3093 118 }
defmacro 0:a268b80e3093 119 }
defmacro 0:a268b80e3093 120
defmacro 0:a268b80e3093 121 static mmqtt_ssize_t
defmacro 0:a268b80e3093 122 mmqtt_stream_pull(struct mmqtt_stream *stream, uint8_t* out, mmqtt_ssize_t max_length)
defmacro 0:a268b80e3093 123 {
defmacro 0:a268b80e3093 124 mmqtt_ssize_t pullable, i;
defmacro 0:a268b80e3093 125 pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer);
defmacro 0:a268b80e3093 126 if (pullable <= 0) {
defmacro 0:a268b80e3093 127 if (stream->left == 0) {
defmacro 0:a268b80e3093 128 return MMQTT_STATUS_DONE;
defmacro 0:a268b80e3093 129 } else {
defmacro 0:a268b80e3093 130 return MMQTT_STATUS_NOT_PULLABLE;
defmacro 0:a268b80e3093 131 }
defmacro 0:a268b80e3093 132 }
defmacro 0:a268b80e3093 133 pullable = min(pullable, max_length);
defmacro 0:a268b80e3093 134
defmacro 0:a268b80e3093 135 if (out) {
defmacro 0:a268b80e3093 136 for (i = 0; i < pullable; i++) {
defmacro 0:a268b80e3093 137 out[i] = stream->data[stream->ring_buffer.start + i];
defmacro 0:a268b80e3093 138 }
defmacro 0:a268b80e3093 139 }
defmacro 0:a268b80e3093 140 mmqtt_ring_buffer_pull_(&stream->ring_buffer, pullable);
defmacro 0:a268b80e3093 141 return pullable;
defmacro 0:a268b80e3093 142 }
defmacro 0:a268b80e3093 143
defmacro 0:a268b80e3093 144 static mmqtt_status_t
defmacro 0:a268b80e3093 145 mmqtt_stream_external_pullable(struct mmqtt_stream *stream,
defmacro 0:a268b80e3093 146 const uint8_t** data, mmqtt_ssize_t* max_length)
defmacro 0:a268b80e3093 147 {
defmacro 0:a268b80e3093 148 mmqtt_ssize_t pullable;
defmacro 0:a268b80e3093 149 pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer);
defmacro 0:a268b80e3093 150 if (pullable <= 0) {
defmacro 0:a268b80e3093 151 if (stream->left == 0) {
defmacro 0:a268b80e3093 152 return MMQTT_STATUS_DONE;
defmacro 0:a268b80e3093 153 } else {
defmacro 0:a268b80e3093 154 return MMQTT_STATUS_NOT_PULLABLE;
defmacro 0:a268b80e3093 155 }
defmacro 0:a268b80e3093 156 }
defmacro 0:a268b80e3093 157 pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer);
defmacro 0:a268b80e3093 158 if (pullable <= 0) { return MMQTT_STATUS_NOT_PULLABLE; }
defmacro 0:a268b80e3093 159 *data = stream->data + stream->ring_buffer.start;
defmacro 0:a268b80e3093 160 *max_length = pullable;
defmacro 0:a268b80e3093 161 return MMQTT_STATUS_OK;
defmacro 0:a268b80e3093 162 }
defmacro 0:a268b80e3093 163
defmacro 0:a268b80e3093 164 static mmqtt_status_t
defmacro 0:a268b80e3093 165 mmqtt_stream_external_pull(struct mmqtt_stream *stream, mmqtt_ssize_t length)
defmacro 0:a268b80e3093 166 {
defmacro 0:a268b80e3093 167 mmqtt_ssize_t pullable;
defmacro 0:a268b80e3093 168 pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer);
defmacro 0:a268b80e3093 169 if (pullable < length) {
defmacro 0:a268b80e3093 170 return MMQTT_STATUS_NOT_PULLABLE;
defmacro 0:a268b80e3093 171 } else {
defmacro 0:a268b80e3093 172 mmqtt_ring_buffer_pull_(&stream->ring_buffer, length);
defmacro 0:a268b80e3093 173 return MMQTT_STATUS_OK;
defmacro 0:a268b80e3093 174 }
defmacro 0:a268b80e3093 175 }
defmacro 0:a268b80e3093 176
defmacro 0:a268b80e3093 177 static mmqtt_ssize_t
defmacro 0:a268b80e3093 178 mmqtt_stream_push(struct mmqtt_stream *stream, const uint8_t* in, mmqtt_ssize_t length)
defmacro 0:a268b80e3093 179 {
defmacro 0:a268b80e3093 180 mmqtt_ssize_t pushable, i;
defmacro 0:a268b80e3093 181 if (stream->left == 0) { return MMQTT_STATUS_DONE; }
defmacro 0:a268b80e3093 182 pushable = mmqtt_ring_buffer_pushable_(&stream->ring_buffer);
defmacro 0:a268b80e3093 183 if (pushable <= 0) { return MMQTT_STATUS_NOT_PUSHABLE; }
defmacro 0:a268b80e3093 184 pushable = min(stream->left, min(pushable, length));
defmacro 0:a268b80e3093 185
defmacro 0:a268b80e3093 186 for (i = 0; i < pushable; i++) {
defmacro 0:a268b80e3093 187 stream->data[stream->ring_buffer.start + i] = in[i];
defmacro 0:a268b80e3093 188 }
defmacro 0:a268b80e3093 189 mmqtt_ring_buffer_push_(&stream->ring_buffer, pushable);
defmacro 0:a268b80e3093 190 if (stream->left > 0) { stream->left -= pushable; }
defmacro 0:a268b80e3093 191 return pushable;
defmacro 0:a268b80e3093 192 }
defmacro 0:a268b80e3093 193
defmacro 0:a268b80e3093 194 static mmqtt_status_t
defmacro 0:a268b80e3093 195 mmqtt_stream_external_pushable(struct mmqtt_stream *stream,
defmacro 0:a268b80e3093 196 uint8_t** data, mmqtt_ssize_t* max_length)
defmacro 0:a268b80e3093 197 {
defmacro 0:a268b80e3093 198 mmqtt_ssize_t pushable;
defmacro 0:a268b80e3093 199 if (stream->left == 0) { return MMQTT_STATUS_DONE; }
defmacro 0:a268b80e3093 200 pushable = mmqtt_ring_buffer_pushable_(&stream->ring_buffer);
defmacro 0:a268b80e3093 201 if (pushable <= 0) { return MMQTT_STATUS_NOT_PUSHABLE; }
defmacro 0:a268b80e3093 202 pushable = min(pushable, stream->left);
defmacro 0:a268b80e3093 203 *data = stream->data + stream->ring_buffer.start;
defmacro 0:a268b80e3093 204 *max_length = pushable;
defmacro 0:a268b80e3093 205 return MMQTT_STATUS_OK;
defmacro 0:a268b80e3093 206 }
defmacro 0:a268b80e3093 207
defmacro 0:a268b80e3093 208 static mmqtt_status_t
defmacro 0:a268b80e3093 209 mmqtt_stream_external_push(struct mmqtt_stream *stream, mmqtt_ssize_t length)
defmacro 0:a268b80e3093 210 {
defmacro 0:a268b80e3093 211 mmqtt_ssize_t pushable;
defmacro 0:a268b80e3093 212 if (stream->left == 0) { return MMQTT_STATUS_DONE; }
defmacro 0:a268b80e3093 213 pushable = mmqtt_ring_buffer_pushable_(&stream->ring_buffer);
defmacro 0:a268b80e3093 214 if (pushable <= 0) { return MMQTT_STATUS_NOT_PUSHABLE; }
defmacro 0:a268b80e3093 215 pushable = min(pushable, stream->left);
defmacro 0:a268b80e3093 216 if (pushable < length) {
defmacro 0:a268b80e3093 217 return MMQTT_STATUS_NOT_PUSHABLE;
defmacro 0:a268b80e3093 218 } else {
defmacro 0:a268b80e3093 219 mmqtt_ring_buffer_push_(&stream->ring_buffer, pushable);
defmacro 0:a268b80e3093 220 return MMQTT_STATUS_OK;
defmacro 0:a268b80e3093 221 }
defmacro 0:a268b80e3093 222 }
defmacro 0:a268b80e3093 223
defmacro 0:a268b80e3093 224 static void
defmacro 0:a268b80e3093 225 mmqtt_queue_init_(struct mmqtt_queue_ *queue)
defmacro 0:a268b80e3093 226 {
defmacro 0:a268b80e3093 227 mmqtt_ring_buffer_init_(&queue->ring_buffer, MMQTT_QUEUE_MAX_LENGTH);
defmacro 0:a268b80e3093 228 }
defmacro 0:a268b80e3093 229
defmacro 0:a268b80e3093 230 static mmqtt_status_t
defmacro 0:a268b80e3093 231 mmqtt_queue_add_(struct mmqtt_queue_ *queue, struct mmqtt_stream *stream)
defmacro 0:a268b80e3093 232 {
defmacro 0:a268b80e3093 233 if (mmqtt_ring_buffer_pushable_(&queue->ring_buffer) <= 0) {
defmacro 0:a268b80e3093 234 return MMQTT_STATUS_NOT_PUSHABLE;
defmacro 0:a268b80e3093 235 }
defmacro 0:a268b80e3093 236 queue->streams[queue->ring_buffer.start] = stream;
defmacro 0:a268b80e3093 237 mmqtt_ring_buffer_push_(&queue->ring_buffer, 1);
defmacro 0:a268b80e3093 238 return MMQTT_STATUS_OK;
defmacro 0:a268b80e3093 239 }
defmacro 0:a268b80e3093 240
defmacro 0:a268b80e3093 241 #define MMQTT_QUEUE_AVAILABLE(queue) ((queue)->ring_buffer.length > 0)
defmacro 0:a268b80e3093 242 #define MMQTT_QUEUE_FIRST(queue) ((queue)->streams[(queue)->ring_buffer.start])
defmacro 0:a268b80e3093 243
defmacro 0:a268b80e3093 244 static void
defmacro 0:a268b80e3093 245 mmqtt_connection_init(struct mmqtt_connection *connection, void *conn)
defmacro 0:a268b80e3093 246 {
defmacro 0:a268b80e3093 247 connection->connection = conn;
defmacro 0:a268b80e3093 248 mmqtt_queue_init_(&connection->read_queue);
defmacro 0:a268b80e3093 249 mmqtt_queue_init_(&connection->write_queue);
defmacro 0:a268b80e3093 250 }
defmacro 0:a268b80e3093 251
defmacro 0:a268b80e3093 252 /*
defmacro 0:a268b80e3093 253 * Pulls data out of connection's write queue, the returned data is supposed
defmacro 0:a268b80e3093 254 * to be sent to a TCP socket to remote servers.
defmacro 0:a268b80e3093 255 */
defmacro 0:a268b80e3093 256 static mmqtt_ssize_t
defmacro 0:a268b80e3093 257 mmqtt_connection_pull(struct mmqtt_connection *connection,
defmacro 0:a268b80e3093 258 uint8_t *out, mmqtt_ssize_t max_length)
defmacro 0:a268b80e3093 259 {
defmacro 0:a268b80e3093 260 mmqtt_ssize_t length, current;
defmacro 0:a268b80e3093 261 struct mmqtt_queue_ *queue;
defmacro 0:a268b80e3093 262 length = 0;
defmacro 0:a268b80e3093 263 queue = &connection->write_queue;
defmacro 0:a268b80e3093 264 while(MMQTT_QUEUE_AVAILABLE(queue) && length < max_length) {
defmacro 0:a268b80e3093 265 current = mmqtt_stream_pull(MMQTT_QUEUE_FIRST(queue),
defmacro 0:a268b80e3093 266 out + length,
defmacro 0:a268b80e3093 267 max_length - length);
defmacro 0:a268b80e3093 268 /* TODO: in case current is 0, should we return non-pullable? */
defmacro 0:a268b80e3093 269 if (current >= 0) {
defmacro 0:a268b80e3093 270 length += current;
defmacro 0:a268b80e3093 271 } else if (current == MMQTT_STATUS_DONE) {
defmacro 0:a268b80e3093 272 /* Release stream from write queue since it is done */
defmacro 0:a268b80e3093 273 mmqtt_ring_buffer_pull_(&queue->ring_buffer, 1);
defmacro 0:a268b80e3093 274 } else if (current == MMQTT_STATUS_NOT_PULLABLE) {
defmacro 0:a268b80e3093 275 return (length > 0) ? length : MMQTT_STATUS_NOT_PULLABLE;
defmacro 0:a268b80e3093 276 } else {
defmacro 0:a268b80e3093 277 return MMQTT_STATUS_UNKNOWN;
defmacro 0:a268b80e3093 278 }
defmacro 0:a268b80e3093 279 }
defmacro 0:a268b80e3093 280 return (length > 0) ? length : MMQTT_STATUS_NOT_PULLABLE;
defmacro 0:a268b80e3093 281 }
defmacro 0:a268b80e3093 282
defmacro 0:a268b80e3093 283 struct mmqtt_stream *
defmacro 0:a268b80e3093 284 mmqtt_connection_pullable_stream(struct mmqtt_connection *connection)
defmacro 0:a268b80e3093 285 {
defmacro 0:a268b80e3093 286 mmqtt_ssize_t i;
defmacro 0:a268b80e3093 287 struct mmqtt_queue_ *queue = &connection->write_queue;
defmacro 0:a268b80e3093 288 struct mmqtt_stream *stream = NULL;
defmacro 0:a268b80e3093 289 for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) {
defmacro 0:a268b80e3093 290 stream = MMQTT_QUEUE_INDEX(queue, i);
defmacro 0:a268b80e3093 291 if (mmqtt_stream_running(stream) == MMQTT_STATUS_OK) { return stream; }
defmacro 0:a268b80e3093 292 }
defmacro 0:a268b80e3093 293 return NULL;
defmacro 0:a268b80e3093 294 }
defmacro 0:a268b80e3093 295
defmacro 0:a268b80e3093 296 /*
defmacro 0:a268b80e3093 297 * Pushes data to connection's read queue, the data should come from
defmacro 0:a268b80e3093 298 * a TCP socket to a remote machine.
defmacro 0:a268b80e3093 299 */
defmacro 0:a268b80e3093 300 static mmqtt_ssize_t
defmacro 0:a268b80e3093 301 mmqtt_connection_push(struct mmqtt_connection *connection,
defmacro 0:a268b80e3093 302 const uint8_t* in, mmqtt_ssize_t length)
defmacro 0:a268b80e3093 303 {
defmacro 0:a268b80e3093 304 mmqtt_ssize_t i, current, consumed_length;
defmacro 0:a268b80e3093 305 struct mmqtt_queue_ *queue;
defmacro 0:a268b80e3093 306 consumed_length = 0;
defmacro 0:a268b80e3093 307 queue = &connection->read_queue;
defmacro 0:a268b80e3093 308 while (i < MMQTT_QUEUE_LENGTH(queue) && consumed_length < length) {
defmacro 0:a268b80e3093 309 current = mmqtt_stream_push(MMQTT_QUEUE_INDEX(queue, i),
defmacro 0:a268b80e3093 310 in + consumed_length,
defmacro 0:a268b80e3093 311 length - consumed_length);
defmacro 0:a268b80e3093 312 if (current >= 0) {
defmacro 0:a268b80e3093 313 consumed_length += current;
defmacro 0:a268b80e3093 314 } else if (current == MMQTT_STATUS_DONE) {
defmacro 0:a268b80e3093 315 i++;
defmacro 0:a268b80e3093 316 } else if (current == MMQTT_STATUS_NOT_PUSHABLE) {
defmacro 0:a268b80e3093 317 return (consumed_length > 0) ? consumed_length : MMQTT_STATUS_NOT_PUSHABLE;
defmacro 0:a268b80e3093 318 } else {
defmacro 0:a268b80e3093 319 return MMQTT_STATUS_UNKNOWN;
defmacro 0:a268b80e3093 320 }
defmacro 0:a268b80e3093 321 }
defmacro 0:a268b80e3093 322 return (consumed_length > 0) ? consumed_length : MMQTT_STATUS_NOT_PUSHABLE;
defmacro 0:a268b80e3093 323 }
defmacro 0:a268b80e3093 324
defmacro 0:a268b80e3093 325 struct mmqtt_stream *
defmacro 0:a268b80e3093 326 mmqtt_connection_pushable_stream(struct mmqtt_connection *connection)
defmacro 0:a268b80e3093 327 {
defmacro 0:a268b80e3093 328 mmqtt_ssize_t i;
defmacro 0:a268b80e3093 329 struct mmqtt_queue_ *queue = &connection->read_queue;
defmacro 0:a268b80e3093 330 struct mmqtt_stream *stream = NULL;
defmacro 0:a268b80e3093 331 for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) {
defmacro 0:a268b80e3093 332 stream = MMQTT_QUEUE_INDEX(queue, i);
defmacro 0:a268b80e3093 333 if (mmqtt_stream_running(stream) == MMQTT_STATUS_OK) { return stream; }
defmacro 0:a268b80e3093 334 }
defmacro 0:a268b80e3093 335 return NULL;
defmacro 0:a268b80e3093 336 }
defmacro 0:a268b80e3093 337
defmacro 0:a268b80e3093 338 static mmqtt_status_t
defmacro 0:a268b80e3093 339 mmqtt_connection_add_read_stream(struct mmqtt_connection *connection,
defmacro 0:a268b80e3093 340 struct mmqtt_stream *stream)
defmacro 0:a268b80e3093 341 {
defmacro 0:a268b80e3093 342 return mmqtt_queue_add_(&connection->read_queue, stream);
defmacro 0:a268b80e3093 343 }
defmacro 0:a268b80e3093 344
defmacro 0:a268b80e3093 345 static mmqtt_status_t
defmacro 0:a268b80e3093 346 mmqtt_connection_add_write_stream(struct mmqtt_connection *connection,
defmacro 0:a268b80e3093 347 struct mmqtt_stream *stream)
defmacro 0:a268b80e3093 348 {
defmacro 0:a268b80e3093 349 return mmqtt_queue_add_(&connection->write_queue, stream);
defmacro 0:a268b80e3093 350 }
defmacro 0:a268b80e3093 351
defmacro 0:a268b80e3093 352 static mmqtt_status_t
defmacro 0:a268b80e3093 353 mmqtt_connection_in_read_stream(struct mmqtt_connection *connection,
defmacro 0:a268b80e3093 354 struct mmqtt_stream *stream)
defmacro 0:a268b80e3093 355 {
defmacro 0:a268b80e3093 356 mmqtt_ssize_t i;
defmacro 0:a268b80e3093 357 struct mmqtt_queue_ *queue = &connection->read_queue;
defmacro 0:a268b80e3093 358 for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) {
defmacro 0:a268b80e3093 359 if (MMQTT_QUEUE_INDEX(queue, i) == stream) { return MMQTT_STATUS_OK; }
defmacro 0:a268b80e3093 360 }
defmacro 0:a268b80e3093 361 return MMQTT_STATUS_NOT_EXPECTED;
defmacro 0:a268b80e3093 362 }
defmacro 0:a268b80e3093 363
defmacro 0:a268b80e3093 364 static mmqtt_status_t
defmacro 0:a268b80e3093 365 mmqtt_connection_in_write_stream(struct mmqtt_connection *connection,
defmacro 0:a268b80e3093 366 struct mmqtt_stream *stream)
defmacro 0:a268b80e3093 367 {
defmacro 0:a268b80e3093 368 mmqtt_ssize_t i;
defmacro 0:a268b80e3093 369 struct mmqtt_queue_ *queue = &connection->write_queue;
defmacro 0:a268b80e3093 370 for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) {
defmacro 0:a268b80e3093 371 if (MMQTT_QUEUE_INDEX(queue, i) == stream) { return MMQTT_STATUS_OK; }
defmacro 0:a268b80e3093 372 }
defmacro 0:a268b80e3093 373 return MMQTT_STATUS_NOT_EXPECTED;
defmacro 0:a268b80e3093 374 }
defmacro 0:a268b80e3093 375
defmacro 0:a268b80e3093 376 static mmqtt_ssize_t
defmacro 0:a268b80e3093 377 mmqtt_connection_read_stream_length(struct mmqtt_connection *connection)
defmacro 0:a268b80e3093 378 {
defmacro 0:a268b80e3093 379 return connection->read_queue.ring_buffer.length;
defmacro 0:a268b80e3093 380 }
defmacro 0:a268b80e3093 381
defmacro 0:a268b80e3093 382 static mmqtt_ssize_t
defmacro 0:a268b80e3093 383 mmqtt_connection_write_stream_length(struct mmqtt_connection *connection)
defmacro 0:a268b80e3093 384 {
defmacro 0:a268b80e3093 385 return connection->write_queue.ring_buffer.length;
defmacro 0:a268b80e3093 386 }
defmacro 0:a268b80e3093 387
defmacro 0:a268b80e3093 388 static mmqtt_status_t
defmacro 0:a268b80e3093 389 mmqtt_connection_release_write_stream(struct mmqtt_connection *connection,
defmacro 0:a268b80e3093 390 struct mmqtt_stream *stream)
defmacro 0:a268b80e3093 391 {
defmacro 0:a268b80e3093 392 if (!MMQTT_QUEUE_AVAILABLE(&connection->write_queue)) {
defmacro 0:a268b80e3093 393 return MMQTT_STATUS_NOT_PULLABLE;
defmacro 0:a268b80e3093 394 }
defmacro 0:a268b80e3093 395 if (stream != MMQTT_QUEUE_FIRST(&connection->write_queue)) {
defmacro 0:a268b80e3093 396 return MMQTT_STATUS_NOT_EXPECTED;
defmacro 0:a268b80e3093 397 }
defmacro 0:a268b80e3093 398 mmqtt_ring_buffer_pull_(&connection->write_queue.ring_buffer, 1);
defmacro 0:a268b80e3093 399 return MMQTT_STATUS_OK;
defmacro 0:a268b80e3093 400 }
defmacro 0:a268b80e3093 401
defmacro 0:a268b80e3093 402 static mmqtt_status_t
defmacro 0:a268b80e3093 403 mmqtt_connection_release_read_stream(struct mmqtt_connection *connection,
defmacro 0:a268b80e3093 404 struct mmqtt_stream *stream)
defmacro 0:a268b80e3093 405 {
defmacro 0:a268b80e3093 406 if (!MMQTT_QUEUE_AVAILABLE(&connection->read_queue)) {
defmacro 0:a268b80e3093 407 return MMQTT_STATUS_NOT_PULLABLE;
defmacro 0:a268b80e3093 408 }
defmacro 0:a268b80e3093 409 if (stream != MMQTT_QUEUE_FIRST(&connection->read_queue)) {
defmacro 0:a268b80e3093 410 return MMQTT_STATUS_NOT_EXPECTED;
defmacro 0:a268b80e3093 411 }
defmacro 0:a268b80e3093 412 mmqtt_ring_buffer_pull_(&connection->read_queue.ring_buffer, 1);
defmacro 0:a268b80e3093 413 return MMQTT_STATUS_OK;
defmacro 0:a268b80e3093 414 }
defmacro 0:a268b80e3093 415
defmacro 0:a268b80e3093 416 static mmqtt_status_t
defmacro 0:a268b80e3093 417 mmqtt_connection_autorelease_write_streams(struct mmqtt_connection *connection)
defmacro 0:a268b80e3093 418 {
defmacro 0:a268b80e3093 419 struct mmqtt_stream *stream;
defmacro 0:a268b80e3093 420 while (MMQTT_QUEUE_AVAILABLE(&connection->write_queue)) {
defmacro 0:a268b80e3093 421 stream = MMQTT_QUEUE_FIRST(&connection->write_queue);
defmacro 0:a268b80e3093 422 if (mmqtt_stream_running(stream) == MMQTT_STATUS_DONE) {
defmacro 0:a268b80e3093 423 mmqtt_connection_release_write_stream(connection, stream);
defmacro 0:a268b80e3093 424 } else {
defmacro 0:a268b80e3093 425 break;
defmacro 0:a268b80e3093 426 }
defmacro 0:a268b80e3093 427 }
defmacro 0:a268b80e3093 428 return MMQTT_STATUS_OK;
defmacro 0:a268b80e3093 429 }
defmacro 0:a268b80e3093 430
defmacro 0:a268b80e3093 431 static mmqtt_status_t
defmacro 0:a268b80e3093 432 mmqtt_connection_autorelease_read_streams(struct mmqtt_connection *connection)
defmacro 0:a268b80e3093 433 {
defmacro 0:a268b80e3093 434 struct mmqtt_stream *stream;
defmacro 0:a268b80e3093 435 while (MMQTT_QUEUE_AVAILABLE(&connection->read_queue)) {
defmacro 0:a268b80e3093 436 stream = MMQTT_QUEUE_FIRST(&connection->read_queue);
defmacro 0:a268b80e3093 437 if (mmqtt_stream_running(stream) == MMQTT_STATUS_DONE) {
defmacro 0:a268b80e3093 438 mmqtt_connection_release_read_stream(connection, stream);
defmacro 0:a268b80e3093 439 } else {
defmacro 0:a268b80e3093 440 break;
defmacro 0:a268b80e3093 441 }
defmacro 0:a268b80e3093 442 }
defmacro 0:a268b80e3093 443 return MMQTT_STATUS_OK;
defmacro 0:a268b80e3093 444 }
defmacro 0:a268b80e3093 445
defmacro 0:a268b80e3093 446 #ifndef MMQTT_DISABLE_STATIC
defmacro 0:a268b80e3093 447
defmacro 0:a268b80e3093 448 typedef mmqtt_status_t (*mmqtt_s_puller)(struct mmqtt_connection*);
defmacro 0:a268b80e3093 449 typedef mmqtt_status_t (*mmqtt_s_pusher)(struct mmqtt_connection*, mmqtt_ssize_t);
defmacro 0:a268b80e3093 450
defmacro 0:a268b80e3093 451 mmqtt_status_t
defmacro 0:a268b80e3093 452 mmqtt_s_encode_buffer(struct mmqtt_connection *conn, mmqtt_s_puller puller,
defmacro 0:a268b80e3093 453 const uint8_t *buffer, uint16_t length)
defmacro 0:a268b80e3093 454 {
defmacro 0:a268b80e3093 455 struct mmqtt_stream stream;
defmacro 0:a268b80e3093 456 mmqtt_status_t status;
defmacro 0:a268b80e3093 457 mmqtt_ssize_t current;
defmacro 0:a268b80e3093 458 uint16_t consumed_length;
defmacro 0:a268b80e3093 459
defmacro 0:a268b80e3093 460 mmqtt_stream_init(&stream, length);
defmacro 0:a268b80e3093 461 while ((status = mmqtt_connection_add_write_stream(conn, &stream)) == MMQTT_STATUS_NOT_PUSHABLE) {
defmacro 0:a268b80e3093 462 status = puller(conn);
defmacro 0:a268b80e3093 463 if (status != MMQTT_STATUS_OK) { return status; }
defmacro 0:a268b80e3093 464 }
defmacro 0:a268b80e3093 465 if (status != MMQTT_STATUS_OK) { return status; }
defmacro 0:a268b80e3093 466
defmacro 0:a268b80e3093 467 consumed_length = 0;
defmacro 0:a268b80e3093 468 while (consumed_length < length) {
defmacro 0:a268b80e3093 469 current = mmqtt_stream_push(&stream, buffer + consumed_length,
defmacro 0:a268b80e3093 470 length - consumed_length);
defmacro 0:a268b80e3093 471 if (current >= 0) {
defmacro 0:a268b80e3093 472 consumed_length += current;
defmacro 0:a268b80e3093 473 } else if (current == MMQTT_STATUS_NOT_PUSHABLE) {
defmacro 0:a268b80e3093 474 status = puller(conn);
defmacro 0:a268b80e3093 475 if (status != MMQTT_STATUS_OK) { return status; }
defmacro 0:a268b80e3093 476 }
defmacro 0:a268b80e3093 477 }
defmacro 0:a268b80e3093 478 while (mmqtt_connection_in_write_stream(conn, &stream) == MMQTT_STATUS_OK) {
defmacro 0:a268b80e3093 479 status = puller(conn);
defmacro 0:a268b80e3093 480 if (status != MMQTT_STATUS_OK) { return status; }
defmacro 0:a268b80e3093 481 }
defmacro 0:a268b80e3093 482 return MMQTT_STATUS_OK;
defmacro 0:a268b80e3093 483 }
defmacro 0:a268b80e3093 484
defmacro 0:a268b80e3093 485 mmqtt_status_t
defmacro 0:a268b80e3093 486 mmqtt_s_decode_buffer(struct mmqtt_connection *conn, mmqtt_s_pusher pusher,
defmacro 0:a268b80e3093 487 uint8_t *buffer, uint16_t buffer_length,
defmacro 0:a268b80e3093 488 uint16_t consume_length)
defmacro 0:a268b80e3093 489 {
defmacro 0:a268b80e3093 490 struct mmqtt_stream stream;
defmacro 0:a268b80e3093 491 mmqtt_status_t status;
defmacro 0:a268b80e3093 492 mmqtt_ssize_t current;
defmacro 0:a268b80e3093 493 uint16_t length;
defmacro 0:a268b80e3093 494 uint8_t temp;
defmacro 0:a268b80e3093 495
defmacro 0:a268b80e3093 496 if (mmqtt_connection_read_stream_length(conn) > 0) {
defmacro 0:a268b80e3093 497 return MMQTT_STATUS_INVALID_STATE;
defmacro 0:a268b80e3093 498 }
defmacro 0:a268b80e3093 499
defmacro 0:a268b80e3093 500 if (consume_length < buffer_length) { consume_length = buffer_length; }
defmacro 0:a268b80e3093 501 mmqtt_stream_init(&stream, consume_length);
defmacro 0:a268b80e3093 502 /* Connection won't remove read stream automatically, pusher won't help here */
defmacro 0:a268b80e3093 503 status = mmqtt_connection_add_read_stream(conn, &stream);
defmacro 0:a268b80e3093 504 if (status != MMQTT_STATUS_OK) { return status; }
defmacro 0:a268b80e3093 505
defmacro 0:a268b80e3093 506 length = 0;
defmacro 0:a268b80e3093 507 while (length < buffer_length) {
defmacro 0:a268b80e3093 508 current = mmqtt_stream_pull(&stream, buffer + length,
defmacro 0:a268b80e3093 509 buffer_length - length);
defmacro 0:a268b80e3093 510 if (current >= 0) {
defmacro 0:a268b80e3093 511 length += current;
defmacro 0:a268b80e3093 512 } else if (current == MMQTT_STATUS_NOT_PULLABLE) {
defmacro 0:a268b80e3093 513 status = pusher(conn, buffer_length - length);
defmacro 0:a268b80e3093 514 if (status != MMQTT_STATUS_OK) {
defmacro 0:a268b80e3093 515 mmqtt_connection_release_read_stream(conn, &stream);
defmacro 0:a268b80e3093 516 return status;
defmacro 0:a268b80e3093 517 }
defmacro 0:a268b80e3093 518 }
defmacro 0:a268b80e3093 519 }
defmacro 0:a268b80e3093 520 while (length < consume_length) {
defmacro 0:a268b80e3093 521 current = mmqtt_stream_pull(&stream, &temp, 1);
defmacro 0:a268b80e3093 522 if (current >= 0) {
defmacro 0:a268b80e3093 523 length += current;
defmacro 0:a268b80e3093 524 } else if (current == MMQTT_STATUS_NOT_PULLABLE) {
defmacro 0:a268b80e3093 525 status = pusher(conn, consume_length - length);
defmacro 0:a268b80e3093 526 if (status != MMQTT_STATUS_OK) {
defmacro 0:a268b80e3093 527 mmqtt_connection_release_read_stream(conn, &stream);
defmacro 0:a268b80e3093 528 return status;
defmacro 0:a268b80e3093 529 }
defmacro 0:a268b80e3093 530 }
defmacro 0:a268b80e3093 531 }
defmacro 0:a268b80e3093 532 mmqtt_connection_release_read_stream(conn, &stream);
defmacro 0:a268b80e3093 533 return MMQTT_STATUS_OK;
defmacro 0:a268b80e3093 534 }
defmacro 0:a268b80e3093 535
defmacro 0:a268b80e3093 536 /* NOTE: for smaller buffer, we can also use mmqtt_s_decode_buffer with
defmacro 0:a268b80e3093 537 * an empty buffer and 0 as buffer length. Here this function is for the
defmacro 0:a268b80e3093 538 * case where we are skipping a whole packet altogether, which could be
defmacro 0:a268b80e3093 539 * larger than 65535 in theory.
defmacro 0:a268b80e3093 540 */
defmacro 0:a268b80e3093 541 mmqtt_status_t
defmacro 0:a268b80e3093 542 mmqtt_s_skip_buffer(struct mmqtt_connection *conn, mmqtt_s_pusher pusher,
defmacro 0:a268b80e3093 543 uint32_t length)
defmacro 0:a268b80e3093 544 {
defmacro 0:a268b80e3093 545 mmqtt_status_t status;
defmacro 0:a268b80e3093 546 while (length > 65535) {
defmacro 0:a268b80e3093 547 status = mmqtt_s_decode_buffer(conn, pusher, NULL, 0, 65535);
defmacro 0:a268b80e3093 548 if (status != MMQTT_STATUS_OK) {
defmacro 0:a268b80e3093 549 return status;
defmacro 0:a268b80e3093 550 }
defmacro 0:a268b80e3093 551 length -= 65535;
defmacro 0:a268b80e3093 552 }
defmacro 0:a268b80e3093 553 if (length > 0) {
defmacro 0:a268b80e3093 554 return mmqtt_s_decode_buffer(conn, pusher, NULL, 0, (uint16_t) length);
defmacro 0:a268b80e3093 555 } else {
defmacro 0:a268b80e3093 556 return MMQTT_STATUS_OK;
defmacro 0:a268b80e3093 557 }
defmacro 0:a268b80e3093 558 }
defmacro 0:a268b80e3093 559
defmacro 0:a268b80e3093 560 mmqtt_status_t
defmacro 0:a268b80e3093 561 mmqtt_s_encode_fixed_header(struct mmqtt_connection *conn, mmqtt_s_puller puller,
defmacro 0:a268b80e3093 562 uint8_t flag, uint32_t length)
defmacro 0:a268b80e3093 563 {
defmacro 0:a268b80e3093 564 mmqtt_status_t status;
defmacro 0:a268b80e3093 565 mmqtt_ssize_t i;
defmacro 0:a268b80e3093 566 uint8_t c, buffer[4];
defmacro 0:a268b80e3093 567
defmacro 0:a268b80e3093 568 buffer[0] = flag;
defmacro 0:a268b80e3093 569 status = mmqtt_s_encode_buffer(conn, puller, buffer, 1);
defmacro 0:a268b80e3093 570 if (status != MMQTT_STATUS_OK) { return status; }
defmacro 0:a268b80e3093 571
defmacro 0:a268b80e3093 572 i = 0;
defmacro 0:a268b80e3093 573 do {
defmacro 0:a268b80e3093 574 c = (uint8_t) length % 128;
defmacro 0:a268b80e3093 575 length /= 128;
defmacro 0:a268b80e3093 576 if (length > 0) { c |= 0x80; }
defmacro 0:a268b80e3093 577 buffer[i++] = c;
defmacro 0:a268b80e3093 578 } while (length > 0);
defmacro 0:a268b80e3093 579
defmacro 0:a268b80e3093 580 return mmqtt_s_encode_buffer(conn, puller, buffer, i);
defmacro 0:a268b80e3093 581 }
defmacro 0:a268b80e3093 582
defmacro 0:a268b80e3093 583 mmqtt_status_t
defmacro 0:a268b80e3093 584 mmqtt_s_decode_fixed_header(struct mmqtt_connection *conn, mmqtt_s_pusher pusher,
defmacro 0:a268b80e3093 585 uint8_t *flag, uint32_t *length)
defmacro 0:a268b80e3093 586 {
defmacro 0:a268b80e3093 587 mmqtt_status_t status;
defmacro 0:a268b80e3093 588 uint8_t c;
defmacro 0:a268b80e3093 589 uint32_t l, factor;
defmacro 0:a268b80e3093 590
defmacro 0:a268b80e3093 591 status = mmqtt_s_decode_buffer(conn, pusher, &c, 1, 1);
defmacro 0:a268b80e3093 592 if (status != MMQTT_STATUS_OK) { return status; }
defmacro 0:a268b80e3093 593 if (flag) { *flag = c; }
defmacro 0:a268b80e3093 594
defmacro 0:a268b80e3093 595 l = 0;
defmacro 0:a268b80e3093 596 factor = 1;
defmacro 0:a268b80e3093 597 do {
defmacro 0:a268b80e3093 598 status = mmqtt_s_decode_buffer(conn, pusher, &c, 1, 1);
defmacro 0:a268b80e3093 599 if (status != MMQTT_STATUS_OK) { return status; }
defmacro 0:a268b80e3093 600 l += (c & 0x7F) * factor;
defmacro 0:a268b80e3093 601 factor *= 128;
defmacro 0:a268b80e3093 602 } while ((c & 0x80) != 0);
defmacro 0:a268b80e3093 603 if (length) { *length = l; }
defmacro 0:a268b80e3093 604
defmacro 0:a268b80e3093 605 return MMQTT_STATUS_OK;
defmacro 0:a268b80e3093 606 }
defmacro 0:a268b80e3093 607
defmacro 0:a268b80e3093 608 void mmqtt_s_pack_uint16_(uint16_t length, uint8_t *buffer)
defmacro 0:a268b80e3093 609 {
defmacro 0:a268b80e3093 610 buffer[0] = (length >> 8) & 0xFF;
defmacro 0:a268b80e3093 611 buffer[1] = length & 0xFF;
defmacro 0:a268b80e3093 612 }
defmacro 0:a268b80e3093 613
defmacro 0:a268b80e3093 614 uint16_t mmqtt_s_unpack_uint16_(uint8_t *buffer)
defmacro 0:a268b80e3093 615 {
defmacro 0:a268b80e3093 616 return ((uint16_t) (buffer[0] << 8)) | ((uint16_t) buffer[1]);
defmacro 0:a268b80e3093 617 }
defmacro 0:a268b80e3093 618
defmacro 0:a268b80e3093 619 /* We don't case uint16_t* directly into uint8_t* because there might be
defmacro 0:a268b80e3093 620 * endianness differences
defmacro 0:a268b80e3093 621 */
defmacro 0:a268b80e3093 622 mmqtt_status_t
defmacro 0:a268b80e3093 623 mmqtt_s_encode_uint16(struct mmqtt_connection *conn, mmqtt_s_puller puller,
defmacro 0:a268b80e3093 624 uint16_t val)
defmacro 0:a268b80e3093 625 {
defmacro 0:a268b80e3093 626 uint8_t buffer[2];
defmacro 0:a268b80e3093 627
defmacro 0:a268b80e3093 628 mmqtt_s_pack_uint16_(val, buffer);
defmacro 0:a268b80e3093 629 return mmqtt_s_encode_buffer(conn, puller, buffer, 2);
defmacro 0:a268b80e3093 630 }
defmacro 0:a268b80e3093 631
defmacro 0:a268b80e3093 632 mmqtt_status_t
defmacro 0:a268b80e3093 633 mmqtt_s_decode_uint16(struct mmqtt_connection *conn, mmqtt_s_pusher pusher,
defmacro 0:a268b80e3093 634 uint16_t *out_val)
defmacro 0:a268b80e3093 635 {
defmacro 0:a268b80e3093 636 mmqtt_status_t status;
defmacro 0:a268b80e3093 637 uint8_t buffer[2];
defmacro 0:a268b80e3093 638
defmacro 0:a268b80e3093 639 status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2);
defmacro 0:a268b80e3093 640 if (status != MMQTT_STATUS_OK) { return status; }
defmacro 0:a268b80e3093 641
defmacro 0:a268b80e3093 642 if (out_val) { *out_val = mmqtt_s_unpack_uint16_(buffer); }
defmacro 0:a268b80e3093 643 return MMQTT_STATUS_OK;
defmacro 0:a268b80e3093 644 }
defmacro 0:a268b80e3093 645
defmacro 0:a268b80e3093 646 mmqtt_status_t
defmacro 0:a268b80e3093 647 mmqtt_s_encode_string(struct mmqtt_connection *conn, mmqtt_s_puller puller,
defmacro 0:a268b80e3093 648 const uint8_t *str, uint16_t length)
defmacro 0:a268b80e3093 649 {
defmacro 0:a268b80e3093 650 mmqtt_status_t status;
defmacro 0:a268b80e3093 651 uint8_t buffer[2];
defmacro 0:a268b80e3093 652
defmacro 0:a268b80e3093 653 mmqtt_s_pack_uint16_(length, buffer);
defmacro 0:a268b80e3093 654 status = mmqtt_s_encode_buffer(conn, puller, buffer, 2);
defmacro 0:a268b80e3093 655 if (status != MMQTT_STATUS_OK) { return status; }
defmacro 0:a268b80e3093 656
defmacro 0:a268b80e3093 657 return mmqtt_s_encode_buffer(conn, puller, str, length);
defmacro 0:a268b80e3093 658 }
defmacro 0:a268b80e3093 659
defmacro 0:a268b80e3093 660 mmqtt_status_t
defmacro 0:a268b80e3093 661 mmqtt_s_decode_string(struct mmqtt_connection *conn, mmqtt_s_pusher pusher,
defmacro 0:a268b80e3093 662 uint8_t *str, uint16_t max_length,
defmacro 0:a268b80e3093 663 uint16_t *out_length, uint16_t *out_true_length) {
defmacro 0:a268b80e3093 664 mmqtt_status_t status;
defmacro 0:a268b80e3093 665 uint8_t buffer[2];
defmacro 0:a268b80e3093 666 uint16_t true_length, length;
defmacro 0:a268b80e3093 667
defmacro 0:a268b80e3093 668 status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2);
defmacro 0:a268b80e3093 669 if (status != MMQTT_STATUS_OK) { return status; }
defmacro 0:a268b80e3093 670 true_length = mmqtt_s_unpack_uint16_(buffer);
defmacro 0:a268b80e3093 671
defmacro 0:a268b80e3093 672 length = min(true_length, max_length);
defmacro 0:a268b80e3093 673 status = mmqtt_s_decode_buffer(conn, pusher, str, length, true_length);
defmacro 0:a268b80e3093 674 if (status != MMQTT_STATUS_OK) { return status; }
defmacro 0:a268b80e3093 675
defmacro 0:a268b80e3093 676 if (out_length) { *out_length = length; }
defmacro 0:a268b80e3093 677 if (out_true_length) { *out_true_length = true_length; }
defmacro 0:a268b80e3093 678 return MMQTT_STATUS_OK;
defmacro 0:a268b80e3093 679 }
defmacro 0:a268b80e3093 680
defmacro 0:a268b80e3093 681 /* p stands for packet here */
defmacro 0:a268b80e3093 682 struct mmqtt_p_connect_header {
defmacro 0:a268b80e3093 683 uint8_t *name;
defmacro 0:a268b80e3093 684 uint16_t name_length;
defmacro 0:a268b80e3093 685 uint16_t name_max_length;
defmacro 0:a268b80e3093 686 uint8_t protocol_version;
defmacro 0:a268b80e3093 687 uint8_t flags;
defmacro 0:a268b80e3093 688 uint16_t keepalive;
defmacro 0:a268b80e3093 689 };
defmacro 0:a268b80e3093 690
defmacro 0:a268b80e3093 691 mmqtt_status_t
defmacro 0:a268b80e3093 692 mmqtt_s_encode_connect_header(struct mmqtt_connection *conn, mmqtt_s_puller puller,
defmacro 0:a268b80e3093 693 const struct mmqtt_p_connect_header *header)
defmacro 0:a268b80e3093 694 {
defmacro 0:a268b80e3093 695 mmqtt_status_t status;
defmacro 0:a268b80e3093 696 uint8_t buffer[2];
defmacro 0:a268b80e3093 697
defmacro 0:a268b80e3093 698 mmqtt_s_pack_uint16_(header->name_length, buffer);
defmacro 0:a268b80e3093 699 status = mmqtt_s_encode_buffer(conn, puller, buffer, 2);
defmacro 0:a268b80e3093 700 if (status != MMQTT_STATUS_OK) { return status; }
defmacro 0:a268b80e3093 701
defmacro 0:a268b80e3093 702 status = mmqtt_s_encode_buffer(conn, puller, header->name, header->name_length);
defmacro 0:a268b80e3093 703 if (status != MMQTT_STATUS_OK) { return status; }
defmacro 0:a268b80e3093 704
defmacro 0:a268b80e3093 705 status = mmqtt_s_encode_buffer(conn, puller, &header->protocol_version, 1);
defmacro 0:a268b80e3093 706 if (status != MMQTT_STATUS_OK) { return status; }
defmacro 0:a268b80e3093 707
defmacro 0:a268b80e3093 708 status = mmqtt_s_encode_buffer(conn, puller, &header->flags, 1);
defmacro 0:a268b80e3093 709 if (status != MMQTT_STATUS_OK) { return status; }
defmacro 0:a268b80e3093 710
defmacro 0:a268b80e3093 711 mmqtt_s_pack_uint16_(header->keepalive, buffer);
defmacro 0:a268b80e3093 712 return mmqtt_s_encode_buffer(conn, puller, buffer, 2);
defmacro 0:a268b80e3093 713 }
defmacro 0:a268b80e3093 714
defmacro 0:a268b80e3093 715 /* We would rewrite name_length in header to contain returned name length */
defmacro 0:a268b80e3093 716 mmqtt_status_t
defmacro 0:a268b80e3093 717 mmqtt_s_decode_connect_header(struct mmqtt_connection *conn, mmqtt_s_pusher pusher,
defmacro 0:a268b80e3093 718 struct mmqtt_p_connect_header *header)
defmacro 0:a268b80e3093 719 {
defmacro 0:a268b80e3093 720 mmqtt_status_t status;
defmacro 0:a268b80e3093 721 uint8_t buffer[2];
defmacro 0:a268b80e3093 722 uint16_t length;
defmacro 0:a268b80e3093 723
defmacro 0:a268b80e3093 724 status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2);
defmacro 0:a268b80e3093 725 if (status != MMQTT_STATUS_OK) { return status; }
defmacro 0:a268b80e3093 726 length = mmqtt_s_unpack_uint16_(buffer);
defmacro 0:a268b80e3093 727
defmacro 0:a268b80e3093 728 header->name_length = min(header->name_max_length, length);
defmacro 0:a268b80e3093 729 status = mmqtt_s_decode_buffer(conn, pusher, header->name,
defmacro 0:a268b80e3093 730 header->name_length, length);
defmacro 0:a268b80e3093 731 if (status != MMQTT_STATUS_OK) { return status; }
defmacro 0:a268b80e3093 732
defmacro 0:a268b80e3093 733 status = mmqtt_s_decode_buffer(conn, pusher, &header->protocol_version, 1, 1);
defmacro 0:a268b80e3093 734 if (status != MMQTT_STATUS_OK) { return status; }
defmacro 0:a268b80e3093 735
defmacro 0:a268b80e3093 736 status = mmqtt_s_decode_buffer(conn, pusher, &header->flags, 1, 1);
defmacro 0:a268b80e3093 737 if (status != MMQTT_STATUS_OK) { return status; }
defmacro 0:a268b80e3093 738
defmacro 0:a268b80e3093 739 status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2);
defmacro 0:a268b80e3093 740 if (status != MMQTT_STATUS_OK) { return status; }
defmacro 0:a268b80e3093 741 header->keepalive = mmqtt_s_unpack_uint16_(buffer);
defmacro 0:a268b80e3093 742
defmacro 0:a268b80e3093 743 return MMQTT_STATUS_OK;
defmacro 0:a268b80e3093 744 }
defmacro 0:a268b80e3093 745
defmacro 0:a268b80e3093 746 struct mmqtt_p_connack_header {
defmacro 0:a268b80e3093 747 uint8_t reserved;
defmacro 0:a268b80e3093 748 uint8_t return_code;
defmacro 0:a268b80e3093 749 };
defmacro 0:a268b80e3093 750
defmacro 0:a268b80e3093 751 mmqtt_status_t
defmacro 0:a268b80e3093 752 mmqtt_s_encode_connack_header(struct mmqtt_connection *conn, mmqtt_s_puller puller,
defmacro 0:a268b80e3093 753 const struct mmqtt_p_connack_header *header)
defmacro 0:a268b80e3093 754 {
defmacro 0:a268b80e3093 755 mmqtt_status_t status;
defmacro 0:a268b80e3093 756
defmacro 0:a268b80e3093 757 status = mmqtt_s_encode_buffer(conn, puller, &header->reserved, 1);
defmacro 0:a268b80e3093 758 if (status != MMQTT_STATUS_OK) { return status; }
defmacro 0:a268b80e3093 759
defmacro 0:a268b80e3093 760 return mmqtt_s_encode_buffer(conn, puller, &header->return_code, 1);
defmacro 0:a268b80e3093 761 }
defmacro 0:a268b80e3093 762
defmacro 0:a268b80e3093 763 mmqtt_status_t
defmacro 0:a268b80e3093 764 mmqtt_s_decode_connack_header(struct mmqtt_connection *conn, mmqtt_s_pusher pusher,
defmacro 0:a268b80e3093 765 struct mmqtt_p_connack_header *header)
defmacro 0:a268b80e3093 766 {
defmacro 0:a268b80e3093 767 mmqtt_status_t status;
defmacro 0:a268b80e3093 768
defmacro 0:a268b80e3093 769 status = mmqtt_s_decode_buffer(conn, pusher, &header->reserved, 1, 1);
defmacro 0:a268b80e3093 770 if (status != MMQTT_STATUS_OK) { return status; }
defmacro 0:a268b80e3093 771
defmacro 1:311cd16389ff 772 return mmqtt_s_decode_buffer(conn, pusher, &header->return_code, 1, 1);
defmacro 0:a268b80e3093 773 }
defmacro 0:a268b80e3093 774
defmacro 0:a268b80e3093 775 /* Works for both subscribe and unsubscribe */
defmacro 0:a268b80e3093 776 struct mmqtt_p_subscribe_decode_context {
defmacro 0:a268b80e3093 777 uint32_t length;
defmacro 0:a268b80e3093 778 uint8_t has_qos;
defmacro 0:a268b80e3093 779 };
defmacro 0:a268b80e3093 780
defmacro 0:a268b80e3093 781 struct mmqtt_p_subscribe_payload_line {
defmacro 0:a268b80e3093 782 uint8_t *topic;
defmacro 0:a268b80e3093 783 uint16_t topic_length;
defmacro 0:a268b80e3093 784 uint16_t topic_max_length;
defmacro 0:a268b80e3093 785 uint8_t qos;
defmacro 0:a268b80e3093 786 };
defmacro 0:a268b80e3093 787
defmacro 0:a268b80e3093 788 void
defmacro 0:a268b80e3093 789 mmqtt_s_subscribe_decode_context_init(struct mmqtt_p_subscribe_decode_context *context,
defmacro 0:a268b80e3093 790 uint32_t length)
defmacro 0:a268b80e3093 791 {
defmacro 0:a268b80e3093 792 context->length = length;
defmacro 0:a268b80e3093 793 context->has_qos = 1;
defmacro 0:a268b80e3093 794 }
defmacro 0:a268b80e3093 795
defmacro 0:a268b80e3093 796 void
defmacro 0:a268b80e3093 797 mmqtt_s_unsubscribe_decode_context_init(struct mmqtt_p_subscribe_decode_context *context,
defmacro 0:a268b80e3093 798 uint32_t length)
defmacro 0:a268b80e3093 799 {
defmacro 0:a268b80e3093 800 context->length = length;
defmacro 0:a268b80e3093 801 context->has_qos = 0;
defmacro 0:a268b80e3093 802 }
defmacro 0:a268b80e3093 803
defmacro 0:a268b80e3093 804 mmqtt_status_t
defmacro 0:a268b80e3093 805 mmqtt_s_decode_subscribe_payload(struct mmqtt_connection *conn, mmqtt_s_pusher pusher,
defmacro 0:a268b80e3093 806 struct mmqtt_p_subscribe_decode_context *context,
defmacro 0:a268b80e3093 807 struct mmqtt_p_subscribe_payload_line *line)
defmacro 0:a268b80e3093 808 {
defmacro 0:a268b80e3093 809 mmqtt_status_t status;
defmacro 0:a268b80e3093 810 uint16_t string_true_length;
defmacro 0:a268b80e3093 811
defmacro 0:a268b80e3093 812 if (context->length <= 0) { return MMQTT_STATUS_DONE; }
defmacro 0:a268b80e3093 813 status = mmqtt_s_decode_string(conn, pusher, line->topic, line->topic_max_length,
defmacro 0:a268b80e3093 814 &line->topic_length, &string_true_length);
defmacro 0:a268b80e3093 815 if (status != MMQTT_STATUS_OK) { return status; }
defmacro 0:a268b80e3093 816
defmacro 0:a268b80e3093 817 context->length -= string_true_length;
defmacro 0:a268b80e3093 818 context->length -= 2;
defmacro 0:a268b80e3093 819
defmacro 0:a268b80e3093 820 if (context->has_qos != 0) {
defmacro 0:a268b80e3093 821 status = mmqtt_s_decode_buffer(conn, pusher, &line->qos, 1, 1);
defmacro 0:a268b80e3093 822 if (status != MMQTT_STATUS_OK) { return status; }
defmacro 0:a268b80e3093 823 context->length -= 1;
defmacro 0:a268b80e3093 824 } else {
defmacro 0:a268b80e3093 825 line->qos = 0;
defmacro 0:a268b80e3093 826 }
defmacro 0:a268b80e3093 827 return MMQTT_STATUS_OK;
defmacro 0:a268b80e3093 828 }
defmacro 0:a268b80e3093 829
defmacro 0:a268b80e3093 830 uint32_t
defmacro 0:a268b80e3093 831 mmqtt_s_string_encoded_length(uint16_t string_length)
defmacro 0:a268b80e3093 832 {
defmacro 0:a268b80e3093 833 return string_length + 2;
defmacro 0:a268b80e3093 834 }
defmacro 0:a268b80e3093 835
defmacro 0:a268b80e3093 836 uint32_t
defmacro 0:a268b80e3093 837 mmqtt_s_connect_header_encoded_length(const struct mmqtt_p_connect_header *header)
defmacro 0:a268b80e3093 838 {
defmacro 0:a268b80e3093 839 return mmqtt_s_string_encoded_length(header->name_length) + 4;
defmacro 0:a268b80e3093 840 }
defmacro 0:a268b80e3093 841
defmacro 0:a268b80e3093 842 #endif /* MMQTT_DISABLE_STATIC */
defmacro 0:a268b80e3093 843
defmacro 0:a268b80e3093 844 #endif /* MMQTT_H_ */