Blob Blame Raw
diff --git a/modules/remotebackend/Makefile.am b/modules/remotebackend/Makefile.am
index c4bf579..04d110a 100644
--- a/modules/remotebackend/Makefile.am
+++ b/modules/remotebackend/Makefile.am
@@ -31,7 +31,7 @@ libtestremotebackend_la_SOURCES=../../pdns/dnsbackend.hh ../../pdns/dnsbackend.c
         ../../pdns/aes/dns_random.cc ../../pdns/packetcache.hh ../../pdns/packetcache.cc \
         ../../pdns/aes/aescpp.h ../../pdns/dns.hh ../../pdns/dns.cc ../../pdns/json.hh ../../pdns/json.cc \
         ../../pdns/aes/aescrypt.c ../../pdns/aes/aes.h ../../pdns/aes/aeskey.c ../../pdns/aes/aes_modes.c ../../pdns/aes/aesopt.h \
-        ../../pdns/aes/aestab.c ../../pdns/aes/aestab.h ../../pdns/aes/brg_endian.h ../../pdns/aes/brg_types.h ../pipebackend/coprocess.cc \
+        ../../pdns/aes/aestab.c ../../pdns/aes/aestab.h ../../pdns/aes/brg_endian.h ../../pdns/aes/brg_types.h \
         remotebackend.hh remotebackend.cc unixconnector.cc httpconnector.cc pipeconnector.cc
 
 libtestremotebackend_la_CFLAGS=$(BOOST_CPPFLAGS) @THREADFLAGS@ $(LIBCURL_CFLAGS) -g -O0 -I../../pdns
diff --git a/modules/remotebackend/pipeconnector.cc b/modules/remotebackend/pipeconnector.cc
index 9e28980..f86fd28 100644
--- a/modules/remotebackend/pipeconnector.cc
+++ b/modules/remotebackend/pipeconnector.cc
@@ -1,3 +1,5 @@
+#include <sys/types.h>
+#include <sys/wait.h>
 #include "remotebackend.hh"
 
 PipeConnector::PipeConnector(std::map<std::string,std::string> options) {
@@ -7,26 +9,92 @@
   }
   this->command = options.find("command")->second;
   this->options = options;
-  this->coproc = NULL;
+  d_timeout=2000;
+
+  if (options.find("timeout") != options.end()) {
+     d_timeout = boost::lexical_cast<int>(options.find("timeout")->second);
+  }
+
+  d_pid = -1;
+  d_fp = NULL;
   launch();
 }
 
 PipeConnector::~PipeConnector(){
-  if (this->coproc != NULL) 
-    delete coproc; 
+  int status;
+  // just in case...
+  if (d_pid == -1) return;
+
+  if(!waitpid(d_pid, &status, WNOHANG)) {
+    kill(d_pid, 9);
+    waitpid(d_pid, &status, 0);
+  }
+
+  close(d_fd1[1]);
+  if (d_fp != NULL) fclose(d_fp);
 }
 
 void PipeConnector::launch() {
-  if (coproc != NULL) return;
+  // no relaunch
+  if (d_pid > 0 && checkStatus()) return;
+
+  std::vector <std::string> v;
+  split(v, command, is_any_of(" "));
+
+  const char *argv[v.size()+1];
+  argv[v.size()]=0;
+
+  for (size_t n = 0; n < v.size(); n++)
+    argv[n]=v[n].c_str();
+
+  signal(SIGPIPE, SIG_IGN);
+
+  if(access(argv[0],X_OK)) // check before fork so we can throw
+    throw AhuException("Command '"+string(argv[0])+"' cannot be executed: "+stringerror());
+
+  if(pipe(d_fd1)<0 || pipe(d_fd2)<0)
+    throw AhuException("Unable to open pipe for coprocess: "+string(strerror(errno)));
+
+  if((d_pid=fork())<0)
+    throw AhuException("Unable to fork for coprocess: "+stringerror());
+  else if(d_pid>0) { // parent speaking
+    close(d_fd1[0]);
+    Utility::setCloseOnExec(d_fd1[1]);
+    close(d_fd2[1]);
+    Utility::setCloseOnExec(d_fd2[0]);
+    if(!(d_fp=fdopen(d_fd2[0],"r")))
+      throw AhuException("Unable to associate a file pointer with pipe: "+stringerror());
+    setbuf(d_fp,0); // no buffering please, confuses select
+  }
+  else if(!d_pid) { // child
+    signal(SIGCHLD, SIG_DFL); // silence a warning from perl
+    close(d_fd1[1]);
+    close(d_fd2[0]);
+
+    if(d_fd1[0]!= 0) {
+      dup2(d_fd1[0], 0);
+      close(d_fd1[0]);
+    }
+
+    if(d_fd2[1]!= 1) {
+      dup2(d_fd2[1], 1);
+      close(d_fd2[1]);
+    }
+
+    // stdin & stdout are now connected, fire up our coprocess!
+
+    if(execv(argv[0], const_cast<char * const *>(argv))<0) // now what
+      exit(123);
+
+    /* not a lot we can do here. We shouldn't return because that will leave a forked process around.
+       no way to log this either - only thing we can do is make sure that our parent catches this soonest! */
+  }
+
   rapidjson::Value val;
   rapidjson::Document init,res;
-  int timeout=2000;
-  if (options.find("timeout") != options.end()) { 
-     timeout = boost::lexical_cast<int>(options.find("timeout")->second);
-  }
-  coproc = new CoProcess(this->command, timeout);
   init.SetObject();
   val = "initialize";
+
   init.AddMember("method",val, init.GetAllocator());
   val.SetObject();
   init.AddMember("parameters", val, init.GetAllocator());
@@ -44,42 +112,83 @@ void PipeConnector::launch() {
 
 int PipeConnector::send_message(const rapidjson::Document &input)
 {
-   std::string data;
+   std::string line;
+   line = makeStringFromDocument(input);
+   launch();
 
-   data = makeStringFromDocument(input);
+   line.append(1,'\n');
 
-   launch();
-   try {
-      coproc->send(data);
-      return 1;
+   unsigned int sent=0;
+   int bytes;
+
+   // writen routine - socket may not accept al data in one go
+   while(sent<line.size()) {
+     bytes=write(d_fd1[1],line.c_str()+sent,line.length()-sent);
+     if(bytes<0)
+       throw AhuException("Writing to coprocess failed: "+std::string(strerror(errno)));
+
+     sent+=bytes;
    }
-   catch(AhuException &ae) {
-      delete coproc;
-      coproc=NULL;
-      throw;
-   } 
+   return sent;
 }
 
 int PipeConnector::recv_message(rapidjson::Document &output) 
 {
+   std::string receive;
    rapidjson::GenericReader<rapidjson::UTF8<> , rapidjson::MemoryPoolAllocator<> > r;
    std::string tmp;
    std::string s_output;
-
    launch();
-   try {
-      while(1) {
-        coproc->receive(tmp);
-        s_output.append(tmp);
-        rapidjson::StringStream ss(s_output.c_str());
-        output.ParseStream<0>(ss); 
-        if (output.HasParseError() == false)
-          return s_output.size();
-      }
-   } catch(AhuException &ae) {
-      L<<Logger::Warning<<"[pipeconnector] "<<" unable to receive data from coprocess. "<<ae.reason<<endl;
-      delete coproc;
-      coproc = NULL;
-      throw;
+
+   while(1) {
+     receive.clear();
+     if(d_timeout) {
+       struct timeval tv;
+       tv.tv_sec = d_timeout/1000;
+       tv.tv_usec = (d_timeout % 1000) * 1000;
+       fd_set rds;
+       FD_ZERO(&rds);
+       FD_SET(fileno(d_fp),&rds);
+       int ret=select(fileno(d_fp)+1,&rds,0,0,&tv);
+       if(ret<0) 
+         throw AhuException("Error waiting on data from coprocess: "+stringerror());
+       if(!ret)
+         throw AhuException("Timeout waiting for data from coprocess");
+     }
+
+     if(!stringfgets(d_fp, receive))
+       throw AhuException("Child closed pipe");
+  
+      s_output.append(receive);
+      rapidjson::StringStream ss(s_output.c_str());
+      output.ParseStream<0>(ss); 
+      if (output.HasParseError() == false)
+        return s_output.size();
    }
+   return 0;
+}
+
+bool PipeConnector::checkStatus()
+{
+  int status;
+  int ret=waitpid(d_pid, &status, WNOHANG);
+  if(ret<0)
+    throw AhuException("Unable to ascertain status of coprocess "+itoa(d_pid)+" from "+itoa(getpid())+": "+string(strerror(errno)));
+  else if(ret) {
+    if(WIFEXITED(status)) {
+      int ret=WEXITSTATUS(status);
+      throw AhuException("Coprocess exited with code "+itoa(ret));
+    }
+    if(WIFSIGNALED(status)) {
+      int sig=WTERMSIG(status);
+      string reason="CoProcess died on receiving signal "+itoa(sig);
+#ifdef WCOREDUMP
+      if(WCOREDUMP(status))
+        reason+=". Dumped core";
+#endif
+
+      throw AhuException(reason);
+    }
+  }
+  return true;
 }
diff --git a/modules/remotebackend/remotebackend.cc b/modules/remotebackend/remotebackend.cc
index 91d426a..7adedf4 100644
--- a/modules/remotebackend/remotebackend.cc
+++ b/modules/remotebackend/remotebackend.cc
@@ -49,11 +49,14 @@ bool Connector::recv(rapidjson::Document &value) {
 RemoteBackend::RemoteBackend(const std::string &suffix)
 {
       setArgPrefix("remote"+suffix);
-      build(getArg("connection-string"));
+
+      this->d_connstr = getArg("connection-string");
       this->d_result = NULL;
       this->d_dnssec = mustDo("dnssec");
       this->d_index = -1;
       this->d_trxid = 0;
+    
+      build();
 }
 
 RemoteBackend::~RemoteBackend() {
@@ -62,11 +65,40 @@ bool Connector::recv(rapidjson::Document &value) {
      }
 }
 
+bool RemoteBackend::send(rapidjson::Document &value) {
+   try {
+     return connector->send(value);
+   } catch (AhuException &ex) {
+     L<<Logger::Error<<"Exception caught when sending: "<<ex.reason<<std::endl;
+   } catch (...) {
+     L<<Logger::Error<<"Exception caught when sending"<<std::endl;
+   }
+
+   delete this->connector;
+   build();
+   return false;
+}
+
+bool RemoteBackend::recv(rapidjson::Document &value) {
+   try {
+     return connector->recv(value);
+   } catch (AhuException &ex) {
+     L<<Logger::Error<<"Exception caught when receiving: "<<ex.reason<<std::endl;
+   } catch (...) {
+     L<<Logger::Error<<"Exception caught when receiving"<<std::endl;;
+   }
+
+   delete this->connector;
+   build();
+   return false;
+}
+
+
 /** 
  * Builds connector based on options
  * Currently supports unix,pipe and http
  */
-int RemoteBackend::build(const std::string &connstr) {
+int RemoteBackend::build() {
       std::vector<std::string> parts;
       std::string type;
       std::string opts;
@@ -74,12 +106,12 @@ int RemoteBackend::build(const std::string &connstr) {
 
       // connstr is of format "type:options"
       size_t pos;
-      pos = connstr.find_first_of(":");
+      pos = d_connstr.find_first_of(":");
       if (pos == std::string::npos)
          throw AhuException("Invalid connection string: malformed");
 
-      type = connstr.substr(0, pos);
-      opts = connstr.substr(pos+1);
+      type = d_connstr.substr(0, pos);
+      opts = d_connstr.substr(pos+1);
 
       // tokenize the string on comma
       stringtok(parts, opts, ",");
@@ -155,7 +187,7 @@ void RemoteBackend::lookup(const QType &qtype, const std::string &qdomain, DNSPa
 
    d_result = new rapidjson::Document();
 
-   if (connector->send(query) == false || connector->recv(*d_result) == false) { 
+   if (this->send(query) == false || this->recv(*d_result) == false) { 
       delete d_result;
       return;
    }
@@ -186,7 +218,7 @@ bool RemoteBackend::list(const std::string &target, int domain_id) {
 
    d_result = new rapidjson::Document();
 
-   if (connector->send(query) == false || connector->recv(*d_result) == false) {
+   if (this->send(query) == false || this->recv(*d_result) == false) {
      delete d_result;
      return false;
    }
@@ -245,7 +277,7 @@ bool RemoteBackend::getBeforeAndAfterNamesAbsolute(uint32_t id, const std::strin
    JSON_ADD_MEMBER(parameters, "qname", qname.c_str(), query.GetAllocator());
    query.AddMember("parameters", parameters, query.GetAllocator());
 
-   if (connector->send(query) == false || connector->recv(answer) == false)
+   if (this->send(query) == false || this->recv(answer) == false)
      return false;
 
    unhashed = getString(answer["result"]["unhashed"]);
@@ -266,13 +298,13 @@ bool RemoteBackend::getDomainMetadata(const std::string& name, const std::string
    JSON_ADD_MEMBER(parameters, "kind", kind.c_str(), query.GetAllocator());
    query.AddMember("parameters", parameters, query.GetAllocator());
 
-   if (connector->send(query) == false)
+   if (this->send(query) == false)
      return false;
 
    meta.clear();
 
    // not mandatory to implement
-   if (connector->recv(answer) == false)
+   if (this->recv(answer) == false)
      return true;
 
    if (answer["result"].IsArray()) {
@@ -301,7 +333,7 @@ bool RemoteBackend::setDomainMetadata(const string& name, const std::string& kin
    parameters.AddMember("value", val, query.GetAllocator());
    query.AddMember("parameters", parameters, query.GetAllocator());
 
-   if (connector->send(query) == false || connector->recv(answer) == false)
+   if (this->send(query) == false || this->recv(answer) == false)
      return false;
 
    return getBool(answer["result"]);
@@ -321,7 +353,7 @@ bool RemoteBackend::getDomainKeys(const std::string& name, unsigned int kind, st
    JSON_ADD_MEMBER(parameters, "kind", kind, query.GetAllocator());
    query.AddMember("parameters", parameters, query.GetAllocator());
 
-   if (connector->send(query) == false || connector->recv(answer) == false)
+   if (this->send(query) == false || this->recv(answer) == false)
      return false;
 
    keys.clear();
@@ -351,7 +383,7 @@ bool RemoteBackend::removeDomainKey(const string& name, unsigned int id) {
    JSON_ADD_MEMBER(parameters, "id", id, query.GetAllocator());
    query.AddMember("parameters", parameters, query.GetAllocator());
 
-   if (connector->send(query) == false || connector->recv(answer) == false)
+   if (this->send(query) == false || this->recv(answer) == false)
      return false;
 
    return true;
@@ -374,7 +406,7 @@ int RemoteBackend::addDomainKey(const string& name, const KeyData& key) {
    parameters.AddMember("key", jkey, query.GetAllocator());
    query.AddMember("parameters", parameters, query.GetAllocator());
 
-   if (connector->send(query) == false || connector->recv(answer) == false)
+   if (this->send(query) == false || this->recv(answer) == false)
      return false;
 
    return getInt(answer["result"]);
@@ -394,7 +426,7 @@ bool RemoteBackend::activateDomainKey(const string& name, unsigned int id) {
    JSON_ADD_MEMBER(parameters, "id", id, query.GetAllocator());
    query.AddMember("parameters", parameters, query.GetAllocator());
 
-   if (connector->send(query) == false || connector->recv(answer) == false)
+   if (this->send(query) == false || this->recv(answer) == false)
      return false;
 
    return true;
@@ -414,7 +446,7 @@ bool RemoteBackend::deactivateDomainKey(const string& name, unsigned int id) {
    JSON_ADD_MEMBER(parameters, "id", id, query.GetAllocator());
    query.AddMember("parameters", parameters, query.GetAllocator());
 
-   if (connector->send(query) == false || connector->recv(answer) == false)
+   if (this->send(query) == false || this->recv(answer) == false)
      return false;
 
    return true;
@@ -436,7 +468,7 @@ bool RemoteBackend::getTSIGKey(const std::string& name, std::string* algorithm,
    JSON_ADD_MEMBER(parameters, "name", name.c_str(), query.GetAllocator());
    query.AddMember("parameters", parameters, query.GetAllocator());
 
-   if (connector->send(query) == false || connector->recv(answer) == false)
+   if (this->send(query) == false || this->recv(answer) == false)
      return false;
 
    if (algorithm != NULL)
@@ -459,7 +491,7 @@ bool RemoteBackend::getDomainInfo(const string &domain, DomainInfo &di) {
    JSON_ADD_MEMBER(parameters, "name", domain.c_str(), query.GetAllocator());
    query.AddMember("parameters", parameters, query.GetAllocator());
 
-   if (connector->send(query) == false || connector->recv(answer) == false)
+   if (this->send(query) == false || this->recv(answer) == false)
      return false;
 
    // make sure we got zone & kind
@@ -506,7 +538,7 @@ void RemoteBackend::setNotified(uint32_t id, uint32_t serial) {
    JSON_ADD_MEMBER(parameters, "serial", serial, query.GetAllocator());
    query.AddMember("parameters", parameters, query.GetAllocator());
  
-   if (connector->send(query) == false || connector->recv(answer) == false) {
+   if (this->send(query) == false || this->recv(answer) == false) {
       L<<Logger::Error<<kBackendId<<"Failed to execute RPC for RemoteBackend::setNotified("<<id<<","<<serial<<")"<<endl;
    }
 }
@@ -541,7 +573,7 @@ bool RemoteBackend::superMasterBackend(const string &ip, const string &domain, c
 
    *ddb = 0;
 
-   if (connector->send(query) == false || connector->recv(answer) == false)
+   if (this->send(query) == false || this->recv(answer) == false)
      return false;
 
    // we are the backend
@@ -565,7 +597,7 @@ bool RemoteBackend::createSlaveDomain(const string &ip, const string &domain, co
    JSON_ADD_MEMBER(parameters, "account", account.c_str(), query.GetAllocator());
    query.AddMember("parameters", parameters, query.GetAllocator());
 
-   if (connector->send(query) == false || connector->recv(answer) == false)
+   if (this->send(query) == false || this->recv(answer) == false)
      return false;
    return true;
 }
@@ -600,7 +632,7 @@ bool RemoteBackend::replaceRRSet(uint32_t domain_id, const string& qname, const
    parameters.AddMember("rrset", rj_rrset, query.GetAllocator());
    query.AddMember("parameters", parameters, query.GetAllocator());
 
-   if (connector->send(query) == false || connector->recv(answer) == false)
+   if (this->send(query) == false || this->recv(answer) == false)
      return false;
 
    return true;
@@ -630,7 +662,7 @@ bool RemoteBackend::feedRecord(const DNSResourceRecord &rr, string *ordername) {
 
    query.AddMember("parameters", parameters, query.GetAllocator());
 
-   if (connector->send(query) == false || connector->recv(answer) == false)
+   if (this->send(query) == false || this->recv(answer) == false)
      return false;
    return true; // XXX FIXME this API should not return 'true' I think -ahu
 }
@@ -651,7 +683,7 @@ bool RemoteBackend::feedEnts(int domain_id, set<string>& nonterm) {
    parameters.AddMember("nonterm", nts, query.GetAllocator());
    query.AddMember("parameters", parameters, query.GetAllocator());
 
-   if (connector->send(query) == false || connector->recv(answer) == false)
+   if (this->send(query) == false || this->recv(answer) == false)
      return false;
    return true; 
 }
@@ -677,7 +709,7 @@ bool RemoteBackend::feedEnts3(int domain_id, const string &domain, set<string> &
    parameters.AddMember("nonterm", nts, query.GetAllocator());
    query.AddMember("parameters", parameters, query.GetAllocator());
 
-   if (connector->send(query) == false || connector->recv(answer) == false)
+   if (this->send(query) == false || this->recv(answer) == false)
      return false;
    return true;
 }
@@ -696,7 +728,7 @@ bool RemoteBackend::startTransaction(const string &domain, int domain_id) {
 
    query.AddMember("parameters", parameters, query.GetAllocator());
 
-   if (connector->send(query) == false || connector->recv(answer) == false) {
+   if (this->send(query) == false || this->recv(answer) == false) {
      d_trxid = -1;
      return false;
    }
@@ -714,7 +746,7 @@ bool RemoteBackend::commitTransaction() {
    query.AddMember("parameters", parameters, query.GetAllocator());
 
    d_trxid = -1;
-   if (connector->send(query) == false || connector->recv(answer) == false)
+   if (this->send(query) == false || this->recv(answer) == false)
      return false;
    return true;
 }
@@ -730,7 +762,7 @@ bool RemoteBackend::abortTransaction() {
    query.AddMember("parameters", parameters, query.GetAllocator());
 
    d_trxid = -1;
-   if (connector->send(query) == false || connector->recv(answer) == false)
+   if (this->send(query) == false || this->recv(answer) == false)
      return false;
    return true;
 }
@@ -759,7 +791,7 @@ bool RemoteBackend::calculateSOASerial(const string& domain, const SOAData& sd,
    parameters.AddMember("sd", soadata, query.GetAllocator());
    query.AddMember("parameters", parameters, query.GetAllocator());
 
-   if (connector->send(query) == false || connector->recv(answer) == false)
+   if (this->send(query) == false || this->recv(answer) == false)
      return false;
 
    serial = getInt64(answer["result"]);
diff --git a/modules/remotebackend/remotebackend.hh b/modules/remotebackend/remotebackend.hh
index 18b6a97..2f2d717 100644
--- a/modules/remotebackend/remotebackend.hh
+++ b/modules/remotebackend/remotebackend.hh
@@ -92,9 +92,15 @@ class PipeConnector: public Connector {
   private:
 
   void launch();
-  CoProcess *coproc;
+  bool checkStatus();
+
   std::string command;
   std::map<std::string,std::string> options;
+ 
+  int d_fd1[2], d_fd2[2];
+  int d_pid;
+  int d_timeout;
+  FILE *d_fp;
 };
 
 class RemoteBackend : public DNSBackend
@@ -133,12 +139,13 @@ class RemoteBackend : public DNSBackend
   static DNSBackend *maker();
 
   private:
-    int build(const std::string &connstr);
+    int build();
     Connector *connector;
     bool d_dnssec;
     rapidjson::Document *d_result;
     int d_index;
     int64_t d_trxid;
+    std::string d_connstr;
 
     bool getBool(rapidjson::Value &value);
     int getInt(rapidjson::Value &value);
@@ -146,5 +153,8 @@ class RemoteBackend : public DNSBackend
     int64_t getInt64(rapidjson::Value &value);
     std::string getString(rapidjson::Value &value);
     double getDouble(rapidjson::Value &value);
+
+    bool send(rapidjson::Document &value);
+    bool recv(rapidjson::Document &value);
 };
 #endif