minimal MQTT Library. This version simply doubled the MQTT QUEUE sizes
Dependents: WNCInterface_M2XMQTTdemo
Fork of minimal-mqtt by
minimal-mqtt.h@2:65f85aa601db, 2016-10-08 (annotated)
- Committer:
- JMF
- Date:
- Sat Oct 08 00:46:33 2016 +0000
- Revision:
- 2:65f85aa601db
- Parent:
- 1:311cd16389ff
doubled MMQTRT_STREAM_MAX_LENGTH and MMQTT_QUEUE_MAX_LENGTH to account for longer MQTT sizes.
Who changed what in which revision?
User | Revision | Line number | New contents of line |
---|---|---|---|
defmacro | 0:a268b80e3093 | 1 | #ifndef MMQTT_H_ |
defmacro | 0:a268b80e3093 | 2 | #define MMQTT_H_ |
defmacro | 0:a268b80e3093 | 3 | |
defmacro | 0:a268b80e3093 | 4 | #include <stdint.h> |
defmacro | 0:a268b80e3093 | 5 | #ifndef min |
defmacro | 0:a268b80e3093 | 6 | #define min(a, b) ((a) > (b) ? (b) : (a)) |
defmacro | 0:a268b80e3093 | 7 | #endif /* min */ |
defmacro | 0:a268b80e3093 | 8 | |
JMF | 2:65f85aa601db | 9 | #define MMQTT_STREAM_MAX_LENGTH 16 |
JMF | 2:65f85aa601db | 10 | #define MMQTT_QUEUE_MAX_LENGTH 4 |
defmacro | 0:a268b80e3093 | 11 | |
defmacro | 0:a268b80e3093 | 12 | /* mmqtt_ssize_t must be able to hold MMQTT_STREAM_MAX_LENGTH as well as |
defmacro | 0:a268b80e3093 | 13 | * MMQTT_QUEUE_MAX_LENGTH, we might add macro-based detection in the future. |
defmacro | 0:a268b80e3093 | 14 | */ |
defmacro | 0:a268b80e3093 | 15 | typedef int8_t mmqtt_ssize_t; |
defmacro | 0:a268b80e3093 | 16 | typedef int8_t mmqtt_status_t; |
defmacro | 0:a268b80e3093 | 17 | |
defmacro | 0:a268b80e3093 | 18 | #define MMQTT_STATUS_OK 0 |
defmacro | 0:a268b80e3093 | 19 | #define MMQTT_STATUS_UNKNOWN -1 |
defmacro | 0:a268b80e3093 | 20 | #define MMQTT_STATUS_DONE -2 |
defmacro | 0:a268b80e3093 | 21 | #define MMQTT_STATUS_NOT_PULLABLE -3 |
defmacro | 0:a268b80e3093 | 22 | #define MMQTT_STATUS_NOT_PUSHABLE -4 |
defmacro | 0:a268b80e3093 | 23 | #define MMQTT_STATUS_NOT_EXPECTED -5 |
defmacro | 0:a268b80e3093 | 24 | #define MMQTT_STATUS_BROKEN_CONNECTION -6 |
defmacro | 0:a268b80e3093 | 25 | #define MMQTT_STATUS_INVALID_STATE -7 |
defmacro | 0:a268b80e3093 | 26 | |
defmacro | 0:a268b80e3093 | 27 | #define MMQTT_MESSAGE_TYPE_CONNECT 0x1 |
defmacro | 0:a268b80e3093 | 28 | #define MMQTT_MESSAGE_TYPE_CONNACK 0x2 |
defmacro | 0:a268b80e3093 | 29 | #define MMQTT_MESSAGE_TYPE_PUBLISH 0x3 |
defmacro | 0:a268b80e3093 | 30 | #define MMQTT_MESSAGE_TYPE_PUBACK 0x4 |
defmacro | 0:a268b80e3093 | 31 | #define MMQTT_MESSAGE_TYPE_SUBSCRIBE 0x8 |
defmacro | 0:a268b80e3093 | 32 | #define MMQTT_MESSAGE_TYPE_SUBACK 0x9 |
defmacro | 0:a268b80e3093 | 33 | #define MMQTT_MESSAGE_TYPE_UNSUBSCRIBE 0x10 |
defmacro | 0:a268b80e3093 | 34 | #define MMQTT_MESSAGE_TYPE_UNSUBACK 0x11 |
defmacro | 0:a268b80e3093 | 35 | #define MMQTT_MESSAGE_TYPE_PINGREQ 0x12 |
defmacro | 0:a268b80e3093 | 36 | #define MMQTT_MESSAGE_TYPE_PINGRESP 0x13 |
defmacro | 0:a268b80e3093 | 37 | #define MMQTT_MESSAGE_TYPE_DISCONNECT 0x14 |
defmacro | 0:a268b80e3093 | 38 | |
defmacro | 0:a268b80e3093 | 39 | #define MMQTT_PACK_MESSAGE_TYPE(type_) ((type_) << 4) |
defmacro | 0:a268b80e3093 | 40 | #define MMQTT_UNPACK_MESSAGE_TYPE(type_) (((type_) >> 4) & 0xF) |
defmacro | 0:a268b80e3093 | 41 | |
defmacro | 0:a268b80e3093 | 42 | struct mmqtt_ring_buffer_ { |
defmacro | 0:a268b80e3093 | 43 | mmqtt_ssize_t start; |
defmacro | 0:a268b80e3093 | 44 | mmqtt_ssize_t length; |
defmacro | 0:a268b80e3093 | 45 | mmqtt_ssize_t capacity; |
defmacro | 0:a268b80e3093 | 46 | }; |
defmacro | 0:a268b80e3093 | 47 | |
defmacro | 0:a268b80e3093 | 48 | struct mmqtt_stream { |
defmacro | 0:a268b80e3093 | 49 | struct mmqtt_ring_buffer_ ring_buffer; |
defmacro | 0:a268b80e3093 | 50 | uint8_t data[MMQTT_STREAM_MAX_LENGTH]; |
defmacro | 0:a268b80e3093 | 51 | size_t left; |
defmacro | 0:a268b80e3093 | 52 | }; |
defmacro | 0:a268b80e3093 | 53 | |
defmacro | 0:a268b80e3093 | 54 | struct mmqtt_queue_ { |
defmacro | 0:a268b80e3093 | 55 | struct mmqtt_ring_buffer_ ring_buffer; |
defmacro | 0:a268b80e3093 | 56 | struct mmqtt_stream* streams[MMQTT_QUEUE_MAX_LENGTH]; |
defmacro | 0:a268b80e3093 | 57 | }; |
defmacro | 0:a268b80e3093 | 58 | |
defmacro | 0:a268b80e3093 | 59 | #define MMQTT_QUEUE_LENGTH(queue) ((queue)->ring_buffer.length) |
defmacro | 0:a268b80e3093 | 60 | #define MMQTT_QUEUE_INDEX(queue, i) ((queue)->streams[((queue)->ring_buffer.start + (i)) % ((queue)->ring_buffer.capacity)]) |
defmacro | 0:a268b80e3093 | 61 | |
defmacro | 0:a268b80e3093 | 62 | struct mmqtt_connection { |
defmacro | 0:a268b80e3093 | 63 | void *connection; |
defmacro | 0:a268b80e3093 | 64 | struct mmqtt_queue_ read_queue; |
defmacro | 0:a268b80e3093 | 65 | struct mmqtt_queue_ write_queue; |
defmacro | 0:a268b80e3093 | 66 | }; |
defmacro | 0:a268b80e3093 | 67 | |
defmacro | 0:a268b80e3093 | 68 | static inline void |
defmacro | 0:a268b80e3093 | 69 | mmqtt_ring_buffer_init_(struct mmqtt_ring_buffer_ *buffer, mmqtt_ssize_t capacity) |
defmacro | 0:a268b80e3093 | 70 | { |
defmacro | 0:a268b80e3093 | 71 | buffer->start = 0; |
defmacro | 0:a268b80e3093 | 72 | buffer->length = 0; |
defmacro | 0:a268b80e3093 | 73 | buffer->capacity = capacity; |
defmacro | 0:a268b80e3093 | 74 | } |
defmacro | 0:a268b80e3093 | 75 | |
defmacro | 0:a268b80e3093 | 76 | static inline mmqtt_ssize_t |
defmacro | 0:a268b80e3093 | 77 | mmqtt_ring_buffer_pullable_(const struct mmqtt_ring_buffer_ *buffer) |
defmacro | 0:a268b80e3093 | 78 | { |
defmacro | 0:a268b80e3093 | 79 | return min(buffer->start + buffer->length, buffer->capacity) - buffer->start; |
defmacro | 0:a268b80e3093 | 80 | } |
defmacro | 0:a268b80e3093 | 81 | |
defmacro | 0:a268b80e3093 | 82 | static inline void |
defmacro | 0:a268b80e3093 | 83 | mmqtt_ring_buffer_pull_(struct mmqtt_ring_buffer_ *buffer, mmqtt_ssize_t length) |
defmacro | 0:a268b80e3093 | 84 | { |
defmacro | 0:a268b80e3093 | 85 | buffer->start = (buffer->start + length) % buffer->capacity; |
defmacro | 0:a268b80e3093 | 86 | buffer->length -= length; |
defmacro | 0:a268b80e3093 | 87 | } |
defmacro | 0:a268b80e3093 | 88 | |
defmacro | 0:a268b80e3093 | 89 | static inline mmqtt_ssize_t |
defmacro | 0:a268b80e3093 | 90 | mmqtt_ring_buffer_pushable_(const struct mmqtt_ring_buffer_ *buffer) |
defmacro | 0:a268b80e3093 | 91 | { |
defmacro | 0:a268b80e3093 | 92 | if (buffer->start + buffer->length > buffer->capacity) { |
defmacro | 0:a268b80e3093 | 93 | return buffer->capacity - buffer->length; |
defmacro | 0:a268b80e3093 | 94 | } |
defmacro | 0:a268b80e3093 | 95 | return buffer->capacity - buffer->length - buffer->start; |
defmacro | 0:a268b80e3093 | 96 | } |
defmacro | 0:a268b80e3093 | 97 | |
defmacro | 0:a268b80e3093 | 98 | static inline void |
defmacro | 0:a268b80e3093 | 99 | mmqtt_ring_buffer_push_(struct mmqtt_ring_buffer_ *buffer, mmqtt_ssize_t length) |
defmacro | 0:a268b80e3093 | 100 | { |
defmacro | 0:a268b80e3093 | 101 | buffer->length += length; |
defmacro | 0:a268b80e3093 | 102 | } |
defmacro | 0:a268b80e3093 | 103 | |
defmacro | 0:a268b80e3093 | 104 | static void |
defmacro | 0:a268b80e3093 | 105 | mmqtt_stream_init(struct mmqtt_stream *stream, size_t total_size) |
defmacro | 0:a268b80e3093 | 106 | { |
defmacro | 0:a268b80e3093 | 107 | mmqtt_ring_buffer_init_(&stream->ring_buffer, MMQTT_STREAM_MAX_LENGTH); |
defmacro | 0:a268b80e3093 | 108 | stream->left = total_size; |
defmacro | 0:a268b80e3093 | 109 | } |
defmacro | 0:a268b80e3093 | 110 | |
defmacro | 0:a268b80e3093 | 111 | static mmqtt_status_t |
defmacro | 0:a268b80e3093 | 112 | mmqtt_stream_running(struct mmqtt_stream *stream) |
defmacro | 0:a268b80e3093 | 113 | { |
defmacro | 0:a268b80e3093 | 114 | if (stream->left > 0 || stream->ring_buffer.length > 0) { |
defmacro | 0:a268b80e3093 | 115 | return MMQTT_STATUS_OK; |
defmacro | 0:a268b80e3093 | 116 | } else { |
defmacro | 0:a268b80e3093 | 117 | return MMQTT_STATUS_DONE; |
defmacro | 0:a268b80e3093 | 118 | } |
defmacro | 0:a268b80e3093 | 119 | } |
defmacro | 0:a268b80e3093 | 120 | |
defmacro | 0:a268b80e3093 | 121 | static mmqtt_ssize_t |
defmacro | 0:a268b80e3093 | 122 | mmqtt_stream_pull(struct mmqtt_stream *stream, uint8_t* out, mmqtt_ssize_t max_length) |
defmacro | 0:a268b80e3093 | 123 | { |
defmacro | 0:a268b80e3093 | 124 | mmqtt_ssize_t pullable, i; |
defmacro | 0:a268b80e3093 | 125 | pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer); |
defmacro | 0:a268b80e3093 | 126 | if (pullable <= 0) { |
defmacro | 0:a268b80e3093 | 127 | if (stream->left == 0) { |
defmacro | 0:a268b80e3093 | 128 | return MMQTT_STATUS_DONE; |
defmacro | 0:a268b80e3093 | 129 | } else { |
defmacro | 0:a268b80e3093 | 130 | return MMQTT_STATUS_NOT_PULLABLE; |
defmacro | 0:a268b80e3093 | 131 | } |
defmacro | 0:a268b80e3093 | 132 | } |
defmacro | 0:a268b80e3093 | 133 | pullable = min(pullable, max_length); |
defmacro | 0:a268b80e3093 | 134 | |
defmacro | 0:a268b80e3093 | 135 | if (out) { |
defmacro | 0:a268b80e3093 | 136 | for (i = 0; i < pullable; i++) { |
defmacro | 0:a268b80e3093 | 137 | out[i] = stream->data[stream->ring_buffer.start + i]; |
defmacro | 0:a268b80e3093 | 138 | } |
defmacro | 0:a268b80e3093 | 139 | } |
defmacro | 0:a268b80e3093 | 140 | mmqtt_ring_buffer_pull_(&stream->ring_buffer, pullable); |
defmacro | 0:a268b80e3093 | 141 | return pullable; |
defmacro | 0:a268b80e3093 | 142 | } |
defmacro | 0:a268b80e3093 | 143 | |
defmacro | 0:a268b80e3093 | 144 | static mmqtt_status_t |
defmacro | 0:a268b80e3093 | 145 | mmqtt_stream_external_pullable(struct mmqtt_stream *stream, |
defmacro | 0:a268b80e3093 | 146 | const uint8_t** data, mmqtt_ssize_t* max_length) |
defmacro | 0:a268b80e3093 | 147 | { |
defmacro | 0:a268b80e3093 | 148 | mmqtt_ssize_t pullable; |
defmacro | 0:a268b80e3093 | 149 | pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer); |
defmacro | 0:a268b80e3093 | 150 | if (pullable <= 0) { |
defmacro | 0:a268b80e3093 | 151 | if (stream->left == 0) { |
defmacro | 0:a268b80e3093 | 152 | return MMQTT_STATUS_DONE; |
defmacro | 0:a268b80e3093 | 153 | } else { |
defmacro | 0:a268b80e3093 | 154 | return MMQTT_STATUS_NOT_PULLABLE; |
defmacro | 0:a268b80e3093 | 155 | } |
defmacro | 0:a268b80e3093 | 156 | } |
defmacro | 0:a268b80e3093 | 157 | pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer); |
defmacro | 0:a268b80e3093 | 158 | if (pullable <= 0) { return MMQTT_STATUS_NOT_PULLABLE; } |
defmacro | 0:a268b80e3093 | 159 | *data = stream->data + stream->ring_buffer.start; |
defmacro | 0:a268b80e3093 | 160 | *max_length = pullable; |
defmacro | 0:a268b80e3093 | 161 | return MMQTT_STATUS_OK; |
defmacro | 0:a268b80e3093 | 162 | } |
defmacro | 0:a268b80e3093 | 163 | |
defmacro | 0:a268b80e3093 | 164 | static mmqtt_status_t |
defmacro | 0:a268b80e3093 | 165 | mmqtt_stream_external_pull(struct mmqtt_stream *stream, mmqtt_ssize_t length) |
defmacro | 0:a268b80e3093 | 166 | { |
defmacro | 0:a268b80e3093 | 167 | mmqtt_ssize_t pullable; |
defmacro | 0:a268b80e3093 | 168 | pullable = mmqtt_ring_buffer_pullable_(&stream->ring_buffer); |
defmacro | 0:a268b80e3093 | 169 | if (pullable < length) { |
defmacro | 0:a268b80e3093 | 170 | return MMQTT_STATUS_NOT_PULLABLE; |
defmacro | 0:a268b80e3093 | 171 | } else { |
defmacro | 0:a268b80e3093 | 172 | mmqtt_ring_buffer_pull_(&stream->ring_buffer, length); |
defmacro | 0:a268b80e3093 | 173 | return MMQTT_STATUS_OK; |
defmacro | 0:a268b80e3093 | 174 | } |
defmacro | 0:a268b80e3093 | 175 | } |
defmacro | 0:a268b80e3093 | 176 | |
defmacro | 0:a268b80e3093 | 177 | static mmqtt_ssize_t |
defmacro | 0:a268b80e3093 | 178 | mmqtt_stream_push(struct mmqtt_stream *stream, const uint8_t* in, mmqtt_ssize_t length) |
defmacro | 0:a268b80e3093 | 179 | { |
defmacro | 0:a268b80e3093 | 180 | mmqtt_ssize_t pushable, i; |
defmacro | 0:a268b80e3093 | 181 | if (stream->left == 0) { return MMQTT_STATUS_DONE; } |
defmacro | 0:a268b80e3093 | 182 | pushable = mmqtt_ring_buffer_pushable_(&stream->ring_buffer); |
defmacro | 0:a268b80e3093 | 183 | if (pushable <= 0) { return MMQTT_STATUS_NOT_PUSHABLE; } |
defmacro | 0:a268b80e3093 | 184 | pushable = min(stream->left, min(pushable, length)); |
defmacro | 0:a268b80e3093 | 185 | |
defmacro | 0:a268b80e3093 | 186 | for (i = 0; i < pushable; i++) { |
defmacro | 0:a268b80e3093 | 187 | stream->data[stream->ring_buffer.start + i] = in[i]; |
defmacro | 0:a268b80e3093 | 188 | } |
defmacro | 0:a268b80e3093 | 189 | mmqtt_ring_buffer_push_(&stream->ring_buffer, pushable); |
defmacro | 0:a268b80e3093 | 190 | if (stream->left > 0) { stream->left -= pushable; } |
defmacro | 0:a268b80e3093 | 191 | return pushable; |
defmacro | 0:a268b80e3093 | 192 | } |
defmacro | 0:a268b80e3093 | 193 | |
defmacro | 0:a268b80e3093 | 194 | static mmqtt_status_t |
defmacro | 0:a268b80e3093 | 195 | mmqtt_stream_external_pushable(struct mmqtt_stream *stream, |
defmacro | 0:a268b80e3093 | 196 | uint8_t** data, mmqtt_ssize_t* max_length) |
defmacro | 0:a268b80e3093 | 197 | { |
defmacro | 0:a268b80e3093 | 198 | mmqtt_ssize_t pushable; |
defmacro | 0:a268b80e3093 | 199 | if (stream->left == 0) { return MMQTT_STATUS_DONE; } |
defmacro | 0:a268b80e3093 | 200 | pushable = mmqtt_ring_buffer_pushable_(&stream->ring_buffer); |
defmacro | 0:a268b80e3093 | 201 | if (pushable <= 0) { return MMQTT_STATUS_NOT_PUSHABLE; } |
defmacro | 0:a268b80e3093 | 202 | pushable = min(pushable, stream->left); |
defmacro | 0:a268b80e3093 | 203 | *data = stream->data + stream->ring_buffer.start; |
defmacro | 0:a268b80e3093 | 204 | *max_length = pushable; |
defmacro | 0:a268b80e3093 | 205 | return MMQTT_STATUS_OK; |
defmacro | 0:a268b80e3093 | 206 | } |
defmacro | 0:a268b80e3093 | 207 | |
defmacro | 0:a268b80e3093 | 208 | static mmqtt_status_t |
defmacro | 0:a268b80e3093 | 209 | mmqtt_stream_external_push(struct mmqtt_stream *stream, mmqtt_ssize_t length) |
defmacro | 0:a268b80e3093 | 210 | { |
defmacro | 0:a268b80e3093 | 211 | mmqtt_ssize_t pushable; |
defmacro | 0:a268b80e3093 | 212 | if (stream->left == 0) { return MMQTT_STATUS_DONE; } |
defmacro | 0:a268b80e3093 | 213 | pushable = mmqtt_ring_buffer_pushable_(&stream->ring_buffer); |
defmacro | 0:a268b80e3093 | 214 | if (pushable <= 0) { return MMQTT_STATUS_NOT_PUSHABLE; } |
defmacro | 0:a268b80e3093 | 215 | pushable = min(pushable, stream->left); |
defmacro | 0:a268b80e3093 | 216 | if (pushable < length) { |
defmacro | 0:a268b80e3093 | 217 | return MMQTT_STATUS_NOT_PUSHABLE; |
defmacro | 0:a268b80e3093 | 218 | } else { |
defmacro | 0:a268b80e3093 | 219 | mmqtt_ring_buffer_push_(&stream->ring_buffer, pushable); |
defmacro | 0:a268b80e3093 | 220 | return MMQTT_STATUS_OK; |
defmacro | 0:a268b80e3093 | 221 | } |
defmacro | 0:a268b80e3093 | 222 | } |
defmacro | 0:a268b80e3093 | 223 | |
defmacro | 0:a268b80e3093 | 224 | static void |
defmacro | 0:a268b80e3093 | 225 | mmqtt_queue_init_(struct mmqtt_queue_ *queue) |
defmacro | 0:a268b80e3093 | 226 | { |
defmacro | 0:a268b80e3093 | 227 | mmqtt_ring_buffer_init_(&queue->ring_buffer, MMQTT_QUEUE_MAX_LENGTH); |
defmacro | 0:a268b80e3093 | 228 | } |
defmacro | 0:a268b80e3093 | 229 | |
defmacro | 0:a268b80e3093 | 230 | static mmqtt_status_t |
defmacro | 0:a268b80e3093 | 231 | mmqtt_queue_add_(struct mmqtt_queue_ *queue, struct mmqtt_stream *stream) |
defmacro | 0:a268b80e3093 | 232 | { |
defmacro | 0:a268b80e3093 | 233 | if (mmqtt_ring_buffer_pushable_(&queue->ring_buffer) <= 0) { |
defmacro | 0:a268b80e3093 | 234 | return MMQTT_STATUS_NOT_PUSHABLE; |
defmacro | 0:a268b80e3093 | 235 | } |
defmacro | 0:a268b80e3093 | 236 | queue->streams[queue->ring_buffer.start] = stream; |
defmacro | 0:a268b80e3093 | 237 | mmqtt_ring_buffer_push_(&queue->ring_buffer, 1); |
defmacro | 0:a268b80e3093 | 238 | return MMQTT_STATUS_OK; |
defmacro | 0:a268b80e3093 | 239 | } |
defmacro | 0:a268b80e3093 | 240 | |
defmacro | 0:a268b80e3093 | 241 | #define MMQTT_QUEUE_AVAILABLE(queue) ((queue)->ring_buffer.length > 0) |
defmacro | 0:a268b80e3093 | 242 | #define MMQTT_QUEUE_FIRST(queue) ((queue)->streams[(queue)->ring_buffer.start]) |
defmacro | 0:a268b80e3093 | 243 | |
defmacro | 0:a268b80e3093 | 244 | static void |
defmacro | 0:a268b80e3093 | 245 | mmqtt_connection_init(struct mmqtt_connection *connection, void *conn) |
defmacro | 0:a268b80e3093 | 246 | { |
defmacro | 0:a268b80e3093 | 247 | connection->connection = conn; |
defmacro | 0:a268b80e3093 | 248 | mmqtt_queue_init_(&connection->read_queue); |
defmacro | 0:a268b80e3093 | 249 | mmqtt_queue_init_(&connection->write_queue); |
defmacro | 0:a268b80e3093 | 250 | } |
defmacro | 0:a268b80e3093 | 251 | |
defmacro | 0:a268b80e3093 | 252 | /* |
defmacro | 0:a268b80e3093 | 253 | * Pulls data out of connection's write queue, the returned data is supposed |
defmacro | 0:a268b80e3093 | 254 | * to be sent to a TCP socket to remote servers. |
defmacro | 0:a268b80e3093 | 255 | */ |
defmacro | 0:a268b80e3093 | 256 | static mmqtt_ssize_t |
defmacro | 0:a268b80e3093 | 257 | mmqtt_connection_pull(struct mmqtt_connection *connection, |
defmacro | 0:a268b80e3093 | 258 | uint8_t *out, mmqtt_ssize_t max_length) |
defmacro | 0:a268b80e3093 | 259 | { |
defmacro | 0:a268b80e3093 | 260 | mmqtt_ssize_t length, current; |
defmacro | 0:a268b80e3093 | 261 | struct mmqtt_queue_ *queue; |
defmacro | 0:a268b80e3093 | 262 | length = 0; |
defmacro | 0:a268b80e3093 | 263 | queue = &connection->write_queue; |
defmacro | 0:a268b80e3093 | 264 | while(MMQTT_QUEUE_AVAILABLE(queue) && length < max_length) { |
defmacro | 0:a268b80e3093 | 265 | current = mmqtt_stream_pull(MMQTT_QUEUE_FIRST(queue), |
defmacro | 0:a268b80e3093 | 266 | out + length, |
defmacro | 0:a268b80e3093 | 267 | max_length - length); |
defmacro | 0:a268b80e3093 | 268 | /* TODO: in case current is 0, should we return non-pullable? */ |
defmacro | 0:a268b80e3093 | 269 | if (current >= 0) { |
defmacro | 0:a268b80e3093 | 270 | length += current; |
defmacro | 0:a268b80e3093 | 271 | } else if (current == MMQTT_STATUS_DONE) { |
defmacro | 0:a268b80e3093 | 272 | /* Release stream from write queue since it is done */ |
defmacro | 0:a268b80e3093 | 273 | mmqtt_ring_buffer_pull_(&queue->ring_buffer, 1); |
defmacro | 0:a268b80e3093 | 274 | } else if (current == MMQTT_STATUS_NOT_PULLABLE) { |
defmacro | 0:a268b80e3093 | 275 | return (length > 0) ? length : MMQTT_STATUS_NOT_PULLABLE; |
defmacro | 0:a268b80e3093 | 276 | } else { |
defmacro | 0:a268b80e3093 | 277 | return MMQTT_STATUS_UNKNOWN; |
defmacro | 0:a268b80e3093 | 278 | } |
defmacro | 0:a268b80e3093 | 279 | } |
defmacro | 0:a268b80e3093 | 280 | return (length > 0) ? length : MMQTT_STATUS_NOT_PULLABLE; |
defmacro | 0:a268b80e3093 | 281 | } |
defmacro | 0:a268b80e3093 | 282 | |
defmacro | 0:a268b80e3093 | 283 | struct mmqtt_stream * |
defmacro | 0:a268b80e3093 | 284 | mmqtt_connection_pullable_stream(struct mmqtt_connection *connection) |
defmacro | 0:a268b80e3093 | 285 | { |
defmacro | 0:a268b80e3093 | 286 | mmqtt_ssize_t i; |
defmacro | 0:a268b80e3093 | 287 | struct mmqtt_queue_ *queue = &connection->write_queue; |
defmacro | 0:a268b80e3093 | 288 | struct mmqtt_stream *stream = NULL; |
defmacro | 0:a268b80e3093 | 289 | for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) { |
defmacro | 0:a268b80e3093 | 290 | stream = MMQTT_QUEUE_INDEX(queue, i); |
defmacro | 0:a268b80e3093 | 291 | if (mmqtt_stream_running(stream) == MMQTT_STATUS_OK) { return stream; } |
defmacro | 0:a268b80e3093 | 292 | } |
defmacro | 0:a268b80e3093 | 293 | return NULL; |
defmacro | 0:a268b80e3093 | 294 | } |
defmacro | 0:a268b80e3093 | 295 | |
defmacro | 0:a268b80e3093 | 296 | /* |
defmacro | 0:a268b80e3093 | 297 | * Pushes data to connection's read queue, the data should come from |
defmacro | 0:a268b80e3093 | 298 | * a TCP socket to a remote machine. |
defmacro | 0:a268b80e3093 | 299 | */ |
defmacro | 0:a268b80e3093 | 300 | static mmqtt_ssize_t |
defmacro | 0:a268b80e3093 | 301 | mmqtt_connection_push(struct mmqtt_connection *connection, |
defmacro | 0:a268b80e3093 | 302 | const uint8_t* in, mmqtt_ssize_t length) |
defmacro | 0:a268b80e3093 | 303 | { |
defmacro | 0:a268b80e3093 | 304 | mmqtt_ssize_t i, current, consumed_length; |
defmacro | 0:a268b80e3093 | 305 | struct mmqtt_queue_ *queue; |
defmacro | 0:a268b80e3093 | 306 | consumed_length = 0; |
defmacro | 0:a268b80e3093 | 307 | queue = &connection->read_queue; |
defmacro | 0:a268b80e3093 | 308 | while (i < MMQTT_QUEUE_LENGTH(queue) && consumed_length < length) { |
defmacro | 0:a268b80e3093 | 309 | current = mmqtt_stream_push(MMQTT_QUEUE_INDEX(queue, i), |
defmacro | 0:a268b80e3093 | 310 | in + consumed_length, |
defmacro | 0:a268b80e3093 | 311 | length - consumed_length); |
defmacro | 0:a268b80e3093 | 312 | if (current >= 0) { |
defmacro | 0:a268b80e3093 | 313 | consumed_length += current; |
defmacro | 0:a268b80e3093 | 314 | } else if (current == MMQTT_STATUS_DONE) { |
defmacro | 0:a268b80e3093 | 315 | i++; |
defmacro | 0:a268b80e3093 | 316 | } else if (current == MMQTT_STATUS_NOT_PUSHABLE) { |
defmacro | 0:a268b80e3093 | 317 | return (consumed_length > 0) ? consumed_length : MMQTT_STATUS_NOT_PUSHABLE; |
defmacro | 0:a268b80e3093 | 318 | } else { |
defmacro | 0:a268b80e3093 | 319 | return MMQTT_STATUS_UNKNOWN; |
defmacro | 0:a268b80e3093 | 320 | } |
defmacro | 0:a268b80e3093 | 321 | } |
defmacro | 0:a268b80e3093 | 322 | return (consumed_length > 0) ? consumed_length : MMQTT_STATUS_NOT_PUSHABLE; |
defmacro | 0:a268b80e3093 | 323 | } |
defmacro | 0:a268b80e3093 | 324 | |
defmacro | 0:a268b80e3093 | 325 | struct mmqtt_stream * |
defmacro | 0:a268b80e3093 | 326 | mmqtt_connection_pushable_stream(struct mmqtt_connection *connection) |
defmacro | 0:a268b80e3093 | 327 | { |
defmacro | 0:a268b80e3093 | 328 | mmqtt_ssize_t i; |
defmacro | 0:a268b80e3093 | 329 | struct mmqtt_queue_ *queue = &connection->read_queue; |
defmacro | 0:a268b80e3093 | 330 | struct mmqtt_stream *stream = NULL; |
defmacro | 0:a268b80e3093 | 331 | for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) { |
defmacro | 0:a268b80e3093 | 332 | stream = MMQTT_QUEUE_INDEX(queue, i); |
defmacro | 0:a268b80e3093 | 333 | if (mmqtt_stream_running(stream) == MMQTT_STATUS_OK) { return stream; } |
defmacro | 0:a268b80e3093 | 334 | } |
defmacro | 0:a268b80e3093 | 335 | return NULL; |
defmacro | 0:a268b80e3093 | 336 | } |
defmacro | 0:a268b80e3093 | 337 | |
defmacro | 0:a268b80e3093 | 338 | static mmqtt_status_t |
defmacro | 0:a268b80e3093 | 339 | mmqtt_connection_add_read_stream(struct mmqtt_connection *connection, |
defmacro | 0:a268b80e3093 | 340 | struct mmqtt_stream *stream) |
defmacro | 0:a268b80e3093 | 341 | { |
defmacro | 0:a268b80e3093 | 342 | return mmqtt_queue_add_(&connection->read_queue, stream); |
defmacro | 0:a268b80e3093 | 343 | } |
defmacro | 0:a268b80e3093 | 344 | |
defmacro | 0:a268b80e3093 | 345 | static mmqtt_status_t |
defmacro | 0:a268b80e3093 | 346 | mmqtt_connection_add_write_stream(struct mmqtt_connection *connection, |
defmacro | 0:a268b80e3093 | 347 | struct mmqtt_stream *stream) |
defmacro | 0:a268b80e3093 | 348 | { |
defmacro | 0:a268b80e3093 | 349 | return mmqtt_queue_add_(&connection->write_queue, stream); |
defmacro | 0:a268b80e3093 | 350 | } |
defmacro | 0:a268b80e3093 | 351 | |
defmacro | 0:a268b80e3093 | 352 | static mmqtt_status_t |
defmacro | 0:a268b80e3093 | 353 | mmqtt_connection_in_read_stream(struct mmqtt_connection *connection, |
defmacro | 0:a268b80e3093 | 354 | struct mmqtt_stream *stream) |
defmacro | 0:a268b80e3093 | 355 | { |
defmacro | 0:a268b80e3093 | 356 | mmqtt_ssize_t i; |
defmacro | 0:a268b80e3093 | 357 | struct mmqtt_queue_ *queue = &connection->read_queue; |
defmacro | 0:a268b80e3093 | 358 | for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) { |
defmacro | 0:a268b80e3093 | 359 | if (MMQTT_QUEUE_INDEX(queue, i) == stream) { return MMQTT_STATUS_OK; } |
defmacro | 0:a268b80e3093 | 360 | } |
defmacro | 0:a268b80e3093 | 361 | return MMQTT_STATUS_NOT_EXPECTED; |
defmacro | 0:a268b80e3093 | 362 | } |
defmacro | 0:a268b80e3093 | 363 | |
defmacro | 0:a268b80e3093 | 364 | static mmqtt_status_t |
defmacro | 0:a268b80e3093 | 365 | mmqtt_connection_in_write_stream(struct mmqtt_connection *connection, |
defmacro | 0:a268b80e3093 | 366 | struct mmqtt_stream *stream) |
defmacro | 0:a268b80e3093 | 367 | { |
defmacro | 0:a268b80e3093 | 368 | mmqtt_ssize_t i; |
defmacro | 0:a268b80e3093 | 369 | struct mmqtt_queue_ *queue = &connection->write_queue; |
defmacro | 0:a268b80e3093 | 370 | for (i = 0; i < MMQTT_QUEUE_LENGTH(queue); i++) { |
defmacro | 0:a268b80e3093 | 371 | if (MMQTT_QUEUE_INDEX(queue, i) == stream) { return MMQTT_STATUS_OK; } |
defmacro | 0:a268b80e3093 | 372 | } |
defmacro | 0:a268b80e3093 | 373 | return MMQTT_STATUS_NOT_EXPECTED; |
defmacro | 0:a268b80e3093 | 374 | } |
defmacro | 0:a268b80e3093 | 375 | |
defmacro | 0:a268b80e3093 | 376 | static mmqtt_ssize_t |
defmacro | 0:a268b80e3093 | 377 | mmqtt_connection_read_stream_length(struct mmqtt_connection *connection) |
defmacro | 0:a268b80e3093 | 378 | { |
defmacro | 0:a268b80e3093 | 379 | return connection->read_queue.ring_buffer.length; |
defmacro | 0:a268b80e3093 | 380 | } |
defmacro | 0:a268b80e3093 | 381 | |
defmacro | 0:a268b80e3093 | 382 | static mmqtt_ssize_t |
defmacro | 0:a268b80e3093 | 383 | mmqtt_connection_write_stream_length(struct mmqtt_connection *connection) |
defmacro | 0:a268b80e3093 | 384 | { |
defmacro | 0:a268b80e3093 | 385 | return connection->write_queue.ring_buffer.length; |
defmacro | 0:a268b80e3093 | 386 | } |
defmacro | 0:a268b80e3093 | 387 | |
defmacro | 0:a268b80e3093 | 388 | static mmqtt_status_t |
defmacro | 0:a268b80e3093 | 389 | mmqtt_connection_release_write_stream(struct mmqtt_connection *connection, |
defmacro | 0:a268b80e3093 | 390 | struct mmqtt_stream *stream) |
defmacro | 0:a268b80e3093 | 391 | { |
defmacro | 0:a268b80e3093 | 392 | if (!MMQTT_QUEUE_AVAILABLE(&connection->write_queue)) { |
defmacro | 0:a268b80e3093 | 393 | return MMQTT_STATUS_NOT_PULLABLE; |
defmacro | 0:a268b80e3093 | 394 | } |
defmacro | 0:a268b80e3093 | 395 | if (stream != MMQTT_QUEUE_FIRST(&connection->write_queue)) { |
defmacro | 0:a268b80e3093 | 396 | return MMQTT_STATUS_NOT_EXPECTED; |
defmacro | 0:a268b80e3093 | 397 | } |
defmacro | 0:a268b80e3093 | 398 | mmqtt_ring_buffer_pull_(&connection->write_queue.ring_buffer, 1); |
defmacro | 0:a268b80e3093 | 399 | return MMQTT_STATUS_OK; |
defmacro | 0:a268b80e3093 | 400 | } |
defmacro | 0:a268b80e3093 | 401 | |
defmacro | 0:a268b80e3093 | 402 | static mmqtt_status_t |
defmacro | 0:a268b80e3093 | 403 | mmqtt_connection_release_read_stream(struct mmqtt_connection *connection, |
defmacro | 0:a268b80e3093 | 404 | struct mmqtt_stream *stream) |
defmacro | 0:a268b80e3093 | 405 | { |
defmacro | 0:a268b80e3093 | 406 | if (!MMQTT_QUEUE_AVAILABLE(&connection->read_queue)) { |
defmacro | 0:a268b80e3093 | 407 | return MMQTT_STATUS_NOT_PULLABLE; |
defmacro | 0:a268b80e3093 | 408 | } |
defmacro | 0:a268b80e3093 | 409 | if (stream != MMQTT_QUEUE_FIRST(&connection->read_queue)) { |
defmacro | 0:a268b80e3093 | 410 | return MMQTT_STATUS_NOT_EXPECTED; |
defmacro | 0:a268b80e3093 | 411 | } |
defmacro | 0:a268b80e3093 | 412 | mmqtt_ring_buffer_pull_(&connection->read_queue.ring_buffer, 1); |
defmacro | 0:a268b80e3093 | 413 | return MMQTT_STATUS_OK; |
defmacro | 0:a268b80e3093 | 414 | } |
defmacro | 0:a268b80e3093 | 415 | |
defmacro | 0:a268b80e3093 | 416 | static mmqtt_status_t |
defmacro | 0:a268b80e3093 | 417 | mmqtt_connection_autorelease_write_streams(struct mmqtt_connection *connection) |
defmacro | 0:a268b80e3093 | 418 | { |
defmacro | 0:a268b80e3093 | 419 | struct mmqtt_stream *stream; |
defmacro | 0:a268b80e3093 | 420 | while (MMQTT_QUEUE_AVAILABLE(&connection->write_queue)) { |
defmacro | 0:a268b80e3093 | 421 | stream = MMQTT_QUEUE_FIRST(&connection->write_queue); |
defmacro | 0:a268b80e3093 | 422 | if (mmqtt_stream_running(stream) == MMQTT_STATUS_DONE) { |
defmacro | 0:a268b80e3093 | 423 | mmqtt_connection_release_write_stream(connection, stream); |
defmacro | 0:a268b80e3093 | 424 | } else { |
defmacro | 0:a268b80e3093 | 425 | break; |
defmacro | 0:a268b80e3093 | 426 | } |
defmacro | 0:a268b80e3093 | 427 | } |
defmacro | 0:a268b80e3093 | 428 | return MMQTT_STATUS_OK; |
defmacro | 0:a268b80e3093 | 429 | } |
defmacro | 0:a268b80e3093 | 430 | |
defmacro | 0:a268b80e3093 | 431 | static mmqtt_status_t |
defmacro | 0:a268b80e3093 | 432 | mmqtt_connection_autorelease_read_streams(struct mmqtt_connection *connection) |
defmacro | 0:a268b80e3093 | 433 | { |
defmacro | 0:a268b80e3093 | 434 | struct mmqtt_stream *stream; |
defmacro | 0:a268b80e3093 | 435 | while (MMQTT_QUEUE_AVAILABLE(&connection->read_queue)) { |
defmacro | 0:a268b80e3093 | 436 | stream = MMQTT_QUEUE_FIRST(&connection->read_queue); |
defmacro | 0:a268b80e3093 | 437 | if (mmqtt_stream_running(stream) == MMQTT_STATUS_DONE) { |
defmacro | 0:a268b80e3093 | 438 | mmqtt_connection_release_read_stream(connection, stream); |
defmacro | 0:a268b80e3093 | 439 | } else { |
defmacro | 0:a268b80e3093 | 440 | break; |
defmacro | 0:a268b80e3093 | 441 | } |
defmacro | 0:a268b80e3093 | 442 | } |
defmacro | 0:a268b80e3093 | 443 | return MMQTT_STATUS_OK; |
defmacro | 0:a268b80e3093 | 444 | } |
defmacro | 0:a268b80e3093 | 445 | |
defmacro | 0:a268b80e3093 | 446 | #ifndef MMQTT_DISABLE_STATIC |
defmacro | 0:a268b80e3093 | 447 | |
defmacro | 0:a268b80e3093 | 448 | typedef mmqtt_status_t (*mmqtt_s_puller)(struct mmqtt_connection*); |
defmacro | 0:a268b80e3093 | 449 | typedef mmqtt_status_t (*mmqtt_s_pusher)(struct mmqtt_connection*, mmqtt_ssize_t); |
defmacro | 0:a268b80e3093 | 450 | |
defmacro | 0:a268b80e3093 | 451 | mmqtt_status_t |
defmacro | 0:a268b80e3093 | 452 | mmqtt_s_encode_buffer(struct mmqtt_connection *conn, mmqtt_s_puller puller, |
defmacro | 0:a268b80e3093 | 453 | const uint8_t *buffer, uint16_t length) |
defmacro | 0:a268b80e3093 | 454 | { |
defmacro | 0:a268b80e3093 | 455 | struct mmqtt_stream stream; |
defmacro | 0:a268b80e3093 | 456 | mmqtt_status_t status; |
defmacro | 0:a268b80e3093 | 457 | mmqtt_ssize_t current; |
defmacro | 0:a268b80e3093 | 458 | uint16_t consumed_length; |
defmacro | 0:a268b80e3093 | 459 | |
defmacro | 0:a268b80e3093 | 460 | mmqtt_stream_init(&stream, length); |
defmacro | 0:a268b80e3093 | 461 | while ((status = mmqtt_connection_add_write_stream(conn, &stream)) == MMQTT_STATUS_NOT_PUSHABLE) { |
defmacro | 0:a268b80e3093 | 462 | status = puller(conn); |
defmacro | 0:a268b80e3093 | 463 | if (status != MMQTT_STATUS_OK) { return status; } |
defmacro | 0:a268b80e3093 | 464 | } |
defmacro | 0:a268b80e3093 | 465 | if (status != MMQTT_STATUS_OK) { return status; } |
defmacro | 0:a268b80e3093 | 466 | |
defmacro | 0:a268b80e3093 | 467 | consumed_length = 0; |
defmacro | 0:a268b80e3093 | 468 | while (consumed_length < length) { |
defmacro | 0:a268b80e3093 | 469 | current = mmqtt_stream_push(&stream, buffer + consumed_length, |
defmacro | 0:a268b80e3093 | 470 | length - consumed_length); |
defmacro | 0:a268b80e3093 | 471 | if (current >= 0) { |
defmacro | 0:a268b80e3093 | 472 | consumed_length += current; |
defmacro | 0:a268b80e3093 | 473 | } else if (current == MMQTT_STATUS_NOT_PUSHABLE) { |
defmacro | 0:a268b80e3093 | 474 | status = puller(conn); |
defmacro | 0:a268b80e3093 | 475 | if (status != MMQTT_STATUS_OK) { return status; } |
defmacro | 0:a268b80e3093 | 476 | } |
defmacro | 0:a268b80e3093 | 477 | } |
defmacro | 0:a268b80e3093 | 478 | while (mmqtt_connection_in_write_stream(conn, &stream) == MMQTT_STATUS_OK) { |
defmacro | 0:a268b80e3093 | 479 | status = puller(conn); |
defmacro | 0:a268b80e3093 | 480 | if (status != MMQTT_STATUS_OK) { return status; } |
defmacro | 0:a268b80e3093 | 481 | } |
defmacro | 0:a268b80e3093 | 482 | return MMQTT_STATUS_OK; |
defmacro | 0:a268b80e3093 | 483 | } |
defmacro | 0:a268b80e3093 | 484 | |
defmacro | 0:a268b80e3093 | 485 | mmqtt_status_t |
defmacro | 0:a268b80e3093 | 486 | mmqtt_s_decode_buffer(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, |
defmacro | 0:a268b80e3093 | 487 | uint8_t *buffer, uint16_t buffer_length, |
defmacro | 0:a268b80e3093 | 488 | uint16_t consume_length) |
defmacro | 0:a268b80e3093 | 489 | { |
defmacro | 0:a268b80e3093 | 490 | struct mmqtt_stream stream; |
defmacro | 0:a268b80e3093 | 491 | mmqtt_status_t status; |
defmacro | 0:a268b80e3093 | 492 | mmqtt_ssize_t current; |
defmacro | 0:a268b80e3093 | 493 | uint16_t length; |
defmacro | 0:a268b80e3093 | 494 | uint8_t temp; |
defmacro | 0:a268b80e3093 | 495 | |
defmacro | 0:a268b80e3093 | 496 | if (mmqtt_connection_read_stream_length(conn) > 0) { |
defmacro | 0:a268b80e3093 | 497 | return MMQTT_STATUS_INVALID_STATE; |
defmacro | 0:a268b80e3093 | 498 | } |
defmacro | 0:a268b80e3093 | 499 | |
defmacro | 0:a268b80e3093 | 500 | if (consume_length < buffer_length) { consume_length = buffer_length; } |
defmacro | 0:a268b80e3093 | 501 | mmqtt_stream_init(&stream, consume_length); |
defmacro | 0:a268b80e3093 | 502 | /* Connection won't remove read stream automatically, pusher won't help here */ |
defmacro | 0:a268b80e3093 | 503 | status = mmqtt_connection_add_read_stream(conn, &stream); |
defmacro | 0:a268b80e3093 | 504 | if (status != MMQTT_STATUS_OK) { return status; } |
defmacro | 0:a268b80e3093 | 505 | |
defmacro | 0:a268b80e3093 | 506 | length = 0; |
defmacro | 0:a268b80e3093 | 507 | while (length < buffer_length) { |
defmacro | 0:a268b80e3093 | 508 | current = mmqtt_stream_pull(&stream, buffer + length, |
defmacro | 0:a268b80e3093 | 509 | buffer_length - length); |
defmacro | 0:a268b80e3093 | 510 | if (current >= 0) { |
defmacro | 0:a268b80e3093 | 511 | length += current; |
defmacro | 0:a268b80e3093 | 512 | } else if (current == MMQTT_STATUS_NOT_PULLABLE) { |
defmacro | 0:a268b80e3093 | 513 | status = pusher(conn, buffer_length - length); |
defmacro | 0:a268b80e3093 | 514 | if (status != MMQTT_STATUS_OK) { |
defmacro | 0:a268b80e3093 | 515 | mmqtt_connection_release_read_stream(conn, &stream); |
defmacro | 0:a268b80e3093 | 516 | return status; |
defmacro | 0:a268b80e3093 | 517 | } |
defmacro | 0:a268b80e3093 | 518 | } |
defmacro | 0:a268b80e3093 | 519 | } |
defmacro | 0:a268b80e3093 | 520 | while (length < consume_length) { |
defmacro | 0:a268b80e3093 | 521 | current = mmqtt_stream_pull(&stream, &temp, 1); |
defmacro | 0:a268b80e3093 | 522 | if (current >= 0) { |
defmacro | 0:a268b80e3093 | 523 | length += current; |
defmacro | 0:a268b80e3093 | 524 | } else if (current == MMQTT_STATUS_NOT_PULLABLE) { |
defmacro | 0:a268b80e3093 | 525 | status = pusher(conn, consume_length - length); |
defmacro | 0:a268b80e3093 | 526 | if (status != MMQTT_STATUS_OK) { |
defmacro | 0:a268b80e3093 | 527 | mmqtt_connection_release_read_stream(conn, &stream); |
defmacro | 0:a268b80e3093 | 528 | return status; |
defmacro | 0:a268b80e3093 | 529 | } |
defmacro | 0:a268b80e3093 | 530 | } |
defmacro | 0:a268b80e3093 | 531 | } |
defmacro | 0:a268b80e3093 | 532 | mmqtt_connection_release_read_stream(conn, &stream); |
defmacro | 0:a268b80e3093 | 533 | return MMQTT_STATUS_OK; |
defmacro | 0:a268b80e3093 | 534 | } |
defmacro | 0:a268b80e3093 | 535 | |
defmacro | 0:a268b80e3093 | 536 | /* NOTE: for smaller buffer, we can also use mmqtt_s_decode_buffer with |
defmacro | 0:a268b80e3093 | 537 | * an empty buffer and 0 as buffer length. Here this function is for the |
defmacro | 0:a268b80e3093 | 538 | * case where we are skipping a whole packet altogether, which could be |
defmacro | 0:a268b80e3093 | 539 | * larger than 65535 in theory. |
defmacro | 0:a268b80e3093 | 540 | */ |
defmacro | 0:a268b80e3093 | 541 | mmqtt_status_t |
defmacro | 0:a268b80e3093 | 542 | mmqtt_s_skip_buffer(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, |
defmacro | 0:a268b80e3093 | 543 | uint32_t length) |
defmacro | 0:a268b80e3093 | 544 | { |
defmacro | 0:a268b80e3093 | 545 | mmqtt_status_t status; |
defmacro | 0:a268b80e3093 | 546 | while (length > 65535) { |
defmacro | 0:a268b80e3093 | 547 | status = mmqtt_s_decode_buffer(conn, pusher, NULL, 0, 65535); |
defmacro | 0:a268b80e3093 | 548 | if (status != MMQTT_STATUS_OK) { |
defmacro | 0:a268b80e3093 | 549 | return status; |
defmacro | 0:a268b80e3093 | 550 | } |
defmacro | 0:a268b80e3093 | 551 | length -= 65535; |
defmacro | 0:a268b80e3093 | 552 | } |
defmacro | 0:a268b80e3093 | 553 | if (length > 0) { |
defmacro | 0:a268b80e3093 | 554 | return mmqtt_s_decode_buffer(conn, pusher, NULL, 0, (uint16_t) length); |
defmacro | 0:a268b80e3093 | 555 | } else { |
defmacro | 0:a268b80e3093 | 556 | return MMQTT_STATUS_OK; |
defmacro | 0:a268b80e3093 | 557 | } |
defmacro | 0:a268b80e3093 | 558 | } |
defmacro | 0:a268b80e3093 | 559 | |
defmacro | 0:a268b80e3093 | 560 | mmqtt_status_t |
defmacro | 0:a268b80e3093 | 561 | mmqtt_s_encode_fixed_header(struct mmqtt_connection *conn, mmqtt_s_puller puller, |
defmacro | 0:a268b80e3093 | 562 | uint8_t flag, uint32_t length) |
defmacro | 0:a268b80e3093 | 563 | { |
defmacro | 0:a268b80e3093 | 564 | mmqtt_status_t status; |
defmacro | 0:a268b80e3093 | 565 | mmqtt_ssize_t i; |
defmacro | 0:a268b80e3093 | 566 | uint8_t c, buffer[4]; |
defmacro | 0:a268b80e3093 | 567 | |
defmacro | 0:a268b80e3093 | 568 | buffer[0] = flag; |
defmacro | 0:a268b80e3093 | 569 | status = mmqtt_s_encode_buffer(conn, puller, buffer, 1); |
defmacro | 0:a268b80e3093 | 570 | if (status != MMQTT_STATUS_OK) { return status; } |
defmacro | 0:a268b80e3093 | 571 | |
defmacro | 0:a268b80e3093 | 572 | i = 0; |
defmacro | 0:a268b80e3093 | 573 | do { |
defmacro | 0:a268b80e3093 | 574 | c = (uint8_t) length % 128; |
defmacro | 0:a268b80e3093 | 575 | length /= 128; |
defmacro | 0:a268b80e3093 | 576 | if (length > 0) { c |= 0x80; } |
defmacro | 0:a268b80e3093 | 577 | buffer[i++] = c; |
defmacro | 0:a268b80e3093 | 578 | } while (length > 0); |
defmacro | 0:a268b80e3093 | 579 | |
defmacro | 0:a268b80e3093 | 580 | return mmqtt_s_encode_buffer(conn, puller, buffer, i); |
defmacro | 0:a268b80e3093 | 581 | } |
defmacro | 0:a268b80e3093 | 582 | |
defmacro | 0:a268b80e3093 | 583 | mmqtt_status_t |
defmacro | 0:a268b80e3093 | 584 | mmqtt_s_decode_fixed_header(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, |
defmacro | 0:a268b80e3093 | 585 | uint8_t *flag, uint32_t *length) |
defmacro | 0:a268b80e3093 | 586 | { |
defmacro | 0:a268b80e3093 | 587 | mmqtt_status_t status; |
defmacro | 0:a268b80e3093 | 588 | uint8_t c; |
defmacro | 0:a268b80e3093 | 589 | uint32_t l, factor; |
defmacro | 0:a268b80e3093 | 590 | |
defmacro | 0:a268b80e3093 | 591 | status = mmqtt_s_decode_buffer(conn, pusher, &c, 1, 1); |
defmacro | 0:a268b80e3093 | 592 | if (status != MMQTT_STATUS_OK) { return status; } |
defmacro | 0:a268b80e3093 | 593 | if (flag) { *flag = c; } |
defmacro | 0:a268b80e3093 | 594 | |
defmacro | 0:a268b80e3093 | 595 | l = 0; |
defmacro | 0:a268b80e3093 | 596 | factor = 1; |
defmacro | 0:a268b80e3093 | 597 | do { |
defmacro | 0:a268b80e3093 | 598 | status = mmqtt_s_decode_buffer(conn, pusher, &c, 1, 1); |
defmacro | 0:a268b80e3093 | 599 | if (status != MMQTT_STATUS_OK) { return status; } |
defmacro | 0:a268b80e3093 | 600 | l += (c & 0x7F) * factor; |
defmacro | 0:a268b80e3093 | 601 | factor *= 128; |
defmacro | 0:a268b80e3093 | 602 | } while ((c & 0x80) != 0); |
defmacro | 0:a268b80e3093 | 603 | if (length) { *length = l; } |
defmacro | 0:a268b80e3093 | 604 | |
defmacro | 0:a268b80e3093 | 605 | return MMQTT_STATUS_OK; |
defmacro | 0:a268b80e3093 | 606 | } |
defmacro | 0:a268b80e3093 | 607 | |
defmacro | 0:a268b80e3093 | 608 | void mmqtt_s_pack_uint16_(uint16_t length, uint8_t *buffer) |
defmacro | 0:a268b80e3093 | 609 | { |
defmacro | 0:a268b80e3093 | 610 | buffer[0] = (length >> 8) & 0xFF; |
defmacro | 0:a268b80e3093 | 611 | buffer[1] = length & 0xFF; |
defmacro | 0:a268b80e3093 | 612 | } |
defmacro | 0:a268b80e3093 | 613 | |
defmacro | 0:a268b80e3093 | 614 | uint16_t mmqtt_s_unpack_uint16_(uint8_t *buffer) |
defmacro | 0:a268b80e3093 | 615 | { |
defmacro | 0:a268b80e3093 | 616 | return ((uint16_t) (buffer[0] << 8)) | ((uint16_t) buffer[1]); |
defmacro | 0:a268b80e3093 | 617 | } |
defmacro | 0:a268b80e3093 | 618 | |
defmacro | 0:a268b80e3093 | 619 | /* We don't case uint16_t* directly into uint8_t* because there might be |
defmacro | 0:a268b80e3093 | 620 | * endianness differences |
defmacro | 0:a268b80e3093 | 621 | */ |
defmacro | 0:a268b80e3093 | 622 | mmqtt_status_t |
defmacro | 0:a268b80e3093 | 623 | mmqtt_s_encode_uint16(struct mmqtt_connection *conn, mmqtt_s_puller puller, |
defmacro | 0:a268b80e3093 | 624 | uint16_t val) |
defmacro | 0:a268b80e3093 | 625 | { |
defmacro | 0:a268b80e3093 | 626 | uint8_t buffer[2]; |
defmacro | 0:a268b80e3093 | 627 | |
defmacro | 0:a268b80e3093 | 628 | mmqtt_s_pack_uint16_(val, buffer); |
defmacro | 0:a268b80e3093 | 629 | return mmqtt_s_encode_buffer(conn, puller, buffer, 2); |
defmacro | 0:a268b80e3093 | 630 | } |
defmacro | 0:a268b80e3093 | 631 | |
defmacro | 0:a268b80e3093 | 632 | mmqtt_status_t |
defmacro | 0:a268b80e3093 | 633 | mmqtt_s_decode_uint16(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, |
defmacro | 0:a268b80e3093 | 634 | uint16_t *out_val) |
defmacro | 0:a268b80e3093 | 635 | { |
defmacro | 0:a268b80e3093 | 636 | mmqtt_status_t status; |
defmacro | 0:a268b80e3093 | 637 | uint8_t buffer[2]; |
defmacro | 0:a268b80e3093 | 638 | |
defmacro | 0:a268b80e3093 | 639 | status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2); |
defmacro | 0:a268b80e3093 | 640 | if (status != MMQTT_STATUS_OK) { return status; } |
defmacro | 0:a268b80e3093 | 641 | |
defmacro | 0:a268b80e3093 | 642 | if (out_val) { *out_val = mmqtt_s_unpack_uint16_(buffer); } |
defmacro | 0:a268b80e3093 | 643 | return MMQTT_STATUS_OK; |
defmacro | 0:a268b80e3093 | 644 | } |
defmacro | 0:a268b80e3093 | 645 | |
defmacro | 0:a268b80e3093 | 646 | mmqtt_status_t |
defmacro | 0:a268b80e3093 | 647 | mmqtt_s_encode_string(struct mmqtt_connection *conn, mmqtt_s_puller puller, |
defmacro | 0:a268b80e3093 | 648 | const uint8_t *str, uint16_t length) |
defmacro | 0:a268b80e3093 | 649 | { |
defmacro | 0:a268b80e3093 | 650 | mmqtt_status_t status; |
defmacro | 0:a268b80e3093 | 651 | uint8_t buffer[2]; |
defmacro | 0:a268b80e3093 | 652 | |
defmacro | 0:a268b80e3093 | 653 | mmqtt_s_pack_uint16_(length, buffer); |
defmacro | 0:a268b80e3093 | 654 | status = mmqtt_s_encode_buffer(conn, puller, buffer, 2); |
defmacro | 0:a268b80e3093 | 655 | if (status != MMQTT_STATUS_OK) { return status; } |
defmacro | 0:a268b80e3093 | 656 | |
defmacro | 0:a268b80e3093 | 657 | return mmqtt_s_encode_buffer(conn, puller, str, length); |
defmacro | 0:a268b80e3093 | 658 | } |
defmacro | 0:a268b80e3093 | 659 | |
defmacro | 0:a268b80e3093 | 660 | mmqtt_status_t |
defmacro | 0:a268b80e3093 | 661 | mmqtt_s_decode_string(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, |
defmacro | 0:a268b80e3093 | 662 | uint8_t *str, uint16_t max_length, |
defmacro | 0:a268b80e3093 | 663 | uint16_t *out_length, uint16_t *out_true_length) { |
defmacro | 0:a268b80e3093 | 664 | mmqtt_status_t status; |
defmacro | 0:a268b80e3093 | 665 | uint8_t buffer[2]; |
defmacro | 0:a268b80e3093 | 666 | uint16_t true_length, length; |
defmacro | 0:a268b80e3093 | 667 | |
defmacro | 0:a268b80e3093 | 668 | status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2); |
defmacro | 0:a268b80e3093 | 669 | if (status != MMQTT_STATUS_OK) { return status; } |
defmacro | 0:a268b80e3093 | 670 | true_length = mmqtt_s_unpack_uint16_(buffer); |
defmacro | 0:a268b80e3093 | 671 | |
defmacro | 0:a268b80e3093 | 672 | length = min(true_length, max_length); |
defmacro | 0:a268b80e3093 | 673 | status = mmqtt_s_decode_buffer(conn, pusher, str, length, true_length); |
defmacro | 0:a268b80e3093 | 674 | if (status != MMQTT_STATUS_OK) { return status; } |
defmacro | 0:a268b80e3093 | 675 | |
defmacro | 0:a268b80e3093 | 676 | if (out_length) { *out_length = length; } |
defmacro | 0:a268b80e3093 | 677 | if (out_true_length) { *out_true_length = true_length; } |
defmacro | 0:a268b80e3093 | 678 | return MMQTT_STATUS_OK; |
defmacro | 0:a268b80e3093 | 679 | } |
defmacro | 0:a268b80e3093 | 680 | |
defmacro | 0:a268b80e3093 | 681 | /* p stands for packet here */ |
defmacro | 0:a268b80e3093 | 682 | struct mmqtt_p_connect_header { |
defmacro | 0:a268b80e3093 | 683 | uint8_t *name; |
defmacro | 0:a268b80e3093 | 684 | uint16_t name_length; |
defmacro | 0:a268b80e3093 | 685 | uint16_t name_max_length; |
defmacro | 0:a268b80e3093 | 686 | uint8_t protocol_version; |
defmacro | 0:a268b80e3093 | 687 | uint8_t flags; |
defmacro | 0:a268b80e3093 | 688 | uint16_t keepalive; |
defmacro | 0:a268b80e3093 | 689 | }; |
defmacro | 0:a268b80e3093 | 690 | |
defmacro | 0:a268b80e3093 | 691 | mmqtt_status_t |
defmacro | 0:a268b80e3093 | 692 | mmqtt_s_encode_connect_header(struct mmqtt_connection *conn, mmqtt_s_puller puller, |
defmacro | 0:a268b80e3093 | 693 | const struct mmqtt_p_connect_header *header) |
defmacro | 0:a268b80e3093 | 694 | { |
defmacro | 0:a268b80e3093 | 695 | mmqtt_status_t status; |
defmacro | 0:a268b80e3093 | 696 | uint8_t buffer[2]; |
defmacro | 0:a268b80e3093 | 697 | |
defmacro | 0:a268b80e3093 | 698 | mmqtt_s_pack_uint16_(header->name_length, buffer); |
defmacro | 0:a268b80e3093 | 699 | status = mmqtt_s_encode_buffer(conn, puller, buffer, 2); |
defmacro | 0:a268b80e3093 | 700 | if (status != MMQTT_STATUS_OK) { return status; } |
defmacro | 0:a268b80e3093 | 701 | |
defmacro | 0:a268b80e3093 | 702 | status = mmqtt_s_encode_buffer(conn, puller, header->name, header->name_length); |
defmacro | 0:a268b80e3093 | 703 | if (status != MMQTT_STATUS_OK) { return status; } |
defmacro | 0:a268b80e3093 | 704 | |
defmacro | 0:a268b80e3093 | 705 | status = mmqtt_s_encode_buffer(conn, puller, &header->protocol_version, 1); |
defmacro | 0:a268b80e3093 | 706 | if (status != MMQTT_STATUS_OK) { return status; } |
defmacro | 0:a268b80e3093 | 707 | |
defmacro | 0:a268b80e3093 | 708 | status = mmqtt_s_encode_buffer(conn, puller, &header->flags, 1); |
defmacro | 0:a268b80e3093 | 709 | if (status != MMQTT_STATUS_OK) { return status; } |
defmacro | 0:a268b80e3093 | 710 | |
defmacro | 0:a268b80e3093 | 711 | mmqtt_s_pack_uint16_(header->keepalive, buffer); |
defmacro | 0:a268b80e3093 | 712 | return mmqtt_s_encode_buffer(conn, puller, buffer, 2); |
defmacro | 0:a268b80e3093 | 713 | } |
defmacro | 0:a268b80e3093 | 714 | |
defmacro | 0:a268b80e3093 | 715 | /* We would rewrite name_length in header to contain returned name length */ |
defmacro | 0:a268b80e3093 | 716 | mmqtt_status_t |
defmacro | 0:a268b80e3093 | 717 | mmqtt_s_decode_connect_header(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, |
defmacro | 0:a268b80e3093 | 718 | struct mmqtt_p_connect_header *header) |
defmacro | 0:a268b80e3093 | 719 | { |
defmacro | 0:a268b80e3093 | 720 | mmqtt_status_t status; |
defmacro | 0:a268b80e3093 | 721 | uint8_t buffer[2]; |
defmacro | 0:a268b80e3093 | 722 | uint16_t length; |
defmacro | 0:a268b80e3093 | 723 | |
defmacro | 0:a268b80e3093 | 724 | status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2); |
defmacro | 0:a268b80e3093 | 725 | if (status != MMQTT_STATUS_OK) { return status; } |
defmacro | 0:a268b80e3093 | 726 | length = mmqtt_s_unpack_uint16_(buffer); |
defmacro | 0:a268b80e3093 | 727 | |
defmacro | 0:a268b80e3093 | 728 | header->name_length = min(header->name_max_length, length); |
defmacro | 0:a268b80e3093 | 729 | status = mmqtt_s_decode_buffer(conn, pusher, header->name, |
defmacro | 0:a268b80e3093 | 730 | header->name_length, length); |
defmacro | 0:a268b80e3093 | 731 | if (status != MMQTT_STATUS_OK) { return status; } |
defmacro | 0:a268b80e3093 | 732 | |
defmacro | 0:a268b80e3093 | 733 | status = mmqtt_s_decode_buffer(conn, pusher, &header->protocol_version, 1, 1); |
defmacro | 0:a268b80e3093 | 734 | if (status != MMQTT_STATUS_OK) { return status; } |
defmacro | 0:a268b80e3093 | 735 | |
defmacro | 0:a268b80e3093 | 736 | status = mmqtt_s_decode_buffer(conn, pusher, &header->flags, 1, 1); |
defmacro | 0:a268b80e3093 | 737 | if (status != MMQTT_STATUS_OK) { return status; } |
defmacro | 0:a268b80e3093 | 738 | |
defmacro | 0:a268b80e3093 | 739 | status = mmqtt_s_decode_buffer(conn, pusher, buffer, 2, 2); |
defmacro | 0:a268b80e3093 | 740 | if (status != MMQTT_STATUS_OK) { return status; } |
defmacro | 0:a268b80e3093 | 741 | header->keepalive = mmqtt_s_unpack_uint16_(buffer); |
defmacro | 0:a268b80e3093 | 742 | |
defmacro | 0:a268b80e3093 | 743 | return MMQTT_STATUS_OK; |
defmacro | 0:a268b80e3093 | 744 | } |
defmacro | 0:a268b80e3093 | 745 | |
defmacro | 0:a268b80e3093 | 746 | struct mmqtt_p_connack_header { |
defmacro | 0:a268b80e3093 | 747 | uint8_t reserved; |
defmacro | 0:a268b80e3093 | 748 | uint8_t return_code; |
defmacro | 0:a268b80e3093 | 749 | }; |
defmacro | 0:a268b80e3093 | 750 | |
defmacro | 0:a268b80e3093 | 751 | mmqtt_status_t |
defmacro | 0:a268b80e3093 | 752 | mmqtt_s_encode_connack_header(struct mmqtt_connection *conn, mmqtt_s_puller puller, |
defmacro | 0:a268b80e3093 | 753 | const struct mmqtt_p_connack_header *header) |
defmacro | 0:a268b80e3093 | 754 | { |
defmacro | 0:a268b80e3093 | 755 | mmqtt_status_t status; |
defmacro | 0:a268b80e3093 | 756 | |
defmacro | 0:a268b80e3093 | 757 | status = mmqtt_s_encode_buffer(conn, puller, &header->reserved, 1); |
defmacro | 0:a268b80e3093 | 758 | if (status != MMQTT_STATUS_OK) { return status; } |
defmacro | 0:a268b80e3093 | 759 | |
defmacro | 0:a268b80e3093 | 760 | return mmqtt_s_encode_buffer(conn, puller, &header->return_code, 1); |
defmacro | 0:a268b80e3093 | 761 | } |
defmacro | 0:a268b80e3093 | 762 | |
defmacro | 0:a268b80e3093 | 763 | mmqtt_status_t |
defmacro | 0:a268b80e3093 | 764 | mmqtt_s_decode_connack_header(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, |
defmacro | 0:a268b80e3093 | 765 | struct mmqtt_p_connack_header *header) |
defmacro | 0:a268b80e3093 | 766 | { |
defmacro | 0:a268b80e3093 | 767 | mmqtt_status_t status; |
defmacro | 0:a268b80e3093 | 768 | |
defmacro | 0:a268b80e3093 | 769 | status = mmqtt_s_decode_buffer(conn, pusher, &header->reserved, 1, 1); |
defmacro | 0:a268b80e3093 | 770 | if (status != MMQTT_STATUS_OK) { return status; } |
defmacro | 0:a268b80e3093 | 771 | |
defmacro | 1:311cd16389ff | 772 | return mmqtt_s_decode_buffer(conn, pusher, &header->return_code, 1, 1); |
defmacro | 0:a268b80e3093 | 773 | } |
defmacro | 0:a268b80e3093 | 774 | |
defmacro | 0:a268b80e3093 | 775 | /* Works for both subscribe and unsubscribe */ |
defmacro | 0:a268b80e3093 | 776 | struct mmqtt_p_subscribe_decode_context { |
defmacro | 0:a268b80e3093 | 777 | uint32_t length; |
defmacro | 0:a268b80e3093 | 778 | uint8_t has_qos; |
defmacro | 0:a268b80e3093 | 779 | }; |
defmacro | 0:a268b80e3093 | 780 | |
defmacro | 0:a268b80e3093 | 781 | struct mmqtt_p_subscribe_payload_line { |
defmacro | 0:a268b80e3093 | 782 | uint8_t *topic; |
defmacro | 0:a268b80e3093 | 783 | uint16_t topic_length; |
defmacro | 0:a268b80e3093 | 784 | uint16_t topic_max_length; |
defmacro | 0:a268b80e3093 | 785 | uint8_t qos; |
defmacro | 0:a268b80e3093 | 786 | }; |
defmacro | 0:a268b80e3093 | 787 | |
defmacro | 0:a268b80e3093 | 788 | void |
defmacro | 0:a268b80e3093 | 789 | mmqtt_s_subscribe_decode_context_init(struct mmqtt_p_subscribe_decode_context *context, |
defmacro | 0:a268b80e3093 | 790 | uint32_t length) |
defmacro | 0:a268b80e3093 | 791 | { |
defmacro | 0:a268b80e3093 | 792 | context->length = length; |
defmacro | 0:a268b80e3093 | 793 | context->has_qos = 1; |
defmacro | 0:a268b80e3093 | 794 | } |
defmacro | 0:a268b80e3093 | 795 | |
defmacro | 0:a268b80e3093 | 796 | void |
defmacro | 0:a268b80e3093 | 797 | mmqtt_s_unsubscribe_decode_context_init(struct mmqtt_p_subscribe_decode_context *context, |
defmacro | 0:a268b80e3093 | 798 | uint32_t length) |
defmacro | 0:a268b80e3093 | 799 | { |
defmacro | 0:a268b80e3093 | 800 | context->length = length; |
defmacro | 0:a268b80e3093 | 801 | context->has_qos = 0; |
defmacro | 0:a268b80e3093 | 802 | } |
defmacro | 0:a268b80e3093 | 803 | |
defmacro | 0:a268b80e3093 | 804 | mmqtt_status_t |
defmacro | 0:a268b80e3093 | 805 | mmqtt_s_decode_subscribe_payload(struct mmqtt_connection *conn, mmqtt_s_pusher pusher, |
defmacro | 0:a268b80e3093 | 806 | struct mmqtt_p_subscribe_decode_context *context, |
defmacro | 0:a268b80e3093 | 807 | struct mmqtt_p_subscribe_payload_line *line) |
defmacro | 0:a268b80e3093 | 808 | { |
defmacro | 0:a268b80e3093 | 809 | mmqtt_status_t status; |
defmacro | 0:a268b80e3093 | 810 | uint16_t string_true_length; |
defmacro | 0:a268b80e3093 | 811 | |
defmacro | 0:a268b80e3093 | 812 | if (context->length <= 0) { return MMQTT_STATUS_DONE; } |
defmacro | 0:a268b80e3093 | 813 | status = mmqtt_s_decode_string(conn, pusher, line->topic, line->topic_max_length, |
defmacro | 0:a268b80e3093 | 814 | &line->topic_length, &string_true_length); |
defmacro | 0:a268b80e3093 | 815 | if (status != MMQTT_STATUS_OK) { return status; } |
defmacro | 0:a268b80e3093 | 816 | |
defmacro | 0:a268b80e3093 | 817 | context->length -= string_true_length; |
defmacro | 0:a268b80e3093 | 818 | context->length -= 2; |
defmacro | 0:a268b80e3093 | 819 | |
defmacro | 0:a268b80e3093 | 820 | if (context->has_qos != 0) { |
defmacro | 0:a268b80e3093 | 821 | status = mmqtt_s_decode_buffer(conn, pusher, &line->qos, 1, 1); |
defmacro | 0:a268b80e3093 | 822 | if (status != MMQTT_STATUS_OK) { return status; } |
defmacro | 0:a268b80e3093 | 823 | context->length -= 1; |
defmacro | 0:a268b80e3093 | 824 | } else { |
defmacro | 0:a268b80e3093 | 825 | line->qos = 0; |
defmacro | 0:a268b80e3093 | 826 | } |
defmacro | 0:a268b80e3093 | 827 | return MMQTT_STATUS_OK; |
defmacro | 0:a268b80e3093 | 828 | } |
defmacro | 0:a268b80e3093 | 829 | |
defmacro | 0:a268b80e3093 | 830 | uint32_t |
defmacro | 0:a268b80e3093 | 831 | mmqtt_s_string_encoded_length(uint16_t string_length) |
defmacro | 0:a268b80e3093 | 832 | { |
defmacro | 0:a268b80e3093 | 833 | return string_length + 2; |
defmacro | 0:a268b80e3093 | 834 | } |
defmacro | 0:a268b80e3093 | 835 | |
defmacro | 0:a268b80e3093 | 836 | uint32_t |
defmacro | 0:a268b80e3093 | 837 | mmqtt_s_connect_header_encoded_length(const struct mmqtt_p_connect_header *header) |
defmacro | 0:a268b80e3093 | 838 | { |
defmacro | 0:a268b80e3093 | 839 | return mmqtt_s_string_encoded_length(header->name_length) + 4; |
defmacro | 0:a268b80e3093 | 840 | } |
defmacro | 0:a268b80e3093 | 841 | |
defmacro | 0:a268b80e3093 | 842 | #endif /* MMQTT_DISABLE_STATIC */ |
defmacro | 0:a268b80e3093 | 843 | |
defmacro | 0:a268b80e3093 | 844 | #endif /* MMQTT_H_ */ |