Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
connector_msg.h
00001 /* 00002 * Copyright (c) 2013 Digi International Inc., 00003 * All rights not expressly granted are reserved. 00004 * 00005 * This Source Code Form is subject to the terms of the Mozilla Public 00006 * License, v. 2.0. If a copy of the MPL was not distributed with this file, 00007 * You can obtain one at http://mozilla.org/MPL/2.0/. 00008 * 00009 * Digi International Inc. 11001 Bren Road East, Minnetonka, MN 55343 00010 * ======================================================================= 00011 */ 00012 00013 #if (defined CONNECTOR_COMPRESSION) 00014 #include "zlib.h" 00015 #endif 00016 00017 #define MSG_FACILITY_VERSION 0x01 00018 #define MSG_COMPRESSION_NONE 0x00 00019 #define MSG_COMPRESSION_LIBZ 0xFF 00020 00021 #define MSG_INVALID_CLIENT_SESSION 0xFFFF 00022 00023 #define MSG_FLAG_REQUEST UINT32_C(0x01) 00024 #define MSG_FLAG_LAST_DATA UINT32_C(0x02) 00025 #define MSG_FLAG_SENDER UINT32_C(0x04) 00026 00027 #define MSG_FLAG_CLIENT_OWNED UINT32_C(0x20) 00028 #define MSG_FLAG_RECEIVING UINT32_C(0x40) 00029 #define MSG_FLAG_START UINT32_C(0x80) 00030 #define MSG_FLAG_ACK_PENDING UINT32_C(0x100) 00031 #define MSG_FLAG_COMPRESSED UINT32_C(0x200) 00032 #define MSG_FLAG_INFLATED UINT32_C(0x400) 00033 #define MSG_FLAG_DEFLATED UINT32_C(0x800) 00034 #define MSG_FLAG_SEND_NOW UINT32_C(0x1000) 00035 #define MSG_FLAG_DOUBLE_BUF UINT32_C(0x2000) 00036 00037 #define MsgIsBitSet(flag, bit) (connector_bool(((flag) & (bit)) == (bit))) 00038 #define MsgIsBitClear(flag, bit) (connector_bool(((flag) & (bit)) == 0)) 00039 #define MsgBitSet(flag, bit) ((flag) |= (bit)) 00040 #define MsgBitClear(flag, bit) ((flag) &= ~(bit)) 00041 00042 #define MsgIsRequest(flag) MsgIsBitSet((flag), MSG_FLAG_REQUEST) 00043 #define MsgIsLastData(flag) MsgIsBitSet((flag), MSG_FLAG_LAST_DATA) 00044 #define MsgIsSender(flag) MsgIsBitSet((flag), MSG_FLAG_SENDER) 00045 #define MsgIsReceiving(flag) MsgIsBitSet((flag), MSG_FLAG_RECEIVING) 00046 #define MsgIsCompressed(flag) MsgIsBitSet((flag), MSG_FLAG_COMPRESSED) 00047 #define MsgIsStart(flag) MsgIsBitSet((flag), MSG_FLAG_START) 00048 #define MsgIsAckPending(flag) MsgIsBitSet((flag), MSG_FLAG_ACK_PENDING) 00049 #define MsgIsClientOwned(flag) MsgIsBitSet((flag), MSG_FLAG_CLIENT_OWNED) 00050 #define MsgIsInflated(flag) MsgIsBitSet((flag), MSG_FLAG_INFLATED) 00051 #define MsgIsDeflated(flag) MsgIsBitSet((flag), MSG_FLAG_DEFLATED) 00052 #define MsgIsSendNow(flag) MsgIsBitSet((flag), MSG_FLAG_SEND_NOW) 00053 #define MsgIsDoubleBuf(flag) MsgIsBitSet((flag), MSG_FLAG_DOUBLE_BUF) 00054 00055 #define MsgIsNotRequest(flag) MsgIsBitClear((flag), MSG_FLAG_REQUEST) 00056 #define MsgIsNotLastData(flag) MsgIsBitClear((flag), MSG_FLAG_LAST_DATA) 00057 #define MsgIsNotSender(flag) MsgIsBitClear((flag), MSG_FLAG_SENDER) 00058 #define MsgIsNotReceiving(flag) MsgIsBitClear((flag), MSG_FLAG_RECEIVING) 00059 #define MsgIsNotClientOwned(flag) MsgIsBitClear((flag), MSG_FLAG_CLIENT_OWNED) 00060 #define MsgIsNotInflated(flag) MsgIsBitClear((flag), MSG_FLAG_INFLATED) 00061 #define MsgIsNotDeflated(flag) MsgIsBitClear((flag), MSG_FLAG_DEFLATED) 00062 00063 #define MsgSetRequest(flag) MsgBitSet((flag), MSG_FLAG_REQUEST) 00064 #define MsgSetLastData(flag) MsgBitSet((flag), MSG_FLAG_LAST_DATA) 00065 #define MsgSetSender(flag) MsgBitSet((flag), MSG_FLAG_SENDER) 00066 #define MsgSetReceiving(flag) MsgBitSet((flag), MSG_FLAG_RECEIVING) 00067 #define MsgSetCompression(flag) MsgBitSet((flag), MSG_FLAG_COMPRESSED) 00068 #define MsgSetStart(flag) MsgBitSet((flag), MSG_FLAG_START) 00069 #define MsgSetAckPending(flag) MsgBitSet((flag), MSG_FLAG_ACK_PENDING) 00070 #define MsgSetClientOwned(flag) MsgBitSet((flag), MSG_FLAG_CLIENT_OWNED) 00071 #define MsgSetInflated(flag) MsgBitSet((flag), MSG_FLAG_INFLATED) 00072 #define MsgSetDeflated(flag) MsgBitSet((flag), MSG_FLAG_DEFLATED) 00073 #define MsgSetSendNow(flag) MsgBitSet((flag), MSG_FLAG_SEND_NOW) 00074 #define MsgSetDoubleBuf(flag) MsgBitSet((flag), MSG_FLAG_DOUBLE_BUF) 00075 00076 #define MsgClearRequest(flag) MsgBitClear((flag), MSG_FLAG_REQUEST) 00077 #define MsgClearLastData(flag) MsgBitClear((flag), MSG_FLAG_LAST_DATA) 00078 #define MsgClearReceiving(flag) MsgBitClear((flag), MSG_FLAG_RECEIVING) 00079 #define MsgClearStart(flag) MsgBitClear((flag), MSG_FLAG_START) 00080 #define MsgClearAckPending(flag) MsgBitClear((flag), MSG_FLAG_ACK_PENDING) 00081 #define MsgClearCompression(flag) MsgBitClear((flag), MSG_FLAG_COMPRESSED) 00082 #define MsgClearInflated(flag) MsgBitClear((flag), MSG_FLAG_INFLATED) 00083 #define MsgClearDeflated(flag) MsgBitClear((flag), MSG_FLAG_DEFLATED) 00084 #define MsgClearSendNow(flag) MsgBitClear((flag), MSG_FLAG_SEND_NOW) 00085 00086 typedef enum 00087 { 00088 msg_service_id_none, 00089 msg_service_id_data, 00090 msg_service_id_file, 00091 msg_service_id_rci, 00092 msg_service_id_brci, 00093 msg_service_id_count 00094 } msg_service_id_t; 00095 00096 typedef enum 00097 { 00098 msg_opcode_capability, 00099 msg_opcode_start, 00100 msg_opcode_data, 00101 msg_opcode_ack, 00102 msg_opcode_error 00103 } msg_opcode_t; 00104 00105 typedef enum 00106 { 00107 msg_block_state_send_request, 00108 msg_block_state_recv_request, 00109 msg_block_state_send_response, 00110 msg_block_state_recv_response 00111 } msg_block_state_t; 00112 00113 typedef enum 00114 { 00115 msg_state_init, 00116 msg_state_get_data, 00117 msg_state_compress, 00118 msg_state_send_data, 00119 msg_state_wait_send_complete, 00120 msg_state_wait_ack, 00121 msg_state_receive, 00122 msg_state_decompress, 00123 msg_state_process_decompressed, 00124 msg_state_send_ack, 00125 msg_state_send_error, 00126 msg_state_delete 00127 } msg_state_t; 00128 00129 typedef enum 00130 { 00131 msg_service_type_need_data, 00132 msg_service_type_have_data, 00133 msg_service_type_error, 00134 msg_service_type_free, 00135 msg_service_type_pending_request 00136 } msg_service_type_t; 00137 00138 typedef enum 00139 { 00140 msg_capability_cloud, 00141 msg_capability_client, 00142 msg_capability_count 00143 } msg_capability_type_t; 00144 00145 enum msg_capability_packet_t 00146 { 00147 field_define(capability_packet, opcode, uint8_t), 00148 field_define(capability_packet, flags, uint8_t), 00149 field_define(capability_packet, version, uint8_t), 00150 field_define(capability_packet, max_transactions, uint8_t), 00151 field_define(capability_packet, window_size, uint32_t), 00152 field_define(capability_packet, compression_count, uint8_t), 00153 record_end(capability_packet) 00154 }; 00155 00156 enum msg_start_packet_t 00157 { 00158 field_define(start_packet, opcode, uint8_t), 00159 field_define(start_packet, flags, uint8_t), 00160 field_define(start_packet, transaction_id, uint16_t), 00161 field_define(start_packet, service_id, uint16_t), 00162 field_define(start_packet, compression_id, uint8_t), 00163 record_end(start_packet) 00164 }; 00165 00166 enum msg_data_packet_t 00167 { 00168 field_define(data_packet, opcode, uint8_t), 00169 field_define(data_packet, flags, uint8_t), 00170 field_define(data_packet, transaction_id, uint16_t), 00171 record_end(data_packet) 00172 }; 00173 00174 enum msg_ack_packet_t 00175 { 00176 field_define(ack_packet, opcode, uint8_t), 00177 field_define(ack_packet, flags, uint8_t), 00178 field_define(ack_packet, transaction_id, uint16_t), 00179 field_define(ack_packet, ack_count, uint32_t), 00180 field_define(ack_packet, window_size, uint32_t), 00181 record_end(ack_packet) 00182 }; 00183 00184 enum msg_error_packet_t 00185 { 00186 field_define(error_packet, opcode, uint8_t), 00187 field_define(error_packet, flags, uint8_t), 00188 field_define(error_packet, transaction_id, uint16_t), 00189 field_define(error_packet, error_code, uint8_t), 00190 record_end(error_packet) 00191 }; 00192 00193 typedef struct msg_data_block_t 00194 { 00195 size_t total_bytes; 00196 size_t available_window; 00197 size_t ack_count; 00198 unsigned int status_flag; 00199 #if (defined CONNECTOR_COMPRESSION) 00200 uint8_t buffer_in[MSG_MAX_RECV_PACKET_SIZE]; 00201 uint8_t buffer_out[MSG_MAX_RECV_PACKET_SIZE]; 00202 size_t bytes_out; 00203 int z_flag; 00204 z_stream zlib; 00205 #endif 00206 } msg_data_block_t; 00207 00208 typedef struct 00209 { 00210 void * data_ptr; 00211 size_t length_in_bytes; 00212 unsigned int flags; 00213 } msg_service_data_t; 00214 00215 typedef struct 00216 { 00217 void * session; 00218 msg_service_type_t service_type; 00219 msg_service_data_t * need_data; 00220 msg_service_data_t * have_data; 00221 connector_session_error_t error_value; 00222 } msg_service_request_t; 00223 00224 typedef struct msg_session_t 00225 { 00226 unsigned int session_id; 00227 unsigned int service_id; 00228 void * service_context; 00229 msg_state_t current_state; 00230 msg_state_t saved_state; 00231 uint8_t * send_data_ptr; 00232 size_t send_data_bytes; 00233 msg_data_block_t * in_dblock; 00234 msg_data_block_t * out_dblock; 00235 connector_session_error_t error; 00236 unsigned int error_flag; 00237 msg_service_request_t service_layer_data; 00238 struct msg_session_t * next; 00239 struct msg_session_t * prev; 00240 } msg_session_t; 00241 00242 typedef connector_status_t connector_msg_callback_t(connector_data_t * const connector_ptr, msg_service_request_t * const service_request); 00243 00244 typedef struct 00245 { 00246 uint32_t window_size; 00247 connector_bool_t compression_supported; 00248 uint8_t active_transactions; 00249 uint8_t max_transactions; 00250 } msg_capabilities_t; 00251 00252 typedef struct 00253 { 00254 msg_capabilities_t capabilities[msg_capability_count]; 00255 connector_msg_callback_t * service_cb[msg_service_id_count]; 00256 connector_bool_t session_locked; 00257 struct 00258 { 00259 msg_session_t * head; 00260 msg_session_t * tail; 00261 msg_session_t * current; 00262 } session; 00263 unsigned int last_assigned_id; 00264 void const * pending_service_request; 00265 } connector_msg_data_t; 00266 00267 static msg_session_t * msg_find_session(connector_msg_data_t const * const msg_ptr, unsigned int const id, connector_bool_t const client_owned) 00268 { 00269 msg_session_t * session = msg_ptr->session.head; 00270 00271 while (session != NULL) 00272 { 00273 if (session->session_id == id) 00274 { 00275 unsigned int const status = (session->in_dblock != NULL) ? session->in_dblock->status_flag : session->out_dblock->status_flag; 00276 00277 if (MsgIsClientOwned(status) == client_owned) 00278 break; 00279 } 00280 00281 session = session->next; 00282 } 00283 00284 return session; 00285 } 00286 00287 static unsigned int msg_find_next_available_id(connector_msg_data_t * const msg_ptr) 00288 { 00289 unsigned int new_id = MSG_INVALID_CLIENT_SESSION; 00290 unsigned int const last_id = msg_ptr->last_assigned_id; 00291 00292 do 00293 { 00294 connector_bool_t const client_owned = connector_true; 00295 00296 msg_ptr->last_assigned_id++; 00297 if (msg_ptr->last_assigned_id >= MSG_INVALID_CLIENT_SESSION) 00298 { 00299 msg_ptr->last_assigned_id = 0; 00300 continue; 00301 } 00302 00303 if (msg_find_session(msg_ptr, msg_ptr->last_assigned_id, client_owned) == NULL) 00304 { 00305 new_id = msg_ptr->last_assigned_id; 00306 break; 00307 } 00308 00309 } while (msg_ptr->last_assigned_id != last_id); 00310 00311 return new_id; 00312 } 00313 00314 static void msg_set_error(msg_session_t * const session, connector_session_error_t const error_code) 00315 { 00316 msg_data_block_t * const dblock = (session->in_dblock != NULL) ? session->in_dblock : session->out_dblock; 00317 connector_bool_t const client_request_error = connector_bool(MsgIsClientOwned(dblock->status_flag) && MsgIsNotReceiving(dblock->status_flag)); 00318 connector_bool_t const client_response_error = connector_bool(MsgIsNotClientOwned(dblock->status_flag) && MsgIsNotReceiving(dblock->status_flag)); 00319 connector_bool_t const cloud_request_error = connector_bool(MsgIsNotClientOwned(dblock->status_flag) && MsgIsReceiving(dblock->status_flag)); 00320 00321 if (client_request_error && MsgIsStart(dblock->status_flag)) 00322 { 00323 /* no need to send an error. just delete since nothing has been sent to Device Cloud */ 00324 session->current_state = msg_state_delete; 00325 goto done; 00326 } 00327 00328 session->error = error_code; 00329 session->current_state = msg_state_send_error; 00330 00331 if (client_response_error && MsgIsStart(dblock->status_flag)) 00332 { 00333 /* canceling Device Cloud request since no response data has been sent yet */ 00334 MsgSetRequest(session->error_flag); 00335 goto done; 00336 } 00337 00338 if (client_request_error || cloud_request_error) 00339 MsgSetRequest(session->error_flag); 00340 00341 if (client_request_error || client_response_error) 00342 MsgSetSender(session->error_flag); 00343 00344 done: 00345 return; 00346 } 00347 00348 static connector_status_t msg_call_service_layer(connector_data_t * const connector_ptr, msg_session_t * const session, msg_service_type_t const type) 00349 { 00350 connector_status_t status = connector_init_error; 00351 msg_service_request_t * const service_ptr = &session->service_layer_data; 00352 connector_msg_data_t * const msg_ptr = get_facility_data(connector_ptr, E_MSG_FAC_MSG_NUM); 00353 connector_msg_callback_t * cb_fn; 00354 00355 ASSERT_GOTO(msg_ptr != NULL, error); 00356 cb_fn = msg_ptr->service_cb[session->service_id]; 00357 ASSERT_GOTO(cb_fn != NULL, error); 00358 00359 service_ptr->service_type = type; 00360 service_ptr->session = session; 00361 00362 status = cb_fn(connector_ptr, service_ptr); 00363 if ((status == connector_working) && (service_ptr->service_type == msg_service_type_error)) 00364 { 00365 msg_set_error(session, service_ptr->error_value); 00366 status = connector_unavailable; 00367 } 00368 00369 error: 00370 return status; 00371 } 00372 00373 static connector_status_t msg_inform_error(connector_data_t * const connector_ptr, msg_session_t * const session, connector_session_error_t error_code) 00374 { 00375 session->service_layer_data.error_value = error_code; 00376 00377 return msg_call_service_layer(connector_ptr, session, msg_service_type_error); 00378 } 00379 00380 static msg_session_t * msg_create_session(connector_data_t * const connector_ptr, connector_msg_data_t * const msg_ptr, unsigned int const service_id, 00381 connector_bool_t const client_owned, connector_status_t * const status) 00382 { 00383 unsigned int session_id = MSG_INVALID_CLIENT_SESSION; 00384 msg_capability_type_t const capability_id = client_owned == connector_true ? msg_capability_cloud : msg_capability_client; 00385 msg_session_t * session = NULL; 00386 unsigned int flags = 0; 00387 00388 *status = connector_pending; 00389 ASSERT_GOTO(msg_ptr != NULL, done); 00390 00391 { 00392 uint8_t const max_transactions = msg_ptr->capabilities[capability_id].max_transactions; 00393 uint8_t const active_transactions = msg_ptr->capabilities[capability_id].active_transactions; 00394 uint8_t const unlimited_transactions = 0; 00395 00396 if ((max_transactions != unlimited_transactions) && (active_transactions >= max_transactions)) 00397 { 00398 connector_debug_printf("msg_create_session: active transactions reached the limit [%d], service id [%d]\n", active_transactions, max_transactions, service_id); 00399 goto done; 00400 } 00401 } 00402 00403 if (client_owned == connector_true) 00404 { 00405 session_id = msg_find_next_available_id(msg_ptr); 00406 if (session_id == MSG_INVALID_CLIENT_SESSION) goto done; 00407 MsgSetClientOwned(flags); 00408 } 00409 00410 MsgSetRequest(flags); 00411 #if (defined CONNECTOR_COMPRESSION) 00412 MsgSetCompression(flags); 00413 #endif 00414 00415 switch (service_id) 00416 { 00417 case msg_service_id_rci: 00418 case msg_service_id_brci: 00419 MsgSetDoubleBuf(flags); 00420 break; 00421 00422 default: 00423 break; 00424 } 00425 00426 { 00427 void * ptr; 00428 uint8_t * data_ptr; 00429 size_t const bytes_in_block = sizeof(msg_data_block_t); 00430 size_t const bytes_in_service_data = sizeof(msg_service_data_t); 00431 size_t const bytes_in_session = sizeof *session; 00432 size_t const single_buffer_bytes = bytes_in_block + bytes_in_service_data; 00433 size_t const double_buffer_bytes = MsgIsCompressed(flags) ? 2 * single_buffer_bytes : (2 * single_buffer_bytes) + MSG_MAX_SEND_PACKET_SIZE; 00434 size_t const total_bytes = bytes_in_session + (MsgIsDoubleBuf(flags) ? double_buffer_bytes : single_buffer_bytes); 00435 connector_static_buffer_id_t buffer_id = client_owned == connector_true ? named_buffer_id(msg_session_client) : named_buffer_id(msg_session); 00436 00437 *status = malloc_data_buffer(connector_ptr, total_bytes, buffer_id, &ptr); 00438 if (*status != connector_working) goto done; 00439 00440 data_ptr = ptr; 00441 session = ptr; 00442 data_ptr += bytes_in_session; 00443 00444 if (MsgIsDoubleBuf(flags)) 00445 { 00446 session->out_dblock = (msg_data_block_t *)(session + 1); 00447 session->in_dblock = session->out_dblock + 1; 00448 data_ptr += (2 * bytes_in_block); 00449 session->service_layer_data.have_data = (msg_service_data_t *)(session->in_dblock + 1); 00450 session->service_layer_data.need_data = session->service_layer_data.have_data + 1; 00451 data_ptr += (2 * bytes_in_service_data); 00452 session->send_data_ptr = MsgIsCompressed(flags) ? NULL : data_ptr; 00453 } 00454 else 00455 { 00456 msg_data_block_t ** const current_dblock_ptr = client_owned == connector_true ? &session->out_dblock : &session->in_dblock; 00457 msg_data_block_t ** const other_dblock_ptr = client_owned == connector_true ? &session->in_dblock : &session->out_dblock; 00458 00459 *current_dblock_ptr = (msg_data_block_t *)(session + 1); 00460 *other_dblock_ptr = NULL; 00461 data_ptr += bytes_in_block; 00462 session->service_layer_data.need_data = (msg_service_data_t *)(client_owned == connector_true ? (*current_dblock_ptr) + 1 : NULL); 00463 session->service_layer_data.have_data = (msg_service_data_t *)(client_owned == connector_true ? NULL : (*current_dblock_ptr) + 1); 00464 session->send_data_ptr = NULL; 00465 } 00466 } 00467 00468 session->session_id = session_id; 00469 session->service_id = service_id; 00470 session->error = connector_session_error_none; 00471 session->service_layer_data.error_value = connector_session_error_none; 00472 session->error_flag = 0; 00473 session->send_data_bytes = 0; 00474 session->service_context = NULL; 00475 session->current_state = msg_state_init; 00476 session->saved_state = msg_state_init; 00477 00478 if (session->out_dblock != NULL) 00479 session->out_dblock->status_flag = flags; 00480 00481 if (session->in_dblock != NULL) 00482 session->in_dblock->status_flag = flags; 00483 00484 if (msg_ptr->session_locked) goto error; 00485 msg_ptr->session_locked = connector_true; 00486 add_list_node(&msg_ptr->session.head, &msg_ptr->session.tail, session); 00487 msg_ptr->session_locked = connector_false; 00488 00489 msg_ptr->capabilities[capability_id].active_transactions++; 00490 *status = connector_working; 00491 goto done; 00492 00493 error: 00494 { 00495 connector_status_t const result = free_data_buffer(connector_ptr, named_buffer_id(msg_session), session); 00496 00497 *status = (result == connector_abort) ? connector_abort : connector_pending; 00498 session = NULL; 00499 } 00500 00501 done: 00502 return session; 00503 } 00504 00505 static connector_status_t msg_delete_session(connector_data_t * const connector_ptr, connector_msg_data_t * const msg_ptr, msg_session_t * const session) 00506 { 00507 connector_status_t status = connector_working; 00508 00509 ASSERT_GOTO(msg_ptr != NULL, error); 00510 ASSERT_GOTO(session != NULL, error); 00511 ASSERT_GOTO((session->in_dblock != NULL) || (session->out_dblock != NULL), error); 00512 00513 if (msg_ptr->session_locked) goto error; 00514 msg_ptr->session_locked = connector_true; 00515 remove_list_node(&msg_ptr->session.head, &msg_ptr->session.tail, session); 00516 if (msg_ptr->session.current == session) 00517 msg_ptr->session.current = (session->prev != NULL) ? session->prev : msg_ptr->session.tail; 00518 msg_ptr->session_locked = connector_false; 00519 00520 #if (defined CONNECTOR_COMPRESSION) 00521 { 00522 if ((session->in_dblock != NULL) && MsgIsInflated(session->in_dblock->status_flag)) 00523 inflateEnd(&session->in_dblock->zlib); 00524 00525 if ((session->out_dblock != NULL) && MsgIsDeflated(session->out_dblock->status_flag)) 00526 deflateEnd(&session->out_dblock->zlib); 00527 } 00528 #endif 00529 00530 { 00531 unsigned int const flag = (session->in_dblock != NULL) ? session->in_dblock->status_flag : session->out_dblock->status_flag; 00532 msg_capability_type_t const capability_id = MsgIsClientOwned(flag) == connector_true ? msg_capability_cloud : msg_capability_client; 00533 00534 if (msg_ptr->capabilities[capability_id].active_transactions > 0) 00535 { 00536 msg_ptr->capabilities[capability_id].active_transactions--; 00537 } 00538 else 00539 { 00540 ASSERT(connector_false); 00541 } 00542 } 00543 00544 status = msg_call_service_layer(connector_ptr, session, msg_service_type_free); 00545 00546 { 00547 connector_status_t const free_status = free_data_buffer(connector_ptr, named_buffer_id(msg_session), session); 00548 00549 if (status != connector_abort) 00550 status = free_status; 00551 } 00552 00553 error: 00554 return status; 00555 } 00556 00557 static void msg_default_data_block(msg_data_block_t * dblock, uint32_t const window_size) 00558 { 00559 #if (defined CONNECTOR_COMPRESSION) 00560 dblock->bytes_out = 0; 00561 dblock->z_flag = Z_NO_FLUSH; 00562 #endif 00563 00564 dblock->available_window = window_size; 00565 dblock->ack_count = 0; 00566 dblock->total_bytes = 0; 00567 MsgSetStart(dblock->status_flag); 00568 MsgClearLastData(dblock->status_flag); 00569 } 00570 00571 static connector_session_error_t msg_initialize_data_block(msg_session_t * const session, uint32_t const window_size, msg_block_state_t state) 00572 { 00573 connector_session_error_t result = connector_session_error_none; 00574 00575 ASSERT_GOTO(session != NULL, error); 00576 00577 switch(state) 00578 { 00579 case msg_block_state_send_response: 00580 if (session->out_dblock == NULL) 00581 { 00582 #if (defined CONNECTOR_COMPRESSION) 00583 ASSERT_GOTO(session->in_dblock != NULL, compression_error); 00584 inflateEnd(&session->in_dblock->zlib); 00585 MsgClearInflated(session->in_dblock->status_flag); 00586 #endif 00587 session->out_dblock = session->in_dblock; 00588 ASSERT_GOTO(session->service_layer_data.have_data != NULL, error); 00589 session->service_layer_data.need_data = session->service_layer_data.have_data; 00590 session->in_dblock = NULL; 00591 session->service_layer_data.have_data = NULL; 00592 } 00593 MsgClearRequest(session->out_dblock->status_flag); 00594 /* intentional fall through */ 00595 00596 case msg_block_state_send_request: 00597 msg_default_data_block(session->out_dblock, window_size); 00598 MsgClearReceiving(session->out_dblock->status_flag); 00599 session->current_state = msg_state_get_data; 00600 session->service_layer_data.need_data->data_ptr = NULL; 00601 #if (defined CONNECTOR_COMPRESSION) 00602 if (MsgIsNotDeflated(session->out_dblock->status_flag) == connector_true) 00603 { 00604 memset(&session->out_dblock->zlib, 0, sizeof session->out_dblock->zlib); 00605 ASSERT_GOTO(deflateInit(&session->out_dblock->zlib, Z_DEFAULT_COMPRESSION) == Z_OK, compression_error); 00606 MsgSetDeflated(session->out_dblock->status_flag); 00607 } 00608 #endif 00609 break; 00610 00611 case msg_block_state_recv_response: 00612 if (session->in_dblock == NULL) 00613 { 00614 #if (defined CONNECTOR_COMPRESSION) 00615 ASSERT_GOTO(session->out_dblock != NULL, compression_error); 00616 deflateEnd(&session->out_dblock->zlib); 00617 MsgClearDeflated(session->out_dblock->status_flag); 00618 #endif 00619 session->in_dblock = session->out_dblock; 00620 ASSERT_GOTO(session->service_layer_data.need_data != NULL, error); 00621 session->service_layer_data.have_data = session->service_layer_data.need_data; 00622 MsgClearRequest(session->in_dblock->status_flag); 00623 session->out_dblock = NULL; 00624 session->service_layer_data.need_data = NULL; 00625 } 00626 /* intentional fall through */ 00627 00628 case msg_block_state_recv_request: 00629 msg_default_data_block(session->in_dblock, window_size); 00630 MsgSetReceiving(session->in_dblock->status_flag); 00631 session->current_state = msg_state_receive; 00632 #if (defined CONNECTOR_COMPRESSION) 00633 if (MsgIsNotInflated(session->in_dblock->status_flag) == connector_true) 00634 { 00635 memset(&session->in_dblock->zlib, 0, sizeof session->in_dblock->zlib); 00636 ASSERT_GOTO(inflateInit(&session->in_dblock->zlib) == Z_OK, compression_error); 00637 MsgSetInflated(session->in_dblock->status_flag); 00638 } 00639 #endif 00640 break; 00641 00642 default: 00643 ASSERT_GOTO(connector_false, error); 00644 break; 00645 } 00646 00647 goto done; 00648 00649 #if (defined CONNECTOR_COMPRESSION) 00650 compression_error: 00651 result = connector_session_error_compression_failure; 00652 #endif 00653 00654 error: 00655 done: 00656 return result; 00657 } 00658 00659 static connector_status_t msg_send_error(connector_data_t * const connector_ptr, connector_msg_data_t * const msg_ptr, msg_session_t * const session, uint16_t const session_id, connector_session_error_t const error_value, uint8_t const flag) 00660 { 00661 connector_status_t status = connector_working; 00662 uint8_t * error_packet; 00663 uint8_t * const edp_header = tcp_get_packet_buffer(connector_ptr, E_MSG_FAC_MSG_NUM, &error_packet, NULL); 00664 00665 if (edp_header == NULL) 00666 { 00667 status = connector_pending; 00668 goto done; 00669 } 00670 00671 message_store_u8(error_packet, opcode, msg_opcode_error); 00672 message_store_u8(error_packet, flags, flag); 00673 message_store_be16(error_packet, transaction_id, session_id); 00674 message_store_u8(error_packet, error_code, error_value); 00675 00676 status = tcp_initiate_send_facility_packet(connector_ptr, edp_header, record_end(error_packet), E_MSG_FAC_MSG_NUM, tcp_release_packet_buffer, NULL); 00677 00678 00679 if ((status != connector_pending) && (session != NULL)) 00680 status = msg_delete_session(connector_ptr, msg_ptr, session); 00681 00682 done: 00683 return status; 00684 } 00685 00686 static connector_status_t msg_send_capabilities(connector_data_t * const connector_ptr, connector_msg_data_t const * const msg_ptr, uint8_t const flag) 00687 { 00688 connector_status_t status = connector_pending; 00689 uint8_t * capability_packet = NULL; 00690 uint8_t * const edp_header = tcp_get_packet_buffer(connector_ptr, E_MSG_FAC_MSG_NUM, &capability_packet, NULL); 00691 uint8_t * variable_data_ptr = capability_packet + record_bytes(capability_packet); 00692 00693 if ((edp_header == NULL) || (capability_packet == NULL)) 00694 goto error; 00695 00696 /* 00697 * ------------------------------------------------------------------------------------------ 00698 * | 0 | 1 | 2 | 3 | 4-7 | 8 | 9-(8+n) | 9+n | s bytes | 00699 * ----------------------------------------------------------------------------------------- 00700 * | opcode | flags | version | Max Trx | Window | Count(n) | list | Count (s) | list | 00701 * | | | | | size | compression algo | Service Info | 00702 * ----------------------------------------------------------------------------------------- 00703 */ 00704 message_store_u8(capability_packet, opcode, msg_opcode_capability); 00705 message_store_u8(capability_packet, flags, flag); 00706 message_store_u8(capability_packet, version, MSG_FACILITY_VERSION); 00707 message_store_u8(capability_packet, max_transactions, msg_ptr->capabilities[msg_capability_client].max_transactions); 00708 message_store_be32(capability_packet, window_size, msg_ptr->capabilities[msg_capability_client].window_size); 00709 00710 /* append compression algorithms supported */ 00711 { 00712 uint8_t const compression_length = (msg_ptr->capabilities[msg_capability_client].compression_supported) ? 1 : 0; 00713 00714 message_store_u8(capability_packet, compression_count, compression_length); 00715 if (compression_length > 0) 00716 *variable_data_ptr++ = MSG_COMPRESSION_LIBZ; 00717 } 00718 00719 /* append service IDs of all listeners */ 00720 { 00721 uint16_t services = 0; 00722 uint16_t service_id; 00723 00724 for (service_id = 0; service_id < msg_service_id_count; service_id++) 00725 if (msg_ptr->service_cb[service_id] != NULL) services++; 00726 00727 StoreBE16(variable_data_ptr, services); 00728 variable_data_ptr += sizeof services; 00729 00730 for (service_id = 0; service_id < msg_service_id_count; service_id++) 00731 { 00732 if (msg_ptr->service_cb[service_id] != NULL) 00733 { 00734 StoreBE16(variable_data_ptr, service_id); 00735 variable_data_ptr += sizeof service_id; 00736 } 00737 } 00738 } 00739 00740 { 00741 size_t const packet_len = (size_t)(variable_data_ptr - capability_packet); 00742 00743 status = tcp_initiate_send_facility_packet(connector_ptr, edp_header, packet_len, E_MSG_FAC_MSG_NUM, tcp_release_packet_buffer, NULL); 00744 } 00745 00746 error: 00747 return status; 00748 } 00749 00750 static void msg_fill_msg_header(msg_session_t * const session, void * ptr) 00751 { 00752 uint8_t flag = 0; 00753 00754 if MsgIsRequest(session->out_dblock->status_flag) 00755 MsgSetRequest(flag); 00756 00757 if (MsgIsLastData(session->out_dblock->status_flag)) 00758 MsgSetLastData(flag); 00759 00760 if (MsgIsStart(session->out_dblock->status_flag)) 00761 { 00762 uint8_t * const start_packet = ptr; 00763 00764 MsgClearStart(session->out_dblock->status_flag); 00765 00766 /* 00767 * ----------------------------------------------------------------- 00768 * | 0 | 1 | 2-3 | 4-5 | 6 | 7 ... n | 00769 * ---------------------------------------------------------------- 00770 * | opcode | flags | Xid | Service ID | Comp ID | Data | 00771 * ---------------------------------------------------------------- 00772 */ 00773 message_store_u8(start_packet, opcode, msg_opcode_start); 00774 message_store_u8(start_packet, flags, flag); 00775 { 00776 uint16_t const session_id = (uint16_t)session->session_id; 00777 uint16_t const service_id = (uint16_t)session->service_id; 00778 00779 message_store_be16(start_packet, transaction_id, session_id); 00780 message_store_be16(start_packet, service_id, service_id); 00781 } 00782 message_store_u8(start_packet, compression_id, (MsgIsCompressed(session->out_dblock->status_flag) ? MSG_COMPRESSION_LIBZ : MSG_COMPRESSION_NONE)); 00783 } 00784 else 00785 { 00786 uint8_t * const data_packet = ptr; 00787 00788 /* 00789 * --------------------------------------- 00790 * | 0 | 1 | 2-3 | 4 ... n | 00791 * -------------------------------------- 00792 * | opcode | flags | Xid | Data | 00793 * -------------------------------------- 00794 */ 00795 message_store_u8(data_packet, opcode, msg_opcode_data); 00796 message_store_u8(data_packet, flags, flag); 00797 { 00798 uint16_t const session_id = (uint16_t)session->session_id; 00799 00800 message_store_be16(data_packet, transaction_id, session_id); 00801 } 00802 } 00803 } 00804 00805 static connector_status_t msg_send_complete(connector_data_t * const connector_ptr, uint8_t const * const packet, connector_status_t const status, void * const user_data) 00806 { 00807 connector_status_t return_status = connector_working; 00808 00809 msg_session_t * const session = user_data; 00810 msg_data_block_t * const dblock = session->out_dblock; 00811 00812 ASSERT_GOTO(session != NULL, error); 00813 ASSERT_GOTO(dblock != NULL, error); 00814 00815 if ((MsgIsDoubleBuf(dblock->status_flag) == connector_false) && (MsgIsCompressed(dblock->status_flag) == connector_false)) 00816 { 00817 return_status = tcp_release_packet_buffer(connector_ptr, packet, connector_success, NULL); 00818 if (return_status != connector_working) goto error; 00819 } 00820 00821 switch (status) 00822 { 00823 case connector_service_busy: 00824 session->current_state = msg_state_send_data; 00825 break; 00826 00827 case connector_success: 00828 /* update session state */ 00829 if (MsgIsLastData(dblock->status_flag)) 00830 { 00831 if (MsgIsRequest(dblock->status_flag)) 00832 { 00833 session->current_state = msg_state_receive; 00834 } 00835 else 00836 { 00837 connector_msg_data_t * const msg_ptr = get_facility_data(connector_ptr, E_MSG_FAC_MSG_NUM); 00838 00839 return_status = msg_delete_session(connector_ptr, msg_ptr, session); 00840 } 00841 00842 goto done; 00843 } 00844 00845 #if (defined CONNECTOR_COMPRESSION) 00846 dblock->bytes_out = 0; 00847 if (dblock->zlib.avail_out == 0) 00848 { 00849 session->current_state = msg_state_compress; 00850 goto done; 00851 } 00852 00853 dblock->z_flag = Z_NO_FLUSH; 00854 dblock->zlib.avail_out = 0; 00855 #else 00856 session->send_data_bytes = 0; 00857 #endif 00858 00859 session->service_layer_data.need_data->data_ptr = NULL; 00860 session->current_state = MsgIsAckPending(dblock->status_flag) == connector_true ? msg_state_wait_ack : session->saved_state; 00861 00862 break; 00863 00864 default: 00865 msg_inform_error(connector_ptr, session, connector_session_error_send); 00866 break; 00867 } 00868 00869 error: 00870 done: 00871 return return_status; 00872 } 00873 00874 static connector_status_t msg_send_data(connector_data_t * const connector_ptr, msg_session_t * const session) 00875 { 00876 connector_status_t status = connector_working; 00877 uint8_t * buffer; 00878 size_t bytes; 00879 00880 #if (defined CONNECTOR_COMPRESSION) 00881 { 00882 msg_data_block_t * const dblock = session->out_dblock; 00883 00884 bytes = dblock->bytes_out; 00885 buffer = dblock->buffer_out; 00886 } 00887 #else 00888 bytes = session->send_data_bytes; 00889 buffer = session->send_data_ptr; 00890 #endif 00891 00892 ASSERT_GOTO(bytes > 0, error); 00893 status = tcp_initiate_send_facility_packet(connector_ptr, buffer, bytes, E_MSG_FAC_MSG_NUM, msg_send_complete, session); 00894 if (status != connector_working) 00895 { 00896 connector_status_t result = status; 00897 00898 if (status == connector_pending) result = connector_service_busy; 00899 00900 msg_send_complete(connector_ptr, buffer, result, session); 00901 } 00902 else 00903 session->current_state = msg_state_wait_send_complete; 00904 00905 error: 00906 return status; 00907 } 00908 00909 #if (defined CONNECTOR_COMPRESSION) 00910 static connector_status_t msg_compress_data(connector_data_t * const connector_ptr, msg_session_t * const session) 00911 { 00912 connector_status_t status = connector_working; 00913 msg_data_block_t * const dblock = session->out_dblock; 00914 uint8_t * const msg_buffer = GET_PACKET_DATA_POINTER(dblock->buffer_out, PACKET_EDP_FACILITY_SIZE); 00915 size_t const frame_bytes = sizeof dblock->buffer_out - PACKET_EDP_FACILITY_SIZE; 00916 z_streamp zlib_ptr = &dblock->zlib; 00917 int zret; 00918 00919 if (zlib_ptr->avail_out == 0) 00920 { 00921 size_t const header_length = MsgIsStart(dblock->status_flag) ? record_end(start_packet) : record_end(data_packet); 00922 00923 zlib_ptr->next_out = msg_buffer + header_length; 00924 zlib_ptr->avail_out = frame_bytes - header_length; 00925 } 00926 00927 zret = deflate(zlib_ptr, dblock->z_flag); 00928 switch (zret) 00929 { 00930 case Z_OK: 00931 if ((dblock->z_flag != Z_SYNC_FLUSH) && (zlib_ptr->avail_out > 0)) 00932 goto done; 00933 break; 00934 00935 case Z_STREAM_END: 00936 MsgSetLastData(dblock->status_flag); 00937 break; 00938 00939 default: 00940 status = msg_inform_error(connector_ptr, session, connector_session_error_compression_failure); 00941 goto done; 00942 } 00943 00944 msg_fill_msg_header(session, msg_buffer); 00945 dblock->bytes_out = frame_bytes - zlib_ptr->avail_out; 00946 status = msg_send_data(connector_ptr, session); 00947 00948 done: 00949 return status; 00950 } 00951 00952 static connector_status_t msg_prepare_send_data(connector_data_t * const connector_ptr, msg_session_t * const session) 00953 { 00954 connector_status_t status = connector_no_resource; 00955 msg_data_block_t * const dblock = session->out_dblock; 00956 00957 UNUSED_PARAMETER(connector_ptr); 00958 ASSERT_GOTO(dblock != NULL, error); 00959 ASSERT_GOTO(dblock->zlib.avail_in == 0, error); 00960 00961 { 00962 unsigned int const flag = (dblock->total_bytes == 0) ? MSG_FLAG_START : 0; 00963 msg_service_data_t * const service_data = session->service_layer_data.need_data; 00964 00965 ASSERT_GOTO(service_data != NULL, error); 00966 service_data->data_ptr = dblock->buffer_in; 00967 service_data->length_in_bytes = sizeof dblock->buffer_in; 00968 service_data->flags = flag; 00969 status = connector_working; 00970 } 00971 00972 error: 00973 return status; 00974 } 00975 00976 static connector_status_t msg_process_send_data(connector_data_t * const connector_ptr, msg_session_t * const session) 00977 { 00978 connector_status_t status = connector_no_resource; 00979 msg_data_block_t * const dblock = session->out_dblock; 00980 00981 ASSERT_GOTO(dblock != NULL, error); 00982 ASSERT_GOTO(dblock->zlib.avail_in == 0, error); 00983 00984 { 00985 z_streamp const zlib_ptr = &dblock->zlib; 00986 msg_service_data_t * const service_data = session->service_layer_data.need_data; 00987 00988 zlib_ptr->next_in = dblock->buffer_in; 00989 zlib_ptr->avail_in = service_data->length_in_bytes; 00990 dblock->total_bytes += service_data->length_in_bytes; 00991 if (MsgIsLastData(service_data->flags)) 00992 dblock->z_flag = Z_FINISH; 00993 } 00994 00995 if (dblock->z_flag == Z_NO_FLUSH) 00996 { 00997 size_t const unacked_bytes = dblock->total_bytes - dblock->ack_count; 00998 size_t const safe_window_size = dblock->available_window - sizeof dblock->buffer_in; 00999 01000 if (unacked_bytes > safe_window_size) 01001 { 01002 dblock->z_flag = Z_SYNC_FLUSH; 01003 MsgSetAckPending(dblock->status_flag); 01004 } 01005 } 01006 01007 status = msg_compress_data(connector_ptr, session); 01008 01009 error: 01010 return status; 01011 } 01012 01013 #else 01014 01015 static connector_status_t msg_prepare_send_data(connector_data_t * const connector_ptr, msg_session_t * const session) 01016 { 01017 uint8_t * msg_buffer; 01018 connector_status_t status = connector_no_resource; 01019 msg_data_block_t * const dblock = session->out_dblock; 01020 01021 ASSERT_GOTO(dblock != NULL, error); 01022 if (MsgIsDoubleBuf(dblock->status_flag)) 01023 { 01024 msg_buffer = GET_PACKET_DATA_POINTER(session->send_data_ptr, PACKET_EDP_FACILITY_SIZE); 01025 session->send_data_bytes = MSG_MAX_SEND_PACKET_SIZE - PACKET_EDP_FACILITY_SIZE; 01026 } 01027 else 01028 { 01029 session->send_data_ptr = tcp_get_packet_buffer(connector_ptr, E_MSG_FAC_MSG_NUM, &msg_buffer, &session->send_data_bytes); 01030 if (session->send_data_ptr == NULL) 01031 { 01032 status = connector_pending; 01033 goto done; 01034 } 01035 } 01036 01037 { 01038 size_t const header_bytes = MsgIsStart(dblock->status_flag) == connector_true ? record_end(start_packet) : record_end(data_packet); 01039 msg_service_data_t * const service_data = session->service_layer_data.need_data; 01040 unsigned int const flag = MsgIsStart(dblock->status_flag) ? MSG_FLAG_START : 0; 01041 01042 ASSERT_GOTO(MsgIsNotLastData(dblock->status_flag), error); 01043 ASSERT_GOTO(session->send_data_bytes > header_bytes, error); 01044 ASSERT_GOTO(service_data != NULL, error); 01045 01046 service_data->data_ptr = msg_buffer + header_bytes; 01047 service_data->length_in_bytes = session->send_data_bytes - header_bytes; 01048 service_data->flags = flag; 01049 status = connector_working; 01050 } 01051 01052 error: 01053 done: 01054 return status; 01055 } 01056 01057 static connector_status_t msg_process_send_data(connector_data_t * const connector_ptr, msg_session_t * const session) 01058 { 01059 connector_status_t status = connector_no_resource; 01060 msg_data_block_t * const dblock = session->out_dblock; 01061 msg_service_data_t * const service_data = session->service_layer_data.need_data; 01062 01063 ASSERT_GOTO(dblock != NULL, error); 01064 if (MsgIsLastData(service_data->flags)) 01065 MsgSetLastData(dblock->status_flag); 01066 01067 { 01068 size_t const header_bytes = MsgIsStart(dblock->status_flag) == connector_true ? record_end(start_packet) : record_end(data_packet); 01069 uint8_t * const msg_buffer = GET_PACKET_DATA_POINTER(session->send_data_ptr, PACKET_EDP_FACILITY_SIZE); 01070 01071 session->send_data_bytes = service_data->length_in_bytes + header_bytes; 01072 msg_fill_msg_header(session, msg_buffer); 01073 status = msg_send_data(connector_ptr, session); 01074 if (status != connector_working) 01075 goto error; 01076 } 01077 01078 dblock->total_bytes += service_data->length_in_bytes; 01079 if ((dblock->total_bytes - dblock->ack_count) > (dblock->available_window - MSG_MAX_SEND_PACKET_SIZE)) 01080 MsgSetAckPending(dblock->status_flag); 01081 01082 error: 01083 return status; 01084 } 01085 01086 #endif 01087 01088 static connector_status_t msg_get_service_data(connector_data_t * const connector_ptr, msg_session_t * const session) 01089 { 01090 connector_status_t status = connector_no_resource; 01091 msg_data_block_t * const dblock = session->out_dblock; 01092 01093 ASSERT_GOTO(dblock != NULL, error); 01094 ASSERT_GOTO(MsgIsNotLastData(dblock->status_flag), done); 01095 01096 status = msg_prepare_send_data(connector_ptr, session); 01097 if (status != connector_working) 01098 goto error; 01099 01100 status = msg_call_service_layer(connector_ptr, session, msg_service_type_need_data); 01101 if (status != connector_working) 01102 { 01103 /* release the buffer */ 01104 if ((MsgIsDoubleBuf(dblock->status_flag) == connector_false) && (MsgIsCompressed(dblock->status_flag) == connector_false)) 01105 tcp_release_packet_buffer(connector_ptr, session->send_data_ptr, connector_success, NULL); 01106 01107 goto done; 01108 } 01109 01110 session->saved_state = session->current_state; 01111 status = msg_process_send_data(connector_ptr, session); 01112 01113 error: 01114 done: 01115 return status; 01116 } 01117 01118 #if (defined CONNECTOR_DATA_SERVICE) 01119 static connector_bool_t msg_initiate_request(connector_data_t * const connector_ptr, void const * const service_context) 01120 { 01121 connector_bool_t success = connector_false; 01122 connector_msg_data_t * const msg_ptr = get_facility_data(connector_ptr, E_MSG_FAC_MSG_NUM); 01123 01124 ASSERT_GOTO(msg_ptr != NULL, error); 01125 if (msg_ptr->pending_service_request == NULL) 01126 { 01127 msg_ptr->pending_service_request = service_context; 01128 success = connector_true; 01129 } 01130 01131 error: 01132 return success; 01133 } 01134 01135 static connector_status_t msg_handle_pending_request(connector_data_t * const connector_ptr, connector_msg_data_t * const msg_ptr, msg_session_t * const session, connector_session_error_t const result) 01136 { 01137 connector_status_t status = connector_working; 01138 connector_msg_callback_t * const cb_fn = msg_ptr->service_cb[msg_service_id_data]; 01139 01140 if (cb_fn != NULL) 01141 { 01142 msg_service_request_t service_data; 01143 01144 service_data.session = session; 01145 service_data.service_type = msg_service_type_pending_request; 01146 service_data.have_data = (void *)msg_ptr->pending_service_request; 01147 service_data.error_value = result; 01148 01149 cb_fn(connector_ptr, &service_data); 01150 if ((service_data.error_value != connector_session_error_none) && (session != NULL)) 01151 status = msg_delete_session(connector_ptr, msg_ptr, session); 01152 } 01153 01154 msg_ptr->pending_service_request = NULL; 01155 return status; 01156 } 01157 01158 static connector_status_t msg_start_session(connector_data_t * const connector_ptr, connector_msg_data_t * const msg_ptr) 01159 { 01160 connector_status_t status = connector_working; 01161 static connector_bool_t const client_owned = connector_true; 01162 msg_session_t * const session = msg_create_session(connector_ptr, msg_ptr, msg_service_id_data, client_owned, &status); 01163 01164 if (session == NULL) goto error; 01165 01166 { 01167 connector_session_error_t const result = msg_initialize_data_block(session, msg_ptr->capabilities[msg_capability_cloud].window_size, msg_block_state_send_request); 01168 01169 status = msg_handle_pending_request(connector_ptr, msg_ptr, session, result); 01170 } 01171 01172 error: 01173 return status; 01174 } 01175 #endif 01176 01177 static connector_status_t msg_send_ack(connector_data_t * const connector_ptr, connector_msg_data_t * const msg_ptr, msg_session_t * const session) 01178 { 01179 connector_status_t status = connector_pending; 01180 uint8_t * ack_packet; 01181 uint8_t * const edp_packet = tcp_get_packet_buffer(connector_ptr, E_MSG_FAC_MSG_NUM, &ack_packet, NULL); 01182 msg_data_block_t * const dblock = session->in_dblock; 01183 01184 if (edp_packet == NULL) 01185 goto done; 01186 01187 ASSERT_GOTO(dblock != NULL, error); 01188 message_store_u8(ack_packet, opcode, msg_opcode_ack); 01189 { 01190 uint8_t const flag = MsgIsClientOwned(dblock->status_flag) ? 0 : MSG_FLAG_REQUEST; 01191 01192 message_store_u8(ack_packet, flags, flag); 01193 } 01194 { 01195 uint16_t const session_id = (uint16_t)session->session_id; 01196 01197 message_store_be16(ack_packet, transaction_id, session_id); 01198 } 01199 { 01200 uint32_t const val32 = (uint32_t) dblock->total_bytes; 01201 01202 ASSERT(dblock->total_bytes <= UINT32_MAX); 01203 message_store_be32(ack_packet, ack_count, val32); 01204 } 01205 01206 message_store_be32(ack_packet, window_size, msg_ptr->capabilities[msg_capability_client].window_size); 01207 01208 status = tcp_initiate_send_facility_packet(connector_ptr, edp_packet, record_end(ack_packet), 01209 E_MSG_FAC_MSG_NUM, tcp_release_packet_buffer, NULL); 01210 if (status != connector_pending) 01211 { 01212 dblock->ack_count = dblock->total_bytes; 01213 MsgClearAckPending(dblock->status_flag); 01214 session->current_state = msg_state_receive; 01215 } 01216 01217 error: 01218 done: 01219 return status; 01220 } 01221 01222 static connector_status_t msg_process_capabilities(connector_data_t * const connector_ptr, connector_msg_data_t * const msg_fac, uint8_t * const ptr) 01223 { 01224 connector_status_t status = connector_working; 01225 uint8_t * capability_packet = ptr; 01226 connector_bool_t const request_capabilities = connector_bool((message_load_u8(capability_packet, flags) & MSG_FLAG_REQUEST) == MSG_FLAG_REQUEST); 01227 uint8_t const version = message_load_u8(capability_packet, version); 01228 01229 ASSERT_GOTO(version == MSG_FACILITY_VERSION, error); 01230 msg_fac->capabilities[msg_capability_cloud].max_transactions = message_load_u8(capability_packet, max_transactions); 01231 msg_fac->capabilities[msg_capability_cloud].window_size = message_load_be32(capability_packet, window_size); 01232 01233 { 01234 uint8_t const comp_count = message_load_u8(capability_packet, compression_count); 01235 uint8_t const * compression_list = capability_packet + record_bytes(capability_packet); 01236 int i; 01237 01238 msg_fac->capabilities[msg_capability_cloud].compression_supported = connector_false; 01239 for (i = 0; i < comp_count; i++) 01240 { 01241 uint8_t const compression_id = *compression_list++; 01242 01243 if (compression_id == MSG_COMPRESSION_LIBZ) 01244 { 01245 msg_fac->capabilities[msg_capability_cloud].compression_supported = connector_true; 01246 break; 01247 } 01248 } 01249 } 01250 01251 if (request_capabilities) 01252 { 01253 uint8_t const capability_flag = 0; 01254 01255 status = msg_send_capabilities(connector_ptr, msg_fac, capability_flag); 01256 } 01257 01258 error: 01259 return status; 01260 } 01261 01262 static connector_status_t msg_pass_service_data(connector_data_t * const connector_ptr, msg_session_t * const session, void * data, size_t const bytes) 01263 { 01264 connector_status_t status = connector_working; 01265 msg_data_block_t * const dblock = session->in_dblock; 01266 01267 ASSERT_GOTO(session != NULL, error); 01268 ASSERT_GOTO(dblock != NULL, error); 01269 01270 { 01271 unsigned int service_flag = 0; 01272 msg_service_data_t * const service_data = session->service_layer_data.have_data; 01273 01274 ASSERT_GOTO(service_data != NULL, error); 01275 if (MsgIsStart(dblock->status_flag)) 01276 MsgSetStart(service_flag); 01277 01278 if (MsgIsLastData(dblock->status_flag)) 01279 MsgSetLastData(service_flag); 01280 01281 service_data->data_ptr = data; 01282 service_data->length_in_bytes = bytes; 01283 service_data->flags = service_flag; 01284 if ((MsgIsDoubleBuf(dblock->status_flag)) && (session->service_layer_data.need_data->data_ptr == NULL)) 01285 { 01286 status = msg_prepare_send_data(connector_ptr, session); 01287 ASSERT_GOTO(status == connector_working, error); 01288 } 01289 status = msg_call_service_layer(connector_ptr, session, msg_service_type_have_data); 01290 } 01291 01292 if (status != connector_working && status != connector_idle && status != connector_pending && status != connector_active) 01293 goto error; 01294 01295 if (status == connector_working) 01296 { 01297 MsgClearStart(dblock->status_flag); 01298 01299 if (MsgIsLastData(dblock->status_flag)) 01300 { 01301 if (MsgIsRequest(dblock->status_flag)) 01302 { 01303 connector_msg_data_t * const msg_ptr = get_facility_data(connector_ptr, E_MSG_FAC_MSG_NUM); 01304 connector_session_error_t result; 01305 01306 ASSERT_GOTO(msg_ptr != NULL, error); 01307 if (MsgIsDoubleBuf(dblock->status_flag) == connector_false) 01308 { 01309 result = msg_initialize_data_block(session, msg_ptr->capabilities[msg_capability_cloud].window_size, msg_block_state_send_response); 01310 if (result != connector_session_error_none) 01311 status = msg_inform_error(connector_ptr, session, result); 01312 } 01313 else 01314 { 01315 MsgClearRequest(dblock->status_flag); 01316 MsgClearRequest(session->out_dblock->status_flag); 01317 } 01318 } 01319 else 01320 session->current_state = msg_state_delete; 01321 } 01322 else 01323 { 01324 dblock->total_bytes += bytes; 01325 01326 if ((dblock->total_bytes - dblock->ack_count) > (dblock->available_window/2)) 01327 { 01328 MsgSetAckPending(dblock->status_flag); 01329 session->current_state = msg_state_send_ack; 01330 } 01331 else 01332 session->current_state = msg_state_receive; 01333 } 01334 } 01335 01336 if (MsgIsDoubleBuf(dblock->status_flag)) 01337 { 01338 ASSERT_GOTO(session->service_layer_data.need_data != NULL, error); 01339 session->saved_state = session->current_state; 01340 01341 if (MsgIsSendNow(session->service_layer_data.need_data->flags)) 01342 { 01343 connector_status_t const send_status = msg_process_send_data(connector_ptr, session); 01344 01345 ASSERT_GOTO(((send_status == connector_working) || (send_status == connector_pending) || (send_status == connector_active)), error); 01346 session->service_layer_data.need_data->data_ptr = NULL; 01347 } 01348 } 01349 01350 error: 01351 return status; 01352 } 01353 01354 #if (defined CONNECTOR_COMPRESSION) 01355 static connector_status_t msg_process_decompressed_data(connector_data_t * const connector_ptr, msg_session_t * const session) 01356 { 01357 connector_status_t status = connector_working; 01358 msg_data_block_t * const dblock = session->in_dblock; 01359 01360 ASSERT_GOTO(dblock != NULL, error); 01361 ASSERT_GOTO(dblock->bytes_out > 0, error); 01362 01363 status = msg_pass_service_data(connector_ptr, session, dblock->buffer_out, dblock->bytes_out); 01364 if (status == connector_working) 01365 { 01366 dblock->bytes_out = 0; 01367 if (session->current_state != msg_state_get_data) 01368 { 01369 if (dblock->zlib.avail_out == 0) 01370 session->current_state = msg_state_decompress; 01371 else 01372 { 01373 if (MsgIsAckPending(dblock->status_flag)) 01374 session->current_state = msg_state_send_ack; 01375 } 01376 } 01377 } 01378 else if ((status == connector_active) || (status == connector_pending)) 01379 { 01380 session->current_state = msg_state_process_decompressed; 01381 status = connector_working; 01382 } 01383 01384 error: 01385 return status; 01386 } 01387 01388 static connector_status_t msg_decompress_data(connector_data_t * const connector_ptr, msg_session_t * const session) 01389 { 01390 connector_status_t status = connector_working; 01391 msg_data_block_t * const dblock = session->in_dblock; 01392 z_streamp zlib_ptr; 01393 01394 ASSERT_GOTO(dblock != NULL, error); 01395 zlib_ptr = &dblock->zlib; 01396 01397 if (zlib_ptr->avail_out == 0) 01398 { 01399 zlib_ptr->next_out = dblock->buffer_out; 01400 zlib_ptr->avail_out = sizeof dblock->buffer_out; 01401 } 01402 01403 { 01404 int const zret = inflate(zlib_ptr, Z_NO_FLUSH); 01405 01406 session->current_state = MsgIsAckPending(dblock->status_flag) ? msg_state_send_ack : msg_state_receive; 01407 switch(zret) 01408 { 01409 case Z_BUF_ERROR: 01410 if (zlib_ptr->avail_in > 0) 01411 goto error; 01412 goto done; 01413 01414 case Z_OK: 01415 if (zlib_ptr->avail_out > 0) 01416 goto done; 01417 break; 01418 01419 case Z_STREAM_END: 01420 MsgSetLastData(dblock->status_flag); 01421 break; 01422 01423 default: 01424 goto error; 01425 } 01426 01427 dblock->bytes_out = sizeof dblock->buffer_out - zlib_ptr->avail_out; 01428 status = msg_process_decompressed_data(connector_ptr, session); 01429 } 01430 01431 goto done; 01432 01433 error: 01434 status = msg_inform_error(connector_ptr, session, connector_session_error_decompression_failure); 01435 01436 done: 01437 return status; 01438 } 01439 01440 static connector_status_t msg_process_compressed_data(connector_data_t * const connector_ptr, msg_session_t * const session, uint8_t * data, size_t const bytes) 01441 { 01442 connector_status_t status = connector_working; 01443 msg_data_block_t * const dblock = session->in_dblock; 01444 z_streamp zlib_ptr; 01445 01446 ASSERT_GOTO(dblock != NULL, error); 01447 zlib_ptr = &dblock->zlib; 01448 01449 if (zlib_ptr->avail_in > 0) 01450 { 01451 status = connector_pending; 01452 goto done; 01453 } 01454 01455 ASSERT_GOTO(bytes < sizeof dblock->buffer_in, error); 01456 memcpy(dblock->buffer_in, data, bytes); 01457 01458 zlib_ptr->next_in = dblock->buffer_in; 01459 zlib_ptr->avail_in = bytes; 01460 01461 status = msg_decompress_data(connector_ptr, session); 01462 01463 error: 01464 done: 01465 return status; 01466 } 01467 #endif 01468 01469 static connector_status_t msg_process_service_data(connector_data_t * const connector_ptr, msg_session_t * const session, uint8_t * msg_data, size_t const frame_bytes, size_t const header_bytes, unsigned const flag) 01470 { 01471 connector_status_t status = connector_working; 01472 uint8_t * buffer = msg_data + header_bytes; 01473 size_t const bytes = frame_bytes - header_bytes; 01474 msg_data_block_t * const dblock = session->in_dblock; 01475 01476 ASSERT_GOTO(dblock != NULL, error); 01477 if (session->current_state != msg_state_receive) 01478 { 01479 status = ((session->current_state == msg_state_send_error) || (session->current_state == msg_state_delete)) ? connector_working : connector_active; 01480 goto done; 01481 } 01482 01483 #if (defined CONNECTOR_COMPRESSION) 01484 if (MsgIsCompressed(dblock->status_flag)) 01485 { 01486 status = msg_process_compressed_data(connector_ptr, session, buffer, bytes); 01487 goto done; 01488 } 01489 #endif 01490 01491 if (MsgIsLastData(flag)) 01492 MsgSetLastData(dblock->status_flag); 01493 01494 status = msg_pass_service_data(connector_ptr, session, buffer, bytes); 01495 01496 error: 01497 done: 01498 return status; 01499 } 01500 01501 static connector_status_t msg_process_start(connector_data_t * const connector_ptr, connector_msg_data_t * const msg_ptr, uint8_t * ptr, uint16_t const length) 01502 { 01503 connector_status_t status = connector_working; 01504 connector_session_error_t result = connector_session_error_none; 01505 msg_session_t * session; 01506 uint8_t * start_packet = ptr; 01507 uint8_t flag = message_load_u8(start_packet, flags); 01508 uint16_t const session_id = message_load_be16(start_packet, transaction_id); 01509 connector_bool_t const client_owned = MsgIsNotRequest(flag); 01510 01511 session = msg_find_session(msg_ptr, session_id, client_owned); 01512 if (session == NULL) 01513 { 01514 uint16_t const service_id = message_load_be16(start_packet, service_id); 01515 01516 if (client_owned) 01517 { 01518 result = connector_session_error_unknown_session; 01519 goto error; 01520 } 01521 01522 session = msg_create_session(connector_ptr, msg_ptr, service_id, client_owned, &status); 01523 if (session == NULL) 01524 { 01525 switch (status) 01526 { 01527 case connector_pending: 01528 result = connector_session_error_busy; 01529 status = connector_working; 01530 break; 01531 01532 default: 01533 result = connector_session_error_fatal; 01534 break; 01535 } 01536 01537 goto error; 01538 } 01539 01540 session->session_id = session_id; 01541 if (session->out_dblock != NULL) 01542 { 01543 result = msg_initialize_data_block(session, msg_ptr->capabilities[msg_capability_client].window_size, msg_block_state_send_response); 01544 if (result != connector_session_error_none) 01545 goto error; 01546 } 01547 01548 result = msg_initialize_data_block(session, msg_ptr->capabilities[msg_capability_client].window_size, msg_block_state_recv_request); 01549 if (result != connector_session_error_none) 01550 goto error; 01551 } 01552 else 01553 { 01554 if (session->current_state == msg_state_send_error) 01555 goto done; 01556 01557 if (client_owned) 01558 { 01559 result = msg_initialize_data_block(session, msg_ptr->capabilities[msg_capability_client].window_size, msg_block_state_recv_response); 01560 if (result != connector_session_error_none) 01561 goto error; 01562 } 01563 } 01564 01565 { 01566 uint8_t const compression = message_load_u8(start_packet, compression_id); 01567 01568 #if (defined CONNECTOR_COMPRESSION) 01569 if (compression == MSG_COMPRESSION_NONE) 01570 MsgClearCompression(session->in_dblock->status_flag); 01571 #else 01572 if (compression != MSG_COMPRESSION_NONE) 01573 { 01574 result = connector_session_error_decompression_failure; 01575 goto error; 01576 } 01577 #endif 01578 } 01579 01580 MsgSetStart(flag); 01581 status = msg_process_service_data(connector_ptr, session, ptr, length, record_end(start_packet), flag); 01582 goto done; 01583 01584 error: 01585 { 01586 connector_status_t error_status = (session != NULL) ? msg_inform_error(connector_ptr, session, result) : msg_send_error(connector_ptr, msg_ptr, session, session_id, result, flag); 01587 01588 if (error_status == connector_abort) 01589 status = connector_abort; 01590 } 01591 01592 done: 01593 return status; 01594 } 01595 01596 static connector_status_t msg_process_data(connector_data_t * const connector_ptr, connector_msg_data_t * const msg_ptr, uint8_t * ptr, uint16_t const length) 01597 { 01598 connector_status_t status = connector_working; 01599 msg_session_t * session; 01600 uint8_t * const data_packet = ptr; 01601 uint8_t const flag = message_load_u8(data_packet, flags); 01602 uint16_t const session_id = message_load_be16(data_packet, transaction_id); 01603 connector_bool_t const client_owned = MsgIsNotRequest(flag); 01604 01605 session = msg_find_session(msg_ptr, session_id, client_owned); 01606 if (session == NULL) 01607 { 01608 status = msg_send_error(connector_ptr, msg_ptr, session, session_id, connector_session_error_unknown_session, flag); 01609 goto done; 01610 } 01611 01612 status = msg_process_service_data(connector_ptr, session, ptr, length, record_end(data_packet), flag); 01613 01614 done: 01615 return status; 01616 } 01617 01618 static connector_status_t msg_process_ack(connector_msg_data_t * const msg_fac, uint8_t const * ptr) 01619 { 01620 msg_session_t * session; 01621 uint8_t const * const ack_packet = ptr; 01622 01623 { 01624 uint16_t const session_id = message_load_be16(ack_packet, transaction_id); 01625 uint8_t const flag = message_load_u8(ack_packet, flags); 01626 connector_bool_t const client_owned = MsgIsRequest(flag); 01627 01628 session = msg_find_session(msg_fac, session_id, client_owned); 01629 /* already closed? done sending all data */ 01630 if (session == NULL) 01631 goto done; 01632 } 01633 01634 { 01635 msg_data_block_t * const dblock = session->out_dblock; 01636 01637 ASSERT_GOTO(dblock != NULL, error); 01638 dblock->available_window = message_load_be32(ack_packet, window_size); 01639 dblock->ack_count = message_load_be32(ack_packet, ack_count); 01640 01641 if (dblock->available_window > 0) 01642 { 01643 MsgClearAckPending(dblock->status_flag); 01644 if (session->current_state == msg_state_wait_ack) 01645 session->current_state = session->saved_state; 01646 } 01647 } 01648 01649 error: 01650 done: 01651 return connector_working; 01652 } 01653 01654 static connector_status_t msg_process_error(connector_data_t * const connector_ptr, connector_msg_data_t * const msg_fac, uint8_t * const ptr) 01655 { 01656 connector_status_t status = connector_working; 01657 uint8_t * const error_packet = ptr; 01658 uint8_t const flag = message_load_u8(error_packet, flags); 01659 connector_bool_t const client_request_error = connector_bool(MsgIsRequest(flag) && MsgIsNotSender(flag)); 01660 connector_bool_t const cloud_response_error = connector_bool(MsgIsNotRequest(flag) && MsgIsSender(flag)); 01661 connector_bool_t const client_owned = connector_bool(client_request_error || cloud_response_error); 01662 uint16_t const session_id = message_load_be16(error_packet, transaction_id); 01663 msg_session_t * const session = msg_find_session(msg_fac, session_id, client_owned); 01664 01665 if (session != NULL) 01666 { 01667 if ((session->current_state != msg_state_delete) && (session->current_state != msg_state_send_error)) 01668 { 01669 uint8_t const error_val = message_load_u8(error_packet, error_code); 01670 01671 ASSERT_GOTO(error_val < connector_session_error_count, error); 01672 { 01673 connector_session_error_t const msg_error = (connector_session_error_t)error_val; 01674 01675 status = msg_inform_error(connector_ptr, session, msg_error); 01676 if ((status != connector_pending) && (status != connector_abort)) 01677 status = msg_delete_session(connector_ptr, msg_fac, session); 01678 } 01679 } 01680 } 01681 else 01682 { 01683 connector_debug_printf("msg_process_error: unable to find session id = %d\n", session_id); 01684 } 01685 01686 error: 01687 return status; 01688 } 01689 01690 static connector_status_t msg_discovery(connector_data_t * const connector_ptr, void * const facility_data, uint8_t * const packet, unsigned int * const receive_timeout) 01691 { 01692 uint8_t const capability_flag = MSG_FLAG_REQUEST; 01693 01694 UNUSED_PARAMETER(packet); 01695 UNUSED_PARAMETER(receive_timeout); 01696 01697 return msg_send_capabilities(connector_ptr, facility_data, capability_flag); 01698 } 01699 01700 static void msg_switch_session(connector_msg_data_t * const msg_ptr, msg_session_t * const session) 01701 { 01702 if (!msg_ptr->session_locked) 01703 { 01704 msg_ptr->session_locked = connector_true; 01705 msg_ptr->session.current = (session->prev != NULL) ? session->prev : msg_ptr->session.tail; 01706 msg_ptr->session_locked = connector_false; 01707 } 01708 } 01709 01710 static connector_status_t msg_process_pending(connector_data_t * const connector_ptr, connector_msg_data_t * const msg_ptr, unsigned int * const receive_timeout) 01711 { 01712 connector_status_t status = connector_idle; 01713 01714 if (msg_ptr->session_locked) goto done; 01715 01716 #if (defined CONNECTOR_DATA_SERVICE) 01717 if (msg_ptr->pending_service_request != NULL) 01718 { 01719 status = msg_start_session(connector_ptr, msg_ptr); 01720 if (status != connector_working) goto done; 01721 } 01722 #endif 01723 01724 #if (defined CONNECTOR_DATA_POINTS) 01725 status = dp_process_request(connector_ptr, connector_transport_tcp); 01726 if ((status != connector_idle) && (status != connector_working)) 01727 goto done; 01728 #endif 01729 01730 msg_ptr->session_locked = connector_true; 01731 if (msg_ptr->session.current == NULL) 01732 msg_ptr->session.current = msg_ptr->session.tail; 01733 msg_ptr->session_locked = connector_false; 01734 01735 *receive_timeout = MAX_RECEIVE_TIMEOUT_IN_SECONDS; 01736 if (msg_ptr->session.current != NULL) 01737 { 01738 msg_session_t * const session = msg_ptr->session.current; 01739 01740 status = connector_working; 01741 01742 *receive_timeout = MIN_RECEIVE_TIMEOUT_IN_SECONDS; 01743 switch (session->current_state) 01744 { 01745 case msg_state_init: 01746 case msg_state_wait_ack: 01747 case msg_state_receive: 01748 case msg_state_wait_send_complete: 01749 msg_switch_session(msg_ptr, session); 01750 break; 01751 01752 case msg_state_get_data: 01753 session->saved_state = msg_state_get_data; 01754 status = msg_get_service_data(connector_ptr, session); 01755 if (status == connector_pending) 01756 msg_switch_session(msg_ptr, session); 01757 break; 01758 01759 case msg_state_send_data: 01760 status = msg_send_data(connector_ptr, session); 01761 break; 01762 01763 #if (defined CONNECTOR_COMPRESSION) 01764 case msg_state_compress: 01765 status = msg_compress_data(connector_ptr, session); 01766 break; 01767 01768 case msg_state_decompress: 01769 status = msg_decompress_data(connector_ptr, session); 01770 break; 01771 01772 case msg_state_process_decompressed: 01773 status = msg_process_decompressed_data(connector_ptr, session); 01774 break; 01775 #endif 01776 01777 case msg_state_send_ack: 01778 status = msg_send_ack(connector_ptr, msg_ptr, session); 01779 break; 01780 01781 case msg_state_send_error: 01782 { 01783 uint8_t const flag = (uint8_t)session->error_flag; 01784 uint16_t const session_id = (uint16_t)session->session_id; 01785 01786 status = msg_send_error(connector_ptr, msg_ptr, session, session_id, session->error, flag); 01787 break; 01788 } 01789 01790 case msg_state_delete: 01791 status = msg_delete_session(connector_ptr, msg_ptr, session); 01792 break; 01793 01794 default: 01795 status = connector_init_error; 01796 connector_debug_printf("Failed %X, state%d\n", session, session->current_state); 01797 ASSERT_GOTO(connector_false, done); 01798 break; 01799 } 01800 } 01801 01802 done: 01803 return status; 01804 } 01805 01806 static connector_status_t msg_process(connector_data_t * const connector_ptr, void * const facility_data, uint8_t * const edp_header, unsigned int * const receive_timeout) 01807 { 01808 connector_status_t status = connector_idle; 01809 connector_msg_data_t * const msg_ptr = facility_data; 01810 01811 ASSERT_GOTO(connector_ptr != NULL, error); 01812 ASSERT_GOTO(msg_ptr != NULL, error); 01813 01814 if (edp_header != NULL) 01815 { 01816 uint8_t * const data_ptr = GET_PACKET_DATA_POINTER(edp_header, PACKET_EDP_FACILITY_SIZE); 01817 uint16_t const length = message_load_be16(edp_header, length); 01818 uint8_t const opcode = *data_ptr; 01819 01820 switch (opcode) 01821 { 01822 case msg_opcode_capability: 01823 status = msg_process_capabilities(connector_ptr, msg_ptr, data_ptr); 01824 break; 01825 01826 case msg_opcode_start: 01827 status = msg_process_start(connector_ptr, msg_ptr, data_ptr, length); 01828 break; 01829 01830 case msg_opcode_data: 01831 status = msg_process_data(connector_ptr, msg_ptr, data_ptr, length); 01832 break; 01833 01834 case msg_opcode_ack: 01835 status = msg_process_ack(msg_ptr, data_ptr); 01836 break; 01837 01838 case msg_opcode_error: 01839 status = msg_process_error(connector_ptr, msg_ptr, data_ptr); 01840 break; 01841 01842 default: 01843 connector_debug_printf("msg_process: Invalid opcode\n"); 01844 break; 01845 } 01846 } 01847 01848 { 01849 connector_status_t const pending_state = msg_process_pending(connector_ptr, msg_ptr, receive_timeout); 01850 01851 if ((pending_state == connector_pending) || (pending_state == connector_working) || (pending_state == connector_active)) 01852 { 01853 if (status == connector_idle) 01854 status = pending_state; 01855 } 01856 else 01857 { 01858 if (pending_state != connector_idle) 01859 status = pending_state; 01860 } 01861 } 01862 01863 if (status == connector_unavailable) status = connector_working; 01864 01865 error: 01866 return status; 01867 } 01868 01869 static connector_status_t msg_cleanup_all_sessions(connector_data_t * const connector_ptr, uint16_t const service_id) 01870 { 01871 connector_status_t status = connector_working; 01872 connector_msg_data_t * const msg_ptr = get_facility_data(connector_ptr, E_MSG_FAC_MSG_NUM); 01873 msg_session_t * session; 01874 01875 ASSERT_GOTO(msg_ptr != NULL, error); 01876 session= msg_ptr->session.head; 01877 01878 while(session != NULL) 01879 { 01880 msg_session_t * next_session = session->next; 01881 01882 if (session->service_id == service_id) 01883 { 01884 if (session->current_state != msg_state_delete && session->current_state != msg_state_send_error) 01885 msg_inform_error(connector_ptr, session, connector_session_error_cancel); 01886 01887 status = msg_delete_session(connector_ptr, msg_ptr, session); 01888 if (status != connector_working) goto error; 01889 } 01890 01891 session = next_session; 01892 } 01893 01894 #if (defined CONNECTOR_DATA_SERVICE) 01895 if (msg_ptr->pending_service_request != NULL) 01896 status = msg_handle_pending_request(connector_ptr, msg_ptr, NULL, connector_session_error_cancel); 01897 #endif 01898 01899 error: 01900 return status; 01901 } 01902 01903 static connector_status_t msg_delete_facility(connector_data_t * const connector_ptr, uint16_t const service_id) 01904 { 01905 connector_status_t status = connector_no_resource; 01906 connector_msg_data_t * const msg_ptr = get_facility_data(connector_ptr, E_MSG_FAC_MSG_NUM); 01907 01908 if (msg_ptr == NULL) 01909 { 01910 status = connector_idle; 01911 goto error; 01912 } 01913 01914 ASSERT_GOTO(service_id < msg_service_id_count, error); 01915 ASSERT_GOTO(msg_ptr->service_cb[service_id] != NULL, error); 01916 01917 msg_ptr->service_cb[service_id] = NULL; 01918 01919 { 01920 connector_bool_t is_empty = connector_true; 01921 int i; 01922 01923 for(i = 0; i < msg_service_id_count; i++) 01924 { 01925 if (msg_ptr->service_cb[i] != NULL) 01926 { 01927 is_empty = connector_false; 01928 break; 01929 } 01930 } 01931 01932 status = is_empty == connector_true ? del_facility_data(connector_ptr, E_MSG_FAC_MSG_NUM) : connector_working; 01933 } 01934 01935 error: 01936 return status; 01937 } 01938 01939 01940 static connector_status_t msg_init_facility(connector_data_t * const connector_ptr, unsigned int const facility_index, uint16_t service_id, connector_msg_callback_t callback) 01941 { 01942 connector_status_t status = connector_no_resource; 01943 connector_msg_data_t * msg_ptr = get_facility_data(connector_ptr, E_MSG_FAC_MSG_NUM); 01944 01945 if (msg_ptr == NULL) 01946 { 01947 void * fac_ptr = NULL; 01948 connector_config_max_transaction_t config_max_transaction; 01949 01950 status = add_facility_data(connector_ptr, facility_index, E_MSG_FAC_MSG_NUM, &fac_ptr, sizeof *msg_ptr); 01951 ASSERT_GOTO(status == connector_working, done); 01952 ASSERT_GOTO(fac_ptr != NULL, done); 01953 01954 msg_ptr = fac_ptr; 01955 memset(msg_ptr, 0, sizeof *msg_ptr); 01956 01957 #if (defined CONNECTOR_COMPRESSION) 01958 msg_ptr->capabilities[msg_capability_client].compression_supported = connector_true; 01959 #endif 01960 01961 #if (defined CONNECTOR_MSG_MAX_TRANSACTION) 01962 config_max_transaction.count = CONNECTOR_MSG_MAX_TRANSACTION; 01963 #else 01964 { 01965 connector_request_id_t request_id; 01966 connector_callback_status_t callback_status; 01967 01968 request_id.config_request = connector_request_id_config_max_transaction; 01969 callback_status = connector_callback(connector_ptr->callback, connector_class_id_config, request_id, &config_max_transaction); 01970 if (callback_status != connector_callback_continue && callback_status != connector_callback_unrecognized) 01971 { 01972 status = connector_abort; 01973 goto done; 01974 } 01975 } 01976 #endif 01977 01978 msg_ptr->capabilities[msg_capability_client].max_transactions = config_max_transaction.count; 01979 01980 { 01981 uint32_t const recv_window = 16384; 01982 01983 ASSERT(recv_window > MSG_MAX_RECV_PACKET_SIZE); 01984 msg_ptr->capabilities[msg_capability_client].window_size = recv_window; 01985 } 01986 } 01987 01988 msg_ptr->service_cb[service_id] = callback; 01989 status = connector_working; 01990 01991 done: 01992 return status; 01993 } 01994 01995 #if (defined CONNECTOR_RCI_SERVICE) 01996 01997 #include "rci_service.h" 01998 01999 #if (!defined CONNECTOR_FIRMWARE_SERVICE) 02000 #include "rci_fwstub.h" 02001 #endif 02002 #endif 02003 02004
Generated on Tue Jul 12 2022 19:18:38 by
1.7.2