A small memory footprint AMQP implimentation
Dependents: iothub_client_sample_amqp remote_monitoring simplesample_amqp
Diff: message_sender.c
- Revision:
- 46:01f7ca900e07
- Parent:
- 45:83b4eda4891c
- Child:
- 47:365a93fdb5bb
diff -r 83b4eda4891c -r 01f7ca900e07 message_sender.c --- a/message_sender.c Thu Jul 12 18:09:41 2018 -0700 +++ b/message_sender.c Tue Sep 11 11:13:43 2018 -0700 @@ -119,6 +119,7 @@ else { AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(delivery_state); + AMQP_VALUE described = amqpvalue_get_inplace_described_value(delivery_state); if (descriptor == NULL) { @@ -126,24 +127,24 @@ } else if (is_accepted_type_by_descriptor(descriptor)) { - message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_OK); + message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_OK, described); } else { - message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR); + message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR, described); } } break; case LINK_DELIVERY_SETTLE_REASON_SETTLED: - message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_OK); + message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_OK, NULL); break; case LINK_DELIVERY_SETTLE_REASON_TIMEOUT: - message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_TIMEOUT); + message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_TIMEOUT, NULL); break; case LINK_DELIVERY_SETTLE_REASON_NOT_DELIVERED: default: - message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR); + message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR, NULL); break; } } @@ -360,38 +361,46 @@ } else { - for (i = 0; i < body_data_count; i++) + if (body_data_count == 0) { - if (message_get_body_amqp_data_in_place(message, i, &binary_data) != 0) - { - LogError("Cannot get body AMQP data %u", (unsigned int)i); - result = SEND_ONE_MESSAGE_ERROR; - } - else + LogError("Body data count is zero"); + result = SEND_ONE_MESSAGE_ERROR; + } + else + { + for (i = 0; i < body_data_count; i++) { - AMQP_VALUE body_amqp_data; - amqp_binary binary_value; - binary_value.bytes = binary_data.bytes; - binary_value.length = (uint32_t)binary_data.length; - body_amqp_data = amqpvalue_create_data(binary_value); - if (body_amqp_data == NULL) + if (message_get_body_amqp_data_in_place(message, i, &binary_data) != 0) { - LogError("Cannot create body AMQP data"); + LogError("Cannot get body AMQP data %u", (unsigned int)i); result = SEND_ONE_MESSAGE_ERROR; } else { - if (amqpvalue_get_encoded_size(body_amqp_data, &encoded_size) != 0) + AMQP_VALUE body_amqp_data; + amqp_binary binary_value; + binary_value.bytes = binary_data.bytes; + binary_value.length = (uint32_t)binary_data.length; + body_amqp_data = amqpvalue_create_data(binary_value); + if (body_amqp_data == NULL) { - LogError("Cannot get body AMQP data encoded size"); + LogError("Cannot create body AMQP data"); result = SEND_ONE_MESSAGE_ERROR; } else { - total_encoded_size += encoded_size; + if (amqpvalue_get_encoded_size(body_amqp_data, &encoded_size) != 0) + { + LogError("Cannot get body AMQP data encoded size"); + result = SEND_ONE_MESSAGE_ERROR; + } + else + { + total_encoded_size += encoded_size; + } + + amqpvalue_destroy(body_amqp_data); } - - amqpvalue_destroy(body_amqp_data); } } } @@ -520,7 +529,7 @@ LINK_TRANSFER_RESULT link_transfer_error; MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, pending_send); message_with_callback->message_send_state = MESSAGE_SEND_STATE_PENDING; - + transfer_async_operation = link_transfer_async(message_sender->link, message_format, &payload, 1, on_delivery_settled, pending_send, &link_transfer_error, message_with_callback->timeout); if (transfer_async_operation == NULL) { @@ -612,7 +621,7 @@ if (on_message_send_complete != NULL) { - on_message_send_complete(context, MESSAGE_SEND_ERROR); + on_message_send_complete(context, MESSAGE_SEND_ERROR, NULL); } i = message_sender->message_count; @@ -650,7 +659,7 @@ MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, message_sender->messages[i]); if (message_with_callback->on_message_send_complete != NULL) { - message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR); + message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR, NULL); } if (message_with_callback->message != NULL) @@ -689,9 +698,9 @@ if ((message_sender->message_sender_state == MESSAGE_SENDER_STATE_OPEN) || (message_sender->message_sender_state == MESSAGE_SENDER_STATE_CLOSING)) { - /* User initiated transition, we should be good */ + /* switch to closing so that no more requests should be accepted */ + indicate_all_messages_as_error(message_sender); set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_IDLE); - indicate_all_messages_as_error(message_sender); } else if (message_sender->message_sender_state != MESSAGE_SENDER_STATE_IDLE) { @@ -702,8 +711,8 @@ case LINK_STATE_ERROR: if (message_sender->message_sender_state != MESSAGE_SENDER_STATE_ERROR) { + indicate_all_messages_as_error(message_sender); set_message_sender_state(message_sender, MESSAGE_SENDER_STATE_ERROR); - indicate_all_messages_as_error(message_sender); } break; } @@ -744,8 +753,8 @@ } else { + indicate_all_messages_as_error(message_sender); (void)messagesender_close(message_sender); - indicate_all_messages_as_error(message_sender); free(message_sender); } @@ -825,7 +834,7 @@ MESSAGE_WITH_CALLBACK* message_with_callback = GET_ASYNC_OPERATION_CONTEXT(MESSAGE_WITH_CALLBACK, send_operation); if (message_with_callback->on_message_send_complete != NULL) { - message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_CANCELLED); + message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_CANCELLED, NULL); } remove_pending_message(message_with_callback->message_sender, send_operation);