minimal MQTT Library. This version simply doubled the MQTT QUEUE sizes
Dependents: WNCInterface_M2XMQTTdemo
Fork of minimal-mqtt by
minimal-mqtt.h
00001 #ifndef MMQTT_H_ 00002 #define MMQTT_H_ 00003 00004 #include <stdint.h> 00005 #ifndef min 00006 #define min(a, b) ((a) > (b) ? (b) : (a)) 00007 #endif /* min */ 00008 00009 #define MMQTT_STREAM_MAX_LENGTH 16 00010 #define MMQTT_QUEUE_MAX_LENGTH 4 00011 00012 /* mmqtt_ssize_t must be able to hold MMQTT_STREAM_MAX_LENGTH as well as 00013 * MMQTT_QUEUE_MAX_LENGTH, we might add macro-based detection in the future. 00014 */ 00015 typedef int8_t mmqtt_ssize_t; 00016 typedef int8_t mmqtt_status_t; 00017 00018 #define MMQTT_STATUS_OK 0 00019 #define MMQTT_STATUS_UNKNOWN -1 00020 #define MMQTT_STATUS_DONE -2 00021 #define MMQTT_STATUS_NOT_PULLABLE -3 00022 #define MMQTT_STATUS_NOT_PUSHABLE -4 00023 #define MMQTT_STATUS_NOT_EXPECTED -5 00024 #define MMQTT_STATUS_BROKEN_CONNECTION -6 00025 #define MMQTT_STATUS_INVALID_STATE -7 00026 00027 #define MMQTT_MESSAGE_TYPE_CONNECT 0x1 00028 #define MMQTT_MESSAGE_TYPE_CONNACK 0x2 00029 #define MMQTT_MESSAGE_TYPE_PUBLISH 0x3 00030 #define MMQTT_MESSAGE_TYPE_PUBACK 0x4 00031 #define MMQTT_MESSAGE_TYPE_SUBSCRIBE 0x8 00032 #define MMQTT_MESSAGE_TYPE_SUBACK 0x9 00033 #define MMQTT_MESSAGE_TYPE_UNSUBSCRIBE 0x10 00034 #define MMQTT_MESSAGE_TYPE_UNSUBACK 0x11 00035 #define MMQTT_MESSAGE_TYPE_PINGREQ 0x12 00036 #define MMQTT_MESSAGE_TYPE_PINGRESP 0x13 00037 #define MMQTT_MESSAGE_TYPE_DISCONNECT 0x14 00038 00039 #define MMQTT_PACK_MESSAGE_TYPE(type_) ((type_) << 4) 00040 #define MMQTT_UNPACK_MESSAGE_TYPE(type_) (((type_) >> 4) & 0xF) 00041 00042 struct mmqtt_ring_buffer_ { 00043 mmqtt_ssize_t start; 00044 mmqtt_ssize_t length; 00045 mmqtt_ssize_t capacity; 00046 }; 00047 00048 struct mmqtt_stream { 00049 struct mmqtt_ring_buffer_ ring_buffer; 00050 uint8_t data[MMQTT_STREAM_MAX_LENGTH]; 00051 size_t left; 00052 }; 00053 00054 struct mmqtt_queue_ { 00055 struct mmqtt_ring_buffer_ ring_buffer; 00056 struct mmqtt_stream* streams[MMQTT_QUEUE_MAX_LENGTH]; 00057 }; 00058 00059 #define MMQTT_QUEUE_LENGTH(queue) ((queue)->ring_buffer.length) 00060 #define MMQTT_QUEUE_INDEX(queue, i) ((queue)->streams[((queue)->ring_buffer.start + (i)) % ((queue)->ring_buffer.capacity)]) 00061 00062 struct mmqtt_connection { 00063 void *connection; 00064 struct mmqtt_queue_ read_queue; 00065 struct mmqtt_queue_ write_queue; 00066 }; 00067 00068 static inline void 00069 mmqtt_ring_buffer_init_(struct mmqtt_ring_buffer_ *buffer, mmqtt_ssize_t capacity) 00070 { 00071 buffer->start = 0; 00072 buffer->length = 0; 00073 buffer->capacity = capacity; 00074 } 00075 00076 static inline mmqtt_ssize_t 00077 mmqtt_ring_buffer_pullable_(const struct mmqtt_ring_buffer_ *buffer) 00078 { 00079 return min(buffer->start + buffer->length, buffer->capacity) - buffer->start; 00080 } 00081 00082 static inline void 00083 mmqtt_ring_buffer_pull_(struct mmqtt_ring_buffer_ *buffer, mmqtt_ssize_t length) 00084 { 00085 buffer->start = (buffer->start + length) % buffer->capacity; 00086 buffer->length -= length; 00087 } 00088 00089 static inline mmqtt_ssize_t 00090 mmqtt_ring_buffer_pushable_(const struct mmqtt_ring_buffer_ *buffer) 00091 { 00092 if (buffer->start + buffer->length > buffer->capacity) { 00093 return buffer->capacity - buffer->length; 00094 } 00095 return buffer->capacity - buffer->length - buffer->start; 00096 } 00097 00098 static inline void 00099 mmqtt_ring_buffer_push_(struct mmqtt_ring_buffer_ *buffer, mmqtt_ssize_t length) 00100 { 00101 buffer->length += length; 00102 } 00103 00104 static void 00105 mmqtt_stream_init(struct mmqtt_stream *stream, size_t total_size) 00106 { 00107 mmqtt_ring_buffer_init_(&stream->ring_buffer, MMQTT_STREAM_MAX_LENGTH); 00108 stream->left = total_size; 00109 } 00110 00111 static mmqtt_status_t 00112 mmqtt_stream_running(struct mmqtt_stream *stream) 00113 { 00114 if (stream->left > 0 || stream->ring_buffer.length > 0) { 00115 return MMQTT_STATUS_OK; 00116 } else { 00117 return MMQTT_STATUS_DONE; 00118 } 00119 } 00120 00121 static mmqtt_ssize_t 00122 mmqtt_stream_pull(struct mmqtt_stream *stream, uint8_t* out, mmqtt_ssize_t max_length) 00123 { 00124 mmqtt_ssize_t pullable, i; 00125 pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer); 00126 if (pullable <= 0) { 00127 if (stream->left == 0) { 00128 return MMQTT_STATUS_DONE; 00129 } else { 00130 return MMQTT_STATUS_NOT_PULLABLE; 00131 } 00132 } 00133 pullable = min(pullable, max_length); 00134 00135 if (out) { 00136 for (i = 0; i < pullable; i++) { 00137 out[i] = stream->data[stream->ring_buffer.start + i]; 00138 } 00139 } 00140 mmqtt_ring_buffer_pull_(&stream->ring_buffer, pullable); 00141 return pullable; 00142 } 00143 00144 static mmqtt_status_t 00145 mmqtt_stream_external_pullable(struct mmqtt_stream *stream, 00146 const uint8_t** data, mmqtt_ssize_t* max_length) 00147 { 00148 mmqtt_ssize_t pullable; 00149 pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer); 00150 if (pullable <= 0) { 00151 if (stream->left == 0) { 00152 return MMQTT_STATUS_DONE; 00153 } else { 00154 return MMQTT_STATUS_NOT_PULLABLE; 00155 } 00156 } 00157 pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer); 00158 if (pullable <= 0) { return MMQTT_STATUS_NOT_PULLABLE; } 00159 *data = stream->data + stream->ring_buffer.start; 00160 *max_length = pullable; 00161 return MMQTT_STATUS_OK; 00162 } 00163 00164 static mmqtt_status_t 00165 mmqtt_stream_external_pull(struct mmqtt_stream *stream, mmqtt_ssize_t length) 00166 { 00167 mmqtt_ssize_t pullable; 00168 pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer); 00169 if (pullable < length) { 00170 return MMQTT_STATUS_NOT_PULLABLE; 00171 } else { 00172 mmqtt_ring_buffer_pull_(&stream->ring_buffer, length); 00173 return MMQTT_STATUS_OK; 00174 } 00175 } 00176 00177 static mmqtt_ssize_t 00178 mmqtt_stream_push(struct mmqtt_stream *stream, const uint8_t* in, mmqtt_ssize_t length) 00179 { 00180 mmqtt_ssize_t pushable, i; 00181 if (stream->left == 0) { return MMQTT_STATUS_DONE; } 00182 pushable = mmqtt_ring_buffer_pushable_(&stream->ring_buffer); 00183 if (pushable <= 0) { return MMQTT_STATUS_NOT_PUSHABLE; } 00184 pushable = min(stream->left, min(pushable, length)); 00185 00186 for (i = 0; i < pushable; i++) { 00187 stream->data[stream->ring_buffer.start + i] = in[i]; 00188 } 00189 mmqtt_ring_buffer_push_(&stream->ring_buffer, pushable); 00190 if (stream->left > 0) { stream->left -= pushable; } 00191 return pushable; 00192 } 00193 00194 static mmqtt_status_t 00195 mmqtt_stream_external_pushable(struct mmqtt_stream *stream, 00196 uint8_t** data, mmqtt_ssize_t* max_length) 00197 { 00198 mmqtt_ssize_t pushable; 00199 if (stream->left == 0) { return MMQTT_STATUS_DONE; } 00200 pushable = mmqtt_ring_buffer_pushable_(&stream->ring_buffer); 00201 if (pushable <= 0) { return MMQTT_STATUS_NOT_PUSHABLE; } 00202 pushable = min(pushable, stream->left); 00203 *data = stream->data + stream->ring_buffer.start; 00204 *max_length = pushable; 00205 return MMQTT_STATUS_OK; 00206 } 00207 00208 static mmqtt_status_t 00209 mmqtt_stream_external_push(struct mmqtt_stream *stream, mmqtt_ssize_t length) 00210 { 00211 mmqtt_ssize_t pushable; 00212 if (stream->left == 0) { return MMQTT_STATUS_DONE; } 00213 pushable = mmqtt_ring_buffer_pushable_(&stream->ring_buffer); 00214 if (pushable <= 0) { return MMQTT_STATUS_NOT_PUSHABLE; } 00215 pushable = min(pushable, stream->left); 00216 if (pushable < length) { 00217 return MMQTT_STATUS_NOT_PUSHABLE; 00218 } else { 00219 mmqtt_ring_buffer_push_(&stream->ring_buffer, pushable); 00220 return MMQTT_STATUS_OK; 00221 } 00222 } 00223 00224 static void 00225 mmqtt_queue_init_(struct mmqtt_queue_ *queue) 00226 { 00227 mmqtt_ring_buffer_init_(&queue->ring_buffer, MMQTT_QUEUE_MAX_LENGTH); 00228 } 00229 00230 static mmqtt_status_t 00231 mmqtt_queue_add_(struct mmqtt_queue_ *queue, struct mmqtt_stream *stream) 00232 { 00233 if (mmqtt_ring_buffer_pushable_(&queue->ring_buffer) <= 0) { 00234 return MMQTT_STATUS_NOT_PUSHABLE; 00235 } 00236 queue->streams[queue->ring_buffer.start] = stream; 00237 mmqtt_ring_buffer_push_(&queue->ring_buffer, 1); 00238 return MMQTT_STATUS_OK; 00239 } 00240 00241 #define MMQTT_QUEUE_AVAILABLE(queue) ((queue)->ring_buffer.length > 0) 00242 #define MMQTT_QUEUE_FIRST(queue) ((queue)->streams[(queue)->ring_buffer.start]) 00243 00244 static void 00245 mmqtt_connection_init(struct mmqtt_connection *connection, void *conn) 00246 { 00247 connection->connection = conn; 00248 mmqtt_queue_init_(&connection->read_queue); 00249 mmqtt_queue_init_(&connection->write_queue); 00250 } 00251 00252 /* 00253 * Pulls data out of connection's write queue, the returned data is supposed 00254 * to be sent to a TCP socket to remote servers. 00255 */ 00256 static mmqtt_ssize_t 00257 mmqtt_connection_pull(struct mmqtt_connection *connection, 00258 uint8_t *out, mmqtt_ssize_t max_length) 00259 { 00260 mmqtt_ssize_t length, current; 00261 struct mmqtt_queue_ *queue; 00262 length = 0; 00263 queue = &connection->write_queue; 00264 while(MMQTT_QUEUE_AVAILABLE(queue) && length < max_length) { 00265 current = mmqtt_stream_pull(MMQTT_QUEUE_FIRST(queue), 00266 out + length, 00267 max_length - length); 00268 /* TODO: in case current is 0, should we return non-pullable? */ 00269 if (current >= 0) { 00270 length += current; 00271 } else if (current == MMQTT_STATUS_DONE) { 00272 /* Release stream from write queue since it is done */ 00273 mmqtt_ring_buffer_pull_(&queue->ring_buffer, 1); 00274 } else if (current == MMQTT_STATUS_NOT_PULLABLE) { 00275 return (length > 0) ? length : MMQTT_STATUS_NOT_PULLABLE; 00276 } else { 00277 return MMQTT_STATUS_UNKNOWN; 00278 } 00279 } 00280 return (length > 0) ? length : MMQTT_STATUS_NOT_PULLABLE; 00281 } 00282 00283 struct mmqtt_stream * 00284 mmqtt_connection_pullable_stream(struct mmqtt_connection *connection) 00285 { 00286 mmqtt_ssize_t i; 00287 struct mmqtt_queue_ *queue = &connection->write_queue; 00288 struct mmqtt_stream *stream = NULL; 00289 for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) { 00290 stream = MMQTT_QUEUE_INDEX(queue, i); 00291 if (mmqtt_stream_running(stream) == MMQTT_STATUS_OK) { return stream; } 00292 } 00293 return NULL; 00294 } 00295 00296 /* 00297 * Pushes data to connection's read queue, the data should come from 00298 * a TCP socket to a remote machine. 00299 */ 00300 static mmqtt_ssize_t 00301 mmqtt_connection_push(struct mmqtt_connection *connection, 00302 const uint8_t* in, mmqtt_ssize_t length) 00303 { 00304 mmqtt_ssize_t i, current, consumed_length; 00305 struct mmqtt_queue_ *queue; 00306 consumed_length = 0; 00307 queue = &connection->read_queue; 00308 while (i < MMQTT_QUEUE_LENGTH(queue) && consumed_length < length) { 00309 current = mmqtt_stream_push(MMQTT_QUEUE_INDEX(queue, i), 00310 in + consumed_length, 00311 length - consumed_length); 00312 if (current >= 0) { 00313 consumed_length += current; 00314 } else if (current == MMQTT_STATUS_DONE) { 00315 i++; 00316 } else if (current == MMQTT_STATUS_NOT_PUSHABLE) { 00317 return (consumed_length > 0) ? consumed_length : MMQTT_STATUS_NOT_PUSHABLE; 00318 } else { 00319 return MMQTT_STATUS_UNKNOWN; 00320 } 00321 } 00322 return (consumed_length > 0) ? consumed_length : MMQTT_STATUS_NOT_PUSHABLE; 00323 } 00324 00325 struct mmqtt_stream * 00326 mmqtt_connection_pushable_stream(struct mmqtt_connection *connection) 00327 { 00328 mmqtt_ssize_t i; 00329 struct mmqtt_queue_ *queue = &connection->read_queue; 00330 struct mmqtt_stream *stream = NULL; 00331 for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) { 00332 stream = MMQTT_QUEUE_INDEX(queue, i); 00333 if (mmqtt_stream_running(stream) == MMQTT_STATUS_OK) { return stream; } 00334 } 00335 return NULL; 00336 } 00337 00338 static mmqtt_status_t 00339 mmqtt_connection_add_read_stream(struct mmqtt_connection *connection, 00340 struct mmqtt_stream *stream) 00341 { 00342 return mmqtt_queue_add_(&connection->read_queue, stream); 00343 } 00344 00345 static mmqtt_status_t 00346 mmqtt_connection_add_write_stream(struct mmqtt_connection *connection, 00347 struct mmqtt_stream *stream) 00348 { 00349 return mmqtt_queue_add_(&connection->write_queue, stream); 00350 } 00351 00352 static mmqtt_status_t 00353 mmqtt_connection_in_read_stream(struct mmqtt_connection *connection, 00354 struct mmqtt_stream *stream) 00355 { 00356 mmqtt_ssize_t i; 00357 struct mmqtt_queue_ *queue = &connection->read_queue; 00358 for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) { 00359 if (MMQTT_QUEUE_INDEX(queue, i) == stream) { return MMQTT_STATUS_OK; } 00360 } 00361 return MMQTT_STATUS_NOT_EXPECTED; 00362 } 00363 00364 static mmqtt_status_t 00365 mmqtt_connection_in_write_stream(struct mmqtt_connection *connection, 00366 struct mmqtt_stream *stream) 00367 { 00368 mmqtt_ssize_t i; 00369 struct mmqtt_queue_ *queue = &connection->write_queue; 00370 for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) { 00371 if (MMQTT_QUEUE_INDEX(queue, i) == stream) { return MMQTT_STATUS_OK; } 00372 } 00373 return MMQTT_STATUS_NOT_EXPECTED; 00374 } 00375 00376 static mmqtt_ssize_t 00377 mmqtt_connection_read_stream_length(struct mmqtt_connection *connection) 00378 { 00379 return connection->read_queue.ring_buffer.length; 00380 } 00381 00382 static mmqtt_ssize_t 00383 mmqtt_connection_write_stream_length(struct mmqtt_connection *connection) 00384 { 00385 return connection->write_queue.ring_buffer.length; 00386 } 00387 00388 static mmqtt_status_t 00389 mmqtt_connection_release_write_stream(struct mmqtt_connection *connection, 00390 struct mmqtt_stream *stream) 00391 { 00392 if (!MMQTT_QUEUE_AVAILABLE(&connection->write_queue)) { 00393 return MMQTT_STATUS_NOT_PULLABLE; 00394 } 00395 if (stream != MMQTT_QUEUE_FIRST(&connection->write_queue)) { 00396 return MMQTT_STATUS_NOT_EXPECTED; 00397 } 00398 mmqtt_ring_buffer_pull_(&connection->write_queue.ring_buffer, 1); 00399 return MMQTT_STATUS_OK; 00400 } 00401 00402 static mmqtt_status_t 00403 mmqtt_connection_release_read_stream(struct mmqtt_connection *connection, 00404 struct mmqtt_stream *stream) 00405 { 00406 if (!MMQTT_QUEUE_AVAILABLE(&connection->read_queue)) { 00407 return MMQTT_STATUS_NOT_PULLABLE; 00408 } 00409 if (stream != MMQTT_QUEUE_FIRST(&connection->read_queue)) { 00410 return MMQTT_STATUS_NOT_EXPECTED; 00411 } 00412 mmqtt_ring_buffer_pull_(&connection->read_queue.ring_buffer, 1); 00413 return MMQTT_STATUS_OK; 00414 } 00415 00416 static mmqtt_status_t 00417 mmqtt_connection_autorelease_write_streams(struct mmqtt_connection *connection) 00418 { 00419 struct mmqtt_stream *stream; 00420 while (MMQTT_QUEUE_AVAILABLE(&connection->write_queue)) { 00421 stream = MMQTT_QUEUE_FIRST(&connection->write_queue); 00422 if (mmqtt_stream_running(stream) == MMQTT_STATUS_DONE) { 00423 mmqtt_connection_release_write_stream(connection, stream); 00424 } else { 00425 break; 00426 } 00427 } 00428 return MMQTT_STATUS_OK; 00429 } 00430 00431 static mmqtt_status_t 00432 mmqtt_connection_autorelease_read_streams(struct mmqtt_connection *connection) 00433 { 00434 struct mmqtt_stream *stream; 00435 while (MMQTT_QUEUE_AVAILABLE(&connection->read_queue)) { 00436 stream = MMQTT_QUEUE_FIRST(&connection->read_queue); 00437 if (mmqtt_stream_running(stream) == MMQTT_STATUS_DONE) { 00438 mmqtt_connection_release_read_stream(connection, stream); 00439 } else { 00440 break; 00441 } 00442 } 00443 return MMQTT_STATUS_OK; 00444 } 00445 00446 #ifndef MMQTT_DISABLE_STATIC 00447 00448 typedef mmqtt_status_t (*mmqtt_s_puller)(struct mmqtt_connection*); 00449 typedef mmqtt_status_t (*mmqtt_s_pusher)(struct mmqtt_connection*, mmqtt_ssize_t); 00450 00451 mmqtt_status_t 00452 mmqtt_s_encode_buffer(struct mmqtt_connection *conn, mmqtt_s_puller puller, 00453 const uint8_t *buffer, uint16_t length) 00454 { 00455 struct mmqtt_stream stream; 00456 mmqtt_status_t status; 00457 mmqtt_ssize_t current; 00458 uint16_t consumed_length; 00459 00460 mmqtt_stream_init(&stream, length); 00461 while ((status = mmqtt_connection_add_write_stream(conn, &stream)) == MMQTT_STATUS_NOT_PUSHABLE) { 00462 status = puller(conn); 00463 if (status != MMQTT_STATUS_OK) { return status; } 00464 } 00465 if (status != MMQTT_STATUS_OK) { return status; } 00466 00467 consumed_length = 0; 00468 while (consumed_length < length) { 00469 current = mmqtt_stream_push(&stream, buffer + consumed_length, 00470 length - consumed_length); 00471 if (current >= 0) { 00472 consumed_length += current; 00473 } else if (current == MMQTT_STATUS_NOT_PUSHABLE) { 00474 status = puller(conn); 00475 if (status != MMQTT_STATUS_OK) { return status; } 00476 } 00477 } 00478 while (mmqtt_connection_in_write_stream(conn, &stream) == MMQTT_STATUS_OK) { 00479 status = puller(conn); 00480 if (status != MMQTT_STATUS_OK) { return status; } 00481 } 00482 return MMQTT_STATUS_OK; 00483 } 00484 00485 mmqtt_status_t 00486 mmqtt_s_decode_buffer(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, 00487 uint8_t *buffer, uint16_t buffer_length, 00488 uint16_t consume_length) 00489 { 00490 struct mmqtt_stream stream; 00491 mmqtt_status_t status; 00492 mmqtt_ssize_t current; 00493 uint16_t length; 00494 uint8_t temp; 00495 00496 if (mmqtt_connection_read_stream_length(conn) > 0) { 00497 return MMQTT_STATUS_INVALID_STATE; 00498 } 00499 00500 if (consume_length < buffer_length) { consume_length = buffer_length; } 00501 mmqtt_stream_init(&stream, consume_length); 00502 /* Connection won't remove read stream automatically, pusher won't help here */ 00503 status = mmqtt_connection_add_read_stream(conn, &stream); 00504 if (status != MMQTT_STATUS_OK) { return status; } 00505 00506 length = 0; 00507 while (length < buffer_length) { 00508 current = mmqtt_stream_pull(&stream, buffer + length, 00509 buffer_length - length); 00510 if (current >= 0) { 00511 length += current; 00512 } else if (current == MMQTT_STATUS_NOT_PULLABLE) { 00513 status = pusher(conn, buffer_length - length); 00514 if (status != MMQTT_STATUS_OK) { 00515 mmqtt_connection_release_read_stream(conn, &stream); 00516 return status; 00517 } 00518 } 00519 } 00520 while (length < consume_length) { 00521 current = mmqtt_stream_pull(&stream, &temp, 1); 00522 if (current >= 0) { 00523 length += current; 00524 } else if (current == MMQTT_STATUS_NOT_PULLABLE) { 00525 status = pusher(conn, consume_length - length); 00526 if (status != MMQTT_STATUS_OK) { 00527 mmqtt_connection_release_read_stream(conn, &stream); 00528 return status; 00529 } 00530 } 00531 } 00532 mmqtt_connection_release_read_stream(conn, &stream); 00533 return MMQTT_STATUS_OK; 00534 } 00535 00536 /* NOTE: for smaller buffer, we can also use mmqtt_s_decode_buffer with 00537 * an empty buffer and 0 as buffer length. Here this function is for the 00538 * case where we are skipping a whole packet altogether, which could be 00539 * larger than 65535 in theory. 00540 */ 00541 mmqtt_status_t 00542 mmqtt_s_skip_buffer(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, 00543 uint32_t length) 00544 { 00545 mmqtt_status_t status; 00546 while (length > 65535) { 00547 status = mmqtt_s_decode_buffer(conn, pusher, NULL, 0, 65535); 00548 if (status != MMQTT_STATUS_OK) { 00549 return status; 00550 } 00551 length -= 65535; 00552 } 00553 if (length > 0) { 00554 return mmqtt_s_decode_buffer(conn, pusher, NULL, 0, (uint16_t) length); 00555 } else { 00556 return MMQTT_STATUS_OK; 00557 } 00558 } 00559 00560 mmqtt_status_t 00561 mmqtt_s_encode_fixed_header(struct mmqtt_connection *conn, mmqtt_s_puller puller, 00562 uint8_t flag, uint32_t length) 00563 { 00564 mmqtt_status_t status; 00565 mmqtt_ssize_t i; 00566 uint8_t c, buffer[4]; 00567 00568 buffer[0] = flag; 00569 status = mmqtt_s_encode_buffer(conn, puller, buffer, 1); 00570 if (status != MMQTT_STATUS_OK) { return status; } 00571 00572 i = 0; 00573 do { 00574 c = (uint8_t) length % 128; 00575 length /= 128; 00576 if (length > 0) { c |= 0x80; } 00577 buffer[i++] = c; 00578 } while (length > 0); 00579 00580 return mmqtt_s_encode_buffer(conn, puller, buffer, i); 00581 } 00582 00583 mmqtt_status_t 00584 mmqtt_s_decode_fixed_header(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, 00585 uint8_t *flag, uint32_t *length) 00586 { 00587 mmqtt_status_t status; 00588 uint8_t c; 00589 uint32_t l, factor; 00590 00591 status = mmqtt_s_decode_buffer(conn, pusher, &c, 1, 1); 00592 if (status != MMQTT_STATUS_OK) { return status; } 00593 if (flag) { *flag = c; } 00594 00595 l = 0; 00596 factor = 1; 00597 do { 00598 status = mmqtt_s_decode_buffer(conn, pusher, &c, 1, 1); 00599 if (status != MMQTT_STATUS_OK) { return status; } 00600 l += (c & 0x7F) * factor; 00601 factor *= 128; 00602 } while ((c & 0x80) != 0); 00603 if (length) { *length = l; } 00604 00605 return MMQTT_STATUS_OK; 00606 } 00607 00608 void mmqtt_s_pack_uint16_(uint16_t length, uint8_t *buffer) 00609 { 00610 buffer[0] = (length >> 8) & 0xFF; 00611 buffer[1] = length & 0xFF; 00612 } 00613 00614 uint16_t mmqtt_s_unpack_uint16_(uint8_t *buffer) 00615 { 00616 return ((uint16_t) (buffer[0] << 8)) | ((uint16_t) buffer[1]); 00617 } 00618 00619 /* We don't case uint16_t* directly into uint8_t* because there might be 00620 * endianness differences 00621 */ 00622 mmqtt_status_t 00623 mmqtt_s_encode_uint16(struct mmqtt_connection *conn, mmqtt_s_puller puller, 00624 uint16_t val) 00625 { 00626 uint8_t buffer[2]; 00627 00628 mmqtt_s_pack_uint16_(val, buffer); 00629 return mmqtt_s_encode_buffer(conn, puller, buffer, 2); 00630 } 00631 00632 mmqtt_status_t 00633 mmqtt_s_decode_uint16(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, 00634 uint16_t *out_val) 00635 { 00636 mmqtt_status_t status; 00637 uint8_t buffer[2]; 00638 00639 status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2); 00640 if (status != MMQTT_STATUS_OK) { return status; } 00641 00642 if (out_val) { *out_val = mmqtt_s_unpack_uint16_(buffer); } 00643 return MMQTT_STATUS_OK; 00644 } 00645 00646 mmqtt_status_t 00647 mmqtt_s_encode_string(struct mmqtt_connection *conn, mmqtt_s_puller puller, 00648 const uint8_t *str, uint16_t length) 00649 { 00650 mmqtt_status_t status; 00651 uint8_t buffer[2]; 00652 00653 mmqtt_s_pack_uint16_(length, buffer); 00654 status = mmqtt_s_encode_buffer(conn, puller, buffer, 2); 00655 if (status != MMQTT_STATUS_OK) { return status; } 00656 00657 return mmqtt_s_encode_buffer(conn, puller, str, length); 00658 } 00659 00660 mmqtt_status_t 00661 mmqtt_s_decode_string(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, 00662 uint8_t *str, uint16_t max_length, 00663 uint16_t *out_length, uint16_t *out_true_length) { 00664 mmqtt_status_t status; 00665 uint8_t buffer[2]; 00666 uint16_t true_length, length; 00667 00668 status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2); 00669 if (status != MMQTT_STATUS_OK) { return status; } 00670 true_length = mmqtt_s_unpack_uint16_(buffer); 00671 00672 length = min(true_length, max_length); 00673 status = mmqtt_s_decode_buffer(conn, pusher, str, length, true_length); 00674 if (status != MMQTT_STATUS_OK) { return status; } 00675 00676 if (out_length) { *out_length = length; } 00677 if (out_true_length) { *out_true_length = true_length; } 00678 return MMQTT_STATUS_OK; 00679 } 00680 00681 /* p stands for packet here */ 00682 struct mmqtt_p_connect_header { 00683 uint8_t *name; 00684 uint16_t name_length; 00685 uint16_t name_max_length; 00686 uint8_t protocol_version; 00687 uint8_t flags; 00688 uint16_t keepalive; 00689 }; 00690 00691 mmqtt_status_t 00692 mmqtt_s_encode_connect_header(struct mmqtt_connection *conn, mmqtt_s_puller puller, 00693 const struct mmqtt_p_connect_header *header) 00694 { 00695 mmqtt_status_t status; 00696 uint8_t buffer[2]; 00697 00698 mmqtt_s_pack_uint16_(header->name_length, buffer); 00699 status = mmqtt_s_encode_buffer(conn, puller, buffer, 2); 00700 if (status != MMQTT_STATUS_OK) { return status; } 00701 00702 status = mmqtt_s_encode_buffer(conn, puller, header->name, header->name_length); 00703 if (status != MMQTT_STATUS_OK) { return status; } 00704 00705 status = mmqtt_s_encode_buffer(conn, puller, &header->protocol_version, 1); 00706 if (status != MMQTT_STATUS_OK) { return status; } 00707 00708 status = mmqtt_s_encode_buffer(conn, puller, &header->flags, 1); 00709 if (status != MMQTT_STATUS_OK) { return status; } 00710 00711 mmqtt_s_pack_uint16_(header->keepalive, buffer); 00712 return mmqtt_s_encode_buffer(conn, puller, buffer, 2); 00713 } 00714 00715 /* We would rewrite name_length in header to contain returned name length */ 00716 mmqtt_status_t 00717 mmqtt_s_decode_connect_header(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, 00718 struct mmqtt_p_connect_header *header) 00719 { 00720 mmqtt_status_t status; 00721 uint8_t buffer[2]; 00722 uint16_t length; 00723 00724 status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2); 00725 if (status != MMQTT_STATUS_OK) { return status; } 00726 length = mmqtt_s_unpack_uint16_(buffer); 00727 00728 header->name_length = min(header->name_max_length, length); 00729 status = mmqtt_s_decode_buffer(conn, pusher, header->name, 00730 header->name_length, length); 00731 if (status != MMQTT_STATUS_OK) { return status; } 00732 00733 status = mmqtt_s_decode_buffer(conn, pusher, &header->protocol_version, 1, 1); 00734 if (status != MMQTT_STATUS_OK) { return status; } 00735 00736 status = mmqtt_s_decode_buffer(conn, pusher, &header->flags, 1, 1); 00737 if (status != MMQTT_STATUS_OK) { return status; } 00738 00739 status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2); 00740 if (status != MMQTT_STATUS_OK) { return status; } 00741 header->keepalive = mmqtt_s_unpack_uint16_(buffer); 00742 00743 return MMQTT_STATUS_OK; 00744 } 00745 00746 struct mmqtt_p_connack_header { 00747 uint8_t reserved; 00748 uint8_t return_code; 00749 }; 00750 00751 mmqtt_status_t 00752 mmqtt_s_encode_connack_header(struct mmqtt_connection *conn, mmqtt_s_puller puller, 00753 const struct mmqtt_p_connack_header *header) 00754 { 00755 mmqtt_status_t status; 00756 00757 status = mmqtt_s_encode_buffer(conn, puller, &header->reserved, 1); 00758 if (status != MMQTT_STATUS_OK) { return status; } 00759 00760 return mmqtt_s_encode_buffer(conn, puller, &header->return_code, 1); 00761 } 00762 00763 mmqtt_status_t 00764 mmqtt_s_decode_connack_header(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, 00765 struct mmqtt_p_connack_header *header) 00766 { 00767 mmqtt_status_t status; 00768 00769 status = mmqtt_s_decode_buffer(conn, pusher, &header->reserved, 1, 1); 00770 if (status != MMQTT_STATUS_OK) { return status; } 00771 00772 return mmqtt_s_decode_buffer(conn, pusher, &header->return_code, 1, 1); 00773 } 00774 00775 /* Works for both subscribe and unsubscribe */ 00776 struct mmqtt_p_subscribe_decode_context { 00777 uint32_t length; 00778 uint8_t has_qos; 00779 }; 00780 00781 struct mmqtt_p_subscribe_payload_line { 00782 uint8_t *topic; 00783 uint16_t topic_length; 00784 uint16_t topic_max_length; 00785 uint8_t qos; 00786 }; 00787 00788 void 00789 mmqtt_s_subscribe_decode_context_init(struct mmqtt_p_subscribe_decode_context *context, 00790 uint32_t length) 00791 { 00792 context->length = length; 00793 context->has_qos = 1; 00794 } 00795 00796 void 00797 mmqtt_s_unsubscribe_decode_context_init(struct mmqtt_p_subscribe_decode_context *context, 00798 uint32_t length) 00799 { 00800 context->length = length; 00801 context->has_qos = 0; 00802 } 00803 00804 mmqtt_status_t 00805 mmqtt_s_decode_subscribe_payload(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, 00806 struct mmqtt_p_subscribe_decode_context *context, 00807 struct mmqtt_p_subscribe_payload_line *line) 00808 { 00809 mmqtt_status_t status; 00810 uint16_t string_true_length; 00811 00812 if (context->length <= 0) { return MMQTT_STATUS_DONE; } 00813 status = mmqtt_s_decode_string(conn, pusher, line->topic, line->topic_max_length, 00814 &line->topic_length, &string_true_length); 00815 if (status != MMQTT_STATUS_OK) { return status; } 00816 00817 context->length -= string_true_length; 00818 context->length -= 2; 00819 00820 if (context->has_qos != 0) { 00821 status = mmqtt_s_decode_buffer(conn, pusher, &line->qos, 1, 1); 00822 if (status != MMQTT_STATUS_OK) { return status; } 00823 context->length -= 1; 00824 } else { 00825 line->qos = 0; 00826 } 00827 return MMQTT_STATUS_OK; 00828 } 00829 00830 uint32_t 00831 mmqtt_s_string_encoded_length(uint16_t string_length) 00832 { 00833 return string_length + 2; 00834 } 00835 00836 uint32_t 00837 mmqtt_s_connect_header_encoded_length(const struct mmqtt_p_connect_header *header) 00838 { 00839 return mmqtt_s_string_encoded_length(header->name_length) + 4; 00840 } 00841 00842 #endif /* MMQTT_DISABLE_STATIC */ 00843 00844 #endif /* MMQTT_H_ */
Generated on Wed Jul 20 2022 02:51:36 by
![doxygen](doxygen.png)