Important changes to repositories hosted on mbed.com
Mbed hosted mercurial repositories are deprecated and are due to be permanently deleted in July 2026.
To keep a copy of this software download the repository Zip archive or clone locally using Mercurial.
It is also possible to export all your personal repositories from the account settings page.
Dependents: Nucleo_mqtt_sim800_final _serial_farfan
Revision 0:e7957e1745cb, committed 2018-04-17
- Comitter:
- rcele_85
- Date:
- Tue Apr 17 05:02:48 2018 +0000
- Commit message:
- MQTT Implementation using sim800 on any Mbed supported board having 1 Hardware UART for SIM800 interfacing and 1 Hardware UART/USB for serial Debug
Changed in this revision
GSM_MQTT.cpp | Show annotated file Show diff for this revision Revisions of this file |
GSM_MQTT.h | Show annotated file Show diff for this revision Revisions of this file |
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/GSM_MQTT.cpp Tue Apr 17 05:02:48 2018 +0000 @@ -0,0 +1,835 @@ +/* ========================================================================================= +* GSM_MQTT == MQTT CLIENT LIBRARY FOR GPRS MODEM +* SUPPORTED GPRS MODEM ==> SIM800, SIM900, SIM300 +* SUPPORTED MBED HARDWARE ==> Any hardware with two Hardware Serial port and 1 timer +* +* Developed By : Ravi Butani +* Prof. Marwadi University, Rajkkot-INDIA +* Contact: ravi_butani@yahoo.com +* +* License : This library released under CC-BY-SA 4.0 license +* https://creativecommons.org/licenses/by-sa/4.0/legalcode.txt +* +* This library is derived from Arduino MQTT Client for SIM800 at +* https://github.com/elementzonline/SIM800_MQTT +*============================================================================================*/ + + +#include "GSM_MQTT.h" +#include "mbed.h" +#include <stdint.h> +#include <string> +#define ESP_RXBUF_LEN 300 + +volatile char esp_buf[ESP_RXBUF_LEN]; // Circular Buffer for Serial Receive As MODSERIAL not supported in GRPeach +volatile uint32_t esp_pos = 0, esp_por = 0, esp_ok = 0, esp_head = 0; // Variable used for hold head and tail of Circular Buffer + +Timer timer800; +extern uint8_t GSM_Response; + +extern string MQTT_HOST;// = "test.mosquitto.org"; +extern string MQTT_PORT;// = "1883"; +extern Serial SerialDBG; +extern Serial Serial800; +extern GSM_MQTT MQTT; +uint8_t GSM_Response = 0; +unsigned long previousMillis = 0; +//char inputString[UART_BUFFER_LENGTH]; // a string to hold incoming data +bool stringComplete = false; // whether the string is complete + + + +// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ESP8266 UART returns how many chars available to read <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< +int Serial800_available(void) +{ + return ((ESP_RXBUF_LEN + esp_pos - esp_por) % ESP_RXBUF_LEN); +} + + +// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ESP8266 UART returns read one char <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< +char Serial800_read(void) +{ + char c = esp_buf[esp_por]; + esp_por = (esp_por + 1)% ESP_RXBUF_LEN; + return(c); +} + +// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ESP8266 UART Flush receive Buffer <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< +void Serial800_flush(void) +{ + esp_pos = 0; + esp_por = 0; +} +// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ESP8266 UART Receive CallBack(ISR) <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< +void Serial800_callback() { + esp_buf[esp_pos] = Serial800.getc(); + esp_pos = (esp_pos+1)% ESP_RXBUF_LEN; +} +void serialEvent(); + +GSM_MQTT::GSM_MQTT(unsigned long KeepAlive) +{ + _KeepAliveTimeOut = KeepAlive; +} + +void GSM_MQTT::begin(void) +{ + TCP_Flag = false; + GSM_ReplyFlag; + pingFlag = false; + tcpATerrorcount = 0; + MQTT_Flag = false; + ConnectionAcknowledgement = NO_ACKNOWLEDGEMENT ; + PublishIndex = 0; + TopicLength = 0; + MessageLength = 0; + MessageFlag = false; + modemStatus = 0; + index = 0; + length = 0; + lengthLocal = 0; + _LastMessaseID = 0; + _ProtocolVersion = 3; + _PingPrevMillis = 0; + _tcpStatus = 0; + _tcpStatusPrev = 0; + _KeepAliveTimeOut; + SerialDBG.baud(9600); + Serial800.baud(9600); + Serial800.attach(&Serial800_callback); + Serial800.printf("AT\r\n"); + timer800.start(); + wait(1); + _tcpInit(); +} +char GSM_MQTT::_sendAT(char *command, unsigned long waitms) +{ + + unsigned long PrevMillis = timer800.read_ms(); + strcpy(reply, "none"); + GSM_Response = 0; + Serial800.printf("%s",command); + unsigned long currentMillis = timer800.read_ms(); + // SerialDBG.println(PrevMillis); + // SerialDBG.println(currentMillis); + while ( (GSM_Response == 0) && ((currentMillis - PrevMillis) < waitms) ) + { + // delay(1); + serialEvent(); + currentMillis = timer800.read_ms(); + } + return GSM_Response; +} +char GSM_MQTT::sendATreply(char *command, char *replystr, unsigned long waitms) +{ + strcpy(reply, replystr); + unsigned long PrevMillis = timer800.read_ms(); + GSM_ReplyFlag = 0; + Serial800.printf("%s",command); + unsigned long currentMillis =timer800.read_ms(); + + // SerialDBG.println(PrevMillis); + // SerialDBG.println(currentMillis); + while ( (GSM_ReplyFlag == 0) && ((currentMillis - PrevMillis) < waitms) ) + { + // delay(1); + serialEvent(); + currentMillis = timer800.read_ms(); + } + return GSM_ReplyFlag; +} +void GSM_MQTT::_tcpInit(void) +{ + switch (modemStatus) + { + case 0: + { + wait(1); + Serial800.printf("+++"); + wait(0.5); + if (_sendAT("AT\r\n", 5000) == 1) + { + modemStatus = 1; + } + else + { + modemStatus = 0; + break; + } + } + case 1: + { + if (_sendAT("ATE1\r\n", 2000) == 1) + { + modemStatus = 2; + } + else + { + modemStatus = 1; + break; + } + } + case 2: + { + if (sendATreply("AT+CREG?\r\n", "0,1", 5000) == 1) + { + _sendAT("AT+CIPMUX=0\r\n", 2000); + _sendAT("AT+CIPMODE=1\r\n", 2000); + if (sendATreply("AT+CGATT?\r\n", ": 1", 4000) != 1) + { + _sendAT("AT+CGATT=1\r\n", 2000); + } + modemStatus = 3; + _tcpStatus = 2; + } + else + { + modemStatus = 2; + break; + } + } + case 3: + { + if (GSM_ReplyFlag != 7) + { + _tcpStatus = sendATreply("AT+CIPSTATUS\r\n", "STATE", 4000); + if (_tcpStatusPrev == _tcpStatus) + { + tcpATerrorcount++; + if (tcpATerrorcount >= 10) + { + tcpATerrorcount = 0; + _tcpStatus = 7; + } + + } + else + { + _tcpStatusPrev = _tcpStatus; + tcpATerrorcount = 0; + } + } + _tcpStatusPrev = _tcpStatus; + SerialDBG.printf("%d",_tcpStatus); + switch (_tcpStatus) + { + case 2: + { + _sendAT("AT+CSTT=\"www\"\r\n", 5000); + break; + } + case 3: + { + _sendAT("AT+CIICR\r\n", 5000) ; + break; + } + case 4: + { + sendATreply("AT+CIFSR\r\n", ".", 4000) ; + break; + } + case 5: + { + Serial800.printf("AT+CIPSTART=\"TCP\",\""); + Serial800.printf("%s",MQTT_HOST); + Serial800.printf("\",\""); + Serial800.printf("%s",MQTT_PORT); + if (_sendAT("\"\r\n", 5000) == 1) + { + unsigned long PrevMillis = timer800.read_ms(); + unsigned long currentMillis = timer800.read_ms(); + while ( (GSM_Response != 4) && ((currentMillis - PrevMillis) < 20000) ) + { + // delay(1); + serialEvent(); + currentMillis = timer800.read_ms(); + } + } + break; + } + case 6: + { + unsigned long PrevMillis =timer800.read_ms(); + unsigned long currentMillis = timer800.read_ms(); + while ( (GSM_Response != 4) && ((currentMillis - PrevMillis) < 20000) ) + { + // delay(1); + serialEvent(); + currentMillis = timer800.read_ms(); + } + break; + } + case 7: + { + sendATreply("AT+CIPSHUT\r\n", "OK", 4000) ; + modemStatus = 0; + _tcpStatus = 2; + break; + } + } + } + } + +} + +void GSM_MQTT::_ping(void) +{ + + if (pingFlag == true) + { + unsigned long currentMillis = timer800.read_ms(); + if ((currentMillis - _PingPrevMillis ) >= _KeepAliveTimeOut * 1000) + { + // save the last time you blinked the LED + _PingPrevMillis = currentMillis; + Serial800.putc(char(PINGREQ * 16)); + _sendLength(0); + } + } +} +void GSM_MQTT::_sendUTFString(char *string1) +{ + int localLength = strlen(string1); + Serial800.putc(char(localLength / 256)); + Serial800.putc(char(localLength % 256)); + Serial800.printf("%s",string1); +} +void GSM_MQTT::_sendLength(int len) +{ + bool length_flag = false; + while (length_flag == false) + { + if ((len / 128) > 0) + { + Serial800.putc(char(len % 128 + 128)); + len /= 128; + } + else + { + length_flag = true; + Serial800.putc(char(len)); + } + } +} +//void GSM_MQTT::connect(char *ClientIdentifier, char UserNameFlag = 0, char PasswordFlag = 0, char *UserName = "", char *Password = "", char CleanSession = 1, char WillFlag = 0, char WillQoS = 0, char WillRetain = 0, char *WillTopic = "", char *WillMessage = "") +void GSM_MQTT::connect(char *ClientIdentifier, char UserNameFlag, char PasswordFlag, char *UserName, char *Password, char CleanSession, char WillFlag, char WillQoS, char WillRetain, char *WillTopic, char *WillMessage) +{ + //SerialDBG.printf("in connect\r\n"); + ConnectionAcknowledgement = NO_ACKNOWLEDGEMENT ; + Serial800.putc(char(CONNECT * 16 )); + char ProtocolName[7] = "MQIsdp"; + int localLength = (2 + strlen(ProtocolName)) + 1 + 3 + (2 + strlen(ClientIdentifier)); + if (WillFlag != 0) + { + localLength = localLength + 2 + strlen(WillTopic) + 2 + strlen(WillMessage); + } + if (UserNameFlag != 0) + { + localLength = localLength + 2 + strlen(UserName); + + if (PasswordFlag != 0) + { + localLength = localLength + 2 + strlen(Password); + } + } + _sendLength(localLength); + _sendUTFString(ProtocolName); + Serial800.putc(char(_ProtocolVersion)); + Serial800.putc(char(UserNameFlag * User_Name_Flag_Mask + PasswordFlag * Password_Flag_Mask + WillRetain * Will_Retain_Mask + WillQoS * Will_QoS_Scale + WillFlag * Will_Flag_Mask + CleanSession * Clean_Session_Mask)); + Serial800.putc(char(_KeepAliveTimeOut / 256)); + Serial800.putc(char(_KeepAliveTimeOut % 256)); + _sendUTFString(ClientIdentifier); + if (WillFlag != 0) + { + _sendUTFString(WillTopic); + _sendUTFString(WillMessage); + } + if (UserNameFlag != 0) + { + _sendUTFString(UserName); + if (PasswordFlag != 0) + { + _sendUTFString(Password); + } + } +} +void GSM_MQTT::publish( char *Topic, char *Message, char DUP, char Qos, char RETAIN, unsigned int MessageID) +{ + Serial800.putc(char(PUBLISH * 16 + DUP * DUP_Mask + Qos * QoS_Scale + RETAIN)); + int localLength = (2 + strlen(Topic)); + if (Qos > 0) + { + localLength += 2; + } + localLength += strlen(Message); + _sendLength(localLength); + _sendUTFString(Topic); + if (Qos > 0) + { + Serial800.putc(char(MessageID / 256)); + Serial800.putc(char(MessageID % 256)); + } + Serial800.printf("%s",Message); +} +void GSM_MQTT::publishACK(unsigned int MessageID) +{ + Serial800.putc(char(PUBACK * 16)); + _sendLength(2); + Serial800.putc(char(MessageID / 256)); + Serial800.putc(char(MessageID % 256)); +} +void GSM_MQTT::publishREC(unsigned int MessageID) +{ + Serial800.putc(char(PUBREC * 16)); + _sendLength(2); + Serial800.putc(char(MessageID / 256)); + Serial800.putc(char(MessageID % 256)); +} +void GSM_MQTT::publishREL(char DUP, unsigned int MessageID) +{ + Serial800.putc(char(PUBREL * 16 + DUP * DUP_Mask + 1 * QoS_Scale)); + _sendLength(2); + Serial800.putc(char(MessageID / 256)); + Serial800.putc(char(MessageID % 256)); +} + +void GSM_MQTT::publishCOMP(unsigned int MessageID) +{ + Serial800.putc(char(PUBCOMP * 16)); + _sendLength(2); + Serial800.putc(char(MessageID / 256)); + Serial800.putc(char(MessageID % 256)); +} +void GSM_MQTT::subscribe(char *SubTopic, char DUP, unsigned int MessageID, char SubQoS) +{ + Serial800.putc(char(SUBSCRIBE * 16 + DUP * DUP_Mask + 1 * QoS_Scale)); + int localLength = 2 + (2 + strlen(SubTopic)) + 1; + _sendLength(localLength); + Serial800.putc(char(MessageID / 256)); + Serial800.putc(char(MessageID % 256)); + _sendUTFString(SubTopic); + Serial800.putc(SubQoS); + +} +void GSM_MQTT::unsubscribe( char *SubTopic, char DUP, unsigned int MessageID) +{ + Serial800.putc(char(UNSUBSCRIBE * 16 + DUP * DUP_Mask + 1 * QoS_Scale)); + int localLength = (2 + strlen(SubTopic)) + 2; + _sendLength(localLength); + + Serial800.putc(char(MessageID / 256)); + Serial800.putc(char(MessageID % 256)); + + _sendUTFString(SubTopic); +} +void GSM_MQTT::disconnect(void) +{ + Serial800.putc(char(DISCONNECT * 16)); + _sendLength(0); + pingFlag = false; +} +//Messages +const char CONNECTMessage[] = "Client request to connect to Server\r\n"; +const char CONNACKMessage[] = "Connect Acknowledgment\r\n"; +const char PUBLISHMessage[] = "Publish message\r\n"; +const char PUBACKMessage[] = "Publish Acknowledgment\r\n"; +const char PUBRECMessage[] = "Publish Received (assured delivery part 1)\r\n"; +const char PUBRELMessage[] = "Publish Release (assured delivery part 2)\r\n"; +const char PUBCOMPMessage[] = "Publish Complete (assured delivery part 3)\r\n"; +const char SUBSCRIBEMessage[] = "Client Subscribe request\r\n"; +const char SUBACKMessage[] = "Subscribe Acknowledgment\r\n"; +const char UNSUBSCRIBEMessage[] = "Client Unsubscribe request\r\n"; +const char UNSUBACKMessage[] = "Unsubscribe Acknowledgment\r\n"; +const char PINGREQMessage[] = "PING Request\r\n"; +const char PINGRESPMessage[] = "PING Response\r\n"; +const char DISCONNECTMessage[] = "Client is Disconnecting\r\n"; + +void GSM_MQTT::printMessageType(uint8_t Message) +{ + switch (Message) + { + case CONNECT: + { + SerialDBG.printf("%s",CONNECTMessage); + break; + } + case CONNACK: + { + SerialDBG.printf("%s",CONNACKMessage); + break; + } + case PUBLISH: + { + SerialDBG.printf("%s",PUBLISHMessage); + break; + } + case PUBACK: + { + SerialDBG.printf("%s",PUBACKMessage); + break; + } + case PUBREC: + { + SerialDBG.printf("%s",PUBRECMessage); + break; + } + case PUBREL: + { + SerialDBG.printf("%s",PUBRELMessage); + break; + } + case PUBCOMP: + { + SerialDBG.printf("%s",PUBCOMPMessage); + break; + } + case SUBSCRIBE: + { + SerialDBG.printf("%s",SUBSCRIBEMessage); + break; + } + case SUBACK: + { + SerialDBG.printf("%s",SUBACKMessage); + break; + } + case UNSUBSCRIBE: + { + SerialDBG.printf("%s",UNSUBSCRIBEMessage); + break; + } + case UNSUBACK: + { + SerialDBG.printf("%s",UNSUBACKMessage); + break; + } + case PINGREQ: + { + SerialDBG.printf("%s",PINGREQMessage); + break; + } + case PINGRESP: + { + SerialDBG.printf("%s",PINGRESPMessage); + break; + } + case DISCONNECT: + { + SerialDBG.printf("%s",DISCONNECTMessage); + break; + } + } +} + +//Connect Ack +const char ConnectAck0[] = "Connection Accepted\r\n"; +const char ConnectAck1[] = "Connection Refused: unacceptable protocol version\r\n"; +const char ConnectAck2[] = "Connection Refused: identifier rejected\r\n"; +const char ConnectAck3[] = "Connection Refused: server unavailable\r\n"; +const char ConnectAck4[] = "Connection Refused: bad user name or password\r\n"; +const char ConnectAck5[] = "Connection Refused: not authorized\r\n"; +void GSM_MQTT::printConnectAck(uint8_t Ack) +{ + switch (Ack) + { + case 0: + { + SerialDBG.printf("%s",ConnectAck0); + break; + } + case 1: + { + SerialDBG.printf("%s",ConnectAck1); + break; + } + case 2: + { + SerialDBG.printf("%s",ConnectAck2); + break; + } + case 3: + { + SerialDBG.printf("%s",ConnectAck3); + break; + } + case 4: + { + SerialDBG.printf("%s",ConnectAck4); + break; + } + case 5: + { + SerialDBG.printf("%s",ConnectAck5); + break; + } + } +} +unsigned int GSM_MQTT::_generateMessageID(void) +{ + if (_LastMessaseID < 65535) + { + return ++_LastMessaseID; + } + else + { + _LastMessaseID = 0; + return _LastMessaseID; + } +} +void GSM_MQTT::processing(void) +{ + serialEvent(); + if (TCP_Flag == false) + { + MQTT_Flag = false; + _tcpInit(); + } + _ping(); +} +bool GSM_MQTT::available(void) +{ + return MQTT_Flag; +} +void serialEvent() +{ +//SerialDBG.printf("in serevent\r\n"); + while (Serial800_available()) + { + char inChar = (char)Serial800_read(); + if (MQTT.TCP_Flag == false) + { + if (MQTT.index < 200) + { + MQTT.inputString[MQTT.index++] = inChar; + } + if (inChar == '\n') + { + MQTT.inputString[MQTT.index] = 0; + stringComplete = true; + SerialDBG.printf("%s",MQTT.inputString); + if (strstr(MQTT.inputString, MQTT.reply) != NULL) + { + MQTT.GSM_ReplyFlag = 1; + if (strstr(MQTT.inputString, " INITIAL") != 0) + { + MQTT.GSM_ReplyFlag = 2; // + } + else if (strstr(MQTT.inputString, " START") != 0) + { + MQTT.GSM_ReplyFlag = 3; // + } + else if (strstr(MQTT.inputString, "IP CONFIG") != 0) + { + wait(0.0000010); + MQTT.GSM_ReplyFlag = 4; + } + else if (strstr(MQTT.inputString, " GPRSACT") != 0) + { + MQTT.GSM_ReplyFlag = 4; // + } + else if ((strstr(MQTT.inputString, " STATUS") != 0) || (strstr(MQTT.inputString, "TCP CLOSED") != 0)) + { + MQTT.GSM_ReplyFlag = 5; // + } + else if (strstr(MQTT.inputString, " TCP CONNECTING") != 0) + { + MQTT.GSM_ReplyFlag = 6; // + } + else if ((strstr(MQTT.inputString, " CONNECT OK") != 0) || (strstr(MQTT.inputString, "CONNECT FAIL") != NULL) || (strstr(MQTT.inputString, "PDP DEACT") != 0)) + { + MQTT.GSM_ReplyFlag = 7; + } + } + else if (strstr(MQTT.inputString, "OK") != NULL) + { + GSM_Response = 1; + } + else if (strstr(MQTT.inputString, "ERROR") != NULL) + { + GSM_Response = 2; + } + else if (strstr(MQTT.inputString, ".") != NULL) + { + GSM_Response = 3; + } + else if (strstr(MQTT.inputString, "CONNECT FAIL") != NULL) + { + GSM_Response = 5; + } + else if (strstr(MQTT.inputString, "CONNECT") != NULL) + { + GSM_Response = 4; + MQTT.TCP_Flag = true; + SerialDBG.printf("MQTT.TCP_Flag = True\r\n"); + //SerialDBG.printf("going to auto connect\r\n"); + MQTT.AutoConnect(); + //SerialDBG.printf("out of auto connect\r\n"); + MQTT.pingFlag = true; + MQTT.tcpATerrorcount = 0; + } + else if (strstr(MQTT.inputString, "CLOSED") != NULL) + { + GSM_Response = 4; + MQTT.TCP_Flag = false; + MQTT.MQTT_Flag = false; + } + MQTT.index = 0; + MQTT.inputString[0] = 0; + } + } + else + { + uint8_t ReceivedMessageType = (inChar / 16) & 0x0F; + uint8_t DUP = (inChar & DUP_Mask) / DUP_Mask; + uint8_t QoS = (inChar & QoS_Mask) / QoS_Scale; + uint8_t RETAIN = (inChar & RETAIN_Mask); + if ((ReceivedMessageType >= CONNECT) && (ReceivedMessageType <= DISCONNECT)) + { + bool NextLengthByte = true; + MQTT.length = 0; + MQTT.lengthLocal = 0; + uint32_t multiplier=1; + wait(0.002); + char Cchar = inChar; + while ( (NextLengthByte == true) && (MQTT.TCP_Flag == true)) + { + if (Serial800_available()>0) + { + inChar = (char)Serial800_read(); + SerialDBG.printf("%d\r\n",inChar); + if ((((Cchar & 0xFF) == 'C') && ((inChar & 0xFF) == 'L') && (MQTT.length == 0)) || (((Cchar & 0xFF) == '+') && ((inChar & 0xFF) == 'P') && (MQTT.length == 0))) + { + MQTT.index = 0; + MQTT.inputString[MQTT.index++] = Cchar; + MQTT.inputString[MQTT.index++] = inChar; + MQTT.TCP_Flag = false; + MQTT.MQTT_Flag = false; + MQTT.pingFlag = false; + SerialDBG.printf("Disconnecting\r\n"); + } + else + { + if ((inChar & 128) == 128) + { + MQTT.length += (inChar & 127) * multiplier; + multiplier *= 128; + SerialDBG.printf("More\r\n"); + } + else + { + NextLengthByte = false; + MQTT.length += (inChar & 127) * multiplier; + multiplier *= 128; + } + } + } + } + MQTT.lengthLocal = MQTT.length; + SerialDBG.printf("%d\r\n",MQTT.length); + if (MQTT.TCP_Flag == true) + { + MQTT.printMessageType(ReceivedMessageType); + MQTT.index = 0L; + uint32_t a = 0; + while ((MQTT.length-- > 0) && (Serial800_available()>0)) + { + MQTT.inputString[uint32_t(MQTT.index++)] = (char)Serial800_read(); + + wait(0.001); + + } + SerialDBG.printf("\r\n"); + if (ReceivedMessageType == CONNACK) + { + MQTT.ConnectionAcknowledgement = MQTT.inputString[0] * 256 + MQTT.inputString[1]; + if (MQTT.ConnectionAcknowledgement == 0) + { + MQTT.MQTT_Flag = true; + MQTT.OnConnect(); + + } + + MQTT.printConnectAck(MQTT.ConnectionAcknowledgement); + // MQTT.OnConnect(); + } + else if (ReceivedMessageType == PUBLISH) + { + uint32_t TopicLength = (MQTT.inputString[0]) * 256 + (MQTT.inputString[1]); + SerialDBG.printf("Topic : '"); + MQTT.PublishIndex = 0; + for (uint32_t iter = 2; iter < TopicLength + 2; iter++) + { + SerialDBG.putc(MQTT.inputString[iter]); + MQTT.Topic[MQTT.PublishIndex++] = MQTT.inputString[iter]; + } + MQTT.Topic[MQTT.PublishIndex] = 0; + SerialDBG.printf("' Message :'"); + MQTT.TopicLength = MQTT.PublishIndex; + + MQTT.PublishIndex = 0; + uint32_t MessageSTART = TopicLength + 2UL; + int MessageID = 0; + if (QoS != 0) + { + MessageSTART += 2; + MessageID = MQTT.inputString[TopicLength + 2UL] * 256 + MQTT.inputString[TopicLength + 3UL]; + } + for (uint32_t iter = (MessageSTART); iter < (MQTT.lengthLocal); iter++) + { + SerialDBG.putc(MQTT.inputString[iter]); + MQTT.Message[MQTT.PublishIndex++] = MQTT.inputString[iter]; + } + MQTT.Message[MQTT.PublishIndex] = 0; + SerialDBG.printf("'\r\n"); + MQTT.MessageLength = MQTT.PublishIndex; + if (QoS == 1) + { + MQTT.publishACK(MessageID); + } + else if (QoS == 2) + { + MQTT.publishREC(MessageID); + } + MQTT.OnMessage(MQTT.Topic, MQTT.TopicLength, MQTT.Message, MQTT.MessageLength); + MQTT.MessageFlag = true; + } + else if (ReceivedMessageType == PUBREC) + { + SerialDBG.printf("Message ID :"); + MQTT.publishREL(0, MQTT.inputString[0] * 256 + MQTT.inputString[1]) ; + SerialDBG.printf("%d",MQTT.inputString[0] * 256 + MQTT.inputString[1]) ; + + } + else if (ReceivedMessageType == PUBREL) + { + SerialDBG.printf("Message ID :"); + MQTT.publishCOMP(MQTT.inputString[0] * 256 + MQTT.inputString[1]) ; + SerialDBG.printf("%d\r\n",MQTT.inputString[0] * 256 + MQTT.inputString[1]) ; + + } + else if ((ReceivedMessageType == PUBACK) || (ReceivedMessageType == PUBCOMP) || (ReceivedMessageType == SUBACK) || (ReceivedMessageType == UNSUBACK)) + { + SerialDBG.printf("Message ID :"); + SerialDBG.printf("%d\r\n",MQTT.inputString[0] * 256 + MQTT.inputString[1]) ; + } + else if (ReceivedMessageType == PINGREQ) + { + MQTT.TCP_Flag = false; + MQTT.pingFlag = false; + SerialDBG.printf("Disconnecting\r\n"); + MQTT.sendATreply("AT+CIPSHUT\r\n", ".", 4000) ; + MQTT.modemStatus = 0; + } + } + } + else if ((inChar == 13) || (inChar == 10)) + { + } + else + { + SerialDBG.printf("Received :Unknown Message Type :"); + SerialDBG.printf("%d",inChar); + } + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/GSM_MQTT.h Tue Apr 17 05:02:48 2018 +0000 @@ -0,0 +1,124 @@ +/* ========================================================================================= +* GSM_MQTT == MQTT CLIENT LIBRARY FOR GPRS MODEM +* SUPPORTED GPRS MODEM ==> SIM800, SIM900, SIM300 +* SUPPORTED MBED HARDWARE ==> Any hardware with two Hardware Serial port and 1 timer +* +* Developed By : Ravi Butani +* Prof. Marwadi University, Rajkkot-INDIA +* Contact: ravi_butani@yahoo.com +* +* License : This library released under CC-BY-SA 4.0 license +* https://creativecommons.org/licenses/by-sa/4.0/legalcode.txt +* +* This library is derived from Arduino MQTT Client for SIM800 at +* https://github.com/elementzonline/SIM800_MQTT +*============================================================================================*/ +#ifndef GSM_MQTT_H_ +#define GSM_MQTT_H_ +#include <stdint.h> +#include <string> +#define UART_BUFFER_LENGTH 300 //Maximum length allowed for UART data +#define TOPIC_BUFFER_LENGTH 50 //Maximum length allowed Topic +#define MESSAGE_BUFFER_LENGTH 250 //Maximum length allowed data + +// ###################################################################################################################### +#define CONNECT 1 //Client request to connect to Server Client Server +#define CONNACK 2 //Connect Acknowledgment Server/Client Server/Client +#define PUBLISH 3 //Publish message Server/Client Server/Client +#define PUBACK 4 //Publish Acknowledgment Server/Client Server/Client +#define PUBREC 5 //Publish Received (assured delivery part 1) Server/Client Server/Client +#define PUBREL 6 //Publish Release (assured delivery part 2) Server/Client Server/Client +#define PUBCOMP 7 //Publish Complete (assured delivery part 3) Server/Client Server/Client +#define SUBSCRIBE 8 //Client Subscribe request Client Server +#define SUBACK 9 //Subscribe Acknowledgment Server Client +#define UNSUBSCRIBE 10 //Client Unsubscribe request Client Server +#define UNSUBACK 11 //Unsubscribe Acknowledgment Server Client +#define PINGREQ 12 //PING Request Client Server +#define PINGRESP 13 //PING Response Server Client +#define DISCONNECT 14 //Client is Disconnecting Client Server + +// QoS value bit 2 bit 1 Description +// 0 0 0 At most once Fire and Forget <=1 +// 1 0 1 At least once Acknowledged delivery >=1 +// 2 1 0 Exactly once Assured delivery =1 +// 3 1 1 Reserved +#define DUP_Mask 8 // Duplicate delivery Only for QoS>0 +#define QoS_Mask 6 // Quality of Service +#define QoS_Scale 2 // (()&QoS)/QoS_Scale +#define RETAIN_Mask 1 // RETAIN flag + +#define User_Name_Flag_Mask 128 +#define Password_Flag_Mask 64 +#define Will_Retain_Mask 32 +#define Will_QoS_Mask 24 +#define Will_QoS_Scale 8 +#define Will_Flag_Mask 4 +#define Clean_Session_Mask 2 + +#define DISCONNECTED 0 +#define CONNECTED 1 +#define NO_ACKNOWLEDGEMENT 255 + +class GSM_MQTT +{ + public: + volatile bool TCP_Flag;// = false; + volatile char GSM_ReplyFlag; + char reply[10]; + volatile bool pingFlag ;//= false; + volatile char tcpATerrorcount;// = 0; + volatile bool MQTT_Flag ;//= false; + volatile int ConnectionAcknowledgement;// = NO_ACKNOWLEDGEMENT ; + volatile int PublishIndex;// = 0; + char Topic[TOPIC_BUFFER_LENGTH]; + volatile int TopicLength;// = 0; + char Message[MESSAGE_BUFFER_LENGTH]; + volatile int MessageLength ;//= 0; + volatile int MessageFlag ;//= false; + volatile char modemStatus ;//= 0; + volatile uint32_t index ;//= 0; + volatile uint32_t length , lengthLocal; + + char inputString[UART_BUFFER_LENGTH]; + + GSM_MQTT(unsigned long KeepAlive); + void begin(void); + void connect(char *ClientIdentifier, char UserNameFlag = 0, char PasswordFlag = 0, char *UserName = "", char *Password = "", char CleanSession = 1, char WillFlag = 0, char WillQoS = 0, char WillRetain = 0, char *WillTopic = "", char *WillMessage = ""); + void publish(char *Topic, char *Message, char DUP = 0, char Qos = 0, char RETAIN = 0, unsigned int MessageID = 0); + void subscribe(char *SubTopic, char DUP = 0, unsigned int MessageID = 0, char SubQoS = 0); + void unsubscribe(char *SubTopic, char DUP = 0, unsigned int MessageID = 0); + void disconnect(void); + void processing(void); + bool available(void); + + + void AutoConnect(void); + void OnConnect(void); + void OnMessage(char *Topic, int TopicLength, char *Message, int MessageLength); + + void publishACK(unsigned int MessageID); + void publishREC(unsigned int MessageID); + void publishREL(char DUP, unsigned int MessageID); + void publishCOMP(unsigned int MessageID); + + void printMessageType(uint8_t Message); + void printConnectAck(uint8_t Ack); + + char sendATreply(char *command, char *replystr, unsigned long waitms); + + private: + volatile unsigned int _LastMessaseID;// = 0; + volatile char _ProtocolVersion;// = 3; + volatile unsigned long _PingPrevMillis;// = 0; + volatile char _tcpStatus;// = 0; + volatile char _tcpStatusPrev;// = 0; + volatile unsigned long _KeepAliveTimeOut; + + void _sendUTFString(char *string); + void _sendLength(int len); + void _ping(void); + void _tcpInit(void); + char _sendAT(char *command, unsigned long waitms); + unsigned int _generateMessageID(void); +}; +#endif /* GSM_MQTT_H_ */ \ No newline at end of file