minimal MQTT Library. This version simply doubled the MQTT QUEUE sizes

Dependents:   WNCInterface_M2XMQTTdemo

Fork of minimal-mqtt by Xuejie Xiao

Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers minimal-mqtt.h Source File

minimal-mqtt.h

00001 #ifndef MMQTT_H_
00002 #define MMQTT_H_
00003 
00004 #include <stdint.h>
00005 #ifndef min
00006 #define min(a, b) ((a) > (b) ? (b) : (a))
00007 #endif  /* min */
00008 
00009 #define MMQTT_STREAM_MAX_LENGTH 16
00010 #define MMQTT_QUEUE_MAX_LENGTH 4
00011 
00012 /* mmqtt_ssize_t must be able to hold MMQTT_STREAM_MAX_LENGTH as well as
00013  * MMQTT_QUEUE_MAX_LENGTH, we might add macro-based detection in the future.
00014  */
00015 typedef int8_t mmqtt_ssize_t;
00016 typedef int8_t mmqtt_status_t;
00017 
00018 #define MMQTT_STATUS_OK 0
00019 #define MMQTT_STATUS_UNKNOWN -1
00020 #define MMQTT_STATUS_DONE -2
00021 #define MMQTT_STATUS_NOT_PULLABLE -3
00022 #define MMQTT_STATUS_NOT_PUSHABLE -4
00023 #define MMQTT_STATUS_NOT_EXPECTED -5
00024 #define MMQTT_STATUS_BROKEN_CONNECTION -6
00025 #define MMQTT_STATUS_INVALID_STATE -7
00026 
00027 #define MMQTT_MESSAGE_TYPE_CONNECT 0x1
00028 #define MMQTT_MESSAGE_TYPE_CONNACK 0x2
00029 #define MMQTT_MESSAGE_TYPE_PUBLISH 0x3
00030 #define MMQTT_MESSAGE_TYPE_PUBACK 0x4
00031 #define MMQTT_MESSAGE_TYPE_SUBSCRIBE 0x8
00032 #define MMQTT_MESSAGE_TYPE_SUBACK 0x9
00033 #define MMQTT_MESSAGE_TYPE_UNSUBSCRIBE 0x10
00034 #define MMQTT_MESSAGE_TYPE_UNSUBACK 0x11
00035 #define MMQTT_MESSAGE_TYPE_PINGREQ 0x12
00036 #define MMQTT_MESSAGE_TYPE_PINGRESP 0x13
00037 #define MMQTT_MESSAGE_TYPE_DISCONNECT 0x14
00038 
00039 #define MMQTT_PACK_MESSAGE_TYPE(type_) ((type_) << 4)
00040 #define MMQTT_UNPACK_MESSAGE_TYPE(type_) (((type_) >> 4) & 0xF)
00041 
00042 struct mmqtt_ring_buffer_ {
00043   mmqtt_ssize_t start;
00044   mmqtt_ssize_t length;
00045   mmqtt_ssize_t capacity;
00046 };
00047 
00048 struct mmqtt_stream {
00049   struct mmqtt_ring_buffer_ ring_buffer;
00050   uint8_t data[MMQTT_STREAM_MAX_LENGTH];
00051   size_t left;
00052 };
00053 
00054 struct mmqtt_queue_ {
00055   struct mmqtt_ring_buffer_ ring_buffer;
00056   struct mmqtt_stream* streams[MMQTT_QUEUE_MAX_LENGTH];
00057 };
00058 
00059 #define MMQTT_QUEUE_LENGTH(queue) ((queue)->ring_buffer.length)
00060 #define MMQTT_QUEUE_INDEX(queue, i) ((queue)->streams[((queue)->ring_buffer.start + (i)) % ((queue)->ring_buffer.capacity)])
00061 
00062 struct mmqtt_connection {
00063   void *connection;
00064   struct mmqtt_queue_ read_queue;
00065   struct mmqtt_queue_ write_queue;
00066 };
00067 
00068 static inline void
00069 mmqtt_ring_buffer_init_(struct mmqtt_ring_buffer_ *buffer, mmqtt_ssize_t capacity)
00070 {
00071   buffer->start = 0;
00072   buffer->length = 0;
00073   buffer->capacity = capacity;
00074 }
00075 
00076 static inline mmqtt_ssize_t
00077 mmqtt_ring_buffer_pullable_(const struct mmqtt_ring_buffer_ *buffer)
00078 {
00079   return min(buffer->start + buffer->length, buffer->capacity) - buffer->start;
00080 }
00081 
00082 static inline void
00083 mmqtt_ring_buffer_pull_(struct mmqtt_ring_buffer_ *buffer, mmqtt_ssize_t length)
00084 {
00085   buffer->start = (buffer->start + length) % buffer->capacity;
00086   buffer->length -= length;
00087 }
00088 
00089 static inline mmqtt_ssize_t
00090 mmqtt_ring_buffer_pushable_(const struct mmqtt_ring_buffer_ *buffer)
00091 {
00092     if (buffer->start + buffer->length > buffer->capacity) {
00093         return buffer->capacity - buffer->length;
00094     }
00095     return buffer->capacity - buffer->length - buffer->start;
00096 }
00097 
00098 static inline void
00099 mmqtt_ring_buffer_push_(struct mmqtt_ring_buffer_ *buffer, mmqtt_ssize_t length)
00100 {
00101   buffer->length += length;
00102 }
00103 
00104 static void
00105 mmqtt_stream_init(struct mmqtt_stream *stream, size_t total_size)
00106 {
00107   mmqtt_ring_buffer_init_(&stream->ring_buffer, MMQTT_STREAM_MAX_LENGTH);
00108   stream->left = total_size;
00109 }
00110 
00111 static mmqtt_status_t
00112 mmqtt_stream_running(struct mmqtt_stream *stream)
00113 {
00114   if (stream->left > 0 || stream->ring_buffer.length > 0) {
00115     return MMQTT_STATUS_OK;
00116   } else {
00117     return MMQTT_STATUS_DONE;
00118   }
00119 }
00120 
00121 static mmqtt_ssize_t
00122 mmqtt_stream_pull(struct mmqtt_stream *stream, uint8_t* out, mmqtt_ssize_t max_length)
00123 {
00124   mmqtt_ssize_t pullable, i;
00125   pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer);
00126   if (pullable <= 0) {
00127     if (stream->left == 0) {
00128       return MMQTT_STATUS_DONE;
00129     } else {
00130       return MMQTT_STATUS_NOT_PULLABLE;
00131     }
00132   }
00133   pullable = min(pullable, max_length);
00134 
00135   if (out) {
00136     for (i = 0; i < pullable; i++) {
00137       out[i] = stream->data[stream->ring_buffer.start + i];
00138     }
00139   }
00140   mmqtt_ring_buffer_pull_(&stream->ring_buffer, pullable);
00141   return pullable;
00142 }
00143 
00144 static mmqtt_status_t
00145 mmqtt_stream_external_pullable(struct mmqtt_stream *stream,
00146                                const uint8_t** data, mmqtt_ssize_t* max_length)
00147 {
00148   mmqtt_ssize_t pullable;
00149   pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer);
00150   if (pullable <= 0) {
00151     if (stream->left == 0) {
00152       return MMQTT_STATUS_DONE;
00153     } else {
00154       return MMQTT_STATUS_NOT_PULLABLE;
00155     }
00156   }
00157   pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer);
00158   if (pullable <= 0) { return MMQTT_STATUS_NOT_PULLABLE; }
00159   *data = stream->data + stream->ring_buffer.start;
00160   *max_length = pullable;
00161   return MMQTT_STATUS_OK;
00162 }
00163 
00164 static mmqtt_status_t
00165 mmqtt_stream_external_pull(struct mmqtt_stream *stream, mmqtt_ssize_t length)
00166 {
00167   mmqtt_ssize_t pullable;
00168   pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer);
00169   if (pullable < length) {
00170     return MMQTT_STATUS_NOT_PULLABLE;
00171   } else {
00172     mmqtt_ring_buffer_pull_(&stream->ring_buffer, length);
00173     return MMQTT_STATUS_OK;
00174   }
00175 }
00176 
00177 static mmqtt_ssize_t
00178 mmqtt_stream_push(struct mmqtt_stream *stream, const uint8_t* in, mmqtt_ssize_t length)
00179 {
00180   mmqtt_ssize_t pushable, i;
00181   if (stream->left == 0) { return MMQTT_STATUS_DONE; }
00182   pushable = mmqtt_ring_buffer_pushable_(&stream->ring_buffer);
00183   if (pushable <= 0) { return MMQTT_STATUS_NOT_PUSHABLE; }
00184   pushable = min(stream->left, min(pushable, length));
00185 
00186   for (i = 0; i < pushable; i++) {
00187     stream->data[stream->ring_buffer.start + i] = in[i];
00188   }
00189   mmqtt_ring_buffer_push_(&stream->ring_buffer, pushable);
00190   if (stream->left > 0) { stream->left -= pushable; }
00191   return pushable;
00192 }
00193 
00194 static mmqtt_status_t
00195 mmqtt_stream_external_pushable(struct mmqtt_stream *stream,
00196                                uint8_t** data, mmqtt_ssize_t* max_length)
00197 {
00198   mmqtt_ssize_t pushable;
00199   if (stream->left == 0) { return MMQTT_STATUS_DONE; }
00200   pushable = mmqtt_ring_buffer_pushable_(&stream->ring_buffer);
00201   if (pushable <= 0) { return MMQTT_STATUS_NOT_PUSHABLE; }
00202   pushable = min(pushable, stream->left);
00203   *data = stream->data + stream->ring_buffer.start;
00204   *max_length = pushable;
00205   return MMQTT_STATUS_OK;
00206 }
00207 
00208 static mmqtt_status_t
00209 mmqtt_stream_external_push(struct mmqtt_stream *stream, mmqtt_ssize_t length)
00210 {
00211   mmqtt_ssize_t pushable;
00212   if (stream->left == 0) { return MMQTT_STATUS_DONE; }
00213   pushable = mmqtt_ring_buffer_pushable_(&stream->ring_buffer);
00214   if (pushable <= 0) { return MMQTT_STATUS_NOT_PUSHABLE; }
00215   pushable = min(pushable, stream->left);
00216   if (pushable < length) {
00217     return MMQTT_STATUS_NOT_PUSHABLE;
00218   } else {
00219     mmqtt_ring_buffer_push_(&stream->ring_buffer, pushable);
00220     return MMQTT_STATUS_OK;
00221   }
00222 }
00223 
00224 static void
00225 mmqtt_queue_init_(struct mmqtt_queue_ *queue)
00226 {
00227   mmqtt_ring_buffer_init_(&queue->ring_buffer, MMQTT_QUEUE_MAX_LENGTH);
00228 }
00229 
00230 static mmqtt_status_t
00231 mmqtt_queue_add_(struct mmqtt_queue_ *queue, struct mmqtt_stream *stream)
00232 {
00233   if (mmqtt_ring_buffer_pushable_(&queue->ring_buffer) <= 0) {
00234     return MMQTT_STATUS_NOT_PUSHABLE;
00235   }
00236   queue->streams[queue->ring_buffer.start] = stream;
00237   mmqtt_ring_buffer_push_(&queue->ring_buffer, 1);
00238   return MMQTT_STATUS_OK;
00239 }
00240 
00241 #define MMQTT_QUEUE_AVAILABLE(queue) ((queue)->ring_buffer.length > 0)
00242 #define MMQTT_QUEUE_FIRST(queue) ((queue)->streams[(queue)->ring_buffer.start])
00243 
00244 static void
00245 mmqtt_connection_init(struct mmqtt_connection *connection, void *conn)
00246 {
00247   connection->connection = conn;
00248   mmqtt_queue_init_(&connection->read_queue);
00249   mmqtt_queue_init_(&connection->write_queue);
00250 }
00251 
00252 /*
00253  * Pulls data out of connection's write queue, the returned data is supposed
00254  * to be sent to a TCP socket to remote servers.
00255  */
00256 static mmqtt_ssize_t
00257 mmqtt_connection_pull(struct mmqtt_connection *connection,
00258                       uint8_t *out, mmqtt_ssize_t max_length)
00259 {
00260   mmqtt_ssize_t length, current;
00261   struct mmqtt_queue_ *queue;
00262   length = 0;
00263   queue = &connection->write_queue;
00264   while(MMQTT_QUEUE_AVAILABLE(queue) && length < max_length) {
00265     current = mmqtt_stream_pull(MMQTT_QUEUE_FIRST(queue),
00266                                 out + length,
00267                                 max_length - length);
00268     /* TODO: in case current is 0, should we return non-pullable? */
00269     if (current >= 0) {
00270       length += current;
00271     } else if (current == MMQTT_STATUS_DONE) {
00272       /* Release stream from write queue since it is done */
00273       mmqtt_ring_buffer_pull_(&queue->ring_buffer, 1);
00274     } else if (current == MMQTT_STATUS_NOT_PULLABLE) {
00275       return (length > 0) ? length : MMQTT_STATUS_NOT_PULLABLE;
00276     } else {
00277       return MMQTT_STATUS_UNKNOWN;
00278     }
00279   }
00280   return (length > 0) ? length : MMQTT_STATUS_NOT_PULLABLE;
00281 }
00282 
00283 struct mmqtt_stream *
00284 mmqtt_connection_pullable_stream(struct mmqtt_connection *connection)
00285 {
00286   mmqtt_ssize_t i;
00287   struct mmqtt_queue_ *queue = &connection->write_queue;
00288   struct mmqtt_stream *stream = NULL;
00289   for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) {
00290     stream = MMQTT_QUEUE_INDEX(queue, i);
00291     if (mmqtt_stream_running(stream) == MMQTT_STATUS_OK) { return stream; }
00292   }
00293   return NULL;
00294 }
00295 
00296 /*
00297  * Pushes data to connection's read queue, the data should come from
00298  * a TCP socket to a remote machine.
00299  */
00300 static mmqtt_ssize_t
00301 mmqtt_connection_push(struct mmqtt_connection *connection,
00302                       const uint8_t* in, mmqtt_ssize_t length)
00303 {
00304   mmqtt_ssize_t i, current, consumed_length;
00305   struct mmqtt_queue_ *queue;
00306   consumed_length = 0;
00307   queue = &connection->read_queue;
00308   while (i < MMQTT_QUEUE_LENGTH(queue) && consumed_length < length) {
00309     current = mmqtt_stream_push(MMQTT_QUEUE_INDEX(queue, i),
00310                                 in + consumed_length,
00311                                 length - consumed_length);
00312     if (current >= 0) {
00313       consumed_length += current;
00314     } else if (current == MMQTT_STATUS_DONE) {
00315       i++;
00316     } else if (current == MMQTT_STATUS_NOT_PUSHABLE) {
00317       return (consumed_length > 0) ? consumed_length : MMQTT_STATUS_NOT_PUSHABLE;
00318     } else {
00319       return MMQTT_STATUS_UNKNOWN;
00320     }
00321   }
00322   return (consumed_length > 0) ? consumed_length : MMQTT_STATUS_NOT_PUSHABLE;
00323 }
00324 
00325 struct mmqtt_stream *
00326 mmqtt_connection_pushable_stream(struct mmqtt_connection *connection)
00327 {
00328   mmqtt_ssize_t i;
00329   struct mmqtt_queue_ *queue = &connection->read_queue;
00330   struct mmqtt_stream *stream = NULL;
00331   for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) {
00332     stream = MMQTT_QUEUE_INDEX(queue, i);
00333     if (mmqtt_stream_running(stream) == MMQTT_STATUS_OK) { return stream; }
00334   }
00335   return NULL;
00336 }
00337 
00338 static mmqtt_status_t
00339 mmqtt_connection_add_read_stream(struct mmqtt_connection *connection,
00340                                  struct mmqtt_stream *stream)
00341 {
00342   return mmqtt_queue_add_(&connection->read_queue, stream);
00343 }
00344 
00345 static mmqtt_status_t
00346 mmqtt_connection_add_write_stream(struct mmqtt_connection *connection,
00347                                   struct mmqtt_stream *stream)
00348 {
00349   return mmqtt_queue_add_(&connection->write_queue, stream);
00350 }
00351 
00352 static mmqtt_status_t
00353 mmqtt_connection_in_read_stream(struct mmqtt_connection *connection,
00354                                 struct mmqtt_stream *stream)
00355 {
00356   mmqtt_ssize_t i;
00357   struct mmqtt_queue_ *queue = &connection->read_queue;
00358   for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) {
00359     if (MMQTT_QUEUE_INDEX(queue, i) == stream) { return MMQTT_STATUS_OK; }
00360   }
00361   return MMQTT_STATUS_NOT_EXPECTED;
00362 }
00363 
00364 static mmqtt_status_t
00365 mmqtt_connection_in_write_stream(struct mmqtt_connection *connection,
00366                                  struct mmqtt_stream *stream)
00367 {
00368   mmqtt_ssize_t i;
00369   struct mmqtt_queue_ *queue = &connection->write_queue;
00370   for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) {
00371     if (MMQTT_QUEUE_INDEX(queue, i) == stream) { return MMQTT_STATUS_OK; }
00372   }
00373   return MMQTT_STATUS_NOT_EXPECTED;
00374 }
00375 
00376 static mmqtt_ssize_t
00377 mmqtt_connection_read_stream_length(struct mmqtt_connection *connection)
00378 {
00379   return connection->read_queue.ring_buffer.length;
00380 }
00381 
00382 static mmqtt_ssize_t
00383 mmqtt_connection_write_stream_length(struct mmqtt_connection *connection)
00384 {
00385   return connection->write_queue.ring_buffer.length;
00386 }
00387 
00388 static mmqtt_status_t
00389 mmqtt_connection_release_write_stream(struct mmqtt_connection *connection,
00390                                       struct mmqtt_stream *stream)
00391 {
00392   if (!MMQTT_QUEUE_AVAILABLE(&connection->write_queue)) {
00393     return MMQTT_STATUS_NOT_PULLABLE;
00394   }
00395   if (stream != MMQTT_QUEUE_FIRST(&connection->write_queue)) {
00396     return MMQTT_STATUS_NOT_EXPECTED;
00397   }
00398   mmqtt_ring_buffer_pull_(&connection->write_queue.ring_buffer, 1);
00399   return MMQTT_STATUS_OK;
00400 }
00401 
00402 static mmqtt_status_t
00403 mmqtt_connection_release_read_stream(struct mmqtt_connection *connection,
00404                                      struct mmqtt_stream *stream)
00405 {
00406   if (!MMQTT_QUEUE_AVAILABLE(&connection->read_queue)) {
00407     return MMQTT_STATUS_NOT_PULLABLE;
00408   }
00409   if (stream != MMQTT_QUEUE_FIRST(&connection->read_queue)) {
00410     return MMQTT_STATUS_NOT_EXPECTED;
00411   }
00412   mmqtt_ring_buffer_pull_(&connection->read_queue.ring_buffer, 1);
00413   return MMQTT_STATUS_OK;
00414 }
00415 
00416 static mmqtt_status_t
00417 mmqtt_connection_autorelease_write_streams(struct mmqtt_connection *connection)
00418 {
00419   struct mmqtt_stream *stream;
00420   while (MMQTT_QUEUE_AVAILABLE(&connection->write_queue)) {
00421     stream = MMQTT_QUEUE_FIRST(&connection->write_queue);
00422     if (mmqtt_stream_running(stream) == MMQTT_STATUS_DONE) {
00423       mmqtt_connection_release_write_stream(connection, stream);
00424     } else {
00425       break;
00426     }
00427   }
00428   return MMQTT_STATUS_OK;
00429 }
00430 
00431 static mmqtt_status_t
00432 mmqtt_connection_autorelease_read_streams(struct mmqtt_connection *connection)
00433 {
00434   struct mmqtt_stream *stream;
00435   while (MMQTT_QUEUE_AVAILABLE(&connection->read_queue)) {
00436     stream = MMQTT_QUEUE_FIRST(&connection->read_queue);
00437     if (mmqtt_stream_running(stream) == MMQTT_STATUS_DONE) {
00438       mmqtt_connection_release_read_stream(connection, stream);
00439     } else {
00440       break;
00441     }
00442   }
00443   return MMQTT_STATUS_OK;
00444 }
00445 
00446 #ifndef MMQTT_DISABLE_STATIC
00447 
00448 typedef mmqtt_status_t (*mmqtt_s_puller)(struct mmqtt_connection*);
00449 typedef mmqtt_status_t (*mmqtt_s_pusher)(struct mmqtt_connection*, mmqtt_ssize_t);
00450 
00451 mmqtt_status_t
00452 mmqtt_s_encode_buffer(struct mmqtt_connection *conn, mmqtt_s_puller puller,
00453                       const uint8_t *buffer, uint16_t length)
00454 {
00455   struct mmqtt_stream stream;
00456   mmqtt_status_t status;
00457   mmqtt_ssize_t current;
00458   uint16_t consumed_length;
00459 
00460   mmqtt_stream_init(&stream, length);
00461   while ((status = mmqtt_connection_add_write_stream(conn, &stream)) == MMQTT_STATUS_NOT_PUSHABLE) {
00462     status = puller(conn);
00463     if (status != MMQTT_STATUS_OK) { return status; }
00464   }
00465   if (status != MMQTT_STATUS_OK) { return status; }
00466 
00467   consumed_length = 0;
00468   while (consumed_length < length) {
00469     current = mmqtt_stream_push(&stream, buffer + consumed_length,
00470                                 length - consumed_length);
00471     if (current >= 0) {
00472       consumed_length += current;
00473     } else if (current == MMQTT_STATUS_NOT_PUSHABLE) {
00474       status = puller(conn);
00475       if (status != MMQTT_STATUS_OK) { return status; }
00476     }
00477   }
00478   while (mmqtt_connection_in_write_stream(conn, &stream) == MMQTT_STATUS_OK) {
00479     status = puller(conn);
00480     if (status != MMQTT_STATUS_OK) { return status; }
00481   }
00482   return MMQTT_STATUS_OK;
00483 }
00484 
00485 mmqtt_status_t
00486 mmqtt_s_decode_buffer(struct mmqtt_connection *conn, mmqtt_s_pusher pusher,
00487                       uint8_t *buffer, uint16_t buffer_length,
00488                       uint16_t consume_length)
00489 {
00490   struct mmqtt_stream stream;
00491   mmqtt_status_t status;
00492   mmqtt_ssize_t current;
00493   uint16_t length;
00494   uint8_t temp;
00495 
00496   if (mmqtt_connection_read_stream_length(conn) > 0) {
00497     return MMQTT_STATUS_INVALID_STATE;
00498   }
00499 
00500   if (consume_length < buffer_length) { consume_length = buffer_length; }
00501   mmqtt_stream_init(&stream, consume_length);
00502   /* Connection won't remove read stream automatically, pusher won't help here */
00503   status = mmqtt_connection_add_read_stream(conn, &stream);
00504   if (status != MMQTT_STATUS_OK) { return status; }
00505 
00506   length = 0;
00507   while (length < buffer_length) {
00508     current = mmqtt_stream_pull(&stream, buffer + length,
00509                                 buffer_length - length);
00510     if (current >= 0) {
00511       length += current;
00512     } else if (current == MMQTT_STATUS_NOT_PULLABLE) {
00513       status = pusher(conn, buffer_length - length);
00514       if (status != MMQTT_STATUS_OK) {
00515         mmqtt_connection_release_read_stream(conn, &stream);
00516         return status;
00517       }
00518     }
00519   }
00520   while (length < consume_length) {
00521     current = mmqtt_stream_pull(&stream, &temp, 1);
00522     if (current >= 0) {
00523       length += current;
00524     } else if (current == MMQTT_STATUS_NOT_PULLABLE) {
00525       status = pusher(conn, consume_length - length);
00526       if (status != MMQTT_STATUS_OK) {
00527         mmqtt_connection_release_read_stream(conn, &stream);
00528         return status;
00529       }
00530     }
00531   }
00532   mmqtt_connection_release_read_stream(conn, &stream);
00533   return MMQTT_STATUS_OK;
00534 }
00535 
00536 /* NOTE: for smaller buffer, we can also use mmqtt_s_decode_buffer with
00537  * an empty buffer and 0 as buffer length. Here this function is for the
00538  * case where we are skipping a whole packet altogether, which could be
00539  * larger than 65535 in theory.
00540  */
00541 mmqtt_status_t
00542 mmqtt_s_skip_buffer(struct mmqtt_connection *conn, mmqtt_s_pusher pusher,
00543                     uint32_t length)
00544 {
00545   mmqtt_status_t status;
00546   while (length > 65535) {
00547     status = mmqtt_s_decode_buffer(conn, pusher, NULL, 0, 65535);
00548     if (status != MMQTT_STATUS_OK) {
00549       return status;
00550     }
00551     length -= 65535;
00552   }
00553   if (length > 0) {
00554     return mmqtt_s_decode_buffer(conn, pusher, NULL, 0, (uint16_t) length);
00555   } else {
00556     return MMQTT_STATUS_OK;
00557   }
00558 }
00559 
00560 mmqtt_status_t
00561 mmqtt_s_encode_fixed_header(struct mmqtt_connection *conn, mmqtt_s_puller puller,
00562                             uint8_t flag, uint32_t length)
00563 {
00564   mmqtt_status_t status;
00565   mmqtt_ssize_t i;
00566   uint8_t c, buffer[4];
00567 
00568   buffer[0] = flag;
00569   status = mmqtt_s_encode_buffer(conn, puller, buffer, 1);
00570   if (status != MMQTT_STATUS_OK) { return status; }
00571 
00572   i = 0;
00573   do {
00574     c = (uint8_t) length % 128;
00575     length /= 128;
00576     if (length > 0) { c |= 0x80; }
00577     buffer[i++] = c;
00578   } while (length > 0);
00579 
00580   return mmqtt_s_encode_buffer(conn, puller, buffer, i);
00581 }
00582 
00583 mmqtt_status_t
00584 mmqtt_s_decode_fixed_header(struct mmqtt_connection *conn, mmqtt_s_pusher pusher,
00585                             uint8_t *flag, uint32_t *length)
00586 {
00587   mmqtt_status_t status;
00588   uint8_t c;
00589   uint32_t l, factor;
00590 
00591   status = mmqtt_s_decode_buffer(conn, pusher, &c, 1, 1);
00592   if (status != MMQTT_STATUS_OK) { return status; }
00593   if (flag) { *flag = c; }
00594 
00595   l = 0;
00596   factor = 1;
00597   do {
00598     status = mmqtt_s_decode_buffer(conn, pusher, &c, 1, 1);
00599     if (status != MMQTT_STATUS_OK) { return status; }
00600     l += (c & 0x7F) * factor;
00601     factor *= 128;
00602   } while ((c & 0x80) != 0);
00603   if (length) { *length = l; }
00604 
00605   return MMQTT_STATUS_OK;
00606 }
00607 
00608 void mmqtt_s_pack_uint16_(uint16_t length, uint8_t *buffer)
00609 {
00610   buffer[0] = (length >> 8) & 0xFF;
00611   buffer[1] = length & 0xFF;
00612 }
00613 
00614 uint16_t mmqtt_s_unpack_uint16_(uint8_t *buffer)
00615 {
00616   return ((uint16_t) (buffer[0] << 8)) | ((uint16_t) buffer[1]);
00617 }
00618 
00619 /* We don't case uint16_t* directly into uint8_t* because there might be
00620  * endianness differences
00621  */
00622 mmqtt_status_t
00623 mmqtt_s_encode_uint16(struct mmqtt_connection *conn, mmqtt_s_puller puller,
00624                       uint16_t val)
00625 {
00626   uint8_t buffer[2];
00627 
00628   mmqtt_s_pack_uint16_(val, buffer);
00629   return mmqtt_s_encode_buffer(conn, puller, buffer, 2);
00630 }
00631 
00632 mmqtt_status_t
00633 mmqtt_s_decode_uint16(struct mmqtt_connection *conn, mmqtt_s_pusher pusher,
00634                       uint16_t *out_val)
00635 {
00636   mmqtt_status_t status;
00637   uint8_t buffer[2];
00638 
00639   status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2);
00640   if (status != MMQTT_STATUS_OK) { return status; }
00641 
00642   if (out_val) { *out_val = mmqtt_s_unpack_uint16_(buffer); }
00643   return MMQTT_STATUS_OK;
00644 }
00645 
00646 mmqtt_status_t
00647 mmqtt_s_encode_string(struct mmqtt_connection *conn, mmqtt_s_puller puller,
00648                       const uint8_t *str, uint16_t length)
00649 {
00650   mmqtt_status_t status;
00651   uint8_t buffer[2];
00652 
00653   mmqtt_s_pack_uint16_(length, buffer);
00654   status = mmqtt_s_encode_buffer(conn, puller, buffer, 2);
00655   if (status != MMQTT_STATUS_OK) { return status; }
00656 
00657   return mmqtt_s_encode_buffer(conn, puller, str, length);
00658 }
00659 
00660 mmqtt_status_t
00661 mmqtt_s_decode_string(struct mmqtt_connection *conn, mmqtt_s_pusher pusher,
00662                       uint8_t *str, uint16_t max_length,
00663                       uint16_t *out_length, uint16_t *out_true_length) {
00664   mmqtt_status_t status;
00665   uint8_t buffer[2];
00666   uint16_t true_length, length;
00667 
00668   status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2);
00669   if (status != MMQTT_STATUS_OK) { return status; }
00670   true_length = mmqtt_s_unpack_uint16_(buffer);
00671 
00672   length = min(true_length, max_length);
00673   status = mmqtt_s_decode_buffer(conn, pusher, str, length, true_length);
00674   if (status != MMQTT_STATUS_OK) { return status; }
00675 
00676   if (out_length) { *out_length = length; }
00677   if (out_true_length) { *out_true_length = true_length; }
00678   return MMQTT_STATUS_OK;
00679 }
00680 
00681 /* p stands for packet here */
00682 struct mmqtt_p_connect_header {
00683   uint8_t *name;
00684   uint16_t name_length;
00685   uint16_t name_max_length;
00686   uint8_t protocol_version;
00687   uint8_t flags;
00688   uint16_t keepalive;
00689 };
00690 
00691 mmqtt_status_t
00692 mmqtt_s_encode_connect_header(struct mmqtt_connection *conn, mmqtt_s_puller puller,
00693                               const struct mmqtt_p_connect_header *header)
00694 {
00695   mmqtt_status_t status;
00696   uint8_t buffer[2];
00697 
00698   mmqtt_s_pack_uint16_(header->name_length, buffer);
00699   status = mmqtt_s_encode_buffer(conn, puller, buffer, 2);
00700   if (status != MMQTT_STATUS_OK) { return status; }
00701 
00702   status = mmqtt_s_encode_buffer(conn, puller, header->name, header->name_length);
00703   if (status != MMQTT_STATUS_OK) { return status; }
00704 
00705   status = mmqtt_s_encode_buffer(conn, puller, &header->protocol_version, 1);
00706   if (status != MMQTT_STATUS_OK) { return status; }
00707 
00708   status = mmqtt_s_encode_buffer(conn, puller, &header->flags, 1);
00709   if (status != MMQTT_STATUS_OK) { return status; }
00710 
00711   mmqtt_s_pack_uint16_(header->keepalive, buffer);
00712   return mmqtt_s_encode_buffer(conn, puller, buffer, 2);
00713 }
00714 
00715 /* We would rewrite name_length in header to contain returned name length */
00716 mmqtt_status_t
00717 mmqtt_s_decode_connect_header(struct mmqtt_connection *conn, mmqtt_s_pusher pusher,
00718                               struct mmqtt_p_connect_header *header)
00719 {
00720   mmqtt_status_t status;
00721   uint8_t buffer[2];
00722   uint16_t length;
00723 
00724   status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2);
00725   if (status != MMQTT_STATUS_OK) { return status; }
00726   length = mmqtt_s_unpack_uint16_(buffer);
00727 
00728   header->name_length = min(header->name_max_length, length);
00729   status = mmqtt_s_decode_buffer(conn, pusher, header->name,
00730                                  header->name_length, length);
00731   if (status != MMQTT_STATUS_OK) { return status; }
00732 
00733   status = mmqtt_s_decode_buffer(conn, pusher, &header->protocol_version, 1, 1);
00734   if (status != MMQTT_STATUS_OK) { return status; }
00735 
00736   status = mmqtt_s_decode_buffer(conn, pusher, &header->flags, 1, 1);
00737   if (status != MMQTT_STATUS_OK) { return status; }
00738 
00739   status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2);
00740   if (status != MMQTT_STATUS_OK) { return status; }
00741   header->keepalive = mmqtt_s_unpack_uint16_(buffer);
00742 
00743   return MMQTT_STATUS_OK;
00744 }
00745 
00746 struct mmqtt_p_connack_header {
00747   uint8_t reserved;
00748   uint8_t return_code;
00749 };
00750 
00751 mmqtt_status_t
00752 mmqtt_s_encode_connack_header(struct mmqtt_connection *conn, mmqtt_s_puller puller,
00753                               const struct mmqtt_p_connack_header *header)
00754 {
00755   mmqtt_status_t status;
00756 
00757   status = mmqtt_s_encode_buffer(conn, puller, &header->reserved, 1);
00758   if (status != MMQTT_STATUS_OK) { return status; }
00759 
00760   return mmqtt_s_encode_buffer(conn, puller, &header->return_code, 1);
00761 }
00762 
00763 mmqtt_status_t
00764 mmqtt_s_decode_connack_header(struct mmqtt_connection *conn, mmqtt_s_pusher pusher,
00765                               struct mmqtt_p_connack_header *header)
00766 {
00767   mmqtt_status_t status;
00768 
00769   status = mmqtt_s_decode_buffer(conn, pusher, &header->reserved, 1, 1);
00770   if (status != MMQTT_STATUS_OK) { return status; }
00771 
00772   return mmqtt_s_decode_buffer(conn, pusher, &header->return_code, 1, 1);
00773 }
00774 
00775 /* Works for both subscribe and unsubscribe */
00776 struct mmqtt_p_subscribe_decode_context {
00777   uint32_t length;
00778   uint8_t has_qos;
00779 };
00780 
00781 struct mmqtt_p_subscribe_payload_line {
00782   uint8_t *topic;
00783   uint16_t topic_length;
00784   uint16_t topic_max_length;
00785   uint8_t qos;
00786 };
00787 
00788 void
00789 mmqtt_s_subscribe_decode_context_init(struct mmqtt_p_subscribe_decode_context *context,
00790                                       uint32_t length)
00791 {
00792   context->length = length;
00793   context->has_qos = 1;
00794 }
00795 
00796 void
00797 mmqtt_s_unsubscribe_decode_context_init(struct mmqtt_p_subscribe_decode_context *context,
00798                                         uint32_t length)
00799 {
00800   context->length = length;
00801   context->has_qos = 0;
00802 }
00803 
00804 mmqtt_status_t
00805 mmqtt_s_decode_subscribe_payload(struct mmqtt_connection *conn, mmqtt_s_pusher pusher,
00806                                       struct mmqtt_p_subscribe_decode_context *context,
00807                                       struct mmqtt_p_subscribe_payload_line *line)
00808 {
00809   mmqtt_status_t status;
00810   uint16_t string_true_length;
00811 
00812   if (context->length <= 0) { return MMQTT_STATUS_DONE; }
00813   status = mmqtt_s_decode_string(conn, pusher, line->topic, line->topic_max_length,
00814                                  &line->topic_length, &string_true_length);
00815   if (status != MMQTT_STATUS_OK) { return status; }
00816 
00817   context->length -= string_true_length;
00818   context->length -= 2;
00819 
00820   if (context->has_qos != 0) {
00821     status = mmqtt_s_decode_buffer(conn, pusher, &line->qos, 1, 1);
00822     if (status != MMQTT_STATUS_OK) { return status; }
00823     context->length -= 1;
00824   } else {
00825     line->qos = 0;
00826   }
00827   return MMQTT_STATUS_OK;
00828 }
00829 
00830 uint32_t
00831 mmqtt_s_string_encoded_length(uint16_t string_length)
00832 {
00833   return string_length + 2;
00834 }
00835 
00836 uint32_t
00837 mmqtt_s_connect_header_encoded_length(const struct mmqtt_p_connect_header *header)
00838 {
00839   return mmqtt_s_string_encoded_length(header->name_length) + 4;
00840 }
00841 
00842 #endif  /* MMQTT_DISABLE_STATIC */
00843 
00844 #endif  /* MMQTT_H_ */