Better timing

Dependencies:   FP MQTTPacket

Fork of MQTT by MQTT

Revision:
54:ff9e5c4b52d0
Parent:
49:08571008b958
--- a/MQTTSocket.h	Mon Sep 25 12:06:28 2017 +0000
+++ b/MQTTSocket.h	Sat Sep 30 16:46:21 2017 +0000
@@ -2,42 +2,95 @@
 #define MQTTSOCKET_H
 
 #include "MQTTmbed.h"
-#include "TCPSocketConnection.h"
+#include <EthernetInterface.h>
+#include <Timer.h>
 
 class MQTTSocket
 {
 public:
+    MQTTSocket(EthernetInterface *anet)
+    {
+        net = anet;
+        open = false;
+    }
+    
     int connect(char* hostname, int port, int timeout=1000)
     {
-        mysock.set_blocking(false, timeout);    // 1 second Timeout
-        return mysock.connect(hostname, port);
+        if (open)
+            disconnect();
+        nsapi_error_t rc = mysock.open(net);
+        open = true;
+        mysock.set_blocking(true);
+        mysock.set_timeout((unsigned int)timeout);  
+        rc = mysock.connect(hostname, port);
+        mysock.set_blocking(false);  // blocking timeouts seem not to work
+        return rc;
     }
 
+    // common read/write routine, avoiding blocking timeouts
+    int common(unsigned char* buffer, int len, int timeout, bool read)
+    {
+        timer.start();
+        mysock.set_blocking(false); // blocking timeouts seem not to work
+        int bytes = 0;
+        bool first = true;
+        do 
+        {
+            if (first)
+                first = false;
+            else
+                wait_ms(timeout < 100 ? timeout : 100);
+            int rc;
+            if (read)
+                rc = mysock.recv((char*)buffer, len);
+            else
+                rc = mysock.send((char*)buffer, len);
+            if (rc < 0)
+            {
+                if (rc != NSAPI_ERROR_WOULD_BLOCK)
+                {
+                    bytes = -1;
+                    break;
+                }
+            } 
+            else
+                bytes += rc;
+        }
+        while (bytes < len && timer.read_ms() < timeout);
+        timer.stop();
+        return bytes;
+    }
+
+    /* returns the number of bytes read, which could be 0.
+       -1 if there was an error on the socket
+    */
     int read(unsigned char* buffer, int len, int timeout)
     {
-        mysock.set_blocking(false, timeout);
-        return mysock.receive((char*)buffer, len);
+        return common(buffer, len, timeout, true);
     }
 
     int write(unsigned char* buffer, int len, int timeout)
     {
-        mysock.set_blocking(false, timeout);
-        return mysock.send((char*)buffer, len);
+        return common(buffer, len, timeout, false);
     }
 
     int disconnect()
     {
+        open = false;
         return mysock.close();
     }
 
-    bool is_connected()
+    /*bool is_connected()
     {
         return mysock.is_connected();
-    }
+    }*/
 
 private:
 
-    TCPSocketConnection mysock;
+    bool open;
+    TCPSocket mysock;
+    EthernetInterface *net;
+    Timer timer;
 
 };