Sebastián Pastor / EtheriosCloudConnector
Embed: (wiki syntax)

« Back to documentation index

Show/hide line numbers connector_msg.h Source File

connector_msg.h

00001 /*
00002  * Copyright (c) 2013 Digi International Inc.,
00003  * All rights not expressly granted are reserved.
00004  *
00005  * This Source Code Form is subject to the terms of the Mozilla Public
00006  * License, v. 2.0. If a copy of the MPL was not distributed with this file,
00007  * You can obtain one at http://mozilla.org/MPL/2.0/.
00008  *
00009  * Digi International Inc. 11001 Bren Road East, Minnetonka, MN 55343
00010  * =======================================================================
00011  */
00012 
00013 #if (defined CONNECTOR_COMPRESSION)
00014 #include "zlib.h"
00015 #endif
00016 
00017 #define MSG_FACILITY_VERSION  0x01
00018 #define MSG_COMPRESSION_NONE  0x00
00019 #define MSG_COMPRESSION_LIBZ  0xFF
00020 
00021 #define MSG_INVALID_CLIENT_SESSION  0xFFFF
00022 
00023 #define MSG_FLAG_REQUEST      UINT32_C(0x01)
00024 #define MSG_FLAG_LAST_DATA    UINT32_C(0x02)
00025 #define MSG_FLAG_SENDER       UINT32_C(0x04)
00026 
00027 #define MSG_FLAG_CLIENT_OWNED UINT32_C(0x20)
00028 #define MSG_FLAG_RECEIVING    UINT32_C(0x40)
00029 #define MSG_FLAG_START        UINT32_C(0x80)
00030 #define MSG_FLAG_ACK_PENDING  UINT32_C(0x100)
00031 #define MSG_FLAG_COMPRESSED   UINT32_C(0x200)
00032 #define MSG_FLAG_INFLATED     UINT32_C(0x400)
00033 #define MSG_FLAG_DEFLATED     UINT32_C(0x800)
00034 #define MSG_FLAG_SEND_NOW     UINT32_C(0x1000)
00035 #define MSG_FLAG_DOUBLE_BUF   UINT32_C(0x2000)
00036 
00037 #define MsgIsBitSet(flag, bit)   (connector_bool(((flag) & (bit)) == (bit)))
00038 #define MsgIsBitClear(flag, bit) (connector_bool(((flag) & (bit)) == 0))
00039 #define MsgBitSet(flag, bit)    ((flag) |= (bit))
00040 #define MsgBitClear(flag, bit)  ((flag) &= ~(bit))
00041 
00042 #define MsgIsRequest(flag)      MsgIsBitSet((flag), MSG_FLAG_REQUEST)
00043 #define MsgIsLastData(flag)     MsgIsBitSet((flag), MSG_FLAG_LAST_DATA)
00044 #define MsgIsSender(flag)       MsgIsBitSet((flag), MSG_FLAG_SENDER)
00045 #define MsgIsReceiving(flag)    MsgIsBitSet((flag), MSG_FLAG_RECEIVING)
00046 #define MsgIsCompressed(flag)   MsgIsBitSet((flag), MSG_FLAG_COMPRESSED)
00047 #define MsgIsStart(flag)        MsgIsBitSet((flag), MSG_FLAG_START)
00048 #define MsgIsAckPending(flag)   MsgIsBitSet((flag), MSG_FLAG_ACK_PENDING)
00049 #define MsgIsClientOwned(flag)  MsgIsBitSet((flag), MSG_FLAG_CLIENT_OWNED)
00050 #define MsgIsInflated(flag)     MsgIsBitSet((flag), MSG_FLAG_INFLATED)
00051 #define MsgIsDeflated(flag)     MsgIsBitSet((flag), MSG_FLAG_DEFLATED)
00052 #define MsgIsSendNow(flag)      MsgIsBitSet((flag), MSG_FLAG_SEND_NOW)
00053 #define MsgIsDoubleBuf(flag)    MsgIsBitSet((flag), MSG_FLAG_DOUBLE_BUF)
00054 
00055 #define MsgIsNotRequest(flag)      MsgIsBitClear((flag), MSG_FLAG_REQUEST)
00056 #define MsgIsNotLastData(flag)     MsgIsBitClear((flag), MSG_FLAG_LAST_DATA)
00057 #define MsgIsNotSender(flag)       MsgIsBitClear((flag), MSG_FLAG_SENDER)
00058 #define MsgIsNotReceiving(flag)    MsgIsBitClear((flag), MSG_FLAG_RECEIVING)
00059 #define MsgIsNotClientOwned(flag)  MsgIsBitClear((flag), MSG_FLAG_CLIENT_OWNED)
00060 #define MsgIsNotInflated(flag)     MsgIsBitClear((flag), MSG_FLAG_INFLATED)
00061 #define MsgIsNotDeflated(flag)     MsgIsBitClear((flag), MSG_FLAG_DEFLATED)
00062 
00063 #define MsgSetRequest(flag)     MsgBitSet((flag), MSG_FLAG_REQUEST)
00064 #define MsgSetLastData(flag)    MsgBitSet((flag), MSG_FLAG_LAST_DATA)
00065 #define MsgSetSender(flag)      MsgBitSet((flag), MSG_FLAG_SENDER)
00066 #define MsgSetReceiving(flag)   MsgBitSet((flag), MSG_FLAG_RECEIVING)
00067 #define MsgSetCompression(flag) MsgBitSet((flag), MSG_FLAG_COMPRESSED)
00068 #define MsgSetStart(flag)       MsgBitSet((flag), MSG_FLAG_START)
00069 #define MsgSetAckPending(flag)  MsgBitSet((flag), MSG_FLAG_ACK_PENDING)
00070 #define MsgSetClientOwned(flag) MsgBitSet((flag), MSG_FLAG_CLIENT_OWNED)
00071 #define MsgSetInflated(flag)    MsgBitSet((flag), MSG_FLAG_INFLATED)
00072 #define MsgSetDeflated(flag)    MsgBitSet((flag), MSG_FLAG_DEFLATED)
00073 #define MsgSetSendNow(flag)     MsgBitSet((flag), MSG_FLAG_SEND_NOW)
00074 #define MsgSetDoubleBuf(flag)   MsgBitSet((flag), MSG_FLAG_DOUBLE_BUF)
00075 
00076 #define MsgClearRequest(flag)     MsgBitClear((flag), MSG_FLAG_REQUEST)
00077 #define MsgClearLastData(flag)    MsgBitClear((flag), MSG_FLAG_LAST_DATA)
00078 #define MsgClearReceiving(flag)   MsgBitClear((flag), MSG_FLAG_RECEIVING)
00079 #define MsgClearStart(flag)       MsgBitClear((flag), MSG_FLAG_START)
00080 #define MsgClearAckPending(flag)  MsgBitClear((flag), MSG_FLAG_ACK_PENDING)
00081 #define MsgClearCompression(flag) MsgBitClear((flag), MSG_FLAG_COMPRESSED)
00082 #define MsgClearInflated(flag)    MsgBitClear((flag), MSG_FLAG_INFLATED)
00083 #define MsgClearDeflated(flag)    MsgBitClear((flag), MSG_FLAG_DEFLATED)
00084 #define MsgClearSendNow(flag)     MsgBitClear((flag), MSG_FLAG_SEND_NOW)
00085 
00086 typedef enum
00087 {
00088     msg_service_id_none,
00089     msg_service_id_data,
00090     msg_service_id_file,
00091     msg_service_id_rci,
00092     msg_service_id_brci,
00093     msg_service_id_count
00094 } msg_service_id_t;
00095 
00096 typedef enum
00097 {
00098     msg_opcode_capability,
00099     msg_opcode_start,
00100     msg_opcode_data,
00101     msg_opcode_ack,
00102     msg_opcode_error
00103 } msg_opcode_t;
00104 
00105 typedef enum
00106 {
00107     msg_block_state_send_request,
00108     msg_block_state_recv_request,
00109     msg_block_state_send_response,
00110     msg_block_state_recv_response
00111 } msg_block_state_t;
00112 
00113 typedef enum
00114 {
00115     msg_state_init,
00116     msg_state_get_data,
00117     msg_state_compress,
00118     msg_state_send_data,
00119     msg_state_wait_send_complete,
00120     msg_state_wait_ack,
00121     msg_state_receive,
00122     msg_state_decompress,
00123     msg_state_process_decompressed,
00124     msg_state_send_ack,
00125     msg_state_send_error,
00126     msg_state_delete
00127 } msg_state_t;
00128 
00129 typedef enum
00130 {
00131     msg_service_type_need_data,
00132     msg_service_type_have_data,
00133     msg_service_type_error,
00134     msg_service_type_free,
00135     msg_service_type_pending_request
00136 } msg_service_type_t;
00137 
00138 typedef enum
00139 {
00140     msg_capability_cloud,
00141     msg_capability_client,
00142     msg_capability_count
00143 } msg_capability_type_t;
00144 
00145 enum msg_capability_packet_t
00146 {
00147     field_define(capability_packet, opcode, uint8_t),
00148     field_define(capability_packet, flags, uint8_t),
00149     field_define(capability_packet, version, uint8_t),
00150     field_define(capability_packet, max_transactions, uint8_t),
00151     field_define(capability_packet, window_size, uint32_t),
00152     field_define(capability_packet, compression_count, uint8_t),
00153     record_end(capability_packet)
00154 };
00155 
00156 enum msg_start_packet_t
00157 {
00158     field_define(start_packet, opcode, uint8_t),
00159     field_define(start_packet, flags, uint8_t),
00160     field_define(start_packet, transaction_id, uint16_t),
00161     field_define(start_packet, service_id, uint16_t),
00162     field_define(start_packet, compression_id, uint8_t),
00163     record_end(start_packet)
00164 };
00165 
00166 enum msg_data_packet_t
00167 {
00168     field_define(data_packet, opcode, uint8_t),
00169     field_define(data_packet, flags, uint8_t),
00170     field_define(data_packet, transaction_id, uint16_t),
00171     record_end(data_packet)
00172 };
00173 
00174 enum msg_ack_packet_t
00175 {
00176     field_define(ack_packet, opcode, uint8_t),
00177     field_define(ack_packet, flags, uint8_t),
00178     field_define(ack_packet, transaction_id, uint16_t),
00179     field_define(ack_packet, ack_count, uint32_t),
00180     field_define(ack_packet, window_size, uint32_t),
00181     record_end(ack_packet)
00182 };
00183 
00184 enum msg_error_packet_t
00185 {
00186     field_define(error_packet, opcode, uint8_t),
00187     field_define(error_packet, flags, uint8_t),
00188     field_define(error_packet, transaction_id, uint16_t),
00189     field_define(error_packet, error_code, uint8_t),
00190     record_end(error_packet)
00191 };
00192 
00193 typedef struct msg_data_block_t
00194 {
00195     size_t total_bytes;
00196     size_t available_window;
00197     size_t ack_count;
00198     unsigned int status_flag;
00199 #if (defined CONNECTOR_COMPRESSION)
00200     uint8_t  buffer_in[MSG_MAX_RECV_PACKET_SIZE];
00201     uint8_t  buffer_out[MSG_MAX_RECV_PACKET_SIZE];
00202     size_t   bytes_out;
00203     int      z_flag;
00204     z_stream zlib;
00205 #endif
00206 } msg_data_block_t;
00207 
00208 typedef struct
00209 {
00210     void * data_ptr;
00211     size_t length_in_bytes;
00212     unsigned int flags;
00213 } msg_service_data_t;
00214 
00215 typedef struct
00216 {
00217     void * session;
00218     msg_service_type_t service_type;
00219     msg_service_data_t * need_data;
00220     msg_service_data_t * have_data;
00221     connector_session_error_t error_value;
00222 } msg_service_request_t;
00223 
00224 typedef struct msg_session_t
00225 {
00226     unsigned int session_id;
00227     unsigned int service_id;
00228     void * service_context;
00229     msg_state_t current_state;
00230     msg_state_t saved_state;
00231     uint8_t * send_data_ptr;
00232     size_t send_data_bytes;
00233     msg_data_block_t * in_dblock;
00234     msg_data_block_t * out_dblock;
00235     connector_session_error_t error;
00236     unsigned int error_flag;
00237     msg_service_request_t service_layer_data;
00238     struct msg_session_t * next;
00239     struct msg_session_t * prev;
00240 } msg_session_t;
00241 
00242 typedef connector_status_t connector_msg_callback_t(connector_data_t * const connector_ptr, msg_service_request_t * const service_request);
00243 
00244 typedef struct
00245 {
00246     uint32_t window_size;
00247     connector_bool_t compression_supported;
00248     uint8_t active_transactions;
00249     uint8_t max_transactions;
00250 } msg_capabilities_t;
00251 
00252 typedef struct
00253 {
00254     msg_capabilities_t capabilities[msg_capability_count];
00255     connector_msg_callback_t * service_cb[msg_service_id_count];
00256     connector_bool_t session_locked;
00257     struct
00258     {
00259         msg_session_t * head;
00260         msg_session_t * tail;
00261         msg_session_t * current;
00262     } session;
00263     unsigned int last_assigned_id;
00264     void const * pending_service_request;
00265 } connector_msg_data_t;
00266 
00267 static msg_session_t * msg_find_session(connector_msg_data_t const * const msg_ptr, unsigned int const id, connector_bool_t const client_owned)
00268 {
00269     msg_session_t * session = msg_ptr->session.head;
00270 
00271     while (session != NULL)
00272     {
00273         if (session->session_id == id)
00274         {
00275             unsigned int const status = (session->in_dblock != NULL) ? session->in_dblock->status_flag : session->out_dblock->status_flag;
00276 
00277             if (MsgIsClientOwned(status) == client_owned)
00278                 break;
00279         }
00280 
00281         session = session->next;
00282     }
00283 
00284     return session;
00285 }
00286 
00287 static unsigned int msg_find_next_available_id(connector_msg_data_t * const msg_ptr)
00288 {
00289     unsigned int new_id = MSG_INVALID_CLIENT_SESSION;
00290     unsigned int const last_id = msg_ptr->last_assigned_id;
00291 
00292     do
00293     {
00294         connector_bool_t const client_owned = connector_true;
00295 
00296         msg_ptr->last_assigned_id++;
00297         if (msg_ptr->last_assigned_id >= MSG_INVALID_CLIENT_SESSION)
00298         {
00299             msg_ptr->last_assigned_id = 0;
00300             continue;
00301         }
00302 
00303         if (msg_find_session(msg_ptr, msg_ptr->last_assigned_id, client_owned) == NULL)
00304         {
00305             new_id = msg_ptr->last_assigned_id;
00306             break;
00307         }
00308 
00309     } while (msg_ptr->last_assigned_id != last_id);
00310 
00311     return new_id;
00312 }
00313 
00314 static void msg_set_error(msg_session_t * const session, connector_session_error_t const error_code)
00315 {
00316     msg_data_block_t * const dblock = (session->in_dblock != NULL) ? session->in_dblock : session->out_dblock;
00317     connector_bool_t const client_request_error = connector_bool(MsgIsClientOwned(dblock->status_flag) && MsgIsNotReceiving(dblock->status_flag));
00318     connector_bool_t const client_response_error = connector_bool(MsgIsNotClientOwned(dblock->status_flag) && MsgIsNotReceiving(dblock->status_flag));
00319     connector_bool_t const cloud_request_error = connector_bool(MsgIsNotClientOwned(dblock->status_flag) && MsgIsReceiving(dblock->status_flag));
00320 
00321     if (client_request_error && MsgIsStart(dblock->status_flag))
00322     {
00323         /* no need to send an error. just delete since nothing has been sent to Device Cloud */
00324         session->current_state = msg_state_delete;
00325         goto done;
00326     }
00327 
00328     session->error = error_code;
00329     session->current_state = msg_state_send_error;
00330 
00331     if (client_response_error && MsgIsStart(dblock->status_flag))
00332     {
00333         /* canceling Device Cloud request since no response data has been sent yet */
00334         MsgSetRequest(session->error_flag);
00335         goto done;
00336     }
00337 
00338     if (client_request_error || cloud_request_error)
00339         MsgSetRequest(session->error_flag);
00340 
00341     if (client_request_error || client_response_error)
00342         MsgSetSender(session->error_flag);
00343 
00344 done:
00345     return;
00346 }
00347 
00348 static connector_status_t msg_call_service_layer(connector_data_t * const connector_ptr, msg_session_t * const session, msg_service_type_t const type)
00349 {
00350     connector_status_t status = connector_init_error;
00351     msg_service_request_t * const service_ptr = &session->service_layer_data;
00352     connector_msg_data_t * const msg_ptr = get_facility_data(connector_ptr, E_MSG_FAC_MSG_NUM);
00353     connector_msg_callback_t * cb_fn;
00354 
00355     ASSERT_GOTO(msg_ptr != NULL, error);
00356     cb_fn = msg_ptr->service_cb[session->service_id];
00357     ASSERT_GOTO(cb_fn != NULL, error);
00358 
00359     service_ptr->service_type = type;
00360     service_ptr->session = session;
00361 
00362     status = cb_fn(connector_ptr, service_ptr);
00363     if ((status == connector_working) && (service_ptr->service_type == msg_service_type_error))
00364     {
00365         msg_set_error(session, service_ptr->error_value);
00366         status = connector_unavailable;
00367     }
00368 
00369 error:
00370     return status;
00371 }
00372 
00373 static connector_status_t msg_inform_error(connector_data_t * const connector_ptr, msg_session_t * const session, connector_session_error_t error_code)
00374 {
00375     session->service_layer_data.error_value = error_code;
00376 
00377     return msg_call_service_layer(connector_ptr, session, msg_service_type_error);
00378 }
00379 
00380 static msg_session_t * msg_create_session(connector_data_t * const connector_ptr, connector_msg_data_t * const msg_ptr, unsigned int const service_id,
00381                                           connector_bool_t const client_owned, connector_status_t * const status)
00382 {
00383     unsigned int session_id = MSG_INVALID_CLIENT_SESSION;
00384     msg_capability_type_t const capability_id = client_owned == connector_true ? msg_capability_cloud : msg_capability_client;
00385     msg_session_t * session = NULL;
00386     unsigned int flags = 0;
00387 
00388     *status = connector_pending;
00389     ASSERT_GOTO(msg_ptr != NULL, done);
00390 
00391     {
00392         uint8_t const max_transactions = msg_ptr->capabilities[capability_id].max_transactions;
00393         uint8_t const active_transactions = msg_ptr->capabilities[capability_id].active_transactions;
00394         uint8_t const unlimited_transactions = 0;
00395 
00396         if ((max_transactions != unlimited_transactions) && (active_transactions >= max_transactions))
00397         {
00398             connector_debug_printf("msg_create_session: active transactions reached the limit [%d], service id [%d]\n", active_transactions, max_transactions, service_id);
00399             goto done;
00400         }
00401     }
00402 
00403     if (client_owned == connector_true)
00404     {
00405         session_id = msg_find_next_available_id(msg_ptr);
00406         if (session_id == MSG_INVALID_CLIENT_SESSION) goto done;
00407         MsgSetClientOwned(flags);
00408     }
00409 
00410     MsgSetRequest(flags);
00411     #if (defined CONNECTOR_COMPRESSION)
00412     MsgSetCompression(flags);
00413     #endif
00414 
00415     switch (service_id)
00416     {
00417     case msg_service_id_rci:
00418     case msg_service_id_brci:
00419         MsgSetDoubleBuf(flags);
00420         break;
00421 
00422     default:
00423         break;
00424     }
00425 
00426     {
00427         void * ptr;
00428         uint8_t * data_ptr;
00429         size_t const bytes_in_block = sizeof(msg_data_block_t);
00430         size_t const bytes_in_service_data = sizeof(msg_service_data_t);
00431         size_t const bytes_in_session = sizeof *session;
00432         size_t const single_buffer_bytes = bytes_in_block + bytes_in_service_data;
00433         size_t const double_buffer_bytes = MsgIsCompressed(flags) ? 2 * single_buffer_bytes : (2 * single_buffer_bytes) + MSG_MAX_SEND_PACKET_SIZE;
00434         size_t const total_bytes = bytes_in_session + (MsgIsDoubleBuf(flags) ? double_buffer_bytes : single_buffer_bytes);
00435         connector_static_buffer_id_t buffer_id = client_owned == connector_true ? named_buffer_id(msg_session_client) : named_buffer_id(msg_session);
00436 
00437         *status = malloc_data_buffer(connector_ptr, total_bytes, buffer_id, &ptr);
00438         if (*status != connector_working) goto done;
00439 
00440         data_ptr = ptr;
00441         session = ptr;
00442         data_ptr += bytes_in_session;
00443 
00444         if (MsgIsDoubleBuf(flags))
00445         {
00446             session->out_dblock = (msg_data_block_t *)(session + 1);
00447             session->in_dblock = session->out_dblock + 1;
00448             data_ptr += (2 * bytes_in_block);
00449             session->service_layer_data.have_data = (msg_service_data_t *)(session->in_dblock + 1);
00450             session->service_layer_data.need_data = session->service_layer_data.have_data + 1;
00451             data_ptr += (2 * bytes_in_service_data);
00452             session->send_data_ptr = MsgIsCompressed(flags) ? NULL : data_ptr;
00453         }
00454         else
00455         {
00456             msg_data_block_t ** const current_dblock_ptr = client_owned == connector_true ? &session->out_dblock : &session->in_dblock;
00457             msg_data_block_t ** const other_dblock_ptr = client_owned == connector_true ? &session->in_dblock : &session->out_dblock;
00458 
00459             *current_dblock_ptr = (msg_data_block_t *)(session + 1);
00460             *other_dblock_ptr = NULL;
00461             data_ptr += bytes_in_block;
00462             session->service_layer_data.need_data = (msg_service_data_t *)(client_owned == connector_true ? (*current_dblock_ptr) + 1 : NULL);
00463             session->service_layer_data.have_data = (msg_service_data_t *)(client_owned == connector_true ? NULL : (*current_dblock_ptr) + 1);
00464             session->send_data_ptr = NULL;
00465         }
00466     }
00467 
00468     session->session_id = session_id;
00469     session->service_id = service_id;
00470     session->error = connector_session_error_none;
00471     session->service_layer_data.error_value = connector_session_error_none;
00472     session->error_flag = 0;
00473     session->send_data_bytes = 0;
00474     session->service_context = NULL;
00475     session->current_state = msg_state_init;
00476     session->saved_state = msg_state_init;
00477 
00478     if (session->out_dblock != NULL)
00479         session->out_dblock->status_flag = flags;
00480 
00481     if (session->in_dblock != NULL)
00482         session->in_dblock->status_flag = flags;
00483 
00484     if (msg_ptr->session_locked) goto error;
00485     msg_ptr->session_locked = connector_true;
00486     add_list_node(&msg_ptr->session.head, &msg_ptr->session.tail, session);
00487     msg_ptr->session_locked = connector_false;
00488 
00489     msg_ptr->capabilities[capability_id].active_transactions++;
00490     *status = connector_working;
00491     goto done;
00492 
00493 error:
00494     {
00495         connector_status_t const result = free_data_buffer(connector_ptr, named_buffer_id(msg_session), session);
00496 
00497         *status = (result == connector_abort) ? connector_abort : connector_pending;
00498         session = NULL;
00499     }
00500 
00501 done:
00502     return session;
00503 }
00504 
00505 static connector_status_t msg_delete_session(connector_data_t * const connector_ptr, connector_msg_data_t * const msg_ptr, msg_session_t * const session)
00506 {
00507     connector_status_t status = connector_working;
00508 
00509     ASSERT_GOTO(msg_ptr != NULL, error);
00510     ASSERT_GOTO(session != NULL, error);
00511     ASSERT_GOTO((session->in_dblock != NULL) || (session->out_dblock != NULL), error);
00512 
00513     if (msg_ptr->session_locked) goto error;
00514     msg_ptr->session_locked = connector_true;
00515     remove_list_node(&msg_ptr->session.head, &msg_ptr->session.tail, session);
00516     if (msg_ptr->session.current == session)
00517         msg_ptr->session.current = (session->prev != NULL) ? session->prev : msg_ptr->session.tail;
00518     msg_ptr->session_locked = connector_false;
00519 
00520     #if (defined CONNECTOR_COMPRESSION)
00521     {
00522         if ((session->in_dblock != NULL) && MsgIsInflated(session->in_dblock->status_flag))
00523             inflateEnd(&session->in_dblock->zlib);
00524 
00525         if ((session->out_dblock != NULL) && MsgIsDeflated(session->out_dblock->status_flag))
00526             deflateEnd(&session->out_dblock->zlib);
00527     }
00528     #endif
00529 
00530     {
00531         unsigned int const flag = (session->in_dblock != NULL) ? session->in_dblock->status_flag : session->out_dblock->status_flag;
00532         msg_capability_type_t const capability_id = MsgIsClientOwned(flag) == connector_true ? msg_capability_cloud : msg_capability_client;
00533 
00534         if (msg_ptr->capabilities[capability_id].active_transactions > 0)
00535         {
00536             msg_ptr->capabilities[capability_id].active_transactions--;
00537         }
00538         else
00539         {
00540             ASSERT(connector_false);
00541         }
00542     }
00543 
00544     status = msg_call_service_layer(connector_ptr, session, msg_service_type_free);
00545 
00546     {
00547         connector_status_t const free_status = free_data_buffer(connector_ptr, named_buffer_id(msg_session), session);
00548 
00549         if (status != connector_abort)
00550             status = free_status;
00551     }
00552 
00553 error:
00554     return status;
00555 }
00556 
00557 static void msg_default_data_block(msg_data_block_t * dblock, uint32_t const window_size)
00558 {
00559     #if (defined CONNECTOR_COMPRESSION)
00560     dblock->bytes_out = 0;
00561     dblock->z_flag = Z_NO_FLUSH;
00562     #endif
00563 
00564     dblock->available_window = window_size;
00565     dblock->ack_count = 0;
00566     dblock->total_bytes = 0;
00567     MsgSetStart(dblock->status_flag);
00568     MsgClearLastData(dblock->status_flag);
00569 }
00570 
00571 static connector_session_error_t msg_initialize_data_block(msg_session_t * const session, uint32_t const window_size, msg_block_state_t state)
00572 {
00573     connector_session_error_t result = connector_session_error_none;
00574 
00575     ASSERT_GOTO(session != NULL, error);
00576 
00577     switch(state)
00578     {
00579     case msg_block_state_send_response:
00580         if (session->out_dblock == NULL)
00581         {
00582             #if (defined CONNECTOR_COMPRESSION)
00583             ASSERT_GOTO(session->in_dblock != NULL, compression_error);
00584             inflateEnd(&session->in_dblock->zlib);
00585             MsgClearInflated(session->in_dblock->status_flag);
00586             #endif
00587             session->out_dblock = session->in_dblock;
00588             ASSERT_GOTO(session->service_layer_data.have_data != NULL, error);
00589             session->service_layer_data.need_data = session->service_layer_data.have_data;
00590             session->in_dblock = NULL;
00591             session->service_layer_data.have_data = NULL;
00592         }
00593         MsgClearRequest(session->out_dblock->status_flag);
00594         /* intentional fall through */
00595 
00596     case msg_block_state_send_request:
00597         msg_default_data_block(session->out_dblock, window_size);
00598         MsgClearReceiving(session->out_dblock->status_flag);
00599         session->current_state = msg_state_get_data;
00600         session->service_layer_data.need_data->data_ptr = NULL;
00601         #if (defined CONNECTOR_COMPRESSION)
00602         if (MsgIsNotDeflated(session->out_dblock->status_flag) == connector_true)
00603         {
00604             memset(&session->out_dblock->zlib, 0, sizeof session->out_dblock->zlib);
00605             ASSERT_GOTO(deflateInit(&session->out_dblock->zlib, Z_DEFAULT_COMPRESSION) == Z_OK, compression_error);
00606             MsgSetDeflated(session->out_dblock->status_flag);
00607         }
00608         #endif
00609         break;
00610 
00611     case msg_block_state_recv_response:
00612         if (session->in_dblock == NULL)
00613         {
00614             #if (defined CONNECTOR_COMPRESSION)
00615             ASSERT_GOTO(session->out_dblock != NULL, compression_error);
00616             deflateEnd(&session->out_dblock->zlib);
00617             MsgClearDeflated(session->out_dblock->status_flag);
00618             #endif
00619             session->in_dblock = session->out_dblock;
00620             ASSERT_GOTO(session->service_layer_data.need_data != NULL, error);
00621             session->service_layer_data.have_data = session->service_layer_data.need_data;
00622             MsgClearRequest(session->in_dblock->status_flag);
00623             session->out_dblock = NULL;
00624             session->service_layer_data.need_data = NULL;
00625         }
00626         /* intentional fall through */
00627 
00628     case msg_block_state_recv_request:
00629         msg_default_data_block(session->in_dblock, window_size);
00630         MsgSetReceiving(session->in_dblock->status_flag);
00631         session->current_state = msg_state_receive;
00632         #if (defined CONNECTOR_COMPRESSION)
00633         if (MsgIsNotInflated(session->in_dblock->status_flag) == connector_true)
00634         {
00635             memset(&session->in_dblock->zlib, 0, sizeof session->in_dblock->zlib);
00636             ASSERT_GOTO(inflateInit(&session->in_dblock->zlib) == Z_OK, compression_error);
00637             MsgSetInflated(session->in_dblock->status_flag);
00638         }
00639         #endif
00640         break;
00641 
00642     default:
00643         ASSERT_GOTO(connector_false, error);
00644         break;
00645     }
00646 
00647     goto done;
00648 
00649 #if (defined CONNECTOR_COMPRESSION)
00650 compression_error:
00651     result = connector_session_error_compression_failure;
00652 #endif
00653 
00654 error:
00655 done:
00656     return result;
00657 }
00658 
00659 static connector_status_t msg_send_error(connector_data_t * const connector_ptr, connector_msg_data_t * const msg_ptr, msg_session_t * const session, uint16_t const session_id, connector_session_error_t const error_value, uint8_t const flag)
00660 {
00661     connector_status_t status = connector_working;
00662     uint8_t * error_packet;
00663     uint8_t * const edp_header = tcp_get_packet_buffer(connector_ptr, E_MSG_FAC_MSG_NUM, &error_packet, NULL);
00664 
00665     if (edp_header == NULL)
00666     {
00667         status = connector_pending;
00668         goto done;
00669     }
00670 
00671     message_store_u8(error_packet, opcode, msg_opcode_error);
00672     message_store_u8(error_packet, flags, flag);
00673     message_store_be16(error_packet, transaction_id, session_id);
00674     message_store_u8(error_packet, error_code, error_value);
00675 
00676     status = tcp_initiate_send_facility_packet(connector_ptr, edp_header, record_end(error_packet), E_MSG_FAC_MSG_NUM, tcp_release_packet_buffer, NULL);
00677 
00678 
00679     if ((status != connector_pending) && (session != NULL))
00680         status = msg_delete_session(connector_ptr, msg_ptr, session);
00681 
00682 done:
00683     return status;
00684 }
00685 
00686 static connector_status_t msg_send_capabilities(connector_data_t * const connector_ptr, connector_msg_data_t const * const msg_ptr, uint8_t const flag)
00687 {
00688     connector_status_t status = connector_pending;
00689     uint8_t * capability_packet = NULL;
00690     uint8_t * const edp_header = tcp_get_packet_buffer(connector_ptr, E_MSG_FAC_MSG_NUM, &capability_packet, NULL);
00691     uint8_t * variable_data_ptr = capability_packet + record_bytes(capability_packet);
00692 
00693     if ((edp_header == NULL) || (capability_packet == NULL))
00694         goto error;
00695 
00696     /*
00697      * ------------------------------------------------------------------------------------------
00698      * |   0    |    1  |     2   |    3    |   4-7  |     8    | 9-(8+n) |     9+n   | s bytes |
00699      *  -----------------------------------------------------------------------------------------
00700      * | opcode | flags | version | Max Trx | Window | Count(n) | list    | Count (s) |  list   |
00701      * |        |       |         |         |  size  |  compression algo  |     Service Info    |
00702      *  -----------------------------------------------------------------------------------------
00703      */
00704     message_store_u8(capability_packet, opcode, msg_opcode_capability);
00705     message_store_u8(capability_packet, flags, flag);
00706     message_store_u8(capability_packet, version, MSG_FACILITY_VERSION);
00707     message_store_u8(capability_packet, max_transactions, msg_ptr->capabilities[msg_capability_client].max_transactions);
00708     message_store_be32(capability_packet, window_size, msg_ptr->capabilities[msg_capability_client].window_size);
00709 
00710     /* append compression algorithms supported */
00711     {
00712         uint8_t const compression_length = (msg_ptr->capabilities[msg_capability_client].compression_supported) ? 1 : 0;
00713 
00714         message_store_u8(capability_packet, compression_count, compression_length);
00715         if (compression_length > 0)
00716             *variable_data_ptr++ = MSG_COMPRESSION_LIBZ;
00717     }
00718 
00719     /* append service IDs of all listeners */
00720     {
00721         uint16_t services = 0;
00722         uint16_t service_id;
00723 
00724         for (service_id = 0; service_id < msg_service_id_count; service_id++)
00725             if (msg_ptr->service_cb[service_id] !=  NULL) services++;
00726 
00727         StoreBE16(variable_data_ptr, services);
00728         variable_data_ptr += sizeof services;
00729 
00730         for (service_id = 0; service_id < msg_service_id_count; service_id++)
00731         {
00732             if (msg_ptr->service_cb[service_id] !=  NULL)
00733             {
00734                 StoreBE16(variable_data_ptr, service_id);
00735                 variable_data_ptr += sizeof service_id;
00736             }
00737         }
00738     }
00739 
00740     {
00741         size_t const packet_len = (size_t)(variable_data_ptr - capability_packet);
00742 
00743         status = tcp_initiate_send_facility_packet(connector_ptr, edp_header, packet_len, E_MSG_FAC_MSG_NUM, tcp_release_packet_buffer, NULL);
00744     }
00745 
00746 error:
00747     return status;
00748 }
00749 
00750 static void msg_fill_msg_header(msg_session_t * const session, void * ptr)
00751 {
00752     uint8_t flag = 0;
00753 
00754     if MsgIsRequest(session->out_dblock->status_flag)
00755         MsgSetRequest(flag);
00756 
00757     if (MsgIsLastData(session->out_dblock->status_flag))
00758         MsgSetLastData(flag);
00759 
00760     if (MsgIsStart(session->out_dblock->status_flag))
00761     {
00762         uint8_t * const start_packet = ptr;
00763 
00764         MsgClearStart(session->out_dblock->status_flag);
00765 
00766         /*
00767         * -----------------------------------------------------------------
00768         * |   0    |    1  |   2-3   |    4-5     |    6    |     7 ... n |
00769         *  ----------------------------------------------------------------
00770         * | opcode | flags |   Xid   | Service ID | Comp ID |     Data    |
00771         *  ----------------------------------------------------------------
00772         */
00773         message_store_u8(start_packet, opcode, msg_opcode_start);
00774         message_store_u8(start_packet, flags, flag);
00775         {
00776             uint16_t const session_id = (uint16_t)session->session_id;
00777             uint16_t const service_id = (uint16_t)session->service_id;
00778 
00779             message_store_be16(start_packet, transaction_id, session_id);
00780             message_store_be16(start_packet, service_id, service_id);
00781         }
00782         message_store_u8(start_packet, compression_id, (MsgIsCompressed(session->out_dblock->status_flag) ? MSG_COMPRESSION_LIBZ : MSG_COMPRESSION_NONE));
00783     }
00784     else
00785     {
00786         uint8_t * const data_packet = ptr;
00787 
00788         /*
00789          * ---------------------------------------
00790          * |   0    |    1  |   2-3   |  4 ... n |
00791          *  --------------------------------------
00792          * | opcode | flags |   Xid   |  Data    |
00793          *  --------------------------------------
00794          */
00795         message_store_u8(data_packet, opcode, msg_opcode_data);
00796         message_store_u8(data_packet, flags, flag);
00797         {
00798             uint16_t const session_id = (uint16_t)session->session_id;
00799 
00800             message_store_be16(data_packet, transaction_id, session_id);
00801         }
00802     }
00803 }
00804 
00805 static connector_status_t msg_send_complete(connector_data_t * const connector_ptr, uint8_t const * const packet, connector_status_t const status, void * const user_data)
00806 {
00807     connector_status_t return_status = connector_working;
00808 
00809     msg_session_t * const session = user_data;
00810     msg_data_block_t * const dblock = session->out_dblock;
00811 
00812     ASSERT_GOTO(session != NULL, error);
00813     ASSERT_GOTO(dblock != NULL, error);
00814 
00815     if ((MsgIsDoubleBuf(dblock->status_flag) == connector_false) && (MsgIsCompressed(dblock->status_flag) == connector_false))
00816     {
00817         return_status = tcp_release_packet_buffer(connector_ptr, packet, connector_success, NULL);
00818         if (return_status != connector_working) goto error;
00819     }
00820 
00821     switch (status)
00822     {
00823         case connector_service_busy:
00824             session->current_state = msg_state_send_data;
00825             break;
00826 
00827         case connector_success:
00828             /* update session state */
00829             if (MsgIsLastData(dblock->status_flag))
00830             {
00831                 if (MsgIsRequest(dblock->status_flag))
00832                 {
00833                     session->current_state = msg_state_receive;
00834                 }
00835                 else
00836                 {
00837                     connector_msg_data_t * const msg_ptr = get_facility_data(connector_ptr, E_MSG_FAC_MSG_NUM);
00838 
00839                     return_status = msg_delete_session(connector_ptr, msg_ptr, session);
00840                 }
00841 
00842                 goto done;
00843             }
00844 
00845             #if (defined CONNECTOR_COMPRESSION)
00846             dblock->bytes_out = 0;
00847             if (dblock->zlib.avail_out == 0)
00848             {
00849                 session->current_state = msg_state_compress;
00850                 goto done;
00851             }
00852 
00853             dblock->z_flag = Z_NO_FLUSH;
00854             dblock->zlib.avail_out = 0;
00855             #else
00856             session->send_data_bytes = 0;
00857             #endif
00858 
00859             session->service_layer_data.need_data->data_ptr = NULL;
00860             session->current_state = MsgIsAckPending(dblock->status_flag) == connector_true ? msg_state_wait_ack : session->saved_state;
00861 
00862             break;
00863 
00864         default:
00865             msg_inform_error(connector_ptr, session, connector_session_error_send);
00866             break;
00867     }
00868 
00869 error:
00870 done:
00871     return return_status;
00872 }
00873 
00874 static connector_status_t msg_send_data(connector_data_t * const connector_ptr, msg_session_t * const session)
00875 {
00876     connector_status_t status = connector_working;
00877     uint8_t * buffer;
00878     size_t bytes;
00879 
00880     #if (defined CONNECTOR_COMPRESSION)
00881     {
00882         msg_data_block_t * const dblock = session->out_dblock;
00883 
00884         bytes = dblock->bytes_out;
00885         buffer = dblock->buffer_out;
00886     }
00887     #else
00888     bytes = session->send_data_bytes;
00889     buffer = session->send_data_ptr;
00890     #endif
00891 
00892     ASSERT_GOTO(bytes > 0, error);
00893     status = tcp_initiate_send_facility_packet(connector_ptr, buffer, bytes, E_MSG_FAC_MSG_NUM, msg_send_complete, session);
00894     if (status != connector_working)
00895     {
00896         connector_status_t result = status;
00897 
00898         if (status == connector_pending) result = connector_service_busy;
00899 
00900         msg_send_complete(connector_ptr, buffer, result, session);
00901     }
00902     else
00903         session->current_state = msg_state_wait_send_complete;
00904 
00905 error:
00906     return status;
00907 }
00908 
00909 #if (defined CONNECTOR_COMPRESSION)
00910 static connector_status_t msg_compress_data(connector_data_t * const connector_ptr, msg_session_t * const session)
00911 {
00912     connector_status_t status = connector_working;
00913     msg_data_block_t * const dblock = session->out_dblock;
00914     uint8_t * const msg_buffer = GET_PACKET_DATA_POINTER(dblock->buffer_out, PACKET_EDP_FACILITY_SIZE);
00915     size_t const frame_bytes = sizeof dblock->buffer_out - PACKET_EDP_FACILITY_SIZE;
00916     z_streamp zlib_ptr = &dblock->zlib;
00917     int zret;
00918 
00919     if (zlib_ptr->avail_out == 0)
00920     {
00921         size_t const header_length = MsgIsStart(dblock->status_flag) ? record_end(start_packet) : record_end(data_packet);
00922 
00923         zlib_ptr->next_out = msg_buffer + header_length;
00924         zlib_ptr->avail_out = frame_bytes - header_length;
00925     }
00926 
00927     zret = deflate(zlib_ptr, dblock->z_flag);
00928     switch (zret)
00929     {
00930     case Z_OK:
00931         if ((dblock->z_flag != Z_SYNC_FLUSH) && (zlib_ptr->avail_out > 0))
00932             goto done;
00933         break;
00934 
00935     case Z_STREAM_END:
00936         MsgSetLastData(dblock->status_flag);
00937         break;
00938 
00939     default:
00940         status = msg_inform_error(connector_ptr, session, connector_session_error_compression_failure);
00941         goto done;
00942     }
00943 
00944     msg_fill_msg_header(session, msg_buffer);
00945     dblock->bytes_out = frame_bytes - zlib_ptr->avail_out;
00946     status = msg_send_data(connector_ptr, session);
00947 
00948 done:
00949     return status;
00950 }
00951 
00952 static connector_status_t msg_prepare_send_data(connector_data_t * const connector_ptr, msg_session_t * const session)
00953 {
00954     connector_status_t status = connector_no_resource;
00955     msg_data_block_t * const dblock = session->out_dblock;
00956 
00957     UNUSED_PARAMETER(connector_ptr);
00958     ASSERT_GOTO(dblock != NULL, error);
00959     ASSERT_GOTO(dblock->zlib.avail_in == 0, error);
00960 
00961     {
00962         unsigned int const flag = (dblock->total_bytes == 0) ? MSG_FLAG_START : 0;
00963         msg_service_data_t * const service_data = session->service_layer_data.need_data;
00964 
00965         ASSERT_GOTO(service_data != NULL, error);
00966         service_data->data_ptr = dblock->buffer_in;
00967         service_data->length_in_bytes = sizeof dblock->buffer_in;
00968         service_data->flags = flag;
00969         status = connector_working;
00970     }
00971 
00972 error:
00973     return status;
00974 }
00975 
00976 static connector_status_t msg_process_send_data(connector_data_t * const connector_ptr, msg_session_t * const session)
00977 {
00978     connector_status_t status = connector_no_resource;
00979     msg_data_block_t * const dblock = session->out_dblock;
00980 
00981     ASSERT_GOTO(dblock != NULL, error);
00982     ASSERT_GOTO(dblock->zlib.avail_in == 0, error);
00983 
00984     {
00985         z_streamp const zlib_ptr = &dblock->zlib;
00986         msg_service_data_t * const service_data = session->service_layer_data.need_data;
00987 
00988         zlib_ptr->next_in = dblock->buffer_in;
00989         zlib_ptr->avail_in = service_data->length_in_bytes;
00990         dblock->total_bytes += service_data->length_in_bytes;
00991         if (MsgIsLastData(service_data->flags))
00992             dblock->z_flag = Z_FINISH;
00993     }
00994 
00995     if (dblock->z_flag == Z_NO_FLUSH)
00996     {
00997         size_t const unacked_bytes = dblock->total_bytes - dblock->ack_count;
00998         size_t const safe_window_size = dblock->available_window - sizeof dblock->buffer_in;
00999 
01000         if (unacked_bytes > safe_window_size)
01001         {
01002             dblock->z_flag = Z_SYNC_FLUSH;
01003             MsgSetAckPending(dblock->status_flag);
01004         }
01005     }
01006 
01007     status = msg_compress_data(connector_ptr, session);
01008 
01009 error:
01010     return status;
01011 }
01012 
01013 #else
01014 
01015 static connector_status_t msg_prepare_send_data(connector_data_t * const connector_ptr, msg_session_t * const session)
01016 {
01017     uint8_t * msg_buffer;
01018     connector_status_t status = connector_no_resource;
01019     msg_data_block_t * const dblock = session->out_dblock;
01020 
01021     ASSERT_GOTO(dblock != NULL, error);
01022     if (MsgIsDoubleBuf(dblock->status_flag))
01023     {
01024         msg_buffer = GET_PACKET_DATA_POINTER(session->send_data_ptr, PACKET_EDP_FACILITY_SIZE);
01025         session->send_data_bytes = MSG_MAX_SEND_PACKET_SIZE - PACKET_EDP_FACILITY_SIZE;
01026     }
01027     else
01028     {
01029         session->send_data_ptr = tcp_get_packet_buffer(connector_ptr, E_MSG_FAC_MSG_NUM, &msg_buffer, &session->send_data_bytes);
01030         if (session->send_data_ptr == NULL)
01031         {
01032             status = connector_pending;
01033             goto done;
01034         }
01035     }
01036 
01037     {
01038         size_t const header_bytes = MsgIsStart(dblock->status_flag) == connector_true ? record_end(start_packet) : record_end(data_packet);
01039         msg_service_data_t * const service_data = session->service_layer_data.need_data;
01040         unsigned int const flag = MsgIsStart(dblock->status_flag) ? MSG_FLAG_START : 0;
01041 
01042         ASSERT_GOTO(MsgIsNotLastData(dblock->status_flag), error);
01043         ASSERT_GOTO(session->send_data_bytes > header_bytes, error);
01044         ASSERT_GOTO(service_data != NULL, error);
01045 
01046         service_data->data_ptr = msg_buffer + header_bytes;
01047         service_data->length_in_bytes = session->send_data_bytes - header_bytes;
01048         service_data->flags = flag;
01049         status = connector_working;
01050     }
01051 
01052 error:
01053 done:
01054     return status;
01055 }
01056 
01057 static connector_status_t msg_process_send_data(connector_data_t * const connector_ptr, msg_session_t * const session)
01058 {
01059     connector_status_t status = connector_no_resource;
01060     msg_data_block_t * const dblock = session->out_dblock;
01061     msg_service_data_t * const service_data = session->service_layer_data.need_data;
01062 
01063     ASSERT_GOTO(dblock != NULL, error);
01064     if (MsgIsLastData(service_data->flags))
01065         MsgSetLastData(dblock->status_flag);
01066 
01067     {
01068         size_t const header_bytes = MsgIsStart(dblock->status_flag) == connector_true ? record_end(start_packet) : record_end(data_packet);
01069         uint8_t * const msg_buffer = GET_PACKET_DATA_POINTER(session->send_data_ptr, PACKET_EDP_FACILITY_SIZE);
01070 
01071         session->send_data_bytes = service_data->length_in_bytes + header_bytes;
01072         msg_fill_msg_header(session, msg_buffer);
01073         status = msg_send_data(connector_ptr, session);
01074         if (status != connector_working)
01075             goto error;
01076     }
01077 
01078     dblock->total_bytes += service_data->length_in_bytes;
01079     if ((dblock->total_bytes - dblock->ack_count) > (dblock->available_window - MSG_MAX_SEND_PACKET_SIZE))
01080         MsgSetAckPending(dblock->status_flag);
01081 
01082 error:
01083     return status;
01084 }
01085 
01086 #endif
01087 
01088 static connector_status_t msg_get_service_data(connector_data_t * const connector_ptr, msg_session_t * const session)
01089 {
01090     connector_status_t status = connector_no_resource;
01091     msg_data_block_t * const dblock = session->out_dblock;
01092 
01093     ASSERT_GOTO(dblock != NULL, error);
01094     ASSERT_GOTO(MsgIsNotLastData(dblock->status_flag), done);
01095 
01096     status = msg_prepare_send_data(connector_ptr, session);
01097     if (status != connector_working)
01098         goto error;
01099 
01100     status = msg_call_service_layer(connector_ptr, session, msg_service_type_need_data);
01101     if (status != connector_working)
01102     {
01103         /* release the buffer */
01104         if ((MsgIsDoubleBuf(dblock->status_flag) == connector_false) && (MsgIsCompressed(dblock->status_flag) == connector_false))
01105             tcp_release_packet_buffer(connector_ptr, session->send_data_ptr, connector_success, NULL);
01106 
01107         goto done;
01108     }
01109 
01110     session->saved_state = session->current_state;
01111     status = msg_process_send_data(connector_ptr, session);
01112 
01113 error:
01114 done:
01115     return status;
01116 }
01117 
01118 #if (defined CONNECTOR_DATA_SERVICE)
01119 static connector_bool_t msg_initiate_request(connector_data_t * const connector_ptr, void const * const service_context)
01120 {
01121     connector_bool_t success = connector_false;
01122     connector_msg_data_t * const msg_ptr = get_facility_data(connector_ptr, E_MSG_FAC_MSG_NUM);
01123 
01124     ASSERT_GOTO(msg_ptr != NULL, error);
01125     if (msg_ptr->pending_service_request == NULL)
01126     {
01127         msg_ptr->pending_service_request = service_context;
01128         success = connector_true;
01129     }
01130 
01131 error:
01132     return success;
01133 }
01134 
01135 static connector_status_t msg_handle_pending_request(connector_data_t * const connector_ptr, connector_msg_data_t * const msg_ptr, msg_session_t * const session, connector_session_error_t const result)
01136 {
01137     connector_status_t status = connector_working;
01138     connector_msg_callback_t * const cb_fn = msg_ptr->service_cb[msg_service_id_data];
01139 
01140     if (cb_fn != NULL)
01141     {
01142         msg_service_request_t service_data;
01143 
01144         service_data.session = session;
01145         service_data.service_type = msg_service_type_pending_request;
01146         service_data.have_data = (void *)msg_ptr->pending_service_request;
01147         service_data.error_value = result;
01148 
01149         cb_fn(connector_ptr, &service_data);
01150         if ((service_data.error_value != connector_session_error_none) && (session != NULL))
01151             status = msg_delete_session(connector_ptr, msg_ptr, session);
01152     }
01153 
01154     msg_ptr->pending_service_request = NULL;
01155     return status;
01156 }
01157 
01158 static connector_status_t msg_start_session(connector_data_t * const connector_ptr, connector_msg_data_t * const msg_ptr)
01159 {
01160     connector_status_t status = connector_working;
01161     static connector_bool_t const client_owned = connector_true;
01162     msg_session_t * const session = msg_create_session(connector_ptr, msg_ptr, msg_service_id_data, client_owned, &status);
01163 
01164     if (session == NULL) goto error;
01165 
01166     {
01167         connector_session_error_t const result = msg_initialize_data_block(session, msg_ptr->capabilities[msg_capability_cloud].window_size, msg_block_state_send_request);
01168 
01169         status = msg_handle_pending_request(connector_ptr, msg_ptr, session, result);
01170     }
01171 
01172 error:
01173     return status;
01174 }
01175 #endif
01176 
01177 static connector_status_t msg_send_ack(connector_data_t * const connector_ptr, connector_msg_data_t * const msg_ptr, msg_session_t * const session)
01178 {
01179     connector_status_t status = connector_pending;
01180     uint8_t * ack_packet;
01181     uint8_t * const edp_packet = tcp_get_packet_buffer(connector_ptr, E_MSG_FAC_MSG_NUM, &ack_packet, NULL);
01182     msg_data_block_t * const dblock = session->in_dblock;
01183 
01184     if (edp_packet == NULL)
01185         goto done;
01186 
01187     ASSERT_GOTO(dblock != NULL, error);
01188     message_store_u8(ack_packet, opcode, msg_opcode_ack);
01189     {
01190         uint8_t const flag = MsgIsClientOwned(dblock->status_flag) ? 0 : MSG_FLAG_REQUEST;
01191 
01192         message_store_u8(ack_packet, flags, flag);
01193     }
01194     {
01195         uint16_t const session_id = (uint16_t)session->session_id;
01196 
01197         message_store_be16(ack_packet, transaction_id, session_id);
01198     }
01199     {
01200         uint32_t const val32 = (uint32_t) dblock->total_bytes;
01201 
01202         ASSERT(dblock->total_bytes <= UINT32_MAX);
01203         message_store_be32(ack_packet, ack_count, val32);
01204     }
01205 
01206     message_store_be32(ack_packet, window_size, msg_ptr->capabilities[msg_capability_client].window_size);
01207 
01208     status = tcp_initiate_send_facility_packet(connector_ptr, edp_packet, record_end(ack_packet),
01209                                            E_MSG_FAC_MSG_NUM, tcp_release_packet_buffer, NULL);
01210     if (status != connector_pending)
01211     {
01212         dblock->ack_count = dblock->total_bytes;
01213         MsgClearAckPending(dblock->status_flag);
01214         session->current_state = msg_state_receive;
01215     }
01216 
01217 error:
01218 done:
01219     return status;
01220 }
01221 
01222 static connector_status_t msg_process_capabilities(connector_data_t * const connector_ptr, connector_msg_data_t * const msg_fac, uint8_t * const ptr)
01223 {
01224     connector_status_t status = connector_working;
01225     uint8_t * capability_packet = ptr;
01226     connector_bool_t const request_capabilities = connector_bool((message_load_u8(capability_packet, flags) & MSG_FLAG_REQUEST) == MSG_FLAG_REQUEST);
01227     uint8_t const version = message_load_u8(capability_packet, version);
01228 
01229     ASSERT_GOTO(version == MSG_FACILITY_VERSION, error);
01230     msg_fac->capabilities[msg_capability_cloud].max_transactions = message_load_u8(capability_packet, max_transactions);
01231     msg_fac->capabilities[msg_capability_cloud].window_size = message_load_be32(capability_packet, window_size);
01232 
01233     {
01234         uint8_t const comp_count = message_load_u8(capability_packet, compression_count);
01235         uint8_t const * compression_list = capability_packet + record_bytes(capability_packet);
01236         int i;
01237 
01238         msg_fac->capabilities[msg_capability_cloud].compression_supported = connector_false;
01239         for (i = 0; i < comp_count; i++)
01240         {
01241             uint8_t const compression_id = *compression_list++;
01242 
01243             if (compression_id == MSG_COMPRESSION_LIBZ)
01244             {
01245                 msg_fac->capabilities[msg_capability_cloud].compression_supported = connector_true;
01246                 break;
01247             }
01248         }
01249     }
01250 
01251     if (request_capabilities)
01252     {
01253         uint8_t const capability_flag = 0;
01254 
01255         status = msg_send_capabilities(connector_ptr, msg_fac, capability_flag);
01256     }
01257 
01258 error:
01259     return status;
01260 }
01261 
01262 static connector_status_t msg_pass_service_data(connector_data_t * const connector_ptr, msg_session_t * const session, void * data, size_t const bytes)
01263 {
01264     connector_status_t status = connector_working;
01265     msg_data_block_t * const dblock = session->in_dblock;
01266 
01267     ASSERT_GOTO(session != NULL, error);
01268     ASSERT_GOTO(dblock != NULL, error);
01269 
01270     {
01271         unsigned int service_flag = 0;
01272         msg_service_data_t * const service_data = session->service_layer_data.have_data;
01273 
01274         ASSERT_GOTO(service_data != NULL, error);
01275         if (MsgIsStart(dblock->status_flag))
01276             MsgSetStart(service_flag);
01277 
01278         if (MsgIsLastData(dblock->status_flag))
01279             MsgSetLastData(service_flag);
01280 
01281         service_data->data_ptr = data;
01282         service_data->length_in_bytes = bytes;
01283         service_data->flags = service_flag;
01284         if ((MsgIsDoubleBuf(dblock->status_flag)) && (session->service_layer_data.need_data->data_ptr == NULL))
01285         {
01286             status = msg_prepare_send_data(connector_ptr, session);
01287             ASSERT_GOTO(status == connector_working, error);
01288         }
01289         status = msg_call_service_layer(connector_ptr, session, msg_service_type_have_data);
01290     }
01291 
01292     if (status != connector_working && status != connector_idle && status != connector_pending && status != connector_active)
01293         goto error;
01294 
01295     if (status == connector_working)
01296     {
01297         MsgClearStart(dblock->status_flag);
01298 
01299         if (MsgIsLastData(dblock->status_flag))
01300         {
01301             if (MsgIsRequest(dblock->status_flag))
01302             {
01303                 connector_msg_data_t * const msg_ptr = get_facility_data(connector_ptr, E_MSG_FAC_MSG_NUM);
01304                 connector_session_error_t result;
01305 
01306                 ASSERT_GOTO(msg_ptr != NULL, error);
01307                 if (MsgIsDoubleBuf(dblock->status_flag) == connector_false)
01308                 {
01309                     result = msg_initialize_data_block(session, msg_ptr->capabilities[msg_capability_cloud].window_size, msg_block_state_send_response);
01310                     if (result != connector_session_error_none)
01311                         status = msg_inform_error(connector_ptr, session, result);
01312                 }
01313                 else
01314                 {
01315                     MsgClearRequest(dblock->status_flag);
01316                     MsgClearRequest(session->out_dblock->status_flag);
01317                 }
01318             }
01319             else
01320                 session->current_state = msg_state_delete;
01321         }
01322         else
01323         {
01324             dblock->total_bytes += bytes;
01325 
01326             if ((dblock->total_bytes - dblock->ack_count) > (dblock->available_window/2))
01327             {
01328                 MsgSetAckPending(dblock->status_flag);
01329                 session->current_state = msg_state_send_ack;
01330             }
01331             else
01332                 session->current_state = msg_state_receive;
01333         }
01334     }
01335 
01336     if (MsgIsDoubleBuf(dblock->status_flag))
01337     {
01338         ASSERT_GOTO(session->service_layer_data.need_data != NULL, error);
01339         session->saved_state = session->current_state;
01340 
01341         if (MsgIsSendNow(session->service_layer_data.need_data->flags))
01342         {
01343             connector_status_t const send_status = msg_process_send_data(connector_ptr, session);
01344 
01345             ASSERT_GOTO(((send_status == connector_working) || (send_status == connector_pending) || (send_status == connector_active)), error);
01346             session->service_layer_data.need_data->data_ptr = NULL;
01347         }
01348     }
01349 
01350 error:
01351     return status;
01352 }
01353 
01354 #if (defined CONNECTOR_COMPRESSION)
01355 static connector_status_t msg_process_decompressed_data(connector_data_t * const connector_ptr, msg_session_t * const session)
01356 {
01357     connector_status_t status = connector_working;
01358     msg_data_block_t * const dblock = session->in_dblock;
01359 
01360     ASSERT_GOTO(dblock != NULL, error);
01361     ASSERT_GOTO(dblock->bytes_out > 0, error);
01362 
01363     status = msg_pass_service_data(connector_ptr, session, dblock->buffer_out, dblock->bytes_out);
01364     if (status ==  connector_working)
01365     {
01366         dblock->bytes_out = 0;
01367         if (session->current_state != msg_state_get_data)
01368         {
01369             if (dblock->zlib.avail_out == 0)
01370                 session->current_state = msg_state_decompress;
01371             else
01372             {
01373                 if (MsgIsAckPending(dblock->status_flag))
01374                     session->current_state = msg_state_send_ack;
01375             }
01376         }
01377     }
01378     else if ((status == connector_active) || (status == connector_pending))
01379     {
01380         session->current_state = msg_state_process_decompressed;
01381         status = connector_working;
01382     }
01383 
01384 error:
01385     return status;
01386 }
01387 
01388 static connector_status_t msg_decompress_data(connector_data_t * const connector_ptr, msg_session_t * const session)
01389 {
01390     connector_status_t status = connector_working;
01391     msg_data_block_t * const dblock = session->in_dblock;
01392     z_streamp zlib_ptr;
01393 
01394     ASSERT_GOTO(dblock != NULL, error);
01395     zlib_ptr = &dblock->zlib;
01396 
01397     if (zlib_ptr->avail_out == 0)
01398     {
01399         zlib_ptr->next_out = dblock->buffer_out;
01400         zlib_ptr->avail_out = sizeof dblock->buffer_out;
01401     }
01402 
01403     {
01404         int const zret = inflate(zlib_ptr, Z_NO_FLUSH);
01405 
01406         session->current_state = MsgIsAckPending(dblock->status_flag) ? msg_state_send_ack : msg_state_receive;
01407         switch(zret)
01408         {
01409         case Z_BUF_ERROR:
01410             if (zlib_ptr->avail_in > 0)
01411                 goto error;
01412             goto done;
01413 
01414         case Z_OK:
01415             if (zlib_ptr->avail_out > 0)
01416                 goto done;
01417             break;
01418 
01419         case Z_STREAM_END:
01420             MsgSetLastData(dblock->status_flag);
01421             break;
01422 
01423         default:
01424             goto error;
01425         }
01426 
01427         dblock->bytes_out = sizeof dblock->buffer_out - zlib_ptr->avail_out;
01428         status = msg_process_decompressed_data(connector_ptr, session);
01429     }
01430 
01431     goto done;
01432 
01433 error:
01434     status = msg_inform_error(connector_ptr, session, connector_session_error_decompression_failure);
01435 
01436 done:
01437     return status;
01438 }
01439 
01440 static connector_status_t msg_process_compressed_data(connector_data_t * const connector_ptr, msg_session_t * const session, uint8_t * data, size_t const bytes)
01441 {
01442     connector_status_t status = connector_working;
01443     msg_data_block_t * const dblock = session->in_dblock;
01444     z_streamp zlib_ptr;
01445 
01446     ASSERT_GOTO(dblock != NULL, error);
01447     zlib_ptr = &dblock->zlib;
01448 
01449     if (zlib_ptr->avail_in > 0)
01450     {
01451         status = connector_pending;
01452         goto done;
01453     }
01454 
01455     ASSERT_GOTO(bytes < sizeof dblock->buffer_in, error);
01456     memcpy(dblock->buffer_in, data, bytes);
01457 
01458     zlib_ptr->next_in = dblock->buffer_in;
01459     zlib_ptr->avail_in = bytes;
01460 
01461     status = msg_decompress_data(connector_ptr, session);
01462 
01463 error:
01464 done:
01465     return status;
01466 }
01467 #endif
01468 
01469 static connector_status_t msg_process_service_data(connector_data_t * const connector_ptr, msg_session_t * const session, uint8_t * msg_data, size_t const frame_bytes, size_t const header_bytes, unsigned const flag)
01470 {
01471     connector_status_t status = connector_working;
01472     uint8_t * buffer = msg_data + header_bytes;
01473     size_t const bytes = frame_bytes - header_bytes;
01474     msg_data_block_t * const dblock = session->in_dblock;
01475 
01476     ASSERT_GOTO(dblock != NULL, error);
01477     if (session->current_state != msg_state_receive)
01478     {
01479         status = ((session->current_state == msg_state_send_error) || (session->current_state == msg_state_delete)) ? connector_working : connector_active;
01480         goto done;
01481     }
01482 
01483     #if (defined CONNECTOR_COMPRESSION)
01484     if (MsgIsCompressed(dblock->status_flag))
01485     {
01486         status = msg_process_compressed_data(connector_ptr, session, buffer, bytes);
01487         goto done;
01488     }
01489     #endif
01490 
01491     if (MsgIsLastData(flag))
01492         MsgSetLastData(dblock->status_flag);
01493 
01494     status = msg_pass_service_data(connector_ptr, session, buffer, bytes);
01495 
01496 error:
01497 done:
01498     return status;
01499 }
01500 
01501 static connector_status_t msg_process_start(connector_data_t * const connector_ptr, connector_msg_data_t * const msg_ptr, uint8_t * ptr, uint16_t const length)
01502 {
01503     connector_status_t status = connector_working;
01504     connector_session_error_t result = connector_session_error_none;
01505     msg_session_t * session;
01506     uint8_t * start_packet = ptr;
01507     uint8_t flag = message_load_u8(start_packet, flags);
01508     uint16_t const session_id = message_load_be16(start_packet, transaction_id);
01509     connector_bool_t const client_owned = MsgIsNotRequest(flag);
01510 
01511     session = msg_find_session(msg_ptr, session_id, client_owned);
01512     if (session == NULL)
01513     {
01514         uint16_t const service_id = message_load_be16(start_packet, service_id);
01515 
01516         if (client_owned)
01517         {
01518              result = connector_session_error_unknown_session;
01519              goto error;
01520         }
01521 
01522         session = msg_create_session(connector_ptr, msg_ptr, service_id, client_owned, &status);
01523         if (session == NULL)
01524         {
01525             switch (status)
01526             {
01527                 case connector_pending:
01528                     result = connector_session_error_busy;
01529                     status = connector_working;
01530                     break;
01531 
01532                 default:
01533                     result = connector_session_error_fatal;
01534                     break;
01535             }
01536 
01537             goto error;
01538         }
01539 
01540         session->session_id = session_id;
01541         if (session->out_dblock != NULL)
01542         {
01543             result = msg_initialize_data_block(session, msg_ptr->capabilities[msg_capability_client].window_size, msg_block_state_send_response);
01544             if (result != connector_session_error_none)
01545                 goto error;
01546         }
01547 
01548         result = msg_initialize_data_block(session, msg_ptr->capabilities[msg_capability_client].window_size, msg_block_state_recv_request);
01549         if (result != connector_session_error_none)
01550             goto error;
01551     }
01552     else
01553     {
01554         if (session->current_state == msg_state_send_error)
01555             goto done;
01556 
01557         if (client_owned)
01558         {
01559             result = msg_initialize_data_block(session, msg_ptr->capabilities[msg_capability_client].window_size, msg_block_state_recv_response);
01560             if (result != connector_session_error_none)
01561                 goto error;
01562         }
01563     }
01564 
01565     {
01566         uint8_t const compression = message_load_u8(start_packet, compression_id);
01567 
01568         #if (defined CONNECTOR_COMPRESSION)
01569         if (compression == MSG_COMPRESSION_NONE)
01570             MsgClearCompression(session->in_dblock->status_flag);
01571         #else
01572         if (compression != MSG_COMPRESSION_NONE)
01573         {
01574             result = connector_session_error_decompression_failure;
01575             goto error;
01576         }
01577         #endif
01578     }
01579 
01580     MsgSetStart(flag);
01581     status = msg_process_service_data(connector_ptr, session, ptr, length, record_end(start_packet), flag);
01582     goto done;
01583 
01584 error:
01585     {
01586         connector_status_t error_status = (session != NULL) ? msg_inform_error(connector_ptr, session, result) : msg_send_error(connector_ptr, msg_ptr, session, session_id, result, flag);
01587 
01588         if (error_status == connector_abort)
01589             status = connector_abort;
01590     }
01591 
01592 done:
01593     return status;
01594 }
01595 
01596 static connector_status_t msg_process_data(connector_data_t * const connector_ptr, connector_msg_data_t * const msg_ptr, uint8_t * ptr, uint16_t const length)
01597 {
01598     connector_status_t status = connector_working;
01599     msg_session_t * session;
01600     uint8_t * const data_packet = ptr;
01601     uint8_t const flag = message_load_u8(data_packet, flags);
01602     uint16_t const session_id = message_load_be16(data_packet, transaction_id);
01603     connector_bool_t const client_owned = MsgIsNotRequest(flag);
01604 
01605     session = msg_find_session(msg_ptr, session_id, client_owned);
01606     if (session == NULL)
01607     {
01608         status = msg_send_error(connector_ptr, msg_ptr, session, session_id, connector_session_error_unknown_session, flag);
01609         goto done;
01610     }
01611 
01612     status = msg_process_service_data(connector_ptr, session, ptr, length, record_end(data_packet), flag);
01613 
01614 done:
01615     return status;
01616 }
01617 
01618 static connector_status_t msg_process_ack(connector_msg_data_t * const msg_fac, uint8_t const * ptr)
01619 {
01620     msg_session_t * session;
01621     uint8_t const * const ack_packet = ptr;
01622 
01623     {
01624         uint16_t const session_id = message_load_be16(ack_packet, transaction_id);
01625         uint8_t const flag = message_load_u8(ack_packet, flags);
01626         connector_bool_t const client_owned = MsgIsRequest(flag);
01627 
01628         session = msg_find_session(msg_fac, session_id, client_owned);
01629         /* already closed? done sending all data */
01630         if (session == NULL)
01631             goto done;
01632     }
01633 
01634     {
01635         msg_data_block_t * const dblock = session->out_dblock;
01636 
01637         ASSERT_GOTO(dblock != NULL, error);
01638         dblock->available_window = message_load_be32(ack_packet, window_size);
01639         dblock->ack_count = message_load_be32(ack_packet, ack_count);
01640 
01641         if (dblock->available_window > 0)
01642         {
01643             MsgClearAckPending(dblock->status_flag);
01644             if (session->current_state == msg_state_wait_ack)
01645                 session->current_state = session->saved_state;
01646         }
01647     }
01648 
01649 error:
01650 done:
01651     return connector_working;
01652 }
01653 
01654 static connector_status_t msg_process_error(connector_data_t * const connector_ptr, connector_msg_data_t * const msg_fac, uint8_t * const ptr)
01655 {
01656     connector_status_t status = connector_working;
01657     uint8_t * const error_packet = ptr;
01658     uint8_t const flag = message_load_u8(error_packet, flags);
01659     connector_bool_t const client_request_error = connector_bool(MsgIsRequest(flag) && MsgIsNotSender(flag));
01660     connector_bool_t const cloud_response_error = connector_bool(MsgIsNotRequest(flag) && MsgIsSender(flag));
01661     connector_bool_t const client_owned = connector_bool(client_request_error || cloud_response_error);
01662     uint16_t const session_id = message_load_be16(error_packet, transaction_id);
01663     msg_session_t * const session = msg_find_session(msg_fac, session_id, client_owned);
01664 
01665     if (session != NULL)
01666     {
01667         if ((session->current_state != msg_state_delete) && (session->current_state != msg_state_send_error))
01668         {
01669             uint8_t const error_val = message_load_u8(error_packet, error_code);
01670 
01671             ASSERT_GOTO(error_val < connector_session_error_count, error);
01672             {
01673                 connector_session_error_t const msg_error = (connector_session_error_t)error_val;
01674 
01675                 status = msg_inform_error(connector_ptr, session, msg_error);
01676                 if ((status != connector_pending) && (status != connector_abort))
01677                     status = msg_delete_session(connector_ptr, msg_fac, session);
01678             }
01679         }
01680     }
01681     else
01682     {
01683         connector_debug_printf("msg_process_error: unable to find session id = %d\n", session_id);
01684     }
01685 
01686 error:
01687     return status;
01688 }
01689 
01690 static connector_status_t msg_discovery(connector_data_t * const connector_ptr, void * const facility_data, uint8_t * const packet, unsigned int * const receive_timeout)
01691 {
01692     uint8_t const capability_flag = MSG_FLAG_REQUEST;
01693 
01694     UNUSED_PARAMETER(packet);
01695     UNUSED_PARAMETER(receive_timeout);
01696 
01697     return msg_send_capabilities(connector_ptr, facility_data, capability_flag);
01698 }
01699 
01700 static void msg_switch_session(connector_msg_data_t * const msg_ptr, msg_session_t * const session)
01701 {
01702     if (!msg_ptr->session_locked)
01703     {
01704         msg_ptr->session_locked = connector_true;
01705         msg_ptr->session.current = (session->prev != NULL) ? session->prev : msg_ptr->session.tail;
01706         msg_ptr->session_locked = connector_false;
01707     }
01708 }
01709 
01710 static connector_status_t msg_process_pending(connector_data_t * const connector_ptr, connector_msg_data_t * const msg_ptr, unsigned int * const receive_timeout)
01711 {
01712     connector_status_t status = connector_idle;
01713 
01714     if (msg_ptr->session_locked) goto done;
01715 
01716 #if (defined CONNECTOR_DATA_SERVICE)
01717     if (msg_ptr->pending_service_request != NULL)
01718     {
01719         status = msg_start_session(connector_ptr, msg_ptr);
01720         if (status != connector_working) goto done;
01721     }
01722 #endif
01723 
01724 #if (defined CONNECTOR_DATA_POINTS)
01725     status = dp_process_request(connector_ptr, connector_transport_tcp);
01726     if ((status != connector_idle) && (status != connector_working))
01727         goto done;
01728 #endif
01729 
01730     msg_ptr->session_locked = connector_true;
01731     if (msg_ptr->session.current == NULL)
01732         msg_ptr->session.current = msg_ptr->session.tail;
01733     msg_ptr->session_locked = connector_false;
01734 
01735     *receive_timeout = MAX_RECEIVE_TIMEOUT_IN_SECONDS;
01736     if (msg_ptr->session.current != NULL)
01737     {
01738         msg_session_t * const session = msg_ptr->session.current;
01739 
01740         status = connector_working;
01741 
01742         *receive_timeout = MIN_RECEIVE_TIMEOUT_IN_SECONDS;
01743         switch (session->current_state)
01744         {
01745         case msg_state_init:
01746         case msg_state_wait_ack:
01747         case msg_state_receive:
01748         case msg_state_wait_send_complete:
01749             msg_switch_session(msg_ptr, session);
01750             break;
01751 
01752         case msg_state_get_data:
01753             session->saved_state = msg_state_get_data;
01754             status = msg_get_service_data(connector_ptr, session);
01755             if (status == connector_pending)
01756                 msg_switch_session(msg_ptr, session);
01757             break;
01758 
01759         case msg_state_send_data:
01760             status = msg_send_data(connector_ptr, session);
01761             break;
01762 
01763         #if (defined CONNECTOR_COMPRESSION)
01764         case msg_state_compress:
01765             status = msg_compress_data(connector_ptr, session);
01766             break;
01767 
01768         case msg_state_decompress:
01769             status = msg_decompress_data(connector_ptr, session);
01770             break;
01771 
01772         case msg_state_process_decompressed:
01773             status = msg_process_decompressed_data(connector_ptr, session);
01774             break;
01775         #endif
01776 
01777         case msg_state_send_ack:
01778             status = msg_send_ack(connector_ptr, msg_ptr, session);
01779             break;
01780 
01781         case msg_state_send_error:
01782         {
01783             uint8_t const flag = (uint8_t)session->error_flag;
01784             uint16_t const session_id = (uint16_t)session->session_id;
01785 
01786             status = msg_send_error(connector_ptr, msg_ptr, session, session_id, session->error, flag);
01787             break;
01788         }
01789 
01790         case msg_state_delete:
01791             status = msg_delete_session(connector_ptr, msg_ptr, session);
01792             break;
01793 
01794         default:
01795             status = connector_init_error;
01796             connector_debug_printf("Failed %X, state%d\n", session, session->current_state);
01797             ASSERT_GOTO(connector_false, done);
01798             break;
01799         }
01800     }
01801 
01802 done:
01803     return status;
01804 }
01805 
01806 static connector_status_t msg_process(connector_data_t * const connector_ptr, void * const facility_data, uint8_t * const edp_header, unsigned int * const receive_timeout)
01807 {
01808     connector_status_t status = connector_idle;
01809     connector_msg_data_t * const msg_ptr = facility_data;
01810 
01811     ASSERT_GOTO(connector_ptr != NULL, error);
01812     ASSERT_GOTO(msg_ptr != NULL, error);
01813 
01814     if (edp_header != NULL)
01815     {
01816         uint8_t * const data_ptr = GET_PACKET_DATA_POINTER(edp_header, PACKET_EDP_FACILITY_SIZE);
01817         uint16_t const length = message_load_be16(edp_header, length);
01818         uint8_t const opcode = *data_ptr;
01819 
01820         switch (opcode)
01821         {
01822             case msg_opcode_capability:
01823                 status = msg_process_capabilities(connector_ptr, msg_ptr, data_ptr);
01824                 break;
01825 
01826             case msg_opcode_start:
01827                 status = msg_process_start(connector_ptr, msg_ptr, data_ptr, length);
01828                 break;
01829 
01830             case msg_opcode_data:
01831                 status = msg_process_data(connector_ptr, msg_ptr, data_ptr, length);
01832                 break;
01833 
01834             case msg_opcode_ack:
01835                 status = msg_process_ack(msg_ptr, data_ptr);
01836                 break;
01837 
01838             case msg_opcode_error:
01839                 status = msg_process_error(connector_ptr, msg_ptr, data_ptr);
01840                 break;
01841 
01842             default:
01843                 connector_debug_printf("msg_process: Invalid opcode\n");
01844                 break;
01845         }
01846     }
01847 
01848     {
01849         connector_status_t const pending_state =  msg_process_pending(connector_ptr, msg_ptr, receive_timeout);
01850 
01851         if ((pending_state == connector_pending) || (pending_state == connector_working) || (pending_state == connector_active))
01852         {
01853             if (status == connector_idle)
01854                 status = pending_state;
01855         }
01856         else
01857         {
01858             if (pending_state != connector_idle)
01859                 status = pending_state;
01860         }
01861     }
01862 
01863     if (status == connector_unavailable) status = connector_working;
01864 
01865 error:
01866     return status;
01867 }
01868 
01869 static connector_status_t msg_cleanup_all_sessions(connector_data_t * const connector_ptr, uint16_t const service_id)
01870 {
01871     connector_status_t status = connector_working;
01872     connector_msg_data_t * const msg_ptr = get_facility_data(connector_ptr, E_MSG_FAC_MSG_NUM);
01873     msg_session_t * session;
01874 
01875     ASSERT_GOTO(msg_ptr != NULL, error);
01876     session= msg_ptr->session.head;
01877 
01878     while(session != NULL)
01879     {
01880         msg_session_t * next_session = session->next;
01881 
01882         if (session->service_id == service_id)
01883         {
01884             if (session->current_state != msg_state_delete && session->current_state != msg_state_send_error)
01885                 msg_inform_error(connector_ptr, session, connector_session_error_cancel);
01886 
01887             status = msg_delete_session(connector_ptr, msg_ptr, session);
01888             if (status != connector_working) goto error;
01889         }
01890 
01891         session = next_session;
01892     }
01893 
01894 #if (defined CONNECTOR_DATA_SERVICE)
01895     if (msg_ptr->pending_service_request != NULL)
01896         status = msg_handle_pending_request(connector_ptr, msg_ptr, NULL, connector_session_error_cancel);
01897 #endif
01898 
01899 error:
01900     return status;
01901 }
01902 
01903 static connector_status_t msg_delete_facility(connector_data_t * const connector_ptr, uint16_t const service_id)
01904 {
01905     connector_status_t status = connector_no_resource;
01906     connector_msg_data_t * const msg_ptr = get_facility_data(connector_ptr, E_MSG_FAC_MSG_NUM);
01907 
01908     if (msg_ptr == NULL)
01909     {
01910         status = connector_idle;
01911         goto error;
01912     }
01913 
01914     ASSERT_GOTO(service_id < msg_service_id_count, error);
01915     ASSERT_GOTO(msg_ptr->service_cb[service_id] != NULL, error);
01916 
01917     msg_ptr->service_cb[service_id] = NULL;
01918 
01919     {
01920         connector_bool_t is_empty = connector_true;
01921         int i;
01922 
01923         for(i = 0; i < msg_service_id_count; i++)
01924         {
01925             if (msg_ptr->service_cb[i] != NULL)
01926             {
01927                 is_empty = connector_false;
01928                 break;
01929             }
01930         }
01931 
01932         status = is_empty == connector_true ? del_facility_data(connector_ptr, E_MSG_FAC_MSG_NUM) : connector_working;
01933     }
01934 
01935 error:
01936     return status;
01937 }
01938 
01939 
01940 static connector_status_t msg_init_facility(connector_data_t * const connector_ptr, unsigned int const facility_index, uint16_t service_id, connector_msg_callback_t callback)
01941 {
01942     connector_status_t status = connector_no_resource;
01943     connector_msg_data_t * msg_ptr = get_facility_data(connector_ptr, E_MSG_FAC_MSG_NUM);
01944 
01945     if (msg_ptr == NULL)
01946     {
01947         void * fac_ptr = NULL;
01948         connector_config_max_transaction_t config_max_transaction;
01949 
01950         status = add_facility_data(connector_ptr, facility_index, E_MSG_FAC_MSG_NUM, &fac_ptr, sizeof *msg_ptr);
01951         ASSERT_GOTO(status == connector_working, done);
01952         ASSERT_GOTO(fac_ptr != NULL, done);
01953 
01954         msg_ptr = fac_ptr;
01955         memset(msg_ptr, 0, sizeof *msg_ptr);
01956 
01957         #if (defined CONNECTOR_COMPRESSION)
01958         msg_ptr->capabilities[msg_capability_client].compression_supported = connector_true;
01959         #endif
01960 
01961         #if (defined CONNECTOR_MSG_MAX_TRANSACTION)
01962         config_max_transaction.count = CONNECTOR_MSG_MAX_TRANSACTION;
01963         #else
01964         {
01965             connector_request_id_t request_id;
01966             connector_callback_status_t callback_status;
01967 
01968             request_id.config_request = connector_request_id_config_max_transaction;
01969             callback_status = connector_callback(connector_ptr->callback, connector_class_id_config, request_id, &config_max_transaction);
01970             if (callback_status != connector_callback_continue && callback_status != connector_callback_unrecognized)
01971             {
01972                 status = connector_abort;
01973                 goto done;
01974             }
01975         }
01976         #endif
01977 
01978         msg_ptr->capabilities[msg_capability_client].max_transactions = config_max_transaction.count;
01979 
01980         {
01981             uint32_t const recv_window = 16384;
01982 
01983             ASSERT(recv_window > MSG_MAX_RECV_PACKET_SIZE);
01984             msg_ptr->capabilities[msg_capability_client].window_size = recv_window;
01985         }
01986     }
01987 
01988     msg_ptr->service_cb[service_id] = callback;
01989     status = connector_working;
01990 
01991 done:
01992     return status;
01993 }
01994 
01995 #if (defined CONNECTOR_RCI_SERVICE)
01996 
01997 #include "rci_service.h"
01998 
01999 #if (!defined CONNECTOR_FIRMWARE_SERVICE)
02000 #include "rci_fwstub.h"
02001 #endif
02002 #endif
02003 
02004