Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Diff: wakaama/observe.c
- Revision:
- 3:a280069151ac
- Parent:
- 0:f9d13e09cf11
- Child:
- 10:df97539c6ddd
--- a/wakaama/observe.c Fri Apr 28 09:53:26 2017 +0000 +++ b/wakaama/observe.c Fri Apr 28 18:13:27 2017 +0800 @@ -13,6 +13,7 @@ * Contributors: * David Navarro, Intel Corporation - initial API and implementation * Toby Jaffey - Please refer to git log + * Bosch Software Innovations GmbH - Please refer to git log * *******************************************************************************/ @@ -69,45 +70,6 @@ return targetP; } -static obs_list_t * prv_getObservedList(lwm2m_context_t * contextP, - lwm2m_uri_t * uriP) -{ - obs_list_t * resultP; - lwm2m_observed_t * targetP; - - resultP = NULL; - - targetP = contextP->observedList; - while (targetP != NULL) - { - if (targetP->uri.objectId == uriP->objectId) - { - if (!LWM2M_URI_IS_SET_INSTANCE(uriP) - || (targetP->uri.flag & LWM2M_URI_FLAG_INSTANCE_ID) == 0 - || uriP->instanceId == targetP->uri.instanceId) - { - if (!LWM2M_URI_IS_SET_RESOURCE(uriP) - || (targetP->uri.flag & LWM2M_URI_FLAG_RESOURCE_ID) == 0 - || uriP->resourceId == targetP->uri.resourceId) - { - obs_list_t * newP; - - newP = (obs_list_t *)lwm2m_malloc(sizeof(obs_list_t)); - if (newP != NULL) - { - newP->item = targetP; - newP->next = resultP; - resultP = newP; - } - } - } - } - targetP = targetP->next; - } - - return resultP; -} - static void prv_unlinkObserved(lwm2m_context_t * contextP, lwm2m_observed_t * observedP) { @@ -132,21 +94,6 @@ } } -static lwm2m_server_t * prv_findServer(lwm2m_context_t * contextP, - void * fromSessionH) -{ - lwm2m_server_t * targetP; - - targetP = contextP->serverList; - while (targetP != NULL - && targetP->sessionH != fromSessionH) - { - targetP = targetP->next; - } - - return targetP; -} - static lwm2m_watcher_t * prv_findWatcher(lwm2m_observed_t * observedP, lwm2m_server_t * serverP) { @@ -162,29 +109,22 @@ return targetP; } -coap_status_t handle_observe_request(lwm2m_context_t * contextP, - lwm2m_uri_t * uriP, - void * fromSessionH, - coap_packet_t * message, - coap_packet_t * response) +static lwm2m_watcher_t * prv_getWatcher(lwm2m_context_t * contextP, + lwm2m_uri_t * uriP, + lwm2m_server_t * serverP) { lwm2m_observed_t * observedP; + bool allocatedObserver; lwm2m_watcher_t * watcherP; - lwm2m_server_t * serverP; - - LOG("handle_observe_request()\r\n"); - if (!LWM2M_URI_IS_SET_INSTANCE(uriP) && LWM2M_URI_IS_SET_RESOURCE(uriP)) return COAP_400_BAD_REQUEST; - if (message->token_len == 0) return COAP_400_BAD_REQUEST; - - serverP = prv_findServer(contextP, fromSessionH); - if (serverP == NULL || serverP->status != STATE_REGISTERED) return COAP_401_UNAUTHORIZED; + allocatedObserver = false; observedP = prv_findObserved(contextP, uriP); if (observedP == NULL) { observedP = (lwm2m_observed_t *)lwm2m_malloc(sizeof(lwm2m_observed_t)); - if (observedP == NULL) return COAP_500_INTERNAL_SERVER_ERROR; + if (observedP == NULL) return NULL; + allocatedObserver = true; memset(observedP, 0, sizeof(lwm2m_observed_t)); memcpy(&(observedP->uri), uriP, sizeof(lwm2m_uri_t)); observedP->next = contextP->observedList; @@ -195,27 +135,90 @@ if (watcherP == NULL) { watcherP = (lwm2m_watcher_t *)lwm2m_malloc(sizeof(lwm2m_watcher_t)); - if (watcherP == NULL) return COAP_500_INTERNAL_SERVER_ERROR; + if (watcherP == NULL) + { + if (allocatedObserver == true) + { + lwm2m_free(observedP); + } + return NULL; + } memset(watcherP, 0, sizeof(lwm2m_watcher_t)); + watcherP->active = false; watcherP->server = serverP; - watcherP->tokenLen = message->token_len; - memcpy(watcherP->token, message->token, message->token_len); watcherP->next = observedP->watcherList; observedP->watcherList = watcherP; } - coap_set_header_observe(response, watcherP->counter++); - - return COAP_205_CONTENT; + return watcherP; } -void cancel_observe(lwm2m_context_t * contextP, +coap_status_t observe_handleRequest(lwm2m_context_t * contextP, + lwm2m_uri_t * uriP, + lwm2m_server_t * serverP, + int size, + lwm2m_data_t * dataP, + coap_packet_t * message, + coap_packet_t * response) +{ + lwm2m_watcher_t * watcherP; + uint32_t count; + + LOG_ARG("Code: %02X, server status: %s", message->code, STR_STATUS(serverP->status)); + LOG_URI(uriP); + + coap_get_header_observe(message, &count); + + switch (count) + { + case 0: + if (!LWM2M_URI_IS_SET_INSTANCE(uriP) && LWM2M_URI_IS_SET_RESOURCE(uriP)) return COAP_400_BAD_REQUEST; + if (message->token_len == 0) return COAP_400_BAD_REQUEST; + + watcherP = prv_getWatcher(contextP, uriP, serverP); + if (watcherP == NULL) return COAP_500_INTERNAL_SERVER_ERROR; + + watcherP->tokenLen = message->token_len; + memcpy(watcherP->token, message->token, message->token_len); + watcherP->active = true; + watcherP->lastTime = lwm2m_gettime(); + + if (LWM2M_URI_IS_SET_RESOURCE(uriP)) + { + switch (dataP->type) + { + case LWM2M_TYPE_INTEGER: + if (1 != lwm2m_data_decode_int(dataP, &(watcherP->lastValue.asInteger))) return COAP_500_INTERNAL_SERVER_ERROR; + break; + case LWM2M_TYPE_FLOAT: + if (1 != lwm2m_data_decode_float(dataP, &(watcherP->lastValue.asFloat))) return COAP_500_INTERNAL_SERVER_ERROR; + break; + default: + break; + } + } + + coap_set_header_observe(response, watcherP->counter++); + + return COAP_205_CONTENT; + + case 1: + // cancellation + observe_cancel(contextP, LWM2M_MAX_ID, serverP->sessionH); + return COAP_205_CONTENT; + + default: + return COAP_400_BAD_REQUEST; + } +} + +void observe_cancel(lwm2m_context_t * contextP, uint16_t mid, void * fromSessionH) { lwm2m_observed_t * observedP; - LOG("cancel_observe()\r\n"); + LOG_ARG("mid: %d", mid); for (observedP = contextP->observedList; observedP != NULL; @@ -223,8 +226,8 @@ { lwm2m_watcher_t * targetP = NULL; - if (observedP->watcherList->lastMid == mid - && observedP->watcherList->server->sessionH == fromSessionH) + if ((LWM2M_MAX_ID == mid || observedP->watcherList->lastMid == mid) + && lwm2m_session_is_equal(observedP->watcherList->server->sessionH, fromSessionH, contextP->userData)) { targetP = observedP->watcherList; observedP->watcherList = observedP->watcherList->next; @@ -236,7 +239,7 @@ parentP = observedP->watcherList; while (parentP->next != NULL && (parentP->next->lastMid != mid - || parentP->next->server->sessionH != fromSessionH)) + || lwm2m_session_is_equal(parentP->next->server->sessionH, fromSessionH, contextP->userData))) { parentP = parentP->next; } @@ -259,47 +262,449 @@ } } +coap_status_t observe_setParameters(lwm2m_context_t * contextP, + lwm2m_uri_t * uriP, + lwm2m_server_t * serverP, + lwm2m_attributes_t * attrP) +{ + uint8_t result; + lwm2m_watcher_t * watcherP; + + LOG_URI(uriP); + LOG_ARG("toSet: %08X, toClear: %08X, minPeriod: %d, maxPeriod: %d, greaterThan: %f, lessThan: %f, step: %f", + attrP->toSet, attrP->toClear, attrP->minPeriod, attrP->maxPeriod, attrP->greaterThan, attrP->lessThan, attrP->step); + + if (!LWM2M_URI_IS_SET_INSTANCE(uriP) && LWM2M_URI_IS_SET_RESOURCE(uriP)) return COAP_400_BAD_REQUEST; + + result = object_checkReadable(contextP, uriP); + if (COAP_205_CONTENT != result) return result; + + if (0 != (attrP->toSet & ATTR_FLAG_NUMERIC)) + { + result = object_checkNumeric(contextP, uriP); + if (COAP_205_CONTENT != result) return result; + } + + watcherP = prv_getWatcher(contextP, uriP, serverP); + if (watcherP == NULL) return COAP_500_INTERNAL_SERVER_ERROR; + + // Check rule ¡°lt¡± value + 2*¡±stp¡± values < ¡°gt¡± value + if ((((attrP->toSet | (watcherP->parameters?watcherP->parameters->toSet:0)) & ~attrP->toClear) & ATTR_FLAG_NUMERIC) == ATTR_FLAG_NUMERIC) + { + float gt; + float lt; + float stp; + + if (0 != (attrP->toSet & LWM2M_ATTR_FLAG_GREATER_THAN)) + { + gt = attrP->greaterThan; + } + else + { + gt = watcherP->parameters->greaterThan; + } + if (0 != (attrP->toSet & LWM2M_ATTR_FLAG_LESS_THAN)) + { + lt = attrP->lessThan; + } + else + { + lt = watcherP->parameters->lessThan; + } + if (0 != (attrP->toSet & LWM2M_ATTR_FLAG_STEP)) + { + stp = attrP->step; + } + else + { + stp = watcherP->parameters->step; + } + + if (lt + (2 * stp) >= gt) return COAP_400_BAD_REQUEST; + } + + if (watcherP->parameters == NULL) + { + if (attrP->toSet != 0) + { + watcherP->parameters = (lwm2m_attributes_t *)lwm2m_malloc(sizeof(lwm2m_attributes_t)); + if (watcherP->parameters == NULL) return COAP_500_INTERNAL_SERVER_ERROR; + memcpy(watcherP->parameters, attrP, sizeof(lwm2m_attributes_t)); + } + } + else + { + watcherP->parameters->toSet &= ~attrP->toClear; + if (attrP->toSet & LWM2M_ATTR_FLAG_MIN_PERIOD) + { + watcherP->parameters->minPeriod = attrP->minPeriod; + } + if (attrP->toSet & LWM2M_ATTR_FLAG_MAX_PERIOD) + { + watcherP->parameters->maxPeriod = attrP->maxPeriod; + } + if (attrP->toSet & LWM2M_ATTR_FLAG_GREATER_THAN) + { + watcherP->parameters->greaterThan = attrP->greaterThan; + } + if (attrP->toSet & LWM2M_ATTR_FLAG_LESS_THAN) + { + watcherP->parameters->lessThan = attrP->lessThan; + } + if (attrP->toSet & LWM2M_ATTR_FLAG_STEP) + { + watcherP->parameters->step = attrP->step; + } + } + + LOG_ARG("Final toSet: %08X, minPeriod: %d, maxPeriod: %d, greaterThan: %f, lessThan: %f, step: %f", + watcherP->parameters->toSet, watcherP->parameters->minPeriod, watcherP->parameters->maxPeriod, watcherP->parameters->greaterThan, watcherP->parameters->lessThan, watcherP->parameters->step); + + return COAP_204_CHANGED; +} + +lwm2m_observed_t * observe_findByUri(lwm2m_context_t * contextP, + lwm2m_uri_t * uriP) +{ + lwm2m_observed_t * targetP; + + LOG_URI(uriP); + targetP = contextP->observedList; + while (targetP != NULL) + { + if (targetP->uri.objectId == uriP->objectId) + { + if ((!LWM2M_URI_IS_SET_INSTANCE(uriP) && !LWM2M_URI_IS_SET_INSTANCE(&(targetP->uri))) + || (LWM2M_URI_IS_SET_INSTANCE(uriP) && LWM2M_URI_IS_SET_INSTANCE(&(targetP->uri)) && (uriP->instanceId == targetP->uri.instanceId))) + { + if ((!LWM2M_URI_IS_SET_RESOURCE(uriP) && !LWM2M_URI_IS_SET_RESOURCE(&(targetP->uri))) + || (LWM2M_URI_IS_SET_RESOURCE(uriP) && LWM2M_URI_IS_SET_RESOURCE(&(targetP->uri)) && (uriP->resourceId == targetP->uri.resourceId))) + { + LOG_ARG("Found one with%s observers.", targetP->watcherList ? "" : " no"); + LOG_URI(&(targetP->uri)); + return targetP; + } + } + } + targetP = targetP->next; + } + + LOG("Found nothing"); + return NULL; +} + void lwm2m_resource_value_changed(lwm2m_context_t * contextP, lwm2m_uri_t * uriP) { - int result; - obs_list_t * listP; - lwm2m_watcher_t * watcherP; + lwm2m_observed_t * targetP; + + LOG_URI(uriP); + targetP = contextP->observedList; + while (targetP != NULL) + { + if (targetP->uri.objectId == uriP->objectId) + { + if (!LWM2M_URI_IS_SET_INSTANCE(uriP) + || (targetP->uri.flag & LWM2M_URI_FLAG_INSTANCE_ID) == 0 + || uriP->instanceId == targetP->uri.instanceId) + { + if (!LWM2M_URI_IS_SET_RESOURCE(uriP) + || (targetP->uri.flag & LWM2M_URI_FLAG_RESOURCE_ID) == 0 + || uriP->resourceId == targetP->uri.resourceId) + { + lwm2m_watcher_t * watcherP; - listP = prv_getObservedList(contextP, uriP); - while (listP != NULL) - { - obs_list_t * targetP; - char * buffer = NULL; - int length = 0; + LOG("Found an observation"); + LOG_URI(&(targetP->uri)); + + for (watcherP = targetP->watcherList ; watcherP != NULL ; watcherP = watcherP->next) + { + if (watcherP->active == true) + { + LOG("Tagging a watcher"); + watcherP->update = true; + } + } + } + } + } + targetP = targetP->next; + } +} - result = object_read(contextP, &listP->item->uri, &buffer, &length); - if (result == COAP_205_CONTENT) - { - coap_packet_t message[1]; - - coap_init_message(message, COAP_TYPE_NON, COAP_204_CHANGED, 0); - coap_set_payload(message, buffer, length); +void observe_step(lwm2m_context_t * contextP, + time_t currentTime, + time_t * timeoutP) +{ + lwm2m_observed_t * targetP; - for (watcherP = listP->item->watcherList ; watcherP != NULL ; watcherP = watcherP->next) + LOG("Entering"); + for (targetP = contextP->observedList ; targetP != NULL ; targetP = targetP->next) + { + lwm2m_watcher_t * watcherP; + uint8_t * buffer = NULL; + size_t length = 0; + lwm2m_data_t * dataP = NULL; + int size = 0; + double floatValue = 0; + int64_t integerValue = 0; + bool storeValue = false; + lwm2m_media_type_t format = LWM2M_CONTENT_TEXT; + coap_packet_t message[1]; + time_t interval; + + LOG_URI(&(targetP->uri)); + if (LWM2M_URI_IS_SET_RESOURCE(&targetP->uri)) + { + if (COAP_205_CONTENT != object_readData(contextP, &targetP->uri, &size, &dataP)) continue; + switch (dataP->type) { - watcherP->lastMid = contextP->nextMID++; - message->mid = watcherP->lastMid; - coap_set_header_token(message, watcherP->token, watcherP->tokenLen); - coap_set_header_observe(message, watcherP->counter++); - (void)message_send(contextP, message, watcherP->server->sessionH); + case LWM2M_TYPE_INTEGER: + if (1 != lwm2m_data_decode_int(dataP, &integerValue)) continue; + storeValue = true; + break; + case LWM2M_TYPE_FLOAT: + if (1 != lwm2m_data_decode_float(dataP, &floatValue)) continue; + storeValue = true; + break; + default: + break; } } + for (watcherP = targetP->watcherList ; watcherP != NULL ; watcherP = watcherP->next) + { + if (watcherP->active == true) + { + bool notify = false; - targetP = listP; - listP = listP->next; - lwm2m_free(targetP); + if (watcherP->update == true) + { + // value changed, should we notify the server ? + + if (watcherP->parameters == NULL || watcherP->parameters->toSet == 0) + { + // no conditions + notify = true; + LOG("Notify with no conditions"); + LOG_URI(&(targetP->uri)); + } + + if (notify == false + && watcherP->parameters != NULL + && (watcherP->parameters->toSet & ATTR_FLAG_NUMERIC) != 0) + { + if ((watcherP->parameters->toSet & LWM2M_ATTR_FLAG_LESS_THAN) != 0) + { + LOG("Checking lower treshold"); + // Did we cross the lower treshold ? + switch (dataP->type) + { + case LWM2M_TYPE_INTEGER: + if ((integerValue <= watcherP->parameters->lessThan + && watcherP->lastValue.asInteger > watcherP->parameters->lessThan) + || (integerValue >= watcherP->parameters->lessThan + && watcherP->lastValue.asInteger < watcherP->parameters->lessThan)) + { + LOG("Notify on lower treshold crossing"); + notify = true; + } + break; + case LWM2M_TYPE_FLOAT: + if ((floatValue <= watcherP->parameters->lessThan + && watcherP->lastValue.asFloat > watcherP->parameters->lessThan) + || (floatValue >= watcherP->parameters->lessThan + && watcherP->lastValue.asFloat < watcherP->parameters->lessThan)) + { + LOG("Notify on lower treshold crossing"); + notify = true; + } + break; + default: + break; + } + } + if ((watcherP->parameters->toSet & LWM2M_ATTR_FLAG_GREATER_THAN) != 0) + { + LOG("Checking upper treshold"); + // Did we cross the upper treshold ? + switch (dataP->type) + { + case LWM2M_TYPE_INTEGER: + if ((integerValue <= watcherP->parameters->greaterThan + && watcherP->lastValue.asInteger > watcherP->parameters->greaterThan) + || (integerValue >= watcherP->parameters->greaterThan + && watcherP->lastValue.asInteger < watcherP->parameters->greaterThan)) + { + LOG("Notify on lower upper crossing"); + notify = true; + } + break; + case LWM2M_TYPE_FLOAT: + if ((floatValue <= watcherP->parameters->greaterThan + && watcherP->lastValue.asFloat > watcherP->parameters->greaterThan) + || (floatValue >= watcherP->parameters->greaterThan + && watcherP->lastValue.asFloat < watcherP->parameters->greaterThan)) + { + LOG("Notify on lower upper crossing"); + notify = true; + } + break; + default: + break; + } + } + if ((watcherP->parameters->toSet & LWM2M_ATTR_FLAG_STEP) != 0) + { + LOG("Checking step"); + + switch (dataP->type) + { + case LWM2M_TYPE_INTEGER: + { + int64_t diff; + + diff = integerValue - watcherP->lastValue.asInteger; + if ((diff < 0 && (0 - diff) >= watcherP->parameters->step) + || (diff >= 0 && diff >= watcherP->parameters->step)) + { + LOG("Notify on step condition"); + notify = true; + } + } + break; + case LWM2M_TYPE_FLOAT: + { + double diff; + + diff = floatValue - watcherP->lastValue.asFloat; + if ((diff < 0 && (0 - diff) >= watcherP->parameters->step) + || (diff >= 0 && diff >= watcherP->parameters->step)) + { + LOG("Notify on step condition"); + notify = true; + } + } + break; + default: + break; + } + } + } + + if (watcherP->parameters != NULL + && (watcherP->parameters->toSet & LWM2M_ATTR_FLAG_MIN_PERIOD) != 0) + { + LOG_ARG("Checking minimal period (%d s)", watcherP->parameters->minPeriod); + + if (watcherP->lastTime + watcherP->parameters->minPeriod > currentTime) + { + // Minimum Period did not elapse yet + interval = watcherP->lastTime + watcherP->parameters->minPeriod - currentTime; + if (*timeoutP > interval) *timeoutP = interval; + notify = false; + } + else + { + LOG("Notify on minimal period"); + notify = true; + } + } + } + + // Is the Maximum Period reached ? + if (notify == false + && watcherP->parameters != NULL + && (watcherP->parameters->toSet & LWM2M_ATTR_FLAG_MAX_PERIOD) != 0) + { + LOG_ARG("Checking maximal period (%d s)", watcherP->parameters->minPeriod); + + if (watcherP->lastTime + watcherP->parameters->maxPeriod <= currentTime) + { + LOG("Notify on maximal period"); + notify = true; + } + } + + if (notify == true) + { + if (buffer == NULL) + { + if (dataP != NULL) + { + int res; + + res = lwm2m_data_serialize(&targetP->uri, size, dataP, &format, &buffer); + if (res < 0) + { + break; + } + else + { + length = (size_t)res; + } + + } + else + { + if (COAP_205_CONTENT != object_read(contextP, &targetP->uri, &format, &buffer, &length)) + { + buffer = NULL; + break; + } + } + coap_init_message(message, COAP_TYPE_NON, COAP_205_CONTENT, 0); + coap_set_header_content_type(message, format); + coap_set_payload(message, buffer, length); + } + watcherP->lastTime = currentTime; + watcherP->lastMid = contextP->nextMID++; + message->mid = watcherP->lastMid; + coap_set_header_token(message, watcherP->token, watcherP->tokenLen); + coap_set_header_observe(message, watcherP->counter++); + (void)message_send(contextP, message, watcherP->server->sessionH); + watcherP->update = false; + } + + // Store this value + if (notify == true && storeValue == true) + { + switch (dataP->type) + { + case LWM2M_TYPE_INTEGER: + watcherP->lastValue.asInteger = integerValue; + break; + case LWM2M_TYPE_FLOAT: + watcherP->lastValue.asFloat = floatValue; + break; + default: + break; + } + } + + if (watcherP->parameters != NULL && (watcherP->parameters->toSet & LWM2M_ATTR_FLAG_MAX_PERIOD) != 0) + { + // update timers + interval = watcherP->lastTime + watcherP->parameters->maxPeriod - currentTime; + if (*timeoutP > interval) *timeoutP = interval; + } + } + } + if (dataP != NULL) lwm2m_data_free(size, dataP); + if (buffer != NULL) lwm2m_free(buffer); } +} -} #endif #ifdef LWM2M_SERVER_MODE + +typedef struct +{ + lwm2m_observation_t * observationP; + lwm2m_result_callback_t callbackP; + void * userDataP; +} cancellation_data_t; + static lwm2m_observation_t * prv_findObservationByURI(lwm2m_client_t * clientP, lwm2m_uri_t * uriP) { @@ -322,30 +727,10 @@ return targetP; } -void observation_remove(lwm2m_client_t * clientP, - lwm2m_observation_t * observationP) +void observe_remove(lwm2m_observation_t * observationP) { - if (clientP->observationList == observationP) - { - clientP->observationList = clientP->observationList->next; - } - else if (clientP->observationList != NULL) - { - lwm2m_observation_t * parentP; - - parentP = clientP->observationList; - - while (parentP->next != NULL - && parentP->next != observationP) - { - parentP = parentP->next; - } - if (parentP->next != NULL) - { - parentP->next = parentP->next->next; - } - } - + LOG("Entering"); + observationP->clientP->observationList = (lwm2m_observation_t *) LWM2M_LIST_RM(observationP->clientP->observationList, observationP->id, NULL); lwm2m_free(observationP); } @@ -356,6 +741,21 @@ coap_packet_t * packet = (coap_packet_t *)message; uint8_t code; + switch (observationP->status) + { + case STATE_DEREG_PENDING: + // Observation was canceled by the user. + observe_remove(observationP); + return; + + case STATE_REG_PENDING: + observationP->status = STATE_REGISTERED; + break; + + default: + break; + } + if (message == NULL) { code = COAP_503_SERVICE_UNAVAILABLE; @@ -372,24 +772,63 @@ if (code != COAP_205_CONTENT) { - observationP->callback(((lwm2m_client_t*)transacP->peerP)->internalID, + observationP->callback(observationP->clientP->internalID, &observationP->uri, code, - NULL, 0, + LWM2M_CONTENT_TEXT, NULL, 0, observationP->userData); - observation_remove(((lwm2m_client_t*)transacP->peerP), observationP); + observe_remove(observationP); } else { - observationP->clientP->observationList = (lwm2m_observation_t *)LWM2M_LIST_ADD(observationP->clientP->observationList, observationP); - observationP->callback(((lwm2m_client_t*)transacP->peerP)->internalID, + observationP->callback(observationP->clientP->internalID, &observationP->uri, 0, - packet->payload, packet->payload_len, + packet->content_type, packet->payload, packet->payload_len, observationP->userData); } } + +static void prv_obsCancelRequestCallback(lwm2m_transaction_t * transacP, + void * message) +{ + cancellation_data_t * cancelP = (cancellation_data_t *)transacP->userData; + coap_packet_t * packet = (coap_packet_t *)message; + uint8_t code; + + if (message == NULL) + { + code = COAP_503_SERVICE_UNAVAILABLE; + } + else + { + code = packet->code; + } + + if (code != COAP_205_CONTENT) + { + cancelP->callbackP(cancelP->observationP->clientP->internalID, + &cancelP->observationP->uri, + code, + LWM2M_CONTENT_TEXT, NULL, 0, + cancelP->userDataP); + } + else + { + cancelP->callbackP(cancelP->observationP->clientP->internalID, + &cancelP->observationP->uri, + 0, + packet->content_type, packet->payload, packet->payload_len, + cancelP->userDataP); + } + + observe_remove(cancelP->observationP); + + lwm2m_free(cancelP); +} + + int lwm2m_observe(lwm2m_context_t * contextP, uint16_t clientID, lwm2m_uri_t * uriP, @@ -401,6 +840,9 @@ lwm2m_observation_t * observationP; uint8_t token[4]; + LOG_ARG("clientID: %d", clientID); + LOG_URI(uriP); + if (!LWM2M_URI_IS_SET_INSTANCE(uriP) && LWM2M_URI_IS_SET_RESOURCE(uriP)) return COAP_400_BAD_REQUEST; clientP = (lwm2m_client_t *)lwm2m_list_find((lwm2m_list_t *)contextP->clientList, clientID); @@ -410,16 +852,10 @@ if (observationP == NULL) return COAP_500_INTERNAL_SERVER_ERROR; memset(observationP, 0, sizeof(lwm2m_observation_t)); - transactionP = transaction_new(COAP_GET, uriP, contextP->nextMID++, ENDPOINT_CLIENT, (void *)clientP); - if (transactionP == NULL) - { - lwm2m_free(observationP); - return COAP_500_INTERNAL_SERVER_ERROR; - } - observationP->id = lwm2m_list_newId((lwm2m_list_t *)clientP->observationList); memcpy(&observationP->uri, uriP, sizeof(lwm2m_uri_t)); observationP->clientP = clientP; + observationP->status = STATE_REG_PENDING; observationP->callback = callback; observationP->userData = userData; @@ -428,6 +864,15 @@ token[2] = observationP->id >> 8; token[3] = observationP->id & 0xFF; + transactionP = transaction_new(clientP->sessionH, COAP_GET, clientP->altPath, uriP, contextP->nextMID++, 4, token); + if (transactionP == NULL) + { + lwm2m_free(observationP); + return COAP_500_INTERNAL_SERVER_ERROR; + } + + observationP->clientP->observationList = (lwm2m_observation_t *)LWM2M_LIST_ADD(observationP->clientP->observationList, observationP); + coap_set_header_observe(transactionP->message, 0); coap_set_header_token(transactionP->message, token, sizeof(token)); @@ -448,20 +893,64 @@ lwm2m_client_t * clientP; lwm2m_observation_t * observationP; + LOG_ARG("clientID: %d", clientID); + LOG_URI(uriP); + clientP = (lwm2m_client_t *)lwm2m_list_find((lwm2m_list_t *)contextP->clientList, clientID); if (clientP == NULL) return COAP_404_NOT_FOUND; observationP = prv_findObservationByURI(clientP, uriP); if (observationP == NULL) return COAP_404_NOT_FOUND; - observation_remove(clientP, observationP); + switch (observationP->status) + { + case STATE_REGISTERED: + { + lwm2m_transaction_t * transactionP; + cancellation_data_t * cancelP; + + transactionP = transaction_new(clientP->sessionH, COAP_GET, clientP->altPath, uriP, contextP->nextMID++, 0, NULL); + if (transactionP == NULL) + { + return COAP_500_INTERNAL_SERVER_ERROR; + } + cancelP = (cancellation_data_t *)lwm2m_malloc(sizeof(cancellation_data_t)); + if (cancelP == NULL) + { + lwm2m_free(transactionP); + return COAP_500_INTERNAL_SERVER_ERROR; + } + + coap_set_header_observe(transactionP->message, 1); - return 0; + cancelP->observationP = observationP; + cancelP->callbackP = callback; + cancelP->userDataP = userData; + + transactionP->callback = prv_obsCancelRequestCallback; + transactionP->userData = (void *)cancelP; + + contextP->transactionList = (lwm2m_transaction_t *)LWM2M_LIST_ADD(contextP->transactionList, transactionP); + + return transaction_send(contextP, transactionP); + } + + case STATE_REG_PENDING: + observationP->status = STATE_DEREG_PENDING; + break; + + default: + // Should not happen + break; + } + + return COAP_NO_ERROR; } -void handle_observe_notify(lwm2m_context_t * contextP, +bool observe_handleNotify(lwm2m_context_t * contextP, void * fromSessionH, - coap_packet_t * message) + coap_packet_t * message, + coap_packet_t * response) { uint8_t * tokenP; int token_len; @@ -471,33 +960,36 @@ lwm2m_observation_t * observationP; uint32_t count; + LOG("Entering"); token_len = coap_get_header_token(message, (const uint8_t **)&tokenP); - if (token_len != sizeof(uint32_t)) return; + if (token_len != sizeof(uint32_t)) return false; - if (1 != coap_get_header_observe(message, &count)) return; + if (1 != coap_get_header_observe(message, &count)) return false; clientID = (tokenP[0] << 8) | tokenP[1]; obsID = (tokenP[2] << 8) | tokenP[3]; clientP = (lwm2m_client_t *)lwm2m_list_find((lwm2m_list_t *)contextP->clientList, clientID); - if (clientP == NULL) return; + if (clientP == NULL) return false; observationP = (lwm2m_observation_t *)lwm2m_list_find((lwm2m_list_t *)clientP->observationList, obsID); if (observationP == NULL) { - coap_packet_t resetMsg; - - coap_init_message(&resetMsg, COAP_TYPE_RST, 0, message->mid); - - message_send(contextP, &resetMsg, fromSessionH); + coap_init_message(response, COAP_TYPE_RST, 0, message->mid); + message_send(contextP, response, fromSessionH); } else { + if (message->type == COAP_TYPE_CON ) { + coap_init_message(response, COAP_TYPE_ACK, 0, message->mid); + message_send(contextP, response, fromSessionH); + } observationP->callback(clientID, &observationP->uri, (int)count, - message->payload, message->payload_len, + message->content_type, message->payload, message->payload_len, observationP->userData); } + return true; } #endif