A small memory footprint AMQP implimentation
Dependents: iothub_client_sample_amqp remote_monitoring simplesample_amqp
Diff: message_sender.c
- Revision:
- 23:1111ee8bcba4
- Parent:
- 21:f9c433d8e6ca
- Child:
- 24:2c59c2d43ebf
--- a/message_sender.c Thu Apr 06 14:11:27 2017 -0700 +++ b/message_sender.c Fri Apr 21 14:50:32 2017 -0700 @@ -91,7 +91,7 @@ } } -static void on_delivery_settled(void* context, delivery_number delivery_no, AMQP_VALUE delivery_state) +static void on_delivery_settled(void* context, delivery_number delivery_no, LINK_DELIVERY_SETTLE_REASON reason, AMQP_VALUE delivery_state) { MESSAGE_WITH_CALLBACK* message_with_callback = (MESSAGE_WITH_CALLBACK*)context; MESSAGE_SENDER_INSTANCE* message_sender_instance = (MESSAGE_SENDER_INSTANCE*)message_with_callback->message_sender; @@ -99,26 +99,39 @@ if (message_with_callback->on_message_send_complete != NULL) { - AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(delivery_state); - if ((descriptor == NULL) && (delivery_state != NULL)) - { - LogError("Error getting descriptor for delivery state"); - } - else + switch (reason) { - MESSAGE_SEND_RESULT message_send_result; - - if ((delivery_state == NULL) || - (is_accepted_type_by_descriptor(descriptor))) + case LINK_DELIVERY_SETTLE_REASON_DISPOSITION_RECEIVED: + if (delivery_state == NULL) { - message_send_result = MESSAGE_SEND_OK; + LogError("delivery state not provided"); } else { - message_send_result = MESSAGE_SEND_ERROR; + AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(delivery_state); + + if (descriptor == NULL) + { + LogError("Error getting descriptor for delivery state"); + } + else if (is_accepted_type_by_descriptor(descriptor)) + { + message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_OK); + } + else + { + message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR); + } } - message_with_callback->on_message_send_complete(message_with_callback->context, message_send_result); + break; + case LINK_DELIVERY_SETTLE_REASON_SETTLED: + message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_OK); + break; + case LINK_DELIVERY_SETTLE_REASON_NOT_DELIVERED: + default: + message_with_callback->on_message_send_complete(message_with_callback->context, MESSAGE_SEND_ERROR); + break; } } @@ -207,12 +220,16 @@ // application properties message_get_application_properties(message, &application_properties); - application_properties_value = amqpvalue_create_application_properties(application_properties); if (application_properties != NULL) { + application_properties_value = amqpvalue_create_application_properties(application_properties); amqpvalue_get_encoded_size(application_properties_value, &encoded_size); total_encoded_size += encoded_size; } + else + { + application_properties_value = NULL; + } result = SEND_ONE_MESSAGE_OK; @@ -298,7 +315,7 @@ { void* data_bytes = malloc(total_encoded_size); PAYLOAD payload; - payload.bytes = data_bytes; + payload.bytes = (const unsigned char*)data_bytes; payload.length = 0; result = SEND_ONE_MESSAGE_OK; @@ -427,10 +444,22 @@ } } - amqpvalue_destroy(application_properties); - amqpvalue_destroy(application_properties_value); - amqpvalue_destroy(properties_amqp_value); - properties_destroy(properties); + if (application_properties != NULL) + { + amqpvalue_destroy(application_properties); + } + if (application_properties_value != NULL) + { + amqpvalue_destroy(application_properties_value); + } + if (properties_amqp_value != NULL) + { + amqpvalue_destroy(properties_amqp_value); + } + if (properties != NULL) + { + properties_destroy(properties); + } } return result; @@ -453,10 +482,10 @@ void* context = message_sender_instance->messages[i]->context; remove_pending_message_by_index(message_sender_instance, i); - if (on_message_send_complete != NULL) - { - on_message_send_complete(context, MESSAGE_SEND_ERROR); - } + if (on_message_send_complete != NULL) + { + on_message_send_complete(context, MESSAGE_SEND_ERROR); + } i = message_sender_instance->message_count; break; @@ -529,8 +558,8 @@ (message_sender_instance->message_sender_state == MESSAGE_SENDER_STATE_CLOSING)) { /* User initiated transition, we should be good */ + set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_IDLE); indicate_all_messages_as_error(message_sender_instance); - set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_IDLE); } else if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_IDLE) { @@ -541,8 +570,8 @@ case LINK_STATE_ERROR: if (message_sender_instance->message_sender_state != MESSAGE_SENDER_STATE_ERROR) { + set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR); indicate_all_messages_as_error(message_sender_instance); - set_message_sender_state(message_sender_instance, MESSAGE_SENDER_STATE_ERROR); } break; } @@ -556,7 +585,7 @@ MESSAGE_SENDER_HANDLE messagesender_create(LINK_HANDLE link, ON_MESSAGE_SENDER_STATE_CHANGED on_message_sender_state_changed, void* context) { - MESSAGE_SENDER_INSTANCE* result = malloc(sizeof(MESSAGE_SENDER_INSTANCE)); + MESSAGE_SENDER_INSTANCE* result = (MESSAGE_SENDER_INSTANCE*)malloc(sizeof(MESSAGE_SENDER_INSTANCE)); if (result != NULL) { result->messages = NULL; @@ -722,7 +751,7 @@ { default: case SEND_ONE_MESSAGE_ERROR: - + remove_pending_message_by_index(message_sender_instance, message_sender_instance->message_count - 1); result = __FAILURE__; break;